NAME

Redis::JobQueue - Job queue management implemented using Redis server.

VERSION

This documentation refers to Redis::JobQueue version 1.19

SYNOPSIS

    use 5.010;
    use strict;
    use warnings;

    #-- Common
    use Redis::JobQueue qw(
        DEFAULT_SERVER
        DEFAULT_PORT
    );

    my $connection_string = DEFAULT_SERVER.':'.DEFAULT_PORT;
    my $jq = Redis::JobQueue->new( redis => $connection_string );

    #-- Producer
    my $job = $jq->add_job(
        {
            queue       => 'xxx',
            workload    => \'Some stuff',
            expire      => 12*60*60,            # 12h,
        }
    );

    #-- Worker
    sub xxx {
        my $job = shift;

        my $workload = ${ $job->workload };
        # do something with workload;

        $job->result( 'XXX JOB result comes here' );
    }

    while ( $job = $jq->get_next_job(
            queue       => 'xxx',
            blocking    => 1,
        ) ) {
        $job->status( 'working' );
        $jq->update_job( $job );

        # do my stuff
        xxx( $job );

        $job->status( 'completed' );
        $jq->update_job( $job );
    }

    #-- Consumer
    my $id = $ARGV[0];
    my $status = $jq->get_job_data( $id, 'status' );

    if ( $status eq 'completed' ) {
        # it is now safe to remove it from JobQueue, since it's completed
        my $job = $jq->load_job( $id );

        $jq->delete_job( $id );
        say 'Job result: ', ${ $job->result };
    } else {
        say "Job is not complete, has current '$status' status";
    }

To see a brief but working code example of the Redis::JobQueue package usage look at the "An Example" section.

Description of the used by Redis::JobQueue data structures (on Redis server) is provided in "JobQueue data structure stored in Redis" section.

ABSTRACT

The Redis::JobQueue package is a set of Perl modules which allows creation of a simple job queue based on Redis server capabilities.

DESCRIPTION

The main features of the package are:

  • Supports the automatic creation of job queues, job status monitoring, updating the job data set, obtaining a consistent job from the queue, removing jobs, and the classification of possible errors.

  • Contains various reusable components that can be used separately or together.

  • Provides an object oriented API.

  • Support of storing arbitrary job-related data structures.

  • Simple methods for organizing producer, worker, and consumer clients.

Atributes

id

An id that uniquely identifies the job, scalar.

queue

Queue name in which job is placed, scalar.

expire

For how long (seconds) job data structures will be kept in memory.

status

Job status, scalar. See Redis::JobQueue::Job EXPORT section for the list of pre-defined statuses. Can be also set to any arbitrary value.

workload, result

User-set data structures which will be serialized before stored in Redis server. Suitable for passing large amounts of data.

*

Any other custom-named field passed to "constructor" or "update_job" method will be stored as metadata scalar in Redis server. Suitable for storing scalar values with fast access (will be serialized before stored in Redis server).

EXPORT

None by default.

The following additional constants, defining defaults for various parameters, are available for export:

Redis::JobQueue::MAX_DATASIZE

Maximum size of the data stored in workload, result: 512MB.

DEFAULT_SERVER

Default address of the Redis server - 'localhost'.

DEFAULT_PORT

Default port of the Redis server - 6379.

DEFAULT_TIMEOUT

Maximum time (in seconds) to wait for a new job from the queue, 0 - unlimited.

DEFAULT_CONNECTION_TIMEOUT

Default socket timeout for connection, number of seconds: 0.1 .

DEFAULT_OPERATION_TIMEOUT

Default socket timeout for read and write operations, number of seconds: 1.

NAMESPACE

Namespace used for keys on the Redis server - 'JobQueue'.

Error codes are identified

More details about error codes are provided in "DIAGNOSTICS" section.

Possible error codes:

E_NO_ERROR

0 - No error

E_MISMATCH_ARG

1 - Invalid argument of new or other method.

E_DATA_TOO_LARGE

2 - Provided data is too large.

E_NETWORK

3 - Error connecting to Redis server.

E_MAX_MEMORY_LIMIT

4 - Command failed because its execution requires more than allowed memory, set in maxmemory.

