MangoX::Queue - A MongoDB queue implementation using Mango
MangoX::Queue is a MongoDB backed queue implementation using Mango to support blocking and non-blocking queues.
MangoX::Queue makes no attempt to handle the Mango connection, database or collection - pass in a collection to the constructor and MangoX::Queue will use it. The collection can be plain, capped or sharded.
For an introduction to MangoX::Queue, see MangoX::Queue::Tutorial.
Non-blocking mode requires a running Mojo::IOLoop.
my $queue = MangoX::Queue->new(collection => $mango_collection); # To add a job enqueue $queue 'test' => sub { my $id = shift; }; # To set options enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test' => sub { my $id = shift; }; # To watch for a specific job status watch $queue $id, 'Complete' => sub { # Job status is 'Complete' }; # To fetch a job fetch $queue sub { my ($job) = @_; # ... }; # To get a job by id get $queue $id => sub { my $job = shift; }; # To requeue a job requeue $queue $job => sub { my $id = shift; }; # To dequeue a job dequeue $queue $id => sub { }; # To consume a queue my $consumer = consume $queue sub { my ($job) = @_; # ... }; # To stop consuming a queue release $queue $consumer; # To listen for errors on $queue error => sub { my ($queue, $error) = @_; };
my $queue = MangoX::Queue->new(collection => $mango_collection); # To add a job my $id = enqueue $queue 'test'; # To set options my $id = enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test'; # To watch for a specific job status watch $queue $id; # To fetch a job my $job = fetch $queue; # To get a job by id my $job = get $queue $id; # To requeue a job my $id = requeue $queue $job; # To dequeue a job dequeue $queue $id; # To consume a queue while(my $job = consume $queue) { # ... }
my $queue = MangoX::Queue->new(collection => $mango_collection); # To listen for events on $queue enqueued => sub ( my ($queue, $job) = @_; }; on $queue dequeued => sub ( my ($queue, $job) = @_; }; on $queue consumed => sub { my ($queue, $job) = @_; }; # To register a plugin plugin $queue 'MangoX::Queue::Plugin::Statsd';
MangoX::Queue implements the following attributes.
my $collection = $queue->collection; $queue->collection($mango->db('foo')->collection('bar')); my $queue = MangoX::Queue->new(collection => $collection);
The Mango::Collection representing the MongoDB queue collection.
my $delay = $queue->delay; $queue->delay(MangoX::Queue::Delay->new);
The MangoX::Queue::Delay responsible for dynamically controlling the delay between queue queries.
my $concurrent_job_limit = $queue->concurrent_job_limit; $queue->concurrent_job_limit(20);
The maximum number of concurrent jobs (jobs consumed from the queue and unfinished). Defaults to 10.
This only applies to jobs on the queue in non-blocking mode. MangoX::Queue has an internal counter that is incremented when a job has been consumed from the queue (in non-blocking mode). The job returned is a MangoX::Queue::Job instance and has a descructor method that is called to decrement the internal counter. See MangoX::Queue::Job for more details.
Set to -1 to disable queue concurrency limits. Use with caution, this could result in out of memory errors or an extremely slow event loop.
my $plugins = $queue->plugins;
Returns a hash containing the plugins registered with this queue.
my $retries = $queue->retries; $queue->retries(5);
The number of times a job will be picked up from the queue before it is marked as failed.
my $timeout = $queue->timeout; $queue->timeout(10);
The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds.
MangoX::Queue inherits from Mojo::EventEmitter and emits the following events.
Events are emitted only for actions on the current queue object, not the entire queue.
on $queue consumed => sub { my ($queue, $job) = @_; # ... };
Emitted when an item is consumed (either via consume or fetch)
on $queue dequeued => sub { my ($queue, $job) = @_; # ... };
Emitted when an item is dequeued
on $queue enqueued => sub { my ($queue, $job) = @_; # ... };
Emitted when an item is enqueued
on $queue enqueued => sub { my ($queue, $concurrent_job_limit) = @_; # ... };
Emitted when a job is found but the </concurrent_job_limit> limit has been reached.
MangoX::Queue implements the following methods.
# In blocking mode while(my $job = consume $queue) { # ... } # In non-blocking mode consume $queue sub { my ($job) = @_; # ... };
Waits for jobs to arrive on the queue, sleeping between queue checks using MangoX::Queue::Delay or Mojo::IOLoop.
Currently sets the status to 'Retrieved' before returning the job.
my $job = fetch $queue; dequeue $queue $job;
Dequeues a job. Currently removes it from the collection.
my $id = enqueue $queue 'job name'; my $id = enqueue $queue [ 'some', 'data' ]; my $id = enqueue $queue +{ foo => 'bar' };
Add an item to the queue in blocking mode. The default priority is 1 and status is 'Pending'.
You can set queue options including priority, created and status.
my $id = enqueue $queue, priority => 1, created => time, status => 'Pending', +{ foo => 'bar' };
For non-blocking mode, pass in a coderef as the final argument.
my $id = enqueue $queue 'job_name' => sub { # ... }; my $id = enqueue $queue priority => 1, +{ foo => 'bar', } => sub { # ... };
Sets the status to 'Pending' by default.
# In blocking mode my $job = fetch $queue; # In non-blocking mode fetch $queue sub { my ($job) = @_; # ... };
Fetch a single job from the queue, returning undef if no jobs are available.
Currently sets job status to 'Retrieved'.
# In non-blocking mode get $queue $id => sub { my ($job) = @_; # ... }; # In blocking mode my $job = get $queue $id;
Gets a job from the queue by ID. Doesn't change the job status.
You can also pass in a job instead of an ID.
$job = get $queue $job;
my $options = $queue->get_options;
Returns the Mango::Collection options hash used by find_and_modify to identify and update available queue items.
my $consumer = consume $queue sub { # ... }; release $queue $consumer;
Releases a non-blocking consumer from watching a queue.
my $job = fetch $queue; requeue $queue $job;
Requeues a job. Sets the job status to 'Pending'.
my $job = fetch $queue; $job->{status} = 'Failed'; update $queue $job;
Updates a job in the queue.
Wait for a job to enter a certain status.
# In blocking mode my $id = enqueue $queue 'test'; watch $queue $id, 'Complete'; # blocks until job is complete # In non-blocking mode my $id = enqueue $queue 'test'; watch $queue $id, 'Complete' => sub { # ... };
Jobs can be queued in advance by setting a delay_until attribute:
enqueue $queue delay_until => (time + 20), "job name";
Errors are reported by MangoX::Queue using callbacks and Mojo::EventEmitter
To listen for all errors on a queue, subscribe to the 'error' event:
$queue->on(error => sub { my ($queue, $job, $error) = @_; # ... });
To check for errors against an individual update, enqueue or dequeue call, you can check for an error argument to the callback sub:
enqueue $queue +$job => sub { my ($job, $error) = @_; if($error) { # ... } }
MangoX::Queue::Tutorial, Mojolicious, Mango
To install MangoX::Queue, copy and paste the appropriate command in to your terminal.
cpanm
cpanm MangoX::Queue
CPAN shell
perl -MCPAN -e shell install MangoX::Queue
For more information on module installation, please visit the detailed CPAN module installation guide.