MangoX::Queue - A MongoDB queue implementation using Mango
MangoX::Queue is a MongoDB backed queue implementation using Mango to support blocking and non-blocking queues.
MangoX::Queue makes no attempt to handle the Mango connection, database or collection - pass in a collection to the constructor and MangoX::Queue will use it. The collection can be plain, capped or sharded.
use Mango; use MangoX::Queue; my $mango = Mango->new("mongodb://localhost:27017"); my $collection = $mango->db('my_db')->collection('my_queue'); my $queue = MangoX::Queue->new(collection => $collection); # To add a job my $id = enqueue $queue 'test'; # Blocking enqueue $queue 'test' => sub { my $id = shift; }; # Non-blocking # To set options my $id = enqueue $queue priority => 1, created => DateTime->now, 'test'; # Blocking enqueue $queue priority => 1, created => DateTime->now, 'test' => sub { my $id = shift; }; # Non-blocking # To watch for a specific job status watch $queue $id; # Blocking watch $queue $id, 'Complete' => sub { # Non-blocking # Job status is 'Complete' }; # To fetch a job my $job = fetch $queue; # Blocking fetch $queue sub { # Non-blocking my ($job) = @_; # ... }; # To get a job by id my $job = get $queue $id; # Blocking get $queue $id => sub { my $job = shift; }; # Non-blocking # To requeue a job my $id = requeue $queue $job; # Blocking requeue $queue $job => sub { my $id = shift; }; # Non-blocking # To dequeue a job dequeue $queue $id; # Blocking dequeue $queue $id => sub { }; # Non-blocking # To consume a queue while(my $job = consume $queue) { # Blocking # ... } my $consumer = consume $queue sub { # Non-blocking my ($job) = @_; # ... }; # To stop consuming a queue release $queue $consumer; # To listen for events on $queue enqueued => sub ( my ($queue, $job) = @_; }; on $queue dequeued => sub ( my ($queue, $job) = @_; }; on $queue consumed => sub { my ($queue, $job) = @_; }; # To register a plugin plugin $queue 'MangoX::Queue::Plugin::Statsd';
MangoX::Queue implements the following attributes.
my $collection = $queue->collection; $queue->collection($mango->db('foo')->collection('bar')); my $queue = MangoX::Queue->new(collection => $collection);
The Mango::Collection representing the MongoDB queue collection.
my $delay = $queue->delay; $queue->delay(MangoX::Queue::Delay->new);
The MangoX::Queue::Delay responsible for dynamically controlling the delay between queue queries.
my $plugins = $queue->plugins;
Returns a hash containing the plugins registered with this queue.
my $retries = $queue->retries; $queue->retries(5);
The number of times a job will be picked up from the queue before it is marked as failed.
my $timeout = $queue->timeout; $queue->timeout(10);
The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds.
MangoX::Queue inherits from Mojo::EventEmitter and emits the following events
on $queue consumed => sub { my ($queue, $job) = @_; # ... };
Emitted when an item is consumed (either via consume or fetch)
on $queue dequeued => sub { my ($queue, $job) = @_; # ... };
Emitted when an item is dequeued
on $queue enqueued => sub { my ($queue, $job) = @_; # ... };
Emitted when an item is enqueued
MangoX::Queue implements the following methods.
# In blocking mode while(my $job = consume $queue) { # ... } while(my $job = $queue->consume) { # ... } # In non-blocking mode consume $queue sub { my ($job) = @_; # ... }; $queue->consume(sub { my ($job) = @_; # ... });
Waits for jobs to arrive on the queue, sleeping between queue checks using MangoX::Queue::Delay or Mojo::IOLoop.
Currently sets the status to 'Retrieved' before returning the job.
my $job = fetch $queue; dequeue $queue $job;
Dequeues a job. Currently removes it from the collection.
enqueue $queue 'job name'; enqueue $queue [ 'some', 'data' ]; enqueue $queue +{ foo => 'bar' }; $queue->enqueue('job name'); $queue->enqueue([ 'some', 'data' ]); $queue->enqueue({ foo => 'bar' });
Add an item to the queue.
Currently uses priority 1 with a job status of 'Pending'.
# In blocking mode my $job = fetch $queue; my $job = $queue->fetch; # In non-blocking mode fetch $queue sub { my ($job) = @_; # ... }; $queue->fetch(sub { my ($job) = @_; # ... });
Fetch a single job from the queue, returning undef if no jobs are available.
Currently sets job status to 'Retrieved'.
my $job = get $queue $id;
Gets a job from the queue by ID. Doesn't change the job status.
my $options = $queue->get_options;
Returns the Mango::Collection options hash used by find_and_modify to identify and update available queue items.
Wait for a job to enter a certain status.
my $consumer = consume $queue sub { # ... }; release $queue $consumer;
Releases a non-blocking consumer from watching a queue.
my $job = fetch $queue; requeue $queue $job;
Requeues a job. Sets the job status to 'Pending'.
my $job = fetch $queue; $job->{status} = 'Failed'; update $queue $job;
Updates a job in the queue.
# In blocking mode my $id = enqueue $queue 'test'; watch $queue $id, 'Complete'; # blocks until job is complete # In non-blocking mode my $id = enqueue $queue 'test'; watch $queue $id, 'Complete' => sub { # ... };
Mojolicious, Mango
To install MangoX::Queue, copy and paste the appropriate command in to your terminal.
cpanm
cpanm MangoX::Queue
CPAN shell
perl -MCPAN -e shell install MangoX::Queue
For more information on module installation, please visit the detailed CPAN module installation guide.