Net::AMQP::RabbitMQ::Batch - simple batch processing of messages for RabbitMQ.
my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak; $rb->process({ from_queue => 'test_in', routing_key => 'test_out', handler => \&msg_handler, batch => { size => 10, # batch size timeout => 2, # ignore_size => 0 # ignore in/out batches size mismatch }, ignore_errors => 0, # ignore handler errors publish_options => { exchange => 'exchange_out', # exchange name, default is 'amq.direct' }, }); sub msg_handler { my $messages = shift; # work with 10 messages return $messages; }
Assume you read messages from a queue, process them and publish. But you would like to do it in batches, processing many messages at once.
This module:
gets messages from in queue and publish them by routing key
uses your handler to batch process messages
keeps persistency - if processing fails, nothing lost from input queue, nothing published
Define a messages handler:
sub msg_handler { my $messages = shift; # works with hashref of messages return $messages; }
$messages is an arrayref of message objects:
$messages
{ body => 'Magic Transient Payload', # the reconstructed body routing_key => 'nr_test_q', # route the message took delivery_tag => 1, # (used for acks) .... # Not all of these will be present. Consult the RabbitMQ reference for more details. props => { ... } }
Handler should return arrayref of message objects (only body is required):
body
[ { body => 'Processed message' }, ... ]
Connect to RabbitMQ:
my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;
And process a batch:
$rb->process({ from_queue => 'test_in', routing_key => 'test_out', handler => \&msg_handler, batch => { size => 10 } });
You might like to wrap it with while(1) {...} loop. See process_in_batches.pl or process_in_forked_batches.pl for example.
while(1) {...}
Can not set infinity timeout (use very long int)
No individual messages processing possible
No tests yet which is very sad :(
Alex Svetkin
MIT
To install Net::AMQP::RabbitMQ::Batch, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Net::AMQP::RabbitMQ::Batch
CPAN shell
perl -MCPAN -e shell install Net::AMQP::RabbitMQ::Batch
For more information on module installation, please visit the detailed CPAN module installation guide.