The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

AnyEvent::RabbitMQ::RPC - RPC queues via RabbitMQ

SYNOPSIS

use AnyEvent::RabbitMQ::RPC;

my $rpc = AnyEvent::RabbitMQ::RPC->new(
    host   => 'localhost',
    port   => 5672,
    user   => 'guest',
    pass   => 'guest',
    vhost  => '/',
    serialize => 'Storable',
);

print $rpc->call(
    name => 'MethodName',
    args => { some => "data" },
);

DESCRIPTION

AnyEvent::RabbitMQ::RPC provides an AnyEvent-based reliable job queue atop the RabbitMQ event server. This can be used as a replacement for similar reliable job queue/RPC client-worker models, such as TheSchwartz.

RPC classes can register calls that they can handle, and/or use call to request another client perform work.

METHODS

new

Create a new RPC object. Either an existing AnyEvent::RabbitMQ object can be passed using the connection argument, or the all of the provided parameters will be passed through to "connect" in AnyEvent::RabbitMQ on a new object. In the latter case, common parameters include host, port, user, pass, and vhost.

If you wish to pass complex data structures back and forth to remote workers, a value must be passed for serialize. Both worker and client must be configured to use the same serialization method. The available options are:

YAML

Use "Dump" in YAML::Any and "Load" in YAML::Any to serialize and deserialize data.

JSON

Use "objToJson" in JSON::Any and "jsonToObj" in JSON::Any to serialize and deserialize.

Storable

Use "nfreeze" in Storable and "thaw" in Storable to serialize and deserialize.

Two callback points, on_success and on_failure, are provided. on_success will be passed the initialized AnyEvent::RabbitMQ::RPC object; on_failure will be passed the reason for the failure. If no on_success is provided, this call will block using an AnyEvent::CondVar until the connection is established.

register name => STRING, run => SUBREF

Establishes that the current process knows how to run the job named STRING, whose definition is provided by the surboutine SUBREF. The subroutine will be called whenever a job is removed from the queue; it will be called with the argument passed to /call, which may be more than a string if serialize was set during "new".

Due to a limitation of AnyEvent::RabbitMQ, false values returned by the subroutine are transformed into the true-valued string 0E0. Subroutines which fail to execute to completion (via die or other runtime execution failure) will re-insert the job into the queue for the next worker to process.

Returning non-string values requires that both worker and client have been created with the same (non-empty) value of serialize passed to "new".

A callback on_failure may optionally be passed, which will be called with an error message if suitable channels cannoot be configured on the RabbitMQ server.

call name => STRING, args => VALUE

Submits a job to the job queue. The VALUE provided must be a string, unless serialize was passed to "new".

Three callbacks exist:

on_reply

Called when the job has been completed successfully, and will be passed the return value of the job. Returning non-string values requires that both worker and client have been created with the same (non-empty) value of serialize passed to "new".

on_sent

Called once the job has been submitted, with a true value if the job was submitted successfully. A false value will be passed if the requisite channels could not be configured, and the job was not submitted sucessfully.

on_failure

Called if there was an error submitting the job, and is passed the reason for the failure. If on_sent was also provided, this is also called after on_sent is called with a false value.

If no value for on_reply is provided, and the call function is not in void context, a AnyEvent::CondVar is used to automatically block until a reply is received from a worker; the return value of the reply is then returned from "call".

connection

Returns the AnyEvent::RabbitMQ connection used by this object.

channel

Returns the AnyEvent::RabbitMQ::Channel used by this object.

rpc_queue queue => NAME, on_success => CALLBACK

Creates the queue with the given name, used to schedule jobs. These queues are durable, and thus persist across program invocations and RabbitMQ restarts.

The on_success callback is called once the queue is known to exist. The on_failure may alternately be called with a reason if the queue creation fails.

reply_queue on_success => CALLBACK

Creates a temporary queue used to reply to job requests. These queues are anonymous and ephemeral, and are torn down after each RPC call.

The on_success callback is called with the name of the queue that has been created. The on_failure may alternately be called with a reason if the queue creation fails.

AUTHOR

Alex Vandiver <alexmv@bestpractical.com>

BUGS

All bugs should be reported via http://rt.cpan.org/Public/Dist/Display.html?Name=AnyEvent-RabbitMQ-RPC or bug-AnyEvent-RabbitMQ-RPC@rt.cpan.org.

LICENSE AND COPYRIGHT

This software is Copyright (c) 2012 by Best Practical Solutions

This is free software, licensed under:

The GNU General Public License, Version 2, June 1991