E_JOB_DELETED

5 - Job's data was removed.

E_REDIS

6 - Other error on Redis server.

CONSTRUCTOR

new( redis => $server, timeout => $timeout, check_maxmemory => $mode, ... )

Creates a new Redis::JobQueue object to communicate with Redis server. If invoked without any arguments, the constructor new creates and returns a Redis::JobQueue object that is configured with the default settings and uses local redis server.

In the parameters of the constructor 'redis' may be either a Redis object or a server string or a hash reference of parameters to create a Redis object.

Optional check_maxmemory boolean argument (default is true) defines if attempt is made to find out maximum available memory from Redis.

In some cases Redis implementation forbids such request, but setting <check_maxmemory> to false can be used as a workaround.

This example illustrates a new() call with all the valid arguments:

    my $coll = Redis::CappedCollection->create(
        redis   => { server => "$server:$port" },   # Redis object
                                    # or hash reference to parameters to create a new Redis object.
        timeout => $timeout,        # wait time (in seconds)
                                    # for blocking call of get_next_job.
                                    # Set 0 for unlimited wait time
        check_maxmemory => $mode,   # boolean argument (default is true)
                                    # defines if attempt is made to find out
                                    # maximum available memory from Redis.
        reconnect_on_error  => 1,   # Boolean argument - default is true and conservative_reconnect is true.
                                    # Controls ability to force re-connection with Redis on error.
        connection_timeout  => DEFAULT_CONNECTION_TIMEOUT,  # Socket timeout for connection,
                                    # number of seconds (can be fractional).
                                    # NOTE: Changes external socket configuration.
        operation_timeout   => DEFAULT_OPERATION_TIMEOUT,   # Socket timeout for read and write operations,
                                    # number of seconds (can be fractional).
                                    # NOTE: Changes external socket configuration.
    );
  • According to Redis documentation:

    This module consider that any data sent to the Redis server is a raw octets string, even if it has utf8 flag set. And it doesn't do anything when getting data from the Redis server.

  • Non-serialize-able fields (like status or message) passed in UTF-8 can not be correctly restored from Redis server. To avoid potential data corruction, passing UTF-8 encoded value causes error.

  • "DEFAULT_TIMEOUT" value is used when a Redis class object is passed to the new constructor without additional timeout argument.

This example illustrates new() call with all possible arguments:

    my $jq = Redis::JobQueue->new(
        redis   => "$server:$port", # Connection info for Redis which hosts queue
        timeout => $timeout,        # wait time (in seconds)
                                    # for blocking call of get_next_job.
                                    # Set 0 for unlimited wait time
    );
    # or
    my $jq = Redis::JobQueue->new(
        redis => {                  # The hash reference of parameters
                                    # to create a Redis object
            server => "$server:$port",
        },
    );

The following examples illustrate other uses of the new method:

    $jq = Redis::JobQueue->new();
    my $next_jq = Redis::JobQueue->new( $jq );

    my $redis = Redis->new( server => "$server:$port" );
    $next_jq = Redis::JobQueue->new(
        $redis,
        timeout => $timeout,
    );
    # or
    $next_jq = Redis::JobQueue->new(
        redis   => $redis,
        timeout => $timeout,
    );

An invalid argument causes die (confess).

METHODS

The following methods are available for object of the Kafka::Producer class:

timeout

Accessor to the timeout attribute.

Returns current value of the attribute if called without an argument.

Non-negative integer value can be used to specify a new value of the maximum waiting time for queue (of the "get_next_job" method). Use timeout = 0 for an unlimited wait time.

reconnect_on_error

Controls ability to force re-connection with Redis on error.

connection_timeout

Controls socket timeout for Redis server connection, number of seconds (can be fractional).

NOTE: Changes external socket configuration.

operation_timeout

Controls socket timeout for Redis server read and write operations, number of seconds (can be fractional).

NOTE: Changes external socket configuration.

max_datasize

Provides access to the max_datasize attribute.

Returns current value of the attribute if called with no argument.

Non-negative integer value can be used to specify a new value for the maximum size of data in the attributes of a Redis::JobQueue::Job object.

