Alex Vandiver
and 1 contributors


AnyEvent::RabbitMQ::RPC - RPC queues via RabbitMQ


    use AnyEvent::RabbitMQ::RPC;

    my $rpc = AnyEvent::RabbitMQ::RPC->new(
        host   => 'localhost',
        port   => 5672,
        user   => 'guest',
        pass   => 'guest',
        vhost  => '/',
        serialize => 'Storable',

    print $rpc->call(
        name => 'MethodName',
        args => { some => "data" },


AnyEvent::RabbitMQ::RPC provides an AnyEvent-based reliable job queue atop the RabbitMQ event server. This can be used as a replacement for similar reliable job queue/RPC client-worker models, such as TheSchwartz.

RPC classes can register calls that they can handle, and/or use call to request another client perform work.



Create a new RPC object. Either an existing AnyEvent::RabbitMQ object can be passed using the connection argument, or the all of the provided parameters will be passed through to "connect" in AnyEvent::RabbitMQ on a new object. In the latter case, common parameters include host, port, user, pass, and vhost.

If you wish to pass complex data structures back and forth to remote workers, a value must be passed for serialize. Both worker and client must be configured to use the same serialization method. The available options are:


Use "Dump" in YAML::Any and "Load" in YAML::Any to serialize and deserialize data.


Use "objToJson" in JSON::Any and "jsonToObj" in JSON::Any to serialize and deserialize.


Use "nfreeze" in Storable and "thaw" in Storable to serialize and deserialize.

Two callback points, on_success and on_failure, are provided. on_success will be passed the initialized AnyEvent::RabbitMQ::RPC object; on_failure will be passed the reason for the failure. If no on_success is provided, this call will block using an AnyEvent::CondVar until the connection is established.

register name => STRING, run => SUBREF

Establishes that the current process knows how to run the job named STRING, whose definition is provided by the surboutine SUBREF. The subroutine will be called whenever a job is removed from the queue; it will be called with the argument passed to /call, which may be more than a string if serialize was set during "new".

Due to a limitation of AnyEvent::RabbitMQ, false values returned by the subroutine are transformed into the true-valued string 0E0. Subroutines which fail to execute to completion (via die or other runtime execution failure) will re-insert the job into the queue for the next worker to process.

Returning non-string values requires that both worker and client have been created with the same (non-empty) value of serialize passed to "new".

A callback on_failure may optionally be passed, which will be called with an error message if suitable channels cannoot be configured on the RabbitMQ server.

call name => STRING, args => VALUE

Submits a job to the job queue. The VALUE provided must be a string, unless serialize was passed to "new".

Three callbacks exist:


Called when the job has been completed successfully, and will be passed the return value of the job. Returning non-string values requires that both worker and client have been created with the same (non-empty) value of serialize passed to "new".


Called once the job has been submitted, with a true value if the job was submitted successfully. A false value will be passed if the requisite channels could not be configured, and the job was not submitted sucessfully.


Called if there was an error submitting the job, and is passed the reason for the failure. If on_sent was also provided, this is also called after on_sent is called with a false value.

If no value for on_reply is provided, and the call function is not in void context, a AnyEvent::CondVar is used to automatically block until a reply is received from a worker; the return value of the reply is then returned from "call".


Returns the AnyEvent::RabbitMQ connection used by this object.


Returns the AnyEvent::RabbitMQ::Channel used by this object.

rpc_queue queue => NAME, on_success => CALLBACK

Creates the queue with the given name, used to schedule jobs. These queues are durable, and thus persist across program invocations and RabbitMQ restarts.

The on_success callback is called once the queue is known to exist. The on_failure may alternately be called with a reason if the queue creation fails.

reply_queue on_success => CALLBACK

Creates a temporary queue used to reply to job requests. These queues are anonymous and ephemeral, and are torn down after each RPC call.

The on_success callback is called with the name of the queue that has been created. The on_failure may alternately be called with a reason if the queue creation fails.


Alex Vandiver <>


All bugs should be reported via or


This software is Copyright (c) 2012 by Best Practical Solutions

This is free software, licensed under:

  The GNU General Public License, Version 2, June 1991