The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Forks::Queue - queue that can be shared safely across processes

VERSION

0.15

SYNOPSIS

  use Forks::Queue;
  $q = Forks::Queue->new( impl => ..., style => 'lifo' );

  # put items on queue
  $q->put("a scalar item");
  $q->put(["an","arrayref","item"]);
  $q->put({"a"=>"hash","reference"=>"item"});
  $q->put("list","of","multiple",["items"]);
  $q->end;        # no more jobs will be added to queue

  # retrieve items from queue, possibly after a fork
  $item = $q->get;
  $item = $q->peek;      # get item without removing it
  @up_to_10_items = $q->get(10);
  $remaining_items = $q->pending;

DESCRIPTION

Interface for a queue object that can be shared across processes and threads. Available implementations are Forks::Queue::File, Forks::Queue::Shmem, Forks::Queue::SQLite.

METHODS

Many of these methods pass or return "items". For this distribution, an "item" is any scalar or reference that can be serialized and shared across processes.

This will include scalars and most unblessed references

  "42"
  [1,2,3,"forty-two"]
  { name=>"a job", timestamp=>time, input=>{foo=>[19],bar=>\%bardata} }

but will generally preclude data with blessed references and code references

  { name => "bad job", callback => \&my_callback_routine }
  [ 46, $url13, File::Temp->new ]

Many of the methods of Forks::Queue have analogues in the Thread::Queue class, and many scripts using Thread::Queue can be easily transformed to use Forks::Queue.

new

    $queue = Forks::Queue->new( %opts )

Instantiates a new queue object with the given configuration.

If one of the options is impl, the constructor from that Forks::Queue subclass will be invoked.