The check is done before sending data to the module Redis, after possible processing by methods of module Storable (attributes workload, result and meta_data). It is automatically serialized.

The max_datasize attribute value is used in the constructor and data entry job operations on the Redis server.

The constructor uses the smaller of the values of 512MB and maxmemory limit from a redis.conf file.

last_errorcode

Returns the code of the last identified error.

See "DIAGNOSTICS" section for description of error codes.

last_error

Returns error message of the last identified error.

See "DIAGNOSTICS" section for more info on errors.

add_job( $job_data, LPUSH => 1 )

Adds a job to the queue on the Redis server.

The first argument should be either Redis::JobQueue::Job object (which is modified by the method) or a reference to a hash representing Redis::JobQueue::Job - in the latter case a new Redis::JobQueue::Job object is created.

Returns a Redis::JobQueue::Job object with a new unique identifier.

Job status is set to STATUS_CREATED.

add_job optionally takes arguments in key-value pairs.

The following example illustrates a add_job() call with all possible arguments:

    my $job_data = {
        id           => '4BE19672-C503-11E1-BF34-28791473A258',
        queue        => 'lovely_queue', # required
        job          => 'strong_job',   # optional attribute
        expire       => 12*60*60,
        status       => 'created',
        workload     => \'Some stuff',
        result       => \'JOB result comes here',
    };

    my $job = Redis::JobQueue::Job->new( $job_data );

    my $resulting_job = $jq->add_job( $job );
    # or
    $resulting_job = $jq->add_job(
        $pre_job,
        LPUSH       => 1,
    );

If used with the optional argument LPUSH => 1, the job is placed at the beginnig of the queue and will be returned by the next call to get_next_job.

TTL of job data on Redis server is set in accordance with the "expire" attribute of the Redis::JobQueue::Job object. Make sure it's higher than the time needed to process all the jobs in the queue.

get_job_data( $job, $data_key )

Data of the job is requested from the Redis server. First argument can be either a job ID or Redis::JobQueue::Job object.

Returns undef when the job was not found on Redis server.

The method returns the jobs data from the Redis server. See Redis::JobQueue::Job for the list of standard jobs data fields.

The method returns a reference to a hash of the standard jobs data fields if only the first argument is specified.

If given a key name $data_key, it returns data corresponding to the key or undef when the value is undefined or key is not in the data or metadata.

The following examples illustrate uses of the get_job_data method:

    my $data_href = $jq->get_job_data( $id );
    # or
    $data_href = $jq->get_job_data( $job );
    # or
    my $data_key = 'foo';
    my $data = $jq->get_job_data( $job->id, $data_key );

You can specify a list of names of key data or metadata. In this case it returns the corresponding list of data. For example:

    my ( $status, $foo ) = $jq->get_job_data( $job->id, 'status', 'foo' );

See meta_data for more informations about the jobs metadata.

get_job_meta_fields( $job )

The list of names of metadata fields of the job is requested from the Redis server. First argument can be either a job ID or Redis::JobQueue::Job object.

Returns empty list when the job was not found on Redis server or the job does not have metadata.

The following examples illustrate uses of the get_job_meta_fields method:

    my @fields = $jq->get_job_meta_fields( $id );
    # or
    @fields = $jq->get_job_meta_fields( $job );
    # or
    @fields = $jq->get_job_meta_fields( $job->id );

See meta_data for more informations about the jobs metadata.

load_job( $job )

Loads job data from the Redis server. The argument is either job ID or a Redis::JobQueue::Job object.

Method returns the object corresponding to the loaded job. Returns undef if the job is not found on the Redis server.

The following examples illustrate uses of the load_job method:

    $job = $jq->load_job( $id );
    # or
    $job = $jq->load_job( $job );

get_next_job( queue => $queue_name, blocking => 1 )

Selects the job identifier which is at the beginning of the queue.

get_next_job takes arguments in key-value pairs. You can specify a queue name or a reference to an array of queue names. Queues from the list are processed in random order.

By default, each queue is processed in a separate request with the result returned immediately if a job is found (waiting) in that queue. If no waiting job found, returns undef. In case optional blocking argument is true, all queues are processed in a single request to Redis server and if no job is found waiting in queue(s), get_next_job execution will be paused for up to timeout seconds or until a job becomes available in any of the listed queues.

