IPC::DirQueue - disk-based many-to-many task queue


    my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });

    my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
    my $job = $dq->pickup_queued_job();
    if (!$job) { print "no jobs left\n"; exit; }
    # something interesting with $job->get_data_path() ...


This module implements a FIFO queueing infrastructure, using a directory as the communications and storage media. No daemon process is required to manage the queue; all communication takes place via the filesystem.

A common UNIX system design pattern is to use a tool like lpr as a task queueing system; for example, describes the use of lpr as an MP3 jukebox.

However, lpr isn't as efficient as it could be. When used in this way, you have to restart each task processor for every new task. If you have a lot of startup overhead, this can be very inefficient. With IPC::DirQueue, a processing server can run persistently and cache data needed across multiple tasks efficiently; it will not be restarted unless you restart it.

Multiple enqueueing and dequeueing processes on multiple hosts (NFS-safe locking is used) can run simultaneously, and safely, on the same queue.

Since multiple dequeuers can run simultaneously, this provides a good way to process a variable level of incoming tasks using a pre-defined number of worker processes.

If you need more CPU power working on a queue, you can simply start another dequeuer to help out. If you need less, kill off a few dequeuers.

If you need to take down the server to perform some maintainance or upgrades, just kill the dequeuer processes, perform the work, and start up new ones. Since there's no 'socket' or similar point of failure aside from the directory itself, the queue will just quietly fill with waiting jobs until the new dequeuer is ready.

Arbitrary 'name = value' metadata pairs can be transferred alongside data files. In fact, in some cases, you may find it easier to send unused and empty data files, and just use the 'metadata' fields to transfer the details of what will be worked on.


$dq->new ($opts);

Create a new batch-queue object, suitable for either enqueueing jobs or picking up already-queued jobs for processing.

$opts is a reference to a hash, which may contain the following options:

dir => $path_to_directory (no default)

Name the directory where the queue files are stored. This is required.

data_file_mode => $mode (default: 0666)

The chmod-style file mode for data files. This should be specified as a string with a leading 0. It will be affected by the current process umask.

queue_file_mode => $mode (default: 0666)

The chmod-style file mode for queue control files. This should be specified as a string with a leading 0. It will be affected by the current process umask.

$dq->enqueue_file ($filename [, $metadata [, $pri] ] );

Enqueue a new job for processing. Returns 1 if the job was enqueued, or undef on failure.

$metadata is an optional hash reference; every item of metadata will be available to worker processes on the IPC::DirQueue::Job object, in the $job-{metadata}> hashref. Note that using this channel for metadata brings with it several restrictions:

1. it requires that the metadata be stored as 'name' => 'value' string pairs

2. neither 'name' nor 'value' may contain newline (\r) or NUL (\0) characters

3. 'name' cannot contain colon characters

4. 'name' cannot start with a capital 'Q' and be 4 characters in length

If those restrictions are broken, that metadatum will be silently dropped.

An optional priority can be specified; lower priorities are run first. Priorities range from 0 to 100, and 50 is default.

$dq->enqueue_fh ($filename [, $metadata [, $pri] ] );

Enqueue a new job for processing. Returns 1 if the job was enqueued, or undef on failure. $pri and $metadata are as described in $dq-enqueue_file()>.

$dq->enqueue_string ($string [, $metadata [, $pri] ] );

Enqueue a new job for processing. The job data is entirely read from $string. Returns 1 if the job was enqueued, or undef on failure. $pri and $metadata are as described in $dq-enqueue_file()>.

$job = $dq->pickup_queued_job();

Pick up the next job in the queue, so that it can be processed.

If no job is available for processing, either because the queue is empty or because other worker processes are already working on them, undef is returned; otherwise, a new instance of IPC::DirQueue::Job is returned.

Note that the job is marked as active until $job-finish()> is called.

$job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]);

Wait for a job to be queued within the next $timeout seconds.

If there is already a job ready for processing, this will return immediately. If one is not available, it will sleep, wake up periodically, check for job availabilty, and either carry on sleeping or return the new job if one is now available.

If a job becomes available, a new instance of IPC::DirQueue::Job is returned. If the timeout is reached, undef is returned.

If $timeout is not specified, or is less than 1, this function will wait indefinitely.

The optional parameter $pollinterval indicates how frequently to wake up and check for new jobs. It is specified in seconds, and floating-point precision is supported. The default is 1.

Note that if $timeout is not a round multiple of $pollinterval, the nearest round multiple of $pollinterval greater than $timeout will be used instead. Also note that $timeout is used as an integer.

$job = $dq->visit_all_jobs($visitor, $visitcontext);

Visit all the jobs in the queue, in a read-only mode. Used to list the entire queue.

The callback function $visitor will be called for each job in the queue, like so:

  &$visitor ($visitcontext, $job);

$visitcontext is whatever you pass in that variable above. $job is a new, read-only instance of IPC::DirQueue::Job representing that job.

If a job is active (being processed), the $job object also contains the following additional data:

  'active_host': the hostname on which the job is active
  'active_pid': the process ID of the process which picked up the job


If interrupted or terminated, dequeueing processes should be careful to either call $job-finish()> or $job-return_to_queue()> on any active tasks before exiting -- otherwise those jobs will remain marked active.

Stale locks are normally dealt with automatically. If a lock is still active after about 10 minutes of inactivity, the other dequeuers on that machine will probe the process ID listed in that lock file using kill(0). If that process ID is no longer running, the lock is presumed likely to be stale. If a given timeout (10 minutes plus a random value between 0 and 256 seconds) has elapsed since the lock file was last modified, the lock file is deleted.

Note: this means that if the dequeueing processes are spread among multiple machines, and there is no longer a dequeuer running on the machine that initially 'locked' the task, it will never be unlocked.




Justin Mason <dq /at/>


IPC::DirQueue is distributed under the same license as perl itself.


The latest version of this library is likely to be available from CPAN.