Other options that should be supported on all implementations include

  • style

  • style => 'fifo' | 'lifo'

    Indicates whether the "get" method will return items in first-in-first-out order or last-in-first-out order (and which end of the queue the "peek" method will examine)

  • limit

  • limit => int

    A maximum size for the queue. Set to a non-positive value to specify an unlimited size queue.

  • on_limit

  • on_limit => 'block' | 'fail' | 'tq-compat'

    Dictates what the queue should do when an attempt is made to add items beyond the queue's limit. If block, the queue will block and wait until items are removed from the queue. If fail, the queue will warn and return immediately without changing the queue.

    The setting tq-compat is similar to block, but has the additional effect where the "insert" method operates without regard to the queue limit. This behavior is compatible with the way queue limits and the insert method work in the Thread::Queue package.

    See the "enqueue", "put", "enqueue", "push", "unshift", and "insert" methods, which increase the length of the queue and may be affected by this setting.

  • join

  • join => bool

    If true, expects that the queue referred to by this constructor has already been created in another process, and that the current process should access the existing queue. This allows a queue to be shared across unrelated processes (i.e., processes that do not have a parent-child relationship).

      # my_daemon.pl - may run "all the time" in the background
      $q = Forks::Queue::File->new(file=>'/var/spool/foo/q17');
      # creates new queue object
      ... 
    
      # worker.pl - may run periodically for a short time, launched from
      #             cron or from command line, but not from the daemon
      $q = Forks::Queue->new( impl => 'File', join => 1,
                              file => '/var/spool/foo/q17',
      # the new queue attaches to existing file at /var/spool/foo/q17
      ...

    join is not necessary for child processes forked from a process with an existing queue

      $q = Forks::Queue->new(...)
      ...
      if (fork() == 0) {
          # $q already exists and the child process can begin using it,
          # no need for a  Forks::Queue  constructor with  join
          ...
      }
  • persist

  • persist => bool

    Active Forks::Queue objects affect your system, writing to disk or writing to memory, and in general they clean themselves up when they detect that no more processes are using the queue. The persist option, if set to true, instructs the queue object to leave its state intact after destruction.

    An obvious use case for this option is debugging, to examine the state of the queue after abnormal termination of your program.

    A second use case is to create persistent queues -- queues that are shared not only among different processes, but among different processes that are running at different times. The persistent queue can be used by supplying both the persist and the join options to the Forks::Queue constructor.

        $queue_file = "/tmp/persistent.job.queue";
        $join = -f $queue_file;
        $q = Forks::Queue->new( impl => 'File', file => $queue_file,
                                join => $join, persist => 1 );
        ... work with the queue ...
        # the queue remains intact if this program exits or aborts
  • list => ARRAYREF

    Initializes the contents of the queue with the argument to the list option. The argument must be an array reference.

    If the join option is specified, the contents of the list could be added to an already existing queue.

See the global "%OPTS" variable for information about default values for many of these settings.

put

enqueue

    $count = $queue->put(@items);
    $count = $queue->enqueue(@items)

Place one or more "items" on the queue, and returns the number of items successfully added to the queue.

Adding items to the queue will fail if the "end" method of the queue had previously been called from any process.

See the "limit" method to see how the put method behaves when adding items would cause the queue to exceed its maximum size.

The enqueue method name is provided for compatibility with Thread::Queue.

push

    $count = $queue->push(@items)

Equivalent to "put", adding items to the end of the queue and returning the number of items successfully added. The most recent items appended to the queue by push or put will be the first items taken from the queue by "pop" or by "get" with LIFO style queues, and the last items removed by "shift" or "get" with FIFO style queues.

If the items added to the queue would cause the queue to exceed its queue size limit (as determined by the "limit" attribute), this method will either block until queue capacity is available, or issue a warning about the uninserted items and return the number of items added, depending on the queue's setting for "on_limit".

unshift

    $count = $queue->unshift(@items)

Equivalent to insert(0,@items), adding items to the front of the queue, and returning the number of items successfully added. In FIFO queues, items added to the queue with unshift will be the last items taken from the queue by "get", and in LIFO queues, they will be the first items taken from the queue by "get".

This method is inefficient for some queue implementations.

end

    $queue->end

Indicates that no more items are to be put on the queue, so that when a process tries to retrieve an item from an empty queue, it will not block and wait for new items to be added. Causes any processes blocking on a "get"/"dequeue"/"shift"/"pop" call to become unblocked and return undef. This method may be called from any process that has access to the queue.

Calling end on a queue more than once will generate a warning message, even if the caller is not the same process/thread that made the original end call.

get

dequeue

    $item = $queue->get;
    $item = $queue->dequeue;

    @items = $queue->get($count);
    @items = $queue->dequeue($count);

Attempt to retrieve one or more "items" on the queue. If the queue is empty, and if "end" has not been called on the queue, this call blocks until an item is available or until the "end" method has been called from some other process. If the queue is empty and "end" has been called, this method returns an empty list in list context or undef in scalar context.

If a $count argument is supplied, returns up to $count items or however many items are currently availble on the queue, whichever is fewer. But the call still blocks if "end" has not been called until there is at least one item available. See "get_nb" for a non-blocking version of this method. The return value of this function when a $count argument is supplied is always a list, so if you evaluate it in scalar context you will get the number of items retrieved from the queue, not the items themselves.

    $job = $q->get;         # $job is an item from the queue
    $job = $q->get(1);      # returns # of items retrieved, not an actual item!
    ($job) = $q->get(1);    # $job is an item from the queue

The only important difference between get and dequeue is what happens when there is a $count argument, and the queue currently has more than zero but fewer than $count items available. In this case, the get call will return all of the available items. The dequeue method will block until at least $count items are available on the queue, or until the "end" method has been called on the queue. This dequeue behavior is consistent with the behavior of the "dequeue" method in Thread::Queue.

pop

    $item = $queue->pop
    @items = $queue->pop($count)

Retrieves one or more items from the "back" of the queue. For LIFO style queues, the "get" method is equivalent to this method. Like "get", this method blocks while the queue is empty and the "end" method has not been called on the queue.

If a $count argument is supplied, returns up to $count items or however many items are currently available on the queue, whichever is fewer. (Like the "get" call, this method blocks when waiting for input. See "pop_nb" for a non-blocking version of the method. Also like "get", you should be wary of using this method in scalar context if you provide a $count argument).

shift

    $item = $queue->shift
    @items = $queue->shift($count)

Retrieves one or more items from the "front" of the queue. For FIFO style queues, the "get" method is equivalent to this method. Like "get", this method blocks while the queue is empty and the "end" method has not been called on the queue.

If a $count argument is supplied, returns up to $count items or however many items are currently available on the queue, whichever is fewer. (Like the "get" call, this method blocks when waiting for input. See "shift_nb" for a non-blocking version of the method. Also like "get", you should be wary of using this method in scalar context if you provide a $count argument).

get_nb

dequeue_nb

pop_nb

shift_nb

    $item = $queue->XXX_nb
    @items = $queue->XXX_nb($count)

Non-blocking versions of the "get", "dequeue", "pop", and "shift" methods. These functions return immediately if there are no items in the queue to retrieve, returning undef in the case with no arguments and an empty list when a $count argument is supplied.

get_timed

dequeue_timed

shift_timed

pop_timed

    $item = $queue->XXX_timed($timeout)
    @item = $queue->XXX_timed($timeout,$count)

Timed versions of "get", "dequeue", "shift", and "pop" that take a $timeout argument and will stop blocking after $timeout seconds have elapsed.

If a $count argument is supplied to dequeue_timed, the function will wait up to $timeout seconds for at least $count items to be available on the queue. After $timeout seconds have passed, the function will return up to $count available items.

For other timed methods, supplying a $count argument for a queue with more than zero but fewer than $count items available will return all available items without blocking.

peek

    $item = $queue->peek
    $item = $queue->peek($index)
    $item = $queue->peek_front
    $item = $queue->peek_back

Returns an item from the queue without removing it. The peek_front and peek_back methods inspect the item at the front and the back of the queue, respectively. The generic peek method is equivalent to peek_front for FIFO style queues and peek_back for LIFO style queues. If an index is specified, returns the item at that position in the queue (where position 0 is the head of the queue). Negative indices are supported, so a call to $queue->peek(-2), for example, would return the second to last item in the queue.

If the queue is empty or if the specified index is larger than the number of elements currently in the queue, these methods will return undef without blocking.

Note that unlike the "peek" method in Thread::Queue, Forks::Queue::peek returns a copy of the item on the queue, so manipulating a reference returned from peek while not affect the item on the queue.

extract

    $item = $queue->extract
    $item = $queue->extract($index)
    @items = $queue->extract($index,$count)

Removes and returns the specified number of items from the queue at the specified index position, to provide random access to the queue. The method is non-blocking and may return fewer than the number of items requested (or zero items) if there are not enough items in the queue to satisfy the request.

If the $count argument is not provided, the method will return (if available) a single item. If the $index argument is also not provided, it will return the first item on the queue exactly like the "get_nb" method with no arguments.

Negative $index values are supported, in which case this method will extract the corresponding items at the back of the queue.

Like get() vs. get($count), the return value is always a scalar when no $count argument is provided, and always a list when it is.

insert

    $count = $queue->insert($index, @list)

Provides random access to the queue, inserting the items specified in @list into the queue after index position $index. Negative $index values are supported, which indicate that the items should be inserted after that position relative to the back of the queue. Returns the number of items that were inserted into the queue.

If the queue has a "limit" set, and inserting all the items on the list would cause the queue size to exceed the limit, the setting of the queue's "on_limit" parameter will govern how this method will behave. See the "on_limit" setting for details.

This method is inefficient for some queue implementations.

pending

    $num_items_avail = $queue->pending

Returns the total number of items available on the queue. There is no guarantee that the number of available items will not change between a call to pending and a subsequent call to "get"

clear

    $queue->clear

Removes all items from the queue.

status

    $status = $queue->status

Returns a hash reference with meta information about the queue. The information should at least include the number of items remaining in the queue. Other implementations may provide additional information in this return value.

limit

    $max = $queue->limit
    $queue->limit($new_limit)
    $queue->limit($new_limit,$on_limit)
    $queue->limit = $new_limit    # limit as lvalue requires Perl >=v5.14

Returns or updates the maximum size of the queue. With no args, returns the existing maximum queue size, with a non-positive value indicating that the queue does not have a maximum size.

The return value also acts as an lvalue through which the maximum queue size can be set, and allows the limit method to be used in the same way as the limit method in Thread::Queue. Note: lvalue feature rqeuires Perl v5.14 or better.

If arguments are provided, the first argument is used to set the maximum queue size. A non-positive queue size can be specified to indicate that the queue does not have a maximum size.

The second argument, if provided, updates the behavior of the queue when an attempt is made to add items beyond the maximum size. See "on_limit" for the recognized values of this argument and how they affect the behavior of the "put"/"push"/"enqueue", "unshift", "insert", and "dequeue" methods.

VARIABLES

%OPTS

Global hash containing the set of default options for all Forks::Queue constructors. Initially this hash contains the key-value pairs

        impl            "File"
        style           "fifo"
        limit           -1
        on_limit        "fail"

but they may be changed at any time to affect all subsequently constructed Forks::Queue objects. The global options can also be set at import time with additional arguments for the use statement.

    use Forks::Queue impl => 'SQLite';    # use SQLite queues by default
    $Forks::Queue::OPTS{impl} = 'SQLite'; # equivalent run-time call
    
    use Forks::Queue
        on_limit => 'block', limit => 10; # finite, blocking queues by default
    $Forks::Queue::OPTS{limit} = 10;
    $Forks::Queue::OPTS{on_limit} = 'block';  # equivalent run-time calls

ENVIRONMENT

Some environment variable settings that can affect this module:

  • FORKS_QUEUE_IMPL

    Specifies a default implementation to use, overriding the initial setting of $Forks::Queue::OPTS{"impl"}, in cases where the Forks::Queue constructor is invoked without passing an impl option.

  • FORKS_QUEUE_DEBUG

    If set to a true value, outputs information about the activity of the queues to standard error.

  • FORKS_QUEUE_NOTIFY

    If set to a false value, disables use of signals on POSIX-y platforms that may help improve queue performance

  • FORKS_QUEUE_DIR

    Specifies a directory to use for temporary queue files in the File and SQLite implementations. If this directory is not specified, the implementations will try to make a reasonable choice based on your platform and other environment settings.

DEPENDENCIES

The Forks::Queue module and all its current implementations require the JSON module.

SEE ALSO

Thread::Queue, File::Queue, Queue::Q, MCE::Queue, Queue::DBI, Directory::Queue.

SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc Forks::Queue

You can also look for information at:

LICENSE AND COPYRIGHT

Copyright (c) 2017-2020, Marty O'Brien.

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.10.1 or, at your option, any later version of Perl 5 you may have available.

See http://dev.perl.org/licenses/ for more information.