Use timeout = 0 for an unlimited wait time. Default - blocking is false (0).

Method returns the job object corresponding to the received job identifier. Returns the undef if there is no job in the queue.

These examples illustrate a get_next_job call with all the valid arguments:

    $job = $jq->get_next_job(
        queue       => 'xxx',
        blocking    => 1,
    );
    # or
    $job = $jq->get_next_job(
        queue       => [ 'aaa', 'bbb' ],
        blocking    => 1,
    );

TTL job data for the job resets on the Redis server in accordance with the "expire" attribute of the job object.

get_next_job_id( queue => $queue_name, blocking => 1 )

Like "get_next_job", but returns job identifier only.

TTL job data for the job does not reset on the Redis server.

update_job( $job )

Saves job data changes to the Redis server. Job ID is obtained from the argument, which can be a Redis::JobQueue::Job object.

Returns the number of attributes that were updated if the job was found on the Redis server and undef if it was not. When you change a single attribute, returns 2 because updated also changes.

Changing the "expire" attribute is ignored.

The following examples illustrate uses of the update_job method:

    $jq->update_job( $job );

TTL job data for the job resets on the Redis server in accordance with the "expire" attribute of the job object.

delete_job( $job )

Deletes job's data from Redis server. The Job ID is obtained from the argument, which can be either a string or a Redis::JobQueue::Job object.

Returns true if job and its metadata was successfully deleted from Redis server. False if jobs or its metadata wasn't found.

The following examples illustrate uses of the delete_job method:

    $jq->delete_job( $job );
    # or
    $jq->delete_job( $id );

Use this method soon after receiving the results of the job to free memory on the Redis server.

See description of the Redis::JobQueue data structure used on the Redis server at the "JobQueue data structure stored in Redis" section.

Note that job deletion time is proportional to number of jobs currently in the queue.

get_job_ids

Gets list of job IDs on the Redis server. These IDs are identifiers of job data structures, not only the identifiers which get derived from the queue by "get_next_job".

The following examples illustrate simple uses of the get_job_ids method (IDs of all existing jobs):

    @jobs = $jq->get_job_ids;

These are identifiers of jobs data structures related to the queue determined by the arguments.

get_job_ids takes arguments in key-value pairs. You can specify a queue name or job status (or a reference to an array of queue names or job statuses) to filter the list of identifiers.

You can also specify an argument queued (true or false).

When filtering by the names of the queues and queued is set to true, only the identifiers of the jobs which have not yet been derived from the queues using "get_next_job" are returned.

Filtering by status returns the task IDs whose status is exactly the same as the specified status.

The following examples illustrate uses of the get_job_ids method:

    # filtering the identifiers of jobs data structures
    @ids = $jq->get_job_ids( queue => 'some_queue' );
    # or
    @ids = $jq->get_job_ids( queue => [ 'foo_queue', 'bar_queue' ] );
    # or
    @ids = $jq->get_job_ids( status => STATUS_COMPLETED );
    # or
    @ids = $jq->get_job_ids( status => [ STATUS_COMPLETED, STATUS_FAILED ] );

    # filter the IDs are in the queues
    @ids = $jq->get_job_ids( queued => 1, queue => 'some_queue' );
    # etc.

server

Returns the address of the Redis server used by the queue (in form of 'host:port').

The following example illustrates use of the server method:

    $redis_address = $jq->server;

ping

This command is used to test connection to Redis server.

Returns 1 if a connection is still alive or 0 otherwise.

The following example illustrates use of the ping method:

    $is_alive = $jq->ping;

External connections to the server object (eg, C <$redis = Redis->new( ... );>), and the queue object can continue to work after calling ping only if the method returned 1.

If there is no connection to the Redis server (methods return 0), the connection to the server closes. In this case, to continue working with the queue, you must re-create the Redis::JobQueue object with the "new" method. When using an external connection to the server, to check the connection to the server you can use the $redis->echo( ... ) call. This is useful to avoid closing the connection to the Redis server unintentionally.

quit

