The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

Redis::JobQueue - Object interface for the creation, execution the job queues, as well as the status and outcome objectives

VERSION

This documentation refers to Redis::JobQueue version 0.16

SYNOPSIS

    #-- Common
    use Redis::JobQueue qw( DEFAULT_SERVER DEFAULT_PORT );

    my $connection_string = DEFAULT_SERVER.':'.DEFAULT_PORT;
    my $jq = Redis::JobQueue->new( redis => $connection_string );

    #-- Producer
    my $job = $jq->add_job(
        {
            queue       => 'xxx',
            workload    => \'Some stuff up to 512MB long',
            expire      => 12*60*60,            # 12h,
        }
        );

    #-- Worker
    sub xxx {
        my $job = shift;

        my $workload = ${$job->workload};
        # do something with workload;

        $job->result( 'XXX JOB result comes here, up to 512MB long' );
    }

    while ( $job = $jq->get_next_job(
        queue       => 'xxx',
        blocking    => 1,
        ) )
    {
        $job->status( 'working' );
        $jq->update_job( $job );

        # do my stuff
        xxx( $job );

        $job->status( 'completed' );
        $jq->update_job( $job );
    }

    #-- Consumer
    my $id = $ARGV[0];
    my $status = $jq->get_job_status( $id );

    if ( $status eq 'completed' )
    {
        # now safe it from JobQueue, since it's completed
        my $job = $jq->load_job( $id );

        $jq->delete_job( $id );
        print "Job result: ", ${$job->result}, "\n";
    }
    else
    {
        print "Job is not complete, has current '$status' status\n";
    }

To see a brief but working code example of the Redis::JobQueue package usage look at the "An Example" section.

To see a description of the used Redis::JobQueue data structure (on Redis server) look at the "JobQueue data structure" section.

ABSTRACT

The Redis::JobQueue package is a set of Perl modules which provides a simple job queue with Redis server capabilities.

DESCRIPTION

The user modules in this package provide an object oriented API. The job queues interface and the jobs are all represented by objects. This makes a simple and powerful interface to these services.

The main features of the package are:

  • Contains various reusable components that can be used separately or together.

  • Provides an object oriented model of communication.

  • Support the work with data structures on the Redis server.

  • Supports the automatic creation of job queue, job status monitoring, updating the job data set, obtaining a consistent job from the queue, remove job, the classification of possible errors.

  • Simple methods for organizing producer, worker and consumer clients.

CONSTRUCTOR

new( redis => $server, timeout => $timeout )

It generates a Redis::JobQueue object to communicate with the Redis server and can be called as either a class method or an object method. If invoked with no arguments the constructor new creates and returns a Redis::JobQueue object that is configured to work with the default settings.

If invoked with the first argument being an object of Redis::JobQueue class or Redis class, then the new object attribute values are taken from the object of the first argument. It does not create a new connection to the Redis server. A created object uses the default value "DEFAULT_TIMEOUT" when a Redis class object is passed to the new constructor, as Redis class does not support the timeout attribute while waiting for a message from the queue.

new optionally takes arguments. These arguments are in key-value pairs.

This example illustrates a new() call with all the valid arguments:

    my $jq = Redis::JobQueue->new(
        redis   => "$server:$port", # Connection info for Redis which hosts queue
        timeout => $timeout,        # Default wait time (in seconds)
                                    # for blocking call of get_next_job.
                                    # Set 0 for unlimited wait time
        );

The following examples illustrate other uses of the new method:

    $jq = Redis::JobQueue->new();
    my $next_jq = Redis::JobQueue->new( $jq );

    my $redis = Redis->new( redis => "$server:$port" );
    $next_jq = Redis::JobQueue->new(
        $redis,
        timeout => $timeout,
        );

An error will cause the program to halt (confess) if an argument is not valid.

METHODS

An error will cause the program to halt (confess) if an argument is not valid.

ATTENTION: In the Redis module the synchronous commands throw an exception on receipt of an error reply, or return a non-error reply directly.

add_job( $pre_job, LPUSH => 1 )

Adds a job to the queue on the Redis server. At the same time creates and returns a new Redis::JobQueue::Job object with a new unique identifier. Job status is set to "STATUS_CREATED".

The first argument should be an Redis::JobQueue::Job object or a reference to a hash describing Redis::JobQueue::Job object attributes.

add_job optionally takes arguments. These arguments are in key-value pairs.

