The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

AnyEvent::RabbitMQ::Simple - Easy to use asynchronous AMQP client

VERSION

version 0.02

SYNOPSIS

    use strict;
    use warnings;
    use AnyEvent::RabbitMQ::Simple;

    # create main loop
    my $loop = AE::cv;

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        host       => '127.0.0.1',
        port       => 5672,
        user       => 'username',
        pass       => 'password',
        vhost      => '/',
        timeout    => 1,
        tls        => 0,
        verbose    => 0,
        confirm_publish => 1,
        prefetch_count => 10,

        failure_cb => sub {
            my ($event, $details, $why) = @_;
            if ( ref $why ) {
                my $method_frame = $why->method_frame;
                $why = $method_frame->reply_text;
            }
            $loop->croak("[ERROR] $event($details): $why" );
        },

        # routing layout
        # [========== exchanges ===================] [===== queues ==============]
        # [             (type/routing key)         ] [        (routing key) ]
        #  logger ----------> stats -------------->   stats-logs
        #   |(fanout)           (direct)                (mail.stats)
        #   |  |
        #   |  | \----------> errors ------------->   ftp-error-logs
        #   |  |              | (topic:*.error.#)       (ftp.error.#)
        #   |  |              |
        #   |  |              \------------------->   mail-error-logs
        #   |  |                                        (mail.error.#)
        #   |  |
        #   |   \-----------> info --------------->   info-logs
        #   |                   (topic:*.info.#)        (*.info.#)
        #   |
        #    \------------------------------------>   debug-queue


        # declare exchanges
        exchanges => [
            'logger' => {
                durable => 0,
                type => 'fanout',
                internal => 0,
                auto_delete => 1,
            },
            'stats' => {
                durable => 0,
                type => 'direct',
                internal => 0,
                auto_delete => 1,
            },
            'errors' => {
                durable => 0,
                type => 'topic',
                internal => 0,
                auto_delete => 1,
            },
            'info' => {
                durable => 0,
                type => 'topic',
                internal => 0,
                auto_delete => 1,
            },
        ],

        # declare queues
        queues => [
            'debug-queue' => {
                durable => 0,
                auto_delete => 1,
            },
            'stats-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'ftp-error-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'mail-error-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'info-logs' => {
                durable => 0,
                auto_delete => 1,
            },
        ],

        # exchange to exchange bindings, with optional routing key
        bind_exchanges => [
            { 'stats'   =>   'logger'                 },
            { 'errors'  => [ 'logger', '*.error.#' ]  },
            { 'info'    => [ 'logger', '*.info.#'  ]  },
        ],


        # queue to exchange bindings, with optional routing key
        bind_queues => [
            { 'debug-queue'     =>   'logger'                   },
            { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
            { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
            { 'info-logs'       => [ 'info',   'info.#'       ] },
            { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
        ],

    );

    # publisher timer
    my $t;

    # connect and set up channel
    my $conn = $rmq->connect();
    $conn->cb(
        sub {
            print "waiting for channel..\n";
            my $channel = shift->recv or $loop->croak("Could not open channel");

            print "************* consuming\n";
            for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) {
                consume($channel, $q);
            }

            print "************* starting publishing\n";
            $t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) };
        }
    );

    # consumes from requested queue
    sub consume {
        my ($channel, $queue) = @_;

        my $consumer_tag;

        $channel->consume(
            queue => $queue,
            no_ack => 0,
            on_success => sub {
                my $frame = shift;
                $consumer_tag = $frame->method_frame->consumer_tag;
                print "************* consuming from $queue with $consumer_tag\n";
            },
            on_consume => sub {
                my $res = shift;
                my $body = $res->{body}->payload;
                print "+++++++++++++ consumed($queue): $body\n";
                $channel->ack(
                    delivery_tag => $res->{deliver}->method_frame->delivery_tag
                );
            },
            on_failure => sub {
                print "************* failed to consume($queue)\n";
            }
        );
    }

    # randomly generates routing key and message body
    sub publish {
        my ($channel, $msg) = @_;

        unless ( $channel->is_open ) {
            warn "Cannot publish, channel closed";
            return;
        }

        my @system = qw( mail ftp web );
        my @levels = qw( debug info error stats );

        my $routing_key = $system[rand @system] .'.'. $levels[ rand @levels ];

        $msg = sprintf("[%s] %s", uc($routing_key), $msg);
        print "\n------- publishing: $msg\n";
        $channel->publish(
            routing_key => $routing_key,
            exchange => 'logger',
            body => $msg,
            on_ack => sub {
                print "------- published: $msg\n";
            },
            on_return => sub {
                print "************* failed to publish: $msg\n";
            }
        );
    }

    # wait forever or die on error
    my $done = $loop->recv;