Closes connection to the Redis server.

The following example illustrates use of the quit method:

    $jq->quit;

It does not close the connection to the Redis server if it is an external connection provided to queue constructor as existing Redis object. When using an external connection (eg, $redis = Redis-> new (...);), to close the connection to the Redis server, call $redis->quit after calling this method.

queue_status

Gets queue status from the Redis server. Queue name is obtained from the argument. The argument can be either a string representing a queue name or a Redis::JobQueue::Job object.

Returns a reference to a hash describing state of the queue or a reference to an empty hash when the queue wasn't found.

The following examples illustrate uses of the queue_status method:

    $qstatus = $jq->queue_status( $queue_name );
    # or
    $qstatus = $jq->queue_status( $job );

The returned hash contains the following information related to the queue:

  • length

    The number of jobs in the active queue that are waiting to be selected by "get_next_job" and then processed.

  • all_jobs

    The number of ALL jobs tagged with the queue, i.e. including those that were processed before and other jobs, not presently waiting in the active queue.

  • max_job_age

    The age of the oldest job (the lifetime of the queue) in the active queue.

  • min_job_age

    The age of the youngest job in the active queue.

  • lifetime

    Time it currently takes for a job to pass through the queue.

Statistics based on the jobs that have not yet been removed. Some fields may be missing if the status of the job prevents determining the desired information (eg, there are no jobs in the queue).

queue_length

Gets queue length from the Redis server. Queue name is obtained from the argument. The can be a string representing a queue name or a Redis::JobQueue::Job object.

If queue does not exist, it is interpreted as an empty list and 0 is returned.

The following examples illustrate uses of the queue_length method:

    $queue_length = $jq->queue_length( $queue_name );
    # or
    $queue_length = $jq->queue_status( $job );

DIAGNOSTICS

Use the method to retrieve last error for analysis: "last_errorcode".

A Redis error will cause the program to halt (confess). In addition to errors in the Redis module detected errors "E_MISMATCH_ARG", "E_DATA_TOO_LARGE", "E_JOB_DELETED". All recognizable errors in Redis::JobQueue lead to the installation of the corresponding value in the "last_errorcode" and cause an exception (confess). Unidentified errors cause an exception ("last_errorcode" remains equal to 0). The initial value of $@ is preserved.

The user has the choice:

An Example