This example illustrates a add_job() call with all the valid arguments:

    my $pre_job = {
        id           => '4BE19672-C503-11E1-BF34-28791473A258',
        queue        => 'lovely_queue', # required
        job          => 'strong_job',   # optional attribute
        expire       => 12*60*60,
        status       => 'created',
        meta_data    => scalar( localtime ),
        workload     => \'Some stuff up to 512MB long',
        result       => \'JOB result comes here, up to 512MB long',
        };

    my $job = Redis::JobQueue::Job->new(
        id           => $pre_job->{id},
        queue        => $pre_job->{queue},  # required
        job          => $pre_job->{job},    # optional attribute
        expire       => $pre_job->{expire},
        status       => $pre_job->{status},
        meta_data    => $pre_job->{meta_data},
        workload     => $pre_job->{workload},
        result       => $pre_job->{result},
        );

    my $resulting_job = $jq->add_job( $job );
    # or
    $resulting_job = $jq->add_job(
        $pre_job,
        LPUSH       => 1,
        );

If used with a LPUSH optional argument, the job is placed in front of the queue and not in its end (if the argument is true).

TTL job data sets on the Redis server in accordance with the expire attribute of the Redis::JobQueue::Job object.

Method returns the object corresponding to the added job.

get_job_status( $job )

Status of the job is requested from the Redis server. Job ID is obtained from the argument. The argument can be either a string or a Redis::JobQueue::Job object.

Returns the status string or undef when the job data does not exist. Returns undef if the job is not on the Redis server.

The following examples illustrate uses of the get_job_status method:

    my $status = $jq->get_job_status( $id );
    # or
    $status = $jq->get_job_status( $job );

get_job_meta_data( $job )

Get job meta-data, such as custom attributes etc. from the Redis server. Takes single argument: either a String - job ID, or Redis::JobQueue::Job object.

Returns the meta-data string or undef when the job data does not exist. Returns undef if the job is not on the Redis server.

The following examples illustrate uses of the get_job_meta_data method:

    my $meta_data = $jq->get_job_meta_data( $id );
    # or
    $meta_data = $jq->get_job_meta_data( $job );

load_job( $job )

Jobs data are loaded from the Redis server. Job ID is obtained from the argument. The argument can be either a string or a Redis::JobQueue::Job object.

Method returns the object corresponding to the loaded job. Returns undef if the job is not on the Redis server.

The following examples illustrate uses of the load_job method:

    $job = $jq->load_job( $id );
    # or
    $job = $jq->load_job( $job );

get_next_job( queue => $queue_name, $blocking => 1 )

Selects the first job identifier in the queue.

get_next_job takes arguments in key-value pairs. You can specify a queue name or a reference to an array of names of queues. Queues from the list are processed in random order.

If the optional blocking argument is true, then the get_next_job method waits for a maximum period of time specified in the timeout attribute of the queue object. Use timeout = 0 for an unlimited wait time. By default, the result is returned immediately. If the optional blocking argument is true, then all queues are processed in a single request to Redis server; otherwise, each queue is processed in a separate request. Default - blocking is false (0).

Method returns the job object corresponding to the received job identifier. Returns the undef if there is no job in the queue.

These examples illustrates a get_next_job call with all the valid arguments:

    $job = $jq->get_next_job(
        queue       => 'xxx',
        blocking    => 1,
        );
    # or
    $job = $jq->get_next_job(
        queue       => [ 'aaa', 'bbb' ],
        blocking    => 1,
        );

TTL job data for the job resets on the Redis server in accordance with the expire attribute of the job object.

update_job( $job )

Saves the changes to the job data to the Redis server. Job ID is obtained from the argument. The argument can be a Redis::JobQueue::Job object.

Returns undef if the job is not on the Redis server and the number of the attributes that were updated in the opposite case.

Changing the expire attribute is ignored.

The following examples illustrate uses of the update_job method:

    $jq->update_job( $job );

TTL job data for the job resets on the Redis server in accordance with the expire attribute of the job object.

delete_job( $job )

Deletes the job data in Redis server. Job ID is obtained from the argument. The argument can be either a string or a Redis::JobQueue::Job object.

Returns undef if the job is not on the Redis server and true in the opposite case.

The following examples illustrate uses of the delete_job method:

    $jq->delete_job( $job );
    # or
    $jq->delete_job( $id );

Use this method immediately after receiving the results of the job for the early release of memory on the Redis server.

