The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

Minion::Backend::Redis - Redis backend for Minion job queue

SYNOPSIS

  use Minion::Backend::Redis;
  my $backend = Minion::Backend::Redis->new('redis://127.0.0.1:6379/5');

  # Minion
  use Minion;
  my $minion = Minion->new(Redis => 'redis://127.0.0.1:6379');

  # Mojolicious (via Mojolicious::Plugin::Minion)
  $self->plugin(Minion => { Redis => 'redis://127.0.0.1:6379/2' });

  # Mojolicious::Lite (via Mojolicious::Plugin::Minion)
  plugin Minion => { Redis => 'redis://x:s3cret@127.0.0.1:6379' };

DESCRIPTION

Minion::Backend::Redis is a backend for Minion based on Mojo::Redis. Note that Redis Server version 2.8.0 or newer is required to use this backend.

CAUTION

This is a slightly hackish modification of the original code by Dan Book to use Mojo::Redis instead of Mojo::Redis2.

Due to the original code being written against an older Minion version, "history" is currently unimplemented.

PERFORMANCE

You can run examples/minion_bench.pl to get some performance metrics.

  Clean start with 10000 jobs
  Enqueued 10000 jobs in 52.6373450756073 seconds (189.979/s)
  4 workers finished 1000 jobs each in 76.6429250240326 seconds (52.190/s)
  4 workers finished 1000 jobs each in 64.2053661346436 seconds (62.300/s)
  Requesting job info 100 times
  Received job info 100 times in 0.783659934997559 seconds (127.606/s)
  Requesting stats 100 times
  Received stats 100 times in 0.595925092697144 seconds (167.806/s)
  Repairing 100 times
  Repaired 100 times in 0.28698992729187 seconds (348.444/s)
  Acquiring locks 1000 times
  Acquired locks 1000 times in 2.0602331161499 seconds (485.382/s)
  Releasing locks 1000 times
  Releasing locks 1000 times in 1.19675707817078 seconds (835.591/s)

ATTRIBUTES

Minion::Backend::Redis inherits all attributes from Minion::Backend and implements the following new ones.

redis

  my $redis = $backend->redis;
  $backend  = $backend->redis(Mojo::Redis->new);

Mojo::Redis object used to store all data.

METHODS

Minion::Backend::Redis inherits all methods from Minion::Backend and implements the following new ones.

new

  my $backend = Minion::Backend::Redis->new;
  my $backend = Minion::Backend::Redis->new('redis://x:s3cret@localhost:6379/5');

Construct a new Minion::Backend::Redis object.

broadcast

  my $bool = $backend->broadcast('some_command');
  my $bool = $backend->broadcast('some_command', [@args]);
  my $bool = $backend->broadcast('some_command', [@args], [$id1, $id2, $id3]);

Broadcast remote control command to one or more workers.

dequeue

  my $job_info = $backend->dequeue($worker_id, 0.5);
  my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']});

Wait a given amount of time in seconds for a job, dequeue it and transition from inactive to active state, or return undef if queues were empty.

These options are currently available:

id
  id => '10023'

Dequeue a specific job.

queues
  queues => ['important']

One or more queues to dequeue jobs from, defaults to default.

These fields are currently available:

args
  args => ['foo', 'bar']

Job arguments.

id
  id => '10023'

Job ID.

retries
  retries => 3

Number of times job has been retried.

task
  task => 'foo'

Task name.

enqueue

  my $job_id = $backend->enqueue('foo');
  my $job_id = $backend->enqueue(foo => [@args]);
  my $job_id = $backend->enqueue(foo => [@args] => {priority => 1});

Enqueue a new job with inactive state.

These options are currently available:

attempts
  attempts => 25

Number of times performing this job will be attempted, with a delay based on "backoff" in Minion after the first attempt, defaults to 1.

delay
  delay => 10

Delay job for this many seconds (from now).

notes
  notes => {foo => 'bar', baz => [1, 2, 3]}

Hash reference with arbitrary metadata for this job.

parents
  parents => [$id1, $id2, $id3]

One or more existing jobs this job depends on, and that need to have transitioned to the state finished before it can be processed.

priority
  priority => 5

Job priority, defaults to 0. Jobs with a higher priority get performed first.

queue
  queue => 'important'

Queue to put job in, defaults to default.

fail_job

  my $bool = $backend->fail_job($job_id, $retries);
  my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!');
  my $bool = $backend->fail_job(
    $job_id, $retries, {msg => 'Something went wrong!'});

Transition from active to failed state, and if there are attempts remaining, transition back to inactive with an exponentially increasing delay based on "backoff" in Minion.

finish_job

  my $bool = $backend->finish_job($job_id, $retries);
  my $bool = $backend->finish_job($job_id, $retries, 'All went well!');
  my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'});

Transition from active to finished state.

history

  my $history = $backend->history;

Get history information for job queue. Unimplemented for now.

These fields are currently available:

daily
  daily => [{epoch => 12345, finished_jobs => 95, failed_jobs => 2}, ...]

Hourly counts for processed jobs from the past day.

list_jobs

  my $results = $backend->list_jobs($offset, $limit);
  my $results = $backend->list_jobs($offset, $limit, {state => 'inactive'});

Returns the information about jobs in batches.

  # Check job state
  my $results = $backend->list_jobs(0, 1, {ids => [$job_id]});
  my $state = $results->{jobs}[0]{state};

  # Get job result
  my $results = $backend->list_jobs(0, 1, {ids => [$job_id]});
  my $result = $results->{jobs}[0]{result};

These options are currently available:

ids
  ids => ['23', '24']

List only jobs with these ids.

queue
  queue => 'important'

List only jobs in this queue.

state
  state => 'inactive'

