The Perl Toolchain Summit 2025 Needs You: You can help 🙏 Learn more

use strict;
use Carp qw(croak);
use Redis;
our $SerealEncoder;
our $SerealDecoder;
getters => [qw(server port queue_name db _redis_conn)],
};
sub new {
my ($class, %params) = @_;
for (qw(server port queue_name)) {
croak("Need '$_' parameter")
if not exists $params{$_};
}
my $self = bless({
(map {$_ => $params{$_}} qw(server port queue_name) ),
db => $params{db} || 0,
_redis_conn => undef,
} => $class);
$self->{_redis_conn} = Redis->new(
%{$params{redis_options} || {}},
encoding => undef, # force undef for binary data
server => join(":", $self->server, $self->port),
);
$self->_redis_conn->select($self->db) if $self->db;
return $self;
}
sub enqueue_item {
my $self = shift;
croak("Need exactly one item to enqeue")
if not @_ == 1;
my ($blob) = $self->_serialize($_[0]);
$self->_redis_conn->lpush($self->queue_name, $blob);
}
sub enqueue_items {
my $self = shift;
return if not @_;
my $qn = $self->queue_name;
my $conn = $self->_redis_conn;
my @blobs = $self->_serialize(@_);
$conn->lpush($qn, @blobs);
}
sub claim_item {
my ($self) = @_;
my ($rv) = $self->_deserialize( $self->_redis_conn->rpop($self->queue_name) );
return $rv;
}
sub claim_items {
my ($self, $n) = @_;
$n ||= 1;
my $conn = $self->_redis_conn;
my $qn = $self->queue_name;
my @elem;
$conn->rpop($qn, sub {push @elem, $_[0]}) for 1..$n;
$conn->wait_all_responses;
return $self->_deserialize( grep defined, @elem );
}
sub flush_queue {
my $self = shift;
$self->_redis_conn->del($self->queue_name);
}
sub queue_length {
my $self = shift;
my ($len) = $self->_redis_conn->llen($self->queue_name);
return $len;
}
sub _serialize {
my $self = shift;
$SerealEncoder ||= Sereal::Encoder->new({stringify_undef => 1, warn_undef => 1});
return map $SerealEncoder->encode($_), @_;
}
sub _deserialize {
my $self = shift;
$SerealDecoder ||= Sereal::Decoder->new();
return map defined($_) ? $SerealDecoder->decode($_) : $_, @_;
}
1;
__END__
=head1 NAME
Queue::Q::NaiveFIFO::Redis - In-memory Redis implementation of the NaiveFIFO queue
=head1 SYNOPSIS
use Queue::Q::NaiveFIFO::Redis;
my $q = Queue::Q::NaiveFIFO::Redis->new(
server => 'myredisserver',
port => 6379,
queue_name => 'my_work_queue',
);
$q->enqueue_item("foo");
$q->enqueue_item({ bar => "baz" }); # any Sereal-serializable data structure
my $foo = $q->claim_item;
my $bar = $q->claim_item;
=head1 DESCRIPTION
Implements interface defined in L<Queue::Q::NaiveFIFO>:
an implementation based on Redis lists.
The data structures passed to C<enqueue_item> are serialized
using Sereal (cf. L<Sereal::Encoder>, L<Sereal::Decoder>), so
any data structures supported by that can be enqueued.
=head1 METHODS
All methods of L<Queue::Q::NaiveFIFO> plus:
=head2 new
Constructor. Takes named parameters. Required parameters are
the C<server> hostname or address, the Redis C<port>, and
the name of the Redis key to use as the C<queue_name>.
You may optionally specify a Redis C<db> number to use.
Since this module will establish the Redis connection,
you may pass in a hash reference of options that are valid
for the constructor of the L<Redis> module. This can be
passed in as the C<redis_options> parameter.
=head2 claim_item($timeout_secs)
The claim_item method has an optional parameter here, which
is the timeout in seconds it will wait for a new item.
Default wait time is one second. Using a timeout > 0 sec, no
additional sleep() calls are needed and items will be available
to the consumer without a delay.
=head1 AUTHOR
Steffen Mueller, E<lt>smueller@cpan.orgE<gt>
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2012 by Steffen Mueller
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.1 or,
at your option, any later version of Perl 5 you may have available.
=cut