When the job is deleted, the data set on the Redis server are changed as follows:

  • All fields are removed (except status).

  • status field is set to STATUS_DELETED.

  • For a hash of the data set TTL = 24h, if the job was expire = 0.

  • Hash of the job data is automatically deleted in accordance with the established value of TTL (expire).

To see a description of the used Redis::JobQueue data structure (on Redis server) look at the "JobQueue data structure" section.

get_jobs

Gets a list of job ids on the Redis server.

The following examples illustrate uses of the get_jobs method:

    @jobs = $jq->get_jobs;

server

Returns the address of the Redis server used by the queue (in the form 'host:port').

The following examples illustrate uses of the get_jobs method:

    $redis_address = $jq->server;

ping

This command is used to test if a connection is still alive.

Returns 1 if a connection is still alive or 0 otherwise.

The following examples illustrate uses of the ping method:

    $is_alive = $jq->ping;

quit

Ask the Redis server to close the connection.

The following examples illustrate uses of the quit method:

    $jq->quit;

timeout

The method of access to the timeout attribute.

The method returns the current value of the attribute if called without arguments.

Non-negative integer value can be used to specify a new value of the maximum waiting time data from the queue (in the "get_next_job" method). Use timeout = 0 for an unlimited wait time.

max_datasize

The method of access to the max_datasize attribute.

The method returns the current value of the attribute if called without arguments.

Non-negative integer value can be used to specify a new value to the maximum size of data in the attributes of a Redis::JobQueue::Job object.

The max_datasize attribute value is used in the constructor and operations data entry jobs on the Redis server.

The constructor uses the smaller of the values of 512MB and maxmemory limit from a redis.conf file.

last_errorcode

The method of access to the code of the last identified errors.

To see more description of the identified errors look at the "DIAGNOSTICS" section.

EXPORT

None by default.

Additional constants are available for import, which can be used to define some type of parameters.

These are the defaults:

DEFAULT_SERVER

Default Redis local server - 'localhost'.

DEFAULT_PORT

Default Redis server port - 6379.

DEFAULT_TIMEOUT

Maximum wait time (in seconds) you receive a message from the queue - 0 for an unlimited timeout.

NAMESPACE

Namespace name used keys on the Redis server - 'JobQueue'.

STATUS_CREATED

Text of the status of the job after it is created - '_created_'.

STATUS_WORKING

Text of the status of the job at run-time - 'working'. Must be set from the worker function.

STATUS_COMPLETED

Text of the status of the job at the conclusion - 'completed'. Must be set at the conclusion of the worker function.

STATUS_DELETED

Text of the status of the job after removal - '_deleted_'.

Error codes are identified

To see more description of the identified errors look at the "DIAGNOSTICS" section.

These are the defaults:

Redis::JobQueue::EXPIRE_DELETED

TTL (24h) for a hash of the deleted data set, if the job was expire = 0.

DIAGNOSTICS

The method for the possible error to analyse: "last_errorcode".

A Redis error will cause the program to halt (confess). In addition to errors in the Redis module detected errors "EMISMATCHARG", "EDATATOOLARGE", "EMAXMEMORYPOLICY", "EJOBDELETED". All recognizable errors in Redis::JobQueue lead to the installation of the corresponding value in the "last_errorcode" and cause an exception (confess). Unidentified errors cause an exception ("last_errorcode" remains equal to 0). The initial value of $@ is preserved.

The user has the choice:

In "last_errortsode" recognizes the following:

ENOERROR

No error.

EMISMATCHARG

This means that you didn't give the right argument to a new or to other method.

EDATATOOLARGE

This means that the data is too large.

ENETWORK

This means that an error in connection to Redis server was detected.

EMAXMEMORYLIMIT

This means that the command not allowed when used memory > maxmemory.

EMAXMEMORYPOLICY

This means that the job was removed by maxmemory-policy.

EJOBDELETED

This means that the job was removed prior to use.

EREDIS

This means that other Redis error message detected.

An Example

