use Moose;
use Time::HiRes qw(usleep);
use Promises qw(deferred collect);
=head1 NAME
AnyEvent::RabbitMQ::PubSub::Consumer - rabbitmq consumer
=cut
has channel => (
is => 'ro', isa => 'AnyEvent::RabbitMQ::Channel', required => 1
);
has exchange => (
is => 'ro', isa => 'HashRef', required => 1
);
has queue => (
is => 'ro', isa => 'HashRef', required => 1
);
has routing_key => (
is => 'ro', isa => 'Str', default => '#'
);
has prefetch_count => (
is => 'ro', isa => 'Int', default => 5,
);
=head1 METHODS
=head2 init()
set prefetch_count
declare exchange and queue
=cut
sub init {
my ($self) = @_;
$self->channel->qos(prefetch_count => $self->prefetch_count);
my $cv = AnyEvent->condvar;
$self->declare_exchange_and_queue()
->then( sub { $self->bind_queue() })
->then( sub { $cv->send() })
->catch(sub { $cv->croak(@_) });
$cv->recv();
return
}
=head2 consume($cv, $on_consume)
run consume C<$on_consume> code on channel
return L<Promise>
my $cv = AnyEvent->condvar();
$self->consume(
$cv,
sub {
my ($consumer, $msg) = @_;
...
}
)->then(sub {
say 'Consumer was started...';
});
=cut
sub consume {
my ($self, $cv, $on_consume) = @_;
my $d = deferred();
$self->channel->consume(
queue => $self->queue->{queue},
no_ack => 0,
on_success => sub { $d->resolve() },
on_cancel => sub {AnyEvent::RabbitMQ::PubSub::_report_error($cv, @_)},
on_failure => sub {AnyEvent::RabbitMQ::PubSub::_report_error($cv, @_)},
on_consume => sub { $on_consume->($self, @_) },
);
return $d->promise
}
=head2 reject_and_republish($msg)
reject (drop) message
and after 10ms (to avoid 100% CPU)
republish message back (to end of queue)
=cut
sub reject_and_republish {
my ($self, $msg) = @_;
usleep 10_000; # wait 10 ms before republish to avoid 100 % CPU
$self->reject($msg);
$msg->{header}{headers}{trials}++;
$self->channel->publish(
body => $msg->{body}->{payload},
header => $msg->{header},
exchange => "",
routing_key => $self->queue->{queue},
);
}
=head2 reject($msg)
reject (drop) message
=cut
sub reject {
my ($self, $msg) = @_;
warn "Message to reject not specified" if !defined $msg;
my $delivery_tag = $msg->{deliver}{method_frame}{delivery_tag};
$self->channel->reject(delivery_tag => $delivery_tag);
}
=head2 ack($msg)
ack C<$msg> same as
$consumer->channel->ack(delivery_tag => $msg->{deliver}{method_frame}{delivery_tag});
=cut
sub ack {
my ($self, $msg) = @_;
warn "Message to ack not specified" if !defined $msg;
my $delivery_tag = $msg->{deliver}{method_frame}{delivery_tag};
$self->channel->ack(delivery_tag => $delivery_tag);
}
sub declare_exchange_and_queue {
my ($self, $cv) = @_;
return collect(
$self->declare_exchange(),
$self->declare_queue(),
)->then(sub {
return @{ $_[0] }
});
}
sub declare_queue {
my ($self) = @_;
my $d = deferred;
$self->channel->declare_queue(
%{ $self->queue },
on_success => sub { $d->resolve() },
on_failure => sub { $d->reject(@_) },
);
return $d->promise()
}
sub declare_exchange {
my ($self) = @_;
my $d = deferred;
$self->channel->declare_exchange(
%{ $self->exchange },
on_success => sub { $d->resolve() },
on_failure => sub { $d->reject(@_) },
);
return $d->promise()
}
sub bind_queue {
my ($self) = @_;
my $d = deferred;
$self->channel->bind_queue(
queue => $self->queue->{queue},
exchange => $self->exchange->{exchange},
routing_key => $self->routing_key,
on_success => sub { $d->resolve() },
on_failure => sub { $d->reject(@_) },
);
return $d->promise()
}
1