RabbitMQ::Consumer::Batcher - batch consumer of RMQ messages
use AnyEvent; use AnyEvent::RabbitMQ::PubSub; use AnyEvent::RabbitMQ::PubSub::Consumer; use RabbitMQ::Consumer::Batcher; my ($rmq_connection, $channel) = AnyEvent::RabbitMQ::PubSub::connect( host => 'localhost', port => 5672, user => 'guest', pass => 'guest', vhost => '/', ); my $exchange = { exchange => 'my_test_exchange', type => 'topic', durable => 0, auto_delete => 1, }; my $queue = { queue => 'my_test_queue'; auto_delete => 1, }; my $routing_key = 'my_rk'; my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new( channel => $channel, exchange => $exchange, queue => $queue, routing_key => $routing_key, ); $consumer->init(); #declares channel, queue and binding my $batcher = RabbitMQ::Consumer::Batcher->new( batch_size => $consumer->prefetch_count, on_add => sub { my ($batcher, $msg) = @_; my $decode_payload = decode_payload($msg->{header}, $msg->{body}->payload()); return $decode_payload; }, on_add_catch => sub { my ($batcher, $msg, $exception) = @_; if ($exception->$_isa('failure') && $exception->{payload}{stats_key}) { $stats->increment($exception->{payload}{stats_key}); } if ($exception->$_isa('failure') && $exception->{payload}{reject}) { $batcher->reject($msg); $log->error("consume failed - reject: $exception\n".$msg->{body}->payload()); } else { $batcher->reject_and_republish($msg); $log->error("consume failed - republish: $exception"); } }, on_batch_complete => sub { my ($batcher, $batch) = @_; path(...)->spew(join "\t", map { $_->value() } @$batch); }, on_batch_complete_catch => sub { my ($batcher, $batch, $exception) = @_; $log->error("save messages to file failed: $exception"); } ); my $cv = AnyEvent->condvar(); $consumer->consume($cv, $batcher->consume_code())->then(sub { say 'Consumer was started...'; });
If you need batch of messages from RabbitMQ - this module is for you.
This module work well with AnyEvent::RabbitMQ::PubSub::Consumer
Idea of this module is - in on_add phase is message validate and if is corrupted, can be reject. In on_batch_complete phase we manipulated with message which we don't miss. If is some problem in this phase, messages are republished..
Max batch size (trigger for on_batch_complete)
on_batch_complete
batch_size must be prefetch_count or bigger!
batch_size
prefetch_count
this is required attribute
this callback are called after consume one single message. Is usefully for decoding for example.
return value of callback are used as value in batch item (RabbitMQ::Consumer::Batcher::Item)
default behaviour is payload of message is used as item in batch
return sub { my($batcher, $msg) = @_; return $msg->{body}->payload() }
parameters which are give to callback:
$batcher
self instance of RabbitMQ::Consumer::Batcher
$msg
consumed message "on_consume" in AnyEvent::RabbitMQ::Channel
this callback are called if on_add callback throws
on_add
default behaviour do reject message
return sub { my ($batcher, $msg, $exception) = @_; $batcher->reject($msg); }
$exception
exception string
this callback is triggered if batch is complete (count of items is batch_size)
$batch
batch is ArrayRef of RabbitMQ::Consumer::Batcher::Item
example on_batch_complete CodeRef (item value are strings)
return sub { my($batcher, $batch) = @_; print join "\n", map { $_->value() } @$batch; $batcher->ack($batch); }
this callback are called if on_batch_complete callback throws
after this callback is batch reject_and_republish
If you need change reject_and_republish of batch to (for example) reject, you can do:
return sub { my ($batcher, $batch, $exception) = @_; $batcher->reject($batch); #batch_clean must be called, #because reject_and_republish after this exception handler will be called to... $batcher->batch_clean(); }
ArrayRef of RabbitMQ::Consumer::Batcher::Items
return sub{} for handling messages in consume method of AnyEvent::RabbitMQ::PubSub::Consumer
sub{}
consume
$consumer->consume($cv, $batcher->consume_code());
ack all @items (instances of RabbitMQ::Consumer::Batcher::Item or RabbitMQ::Consumer::Batcher::Message)
@items
reject all @items (instances of RabbitMQ::Consumer::Batcher::Item or RabbitMQ::Consumer::Batcher::Message)
reject and republish all @items (instances of RabbitMQ::Consumer::Batcher::Item or RabbitMQ::Consumer::Batcher::Message)
for dependency use cpanfile...
for resolve dependency use Carton (or Carmel - is more experimental)
carton install
for run test use minil test
minil test
carton exec minil test
if you don't have perl environment, is best way use docker
docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton install docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton exec minil test
docker run default as root, all files which will be make in docker will be have root rights
one solution is change rights in docker
docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended bash -c "carton install; chmod -R 0777 ."
or after docker command (but you must have root rights)
Copyright (C) Avast Software.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
Jan Seidl <seidl@avast.com>
To install RabbitMQ::Consumer::Batcher, copy and paste the appropriate command in to your terminal.
cpanm
cpanm RabbitMQ::Consumer::Batcher
CPAN shell
perl -MCPAN -e shell install RabbitMQ::Consumer::Batcher
For more information on module installation, please visit the detailed CPAN module installation guide.