The example shows a possible treatment for possible errors.

    #-- Common ---------------------------------------------------------------
    use Redis::JobQueue qw(
        DEFAULT_SERVER
        DEFAULT_PORT
        STATUS_CREATED
        STATUS_WORKING
        STATUS_COMPLETED

        ENOERROR
        EMISMATCHARG
        EDATATOOLARGE
        ENETWORK
        EMAXMEMORYLIMIT
        EMAXMEMORYPOLICY
        EJOBDELETED
        EREDIS
        );

    my $server = DEFAULT_SERVER.':'.DEFAULT_PORT;   # the Redis Server

    # A possible treatment for possible errors
    sub exception {
        my $jq  = shift;
        my $err = shift;

        if ( $jq->last_errorcode == ENOERROR )
        {
            # For example, to ignore
            return unless $err;
        }
        elsif ( $jq->last_errorcode == EMISMATCHARG )
        {
            # Necessary to correct the code
        }
        elsif ( $jq->last_errorcode == EDATATOOLARGE )
        {
            # You must use the control data length
        }
        elsif ( $jq->last_errorcode == ENETWORK )
        {
            # For example, sleep
            #sleep 60;
            # and return code to repeat the operation
            #return "to repeat";
        }
        elsif ( $jq->last_errorcode == EMAXMEMORYLIMIT )
        {
            # For example, return code to restart the server
            #return "to restart the redis server";
        }
        elsif ( $jq->last_errorcode == EMAXMEMORYPOLICY )
        {
            # For example, return code to recreate the job
            my $id = $err =~ /^(\S+)/;
            #return "to recreate $id";
        }
        elsif ( $jq->last_errorcode == EJOBDELETED )
        {
            # For example, return code to ignore
            my $id = $err =~ /^(\S+)/;
            #return "to ignore $id";
        }
        elsif ( $jq->last_errorcode == EREDIS )
        {
            # Independently analyze the $err
        }
        else
        {
            # Unknown error code
        }
        die $err if $err;
    }

    my $jq;

    eval {
        $jq = Redis::JobQueue->new(
            redis   => $server,
            timeout => 1,   # DEFAULT_TIMEOUT = 0 for an unlimited timeout
            );
    };
    exception( $jq, $@ ) if $@;

    #-- Producer -------------------------------------------------------------
    #-- Adding new job

    my $job;
    eval {
        $job = $jq->add_job(
            {
                queue       => 'xxx',
                workload    => \'Some stuff up to 512MB long',
                expire      => 12*60*60,
            } );
    };
    exception( $jq, $@ ) if $@;
    print 'Added job ', $job->id, "\n" if $job;

    eval {
        $job = $jq->add_job(
            {
                queue       => 'yyy',
                workload    => \'Some stuff up to 512MB long',
                expire      => 12*60*60,
            } );
    };
    exception( $jq, $@ ) if $@;
    print 'Added job ', $job->id, "\n" if $job;

    #-- Worker ---------------------------------------------------------------
    #-- Run your jobs

    sub xxx {
        my $job = shift;

        my $workload = ${$job->workload};
        # do something with workload;
        print "XXX workload: $workload\n";

        $job->result( 'XXX JOB result comes here, up to 512MB long' );
    }

    sub yyy {
        my $job = shift;

        my $workload = ${$job->workload};
        # do something with workload;
        print "YYY workload: $workload\n";

        $job->result( \'YYY JOB result comes here, up to 512MB long' );
    }

    eval {
        while ( my $job = $jq->get_next_job(
            queue       => [ 'xxx','yyy' ],
            blocking    => 1,
            ) )
        {
            my $id = $job->id;

            my $status = $jq->get_job_status( $id );
            print "Job '", $id, "' was '$status' status\n";

            $job->status( STATUS_WORKING );
            $jq->update_job( $job );

            $status = $jq->get_job_status( $id );
            print "Job '", $id, "' has new '$status' status\n";

            # do my stuff
            if ( $job->queue eq 'xxx' )
            {
                xxx( $job );
            }
            elsif ( $job->queue eq 'yyy' )
            {
                yyy( $job );
            }

            $job->status( STATUS_COMPLETED );
            $jq->update_job( $job );

            $status = $jq->get_job_status( $id );
            print "Job '", $id, "' has last '$status' status\n";
        }
    };
    exception( $jq, $@ ) if $@;

    #-- Consumer -------------------------------------------------------------
    #-- Check the job status

    eval {
        # For example:
        # my $status = $jq->get_job_status( $ARGV[0] );
        # or:
        my @jobs = $jq->get_jobs;

        foreach my $id ( @jobs )
        {
            my $status = $jq->get_job_status( $id );
            print "Job '$id' has '$status' status\n";
        }
    };
    exception( $jq, $@ ) if $@;

    #-- Fetching the result

    eval {
        # For example:
        # my $id = $ARGV[0];
        # or:
        my @jobs = $jq->get_jobs;

        foreach my $id ( @jobs )
        {
            my $status = $jq->get_job_status( $id );
            print "Job '$id' has '$status' status\n";

            if ( $status eq STATUS_COMPLETED )
            {
                my $job = $jq->load_job( $id );

                # now safe to compelete it from JobQueue, since it's completed
                $jq->delete_job( $id );

                print 'Job result: ', ${$job->result}, "\n";
            }
            else
            {
                print "Job is not complete, has current '$status' status\n";
            }
        }
    };
    exception( $jq, $@ ) if $@;

    #-- Closes and cleans up -------------------------------------------------

    eval { $jq->quit };
    exception( $jq, $@ ) if $@;

