The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Net::AMQP::RabbitMQ::Batch - simple batch processing of messages for RabbitMQ.

SYNOPSIS

    my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;
    $rb->process({
        from_queue  => 'test_in',
        routing_key => 'test_out',
        handler     => \&msg_handler,
        batch       => {
            size          => 10,        # batch size
            timeout       => 2,         #
            ignore_size   => 0          # ignore in/out batches size mismatch
        },
        ignore_errors => 0,             # ignore handler errors
        publish_options => {
            exchange => 'exchange_out', # exchange name, default is 'amq.direct'
        },
    });

    sub msg_handler {
        my $messages = shift;
        # work with 10 messages
        return $messages;
    }

DESCRIPTION

Assume you read messages from a queue, process them and publish. But you would like to do it in batches, processing many messages at once.

This module:

  • gets messages from in queue and publish them by routing key

  • uses your handler to batch process messages

  • keeps persistency - if processing fails, nothing lost from input queue, nothing published

USAGE

Define a messages handler:

    sub msg_handler {
        my $messages = shift;
        # works with hashref of messages
        return $messages;
    }

$messages is an arrayref of message objects:

    {
        body => 'Magic Transient Payload', # the reconstructed body
        routing_key => 'nr_test_q',        # route the message took
        delivery_tag => 1,                 # (used for acks)
        ....
        # Not all of these will be present. Consult the RabbitMQ reference for more details.
        props => { ... }
    }

Handler should return arrayref of message objects (only body is required):

    [
        { body => 'Processed message' },
        ...
    ]

Connect to RabbitMQ:

    my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;

And process a batch:

    $rb->process({
        from_queue  => 'test_in',
        routing_key => 'test_out',
        handler     => \&msg_handler,
        batch       => { size => 10 }
    });

You might like to wrap it with while(1) {...} loop. See process_in_batches.pl or process_in_forked_batches.pl for example.

METHODS

process()

Known Issues

  • Can not set infinity timeout (use very long int)

  • No individual messages processing possible

  • No tests yet which is very sad :(

AUTHORS

Alex Svetkin

LICENSE

MIT