List only jobs in this state.

task
  task => 'test'

List only jobs for this task.

These fields are currently available:

args
  args => ['foo', 'bar']

Job arguments.

attempts
  attempts => 25

Number of times performing this job will be attempted.

children
  children => ['10026', '10027', '10028']

Jobs depending on this job.

created
  created => 784111777

Epoch time job was created.

delayed
  delayed => 784111777

Epoch time job was delayed to.

finished
  finished => 784111777

Epoch time job was finished.

notes
  notes => {foo => 'bar', baz => [1, 2, 3]}

Hash reference with arbitrary metadata for this job.

parents
  parents => ['10023', '10024', '10025']

Jobs this job depends on.

priority
  priority => 3

Job priority.

queue
  queue => 'important'

Queue name.

result
  result => 'All went well!'

Job result.

retried
  retried => 784111777

Epoch time job has been retried.

retries
  retries => 3

Number of times job has been retried.

started
  started => 784111777

Epoch time job was started.

state
  state => 'inactive'

Current job state, usually active, failed, finished or inactive.

task
  task => 'foo'

Task name.

worker
  worker => '154'

Id of worker that is processing the job.

list_locks

  my $results = $backend->list_locks($offset, $limit);
  my $results = $backend->list_locks($offset, $limit, {names => ['foo']});

Returns information about locks in batches.

  # Get the total number of results (without limit)
  my $num = $backend->list_locks(0, 100, {names => ['bar']})->{total};

  # Check expiration time
  my $results = $backend->list_locks(0, 1, {names => ['foo']});
  my $expires = $results->{locks}[0]{expires};

These options are currently available:

names
  names => ['foo', 'bar']

List only locks with these names.

These fields are currently available:

expires
  expires => 784111777

Epoch time this lock will expire.

name
  name => 'foo'

Lock name.

list_workers

  my $results = $backend->list_workers($offset, $limit);
  my $results = $backend->list_workers($offset, $limit, {ids => [23]});

Returns information about workers in batches.

  # Check worker host
  my $results = $backend->list_workers(0, 1, {ids => [$worker_id]});
  my $host    = $results->{workers}[0]{host};

These options are currently available:

ids
  ids => ['23', '24']

List only workers with these ids.

These fields are currently available:

host
  host => 'localhost'

Worker host.

jobs
  jobs => ['10023', '10024', '10025', '10029']

Ids of jobs the worker is currently processing.

notified
  notified => 784111777

Epoch time worker sent the last heartbeat.

pid
  pid => 12345

Process id of worker.

started
  started => 784111777

Epoch time worker was started.

status
  status => {queues => ['default', 'important']}

Hash reference with whatever status information the worker would like to share.

lock

  my $bool = $backend->lock('foo', 3600);
  my $bool = $backend->lock('foo', 3600, {limit => 20});

Try to acquire a named lock that will expire automatically after the given amount of time in seconds.

These options are currently available:

limit
  limit => 20

Number of shared locks with the same name that can be active at the same time, defaults to 1.

note

  my $bool = $backend->note($job_id, foo => 'bar');

Change a metadata field for a job.

receive

  my $commands = $backend->receive($worker_id);

Receive remote control commands for worker.

register_worker

  my $worker_id = $backend->register_worker;
  my $worker_id = $backend->register_worker($worker_id);
  my $worker_id = $backend->register_worker(
    $worker_id, {status => {queues => ['default', 'important']}});

Register worker or send heartbeat to show that this worker is still alive.

These options are currently available:

status
  status => {queues => ['default', 'important']}

Hash reference with whatever status information the worker would like to share.

remove_job

  my $bool = $backend->remove_job($job_id);

Remove failed, finished or inactive job from queue.

repair

  $backend->repair;

Repair worker registry and job queue if necessary.

reset

  $backend->reset;

Reset job queue.

retry_job

  my $bool = $backend->retry_job($job_id, $retries);
  my $bool = $backend->retry_job($job_id, $retries, {delay => 10});

Transition job back to inactive state, already inactive jobs may also be retried to change options.

These options are currently available:

attempts
  attempts => 25

Number of times performing this job will be attempted.

delay
  delay => 10

Delay job for this many seconds (from now).

priority
  priority => 5

Job priority.

queue
  queue => 'important'

Queue to put job in.

stats

  my $stats = $backend->stats;

Get statistics for jobs and workers.

These fields are currently available:

active_jobs
  active_jobs => 100

Number of jobs in active state.

active_workers
  active_workers => 100

Number of workers that are currently processing a job.

delayed_jobs
  delayed_jobs => 100

Number of jobs in inactive state that are scheduled to run at specific time in the future. Note that this field is EXPERIMENTAL and might change without warning!

enqueued_jobs
  enqueued_jobs => 100000

Rough estimate of how many jobs have ever been enqueued. Note that this field is EXPERIMENTAL and might change without warning!

failed_jobs
  failed_jobs => 100

Number of jobs in failed state.

finished_jobs
  finished_jobs => 100

Number of jobs in finished state.

inactive_jobs
  inactive_jobs => 100

Number of jobs in inactive state.

inactive_workers
  inactive_workers => 100

Number of workers that are currently not processing a job.

uptime
  uptime => 1000

Uptime in seconds.

unlock

  my $bool = $backend->unlock('foo');

Release a named lock.

unregister_worker

  $backend->unregister_worker($worker_id);

Unregister worker.

BUGS

Report any issues on the public bugtracker.

AUTHOR

Dan Book <dbook@cpan.org>

COPYRIGHT AND LICENSE

This software is Copyright (c) 2017 by Dan Book.

This is free software, licensed under:

  The Artistic License 2.0 (GPL Compatible)

SEE ALSO

Minion, Mojo::Redis