This example shows handling for possible errors.

    use 5.010;
    use strict;
    use warnings;

    #-- Common ---------------------------------------------------------------
    use Redis::JobQueue qw(
        DEFAULT_SERVER
        DEFAULT_PORT

        E_NO_ERROR
        E_MISMATCH_ARG
        E_DATA_TOO_LARGE
        E_NETWORK
        E_MAX_MEMORY_LIMIT
        E_JOB_DELETED
        E_REDIS
    );
    use Redis::JobQueue::Job qw(
        STATUS_CREATED
        STATUS_WORKING
        STATUS_COMPLETED
    );

    my $server = DEFAULT_SERVER.':'.DEFAULT_PORT;   # the Redis Server

    # Example of error handling
    sub exception {
        my $jq  = shift;
        my $err = shift;

        if ( $jq->last_errorcode == E_NO_ERROR ) {
            # For example, to ignore
            return unless $err;
        } elsif ( $jq->last_errorcode == E_MISMATCH_ARG ) {
            # Necessary to correct the code
        } elsif ( $jq->last_errorcode == E_DATA_TOO_LARGE ) {
            # You must use the control data length
        } elsif ( $jq->last_errorcode == E_NETWORK ) {
            # For example, sleep
            #sleep 60;
            # and return code to repeat the operation
            #return "to repeat";
        } elsif ( $jq->last_errorcode == E_JOB_DELETED ) {
            # For example, return code to ignore
            my $id = $err =~ /^(\S+)/;
            #return "to ignore $id";
        } elsif ( $jq->last_errorcode == E_REDIS ) {
            # Independently analyze the $err
        } else {
            # Unknown error code
        }
        die $err if $err;
    }

    my $jq;

    eval {
        $jq = Redis::JobQueue->new(
            redis   => $server,
            timeout => 1,   # DEFAULT_TIMEOUT = 0 for an unlimited timeout
        );
    };
    exception( $jq, $@ ) if $@;

    #-- Producer -------------------------------------------------------------
    #-- Adding new job

    my $job;
    eval {
        $job = $jq->add_job(
            {
                queue       => 'xxx',
                workload    => \'Some stuff',
                expire      => 12*60*60,
            }
        );
    };
    exception( $jq, $@ ) if $@;
    say( 'Added job ', $job->id ) if $job;

    eval {
        $job = $jq->add_job(
            {
                queue       => 'yyy',
                workload    => \'Some stuff',
                expire      => 12*60*60,
            }
        );
    };
    exception( $jq, $@ ) if $@;
    say( 'Added job ', $job->id ) if $job;

    #-- Worker ---------------------------------------------------------------

    #-- Run your jobs

    sub xxx {
        my $job = shift;

        my $workload = ${$job->workload};
        # do something with workload;
        say "XXX workload: $workload";

        $job->result( 'XXX JOB result comes here' );
    }

    sub yyy {
        my $job = shift;

        my $workload = ${$job->workload};
        # do something with workload;
        say "YYY workload: $workload";

        $job->result( \'YYY JOB result comes here' );
    }

    eval {
        while ( my $job = $jq->get_next_job(
                queue       => [ 'xxx','yyy' ],
                blocking    => 1,
            ) ) {
            my $id = $job->id;

            my $status = $jq->get_job_data( $id, 'status' );
            say "Job '$id' was '$status' status";

            $job->status( STATUS_WORKING );
            $jq->update_job( $job );

            $status = $jq->get_job_data( $id, 'status' );
            say "Job '$id' has new '$status' status";

            # do my stuff
            if ( $job->queue eq 'xxx' ) {
                xxx( $job );
            } elsif ( $job->queue eq 'yyy' ) {
                yyy( $job );
            }

            $job->status( STATUS_COMPLETED );
            $jq->update_job( $job );

            $status = $jq->get_job_data( $id, 'status' );
            say "Job '$id' has last '$status' status";
        }
    };
    exception( $jq, $@ ) if $@;

    #-- Consumer -------------------------------------------------------------

    #-- Check the job status

    eval {
        # For example:
        # my $status = $jq->get_job_data( $ARGV[0], 'status' );
        # or:
        my @ids = $jq->get_job_ids;

        foreach my $id ( @ids ) {
            my $status = $jq->get_job_data( $id, 'status' );
            say "Job '$id' has '$status' status";
        }
    };
    exception( $jq, $@ ) if $@;

    #-- Fetching the result

    eval {
        # For example:
        # my $id = $ARGV[0];
        # or:
        my @ids = $jq->get_job_ids;

        foreach my $id ( @ids ) {
            my $status = $jq->get_job_data( $id, 'status' );
            say "Job '$id' has '$status' status";

            if ( $status eq STATUS_COMPLETED ) {
                my $job = $jq->load_job( $id );

                # it is now safe to remove it from JobQueue, since it is completed
                $jq->delete_job( $id );

                say 'Job result: ', ${$job->result};
            } else {
                say "Job is not complete, has current '$status' status";
            }
        }
    };
    exception( $jq, $@ ) if $@;

    #-- Closes and cleans up -------------------------------------------------

    eval { $jq->quit };
    exception( $jq, $@ ) if $@;

JobQueue data structure stored in Redis

The following data structures are stored on Redis server:

    #-- To store the job data:
    # HASH    Namespace:id

For example:

    $ redis-cli
    redis 127.0.0.1:6379> KEYS JobQueue:*
    1) "JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783"
    2) "JobQueue:478C81B2-C5B8-11E1-B5B1-16670A986783"
    3) "JobQueue:89116152-C5BD-11E1-931B-0A690A986783"
    #      |                 |
    #   Namespace            |
    #                     Job id (UUID)
    ...
    redis 127.0.0.1:6379> hgetall JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783
    1) "queue"                                  # hash key
    2) "xxx"                                    # the key value
    3) "job"                                    # hash key
    4) "Some description"                       # the key value
    5) "workload"                               # hash key
    6) "Some stuff"                             # the key value
    7) "expire"                                 # hash key
    8) "43200"                                  # the key value
    9) "status"                                 # hash key
    10) "_created_"                             # the key value
    ...