DESCRIPTION

This module is meant to simplify the process of setting up the RabbitMQ channel, so you can start publishing and/or consuming messages without chaining on_success callbacks.

METHODS

new

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        ...
    );

Returns configured object using following parameters:

host

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        host => '127.0.0.1', # default
        ...
    );

Host IP.

port

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        port => 5672, # default
        ...
    );

Port number.

vhost

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        vhost => '/', # default
        ...
    );

Virtual host namespace.

user

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        user => 'guest', # default
        ...
    );

User name.

pass

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        pass => 'guest', # default
        ...
    );

Password.

tune

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        tune => {
            heartbeat => $connection_heartbeat,
            channel_max => $max_channel_number,
            frame_max => $max_frame_size
        },
        ...
    );

Optional connection tuning options.

timeout

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        timeout => 0, # default
        ...
    );

Connection timeout.

tls

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        tls => 0, # default
        ...
    );

Use TLS.

verbose

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        verbose => 0, # default
        ...
    );

Turn on protocol debug.

confirm_publish

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        confirm_publish => 0, # default
        ...
    );

Turn on confirm mode on channel. If set it enables the on_ack callback of channel's publish method.

prefetch_count

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        prefetch_count => 0, # default
        ...
    );

Specify the number of prefetched messages when consuming from the channel.

exchange

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        exchange => 'name_of_exchange',
        ...
    );

Optional name of exchange to declare with its default configuration options.

See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details.

exchanges

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        exchanges => [
            'name_of_exchange' => {
                durable => 1,
                type => 'fanout',
                ... # other exchange configuration parameters
            },
            ...
        ],
        ...
    );

Optional list of exchanges to declare with their configuration options.

See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details.

queue

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        queue => 'name_of_queue',
        ...
    );

Optional name of queue to declare with its default configuration options.

If no queues were declared or empty name has been specified a unique generated queue name will be available:

    my $gen_queue = $rmq->gen_queue;

See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.

queues

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        queues => [
            'name_of_queue' => {
                durable => 1,
                no_ack => 0,
                ... # other queue configuration parameters
            },
            ...
        ],
        ...
    );

Optional list of queues to declare with their configuration options.

See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.

bind_exchanges

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        bind_exchanges => [
            # without routing key
            { 'destination1' => 'source' },

            # with routing key
            { 'destination2'  => [ 'source', 'routing_key' ]  },
            ...
        ],
        ...
    );

Optional list of exchange-to-exchange bindings.

See "bind_exchange" in AnyEvent::RabbitMQ::Channel for details.

bind_queues

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        bind_queues => [
            # without routing key
            { 'queue1' => 'exchange' },

            # with routing key
            { 'queue2'  => [ 'exchange', 'routing_key' ]  },
            ...
        ],
        ...
    );

Optional list of queue-to-exchange bindings.

See "bind_queue" in AnyEvent::RabbitMQ::Channel for details.

failure_cb

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        failure_cb => sub {
            my ($event, $details, $why) = @_;
            if ( ref $why ) {
                my $method_frame = $why->method_frame;
                $why = $method_frame->reply_text;
            }
            $loop->croak("[ERROR] $event($details): $why" );
        },
        ...
    );

Required catch-all error handling callback. The value of $event is one of:

ConnectOnFailure
ConnectOnReadFailure
ConnectOnReturn
ConnectOnClose
OpenChannelOnFailure
OpenChannelOnReturn
OpenChannelOnClose
DeclareExchangeOnFailure

Value of $details has following format: name:$name_of_exchange.

BindExchangeOnFailure

Value of $details has following format: source:$name_of_source_exchange, destination:$name_of_destination_exchange.

DeclareQueueOnFailure

Value of $details has following format: name:$name_of_queue.

BindQueueOnFailure

Value of $details has following format: queue:$name_of_queue, exchange:$name_of_exchange.

ConfirmChannelOnFailure
QosChannelOnFailure

connect

    my $conn = $rmq->connect();
    $conn->cb(
        sub {
            my $channel = shift->recv or $loop->croak("Could not open channel");

            ...
        }
    );

Returns the AnyEvent condvar that returns AnyEvent::RabbitMQ::Channel object after all the configuration steps were successful.

disconnect

    $rmq->disconnect();

Disconnects from RabbitMQ server.

gen_queue

    my $gen_queue = $rmq->gen_queue;

Name of the generated queue if no queues were declared (or queue with empty name has been specified).

SEE ALSO

AUTHOR

Alex J. G. Burzyński <ajgb@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2016 by Alex J. G. Burzyński <ajgb@cpan.org>.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.