JobQueue data structure

Using the currently selected database (default = 0).

While working on the Redis server creates and uses these data structures:

    #-- To store the job data:
    # HASH    Namespace:id

For example:

    $ redis-cli
    redis 127.0.0.1:6379> KEYS JobQueue:*
    1) "JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783"
    2) "JobQueue:478C81B2-C5B8-11E1-B5B1-16670A986783"
    3) "JobQueue:89116152-C5BD-11E1-931B-0A690A986783"
    #      |                 |
    #   Namespace            |
    #                     Job id (UUID)
    ...
    redis 127.0.0.1:6379> hgetall JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783
    1) "queue"                                  # hash key
    2) "xxx"                                    # the key value
    3) "job"                                    # hash key
    4) "Some description"                       # the key value
    5) "workload"                               # hash key
    6) "Some stuff up to 512MB long"            # the key value
    7) "expire"                                 # hash key
    8) "43200"                                  # the key value
    9) "status"                                 # hash key
    10) "_created_"                             # the key value
    11) "meta_data"                             # hash key
    12) "Sat Mar  9 09:11:27 2013"              # the key value

After you create ("add_job" method) or modify ("update_job" method) the data set are within the time specified expire attribute (seconds). For example:

    redis 127.0.0.1:6379> TTL JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783
    (integer) 42062

Hash of the job data is deleted when you delete the job ("delete_job" method).

    # -- To store the job queue (the list created but not yet requested jobs):
    # LIST    JobQueue:queue:queue_name:job_name

For example:

    redis 127.0.0.1:6379> KEYS JobQueue:queue:*
    ...
    4) "JobQueue:queue:xxx"
    5) "JobQueue:queue:yyy"
    #      |       |    |
    #   Namespace  |    |
    #    Fixed key word |
    #            Queue name
    ...
    redis 127.0.0.1:6379> LRANGE JobQueue:queue:xxx 0 -1
    1) "478B9C84-C5B8-11E1-A2C5-D35E0A986783 1344070066"
    2) "89116152-C5BD-11E1-931B-0A690A986783 1344070067"
    #                        |                   |
    #                     Job id (UUID)          |
    #                                      Expire time (UTC)
    ...

Job queue will be created automatically when the data arrives to contain. Job queue will be deleted automatically in the exhaustion of its contents.

DEPENDENCIES

In order to install and use this package is desirable to use a Perl version 5.010 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Redis::JobQueue package:

    Data::UUID
    Mouse
    Params::Util
    Redis

Redis::JobQueue package has the following optional dependencies:

    Net::EmptyPort
    Test::Distribution
    Test::Exception
    Test::Kwalitee
    Test::Perl::Critic
    Test::Pod
    Test::Pod::Coverage
    Test::RedisServer

If the optional modules are missing, some "prereq" tests are skipped.

BUGS AND LIMITATIONS

The use of maxmemory-police all* in the redis.conf file could lead to a serious (but hard to detect) problem as Redis server may delete the job queue lists.

We strongly recommend using the option maxmemory in the redis.conf file if the data set may be large.

The Redis::JobQueue package was written, tested, and found working on recent Linux distributions.

There are no known bugs in this package.

Please report problems to the "AUTHOR".

Patches are welcome.

MORE DOCUMENTATION

All modules contain detailed information on the interfaces they provide.

SEE ALSO

The basic operation of the Redis::JobQueue package modules:

Redis::JobQueue - Object interface for creating, executing jobs queues, as well as monitoring the status and results of jobs.

Redis::JobQueue::Job - Object interface for jobs creating and manipulating.

Redis - Perl binding for Redis database.

AUTHOR

Sergey Gladkov, <sgladkov@trackingsoft.com>

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.