Each call to "add_job" or "update_job" methods renews objects expiration accroding to "expire" attribute (seconds). For example:

    redis 127.0.0.1:6379> TTL JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783
    (integer) 42062

Hash containing job data is deleted when you delete the job ("delete_job" method). Job is also removed from the LIST object.

    # -- To store the job queue (the list created but not yet requested jobs):
    # LIST    JobQueue:queue:queue_name:job_name

For example:

    redis 127.0.0.1:6379> KEYS JobQueue:queue:*
    ...
    4) "JobQueue:queue:xxx"
    5) "JobQueue:queue:yyy"
    #      |       |    |
    #   Namespace  |    |
    #    Fixed key word |
    #            Queue name
    ...
    redis 127.0.0.1:6379> LRANGE JobQueue:queue:xxx 0 -1
    1) "478B9C84-C5B8-11E1-A2C5-D35E0A986783 1344070066"
    2) "89116152-C5BD-11E1-931B-0A690A986783 1344070067"
    #                        |                   |
    #                     Job id (UUID)          |
    #                                      Expire time (UTC)
    ...

Job queue data structures are created automatically when job is placed in the queue and deleted when all jobs are removed from the queue.

DEPENDENCIES

In order to install and use this package it's recommended to use Perl version 5.010 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Redis::JobQueue package:

    Data::UUID
    Digest::SHA1
    List::MoreUtils
    Mouse
    Params::Util
    Redis
    Storable

Redis::JobQueue package has the following optional dependencies:

    Net::EmptyPort
    Test::Deep
    Test::Exception
    Test::NoWarnings
    Test::RedisServer

If the optional modules are missing, some "prereq" tests are skipped.

BUGS AND LIMITATIONS

By design Redis::JobQueue uses freeze before storing job data on Redis (workload, result containers and the custom-named fields). This ensures that among other things, UTF8-encoded strings are safe when passed this way. The other main string job data fields (id, queue, job, status, message) are not processed in any way and passed to Redis as-is. They are designed as an easy and fast way for software developer to store some internal / supplemental data among job details.

For the main string job data fields (id, queue, job, status, message) you can do one of the following:

  • forcefully downgrade string to ASCII (see perldoc utf8) before attempting to pass it to Redis::JobQueue

  • use freeze (Storable) before passing it to Redis::JobQueue

  • store such string as part of workload / result data structures

Needs Redis server version 2.8 or higher as module uses Redis Lua scripting.

The use of maxmemory-policy all* in the redis.conf file could lead to a serious (but hard to detect) problem as Redis server may delete the job queue objects.

It may not be possible to use this module with the cluster of Redis servers because full name of some Redis keys may not be known at the time of the call to Lua script ('EVAL' or 'EVALSHA' command). Redis server may not be able to forward the request to the appropriate node in the cluster.

We strongly recommend using of maxmemory option in the redis.conf file if the data set may be large.

WARN: Not use maxmemory less than for example 70mb (60 connections) for avoid 'used_memory > maxmemory' problem.

The Redis::JobQueue package was written, tested, and found working on recent Linux distributions.

There are no known bugs in this package.

Please report problems to the "AUTHOR".

Patches are welcome.

MORE DOCUMENTATION

All modules contain detailed information on the interfaces they provide.

SEE ALSO

The basic operation of the Redis::JobQueue package modules:

Redis::JobQueue - Object interface for creating and executing jobs queues, as well as monitoring the status and results of jobs.

Redis::JobQueue::Job - Object interface for creating and manipulating jobs.

Redis::JobQueue::Util - String manipulation utilities.

Redis - Perl binding for Redis database.

SOURCE CODE

Redis::JobQueue is hosted on GitHub: https://github.com/TrackingSoft/Redis-JobQueue

AUTHOR

Sergey Gladkov, <sgladkov@trackingsoft.com>

Please use GitHub project link above to report problems or contact authors.

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko

COPYRIGHT AND LICENSE

Copyright (C) 2012-2016 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.