The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

MCE::Queue - Hybrid queues (normal including priority) for Many-core Engine

VERSION

This document describes MCE::Queue version 1.500

SYNOPSIS

   use MCE;
   use MCE::Queue;

   my @dirs = (".");

   my $D = MCE::Queue->new( queue => \@dirs );
   my $F = MCE::Queue->new();

   ## Notice the use of dequeue_nb (non-blocking) for the initial
   ## task and dequeue for the task afterwards. The first task is
   ## recursive.

   my $mce = MCE->new(

      user_tasks => [{
         max_workers => 4,

         task_end => sub {

            ## Signal workers no more work remains. The number 4
            ## indicates the numbers of workers for the 2nd task
            ## performing the read.

            $F->enqueue((undef) x 4);
         },

         user_func => sub {

            ## Pause briefly to allow time for wid 1 to add items.
            select(undef, undef, undef, 0.05) if (MCE->task_wid > 1);

            ## Worker will loop until no more directories.
            while (defined (local $_ = $D->dequeue_nb)) {
               my ($files, $dirs) = part { -d $_ ? 1 : 0 } glob("$_/*");

               $D->enqueue(@$dirs ) if defined $dirs;
               $F->enqueue(@$files) if defined $files;
            }

            MCE->say("STDERR", "(D) worker has ended");

            return;
         }

      },{
         max_workers => 4,

         user_func => sub {

            ## Worker will loop until no more files.
            while (defined (local $_ = $F->dequeue)) {
               MCE->say($_);
            }

            MCE->say("STDERR", "(F) worker has ended");

            return;
         }
      }]
   );

   $mce->run;

DESCRIPTION

This module provides a queue interface supporting normal and priority queues and utilizing the IPC engine behind MCE. Queue data resides under the manager process. MCE::Queue also allows for a worker to create any number of queues locally not available to other workers including the manager processes. Think of a CPU having L3 (shared) and L1 (local) cache.

The structure for the MCE::Queue object is provided below. It allows for normal queues to run as fast as an array. Data for priority queues are also nearly as fast due to having a brief lookup if the priority exists in the hash including adding/removal of the key. The heap array contains only priorities, not the data itself. This makes the management of the heap order only as necessary while running.

   ## Normal queue data
   $_queue->{_datq} = [];

   ## Priority data { p1 => [ ], p2 => [ ], pN => [ ] }
   $_queue->{_datp} = {};

   ## Priority heap [ pN, p2, p1 ] ## in heap order
   ## fyi, _datp will always dequeue before _datq
   $_queue->{_heap} = [];

   ## Priority order (default)
   $_queue->{_porder} = $MCE::Queue::HIGHEST;

   ## Priority type (default)
   $_queue->{_type} = $MCE::Queue::FIFO;

IMPORT

Two options are available for overriding the default value used when creating new queues (porder applies to priority queues only).

   use MCE::Queue porder => $MCE::Queue::HIGHEST,
                  type   => $MCE::Queue::FIFO;

   use MCE::Queue;       ## same as above

       porder => $HIGHEST = Highest priority items are dequeued first
                 $LOWEST  = Lowest priority items are dequeued first

       type   => $FIFO    = First in, first out
                 $LILO    =    (Synonym for FIFO)
                 $LIFO    = Last in, first out
                 $FILO    =    (Synonym for LIFO)

THREE RUN MODES

MCE::Queue can be utilized under the following conditions:

    A) use MCE;           B) use MCE::Queue;    C) use MCE::Queue;
       use MCE::Queue;       use MCE;
A) Loading MCE prior to inclusion of MCE::Queue

The dequeue method blocks for the manager process including workers. All data resides under the manager process. Workers send/request data through IPC.

Creating a queue from the worker process will cause the queue to run in local mode. The data resides under the worker process and not available to other workers or even the manager process.

B) Loading MCE::Queue prior to inclusion of MCE

Queues behave as if running in local mode for the manager including workers for the duration of the script. I cannot think of a use-case for this, but wanted to mention the behavior in the event MCE::Queue is loaded prior to MCE.

C) Loading MCE::Queue without MCE

The dequeue method is non-blocking in this mode. This behaves like local mode when MCE is not present. As with local queuing, this mode is speedy due to minimum overhead and zero IPC.

Basically, the MCE module is not a requirement for using MCE::Queue.

API DOCUMENTATION

->new ( [ queue => \@array ] )

This creates a new queue. Available options are queue, porder, type, and gather. The gather option is mainly for running with MCE and wanting to pass item(s) to a callback function for adding to the queue.

   use MCE;
   use MCE::Queue;

   my $q1 = MCE::Queue->new();
   my $q2 = MCE::Queue->new( queue => [ 0, 1, 2 ] );

   my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
   my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST  );

   my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO );
   my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO );

Multiple queues may point to the same callback function. Please note that the first argument for the callback function is the queue object itself.

   sub _append {
      my ($Q, @items) = @_;
      $Q->enqueue(@items);
   }

   my $q7 = MCE::Queue->new( gather => \&_append );
   my $q8 = MCE::Queue->new( gather => \&_append );

   ## Items are diverted to the gather callback function.
   $q7->enqueue( 'apple', 'orange' );

The gather option is useful when wanting to temporarily store items into a holding area until output order can be obtained. Although a queue is not required to gather data in MCE, this is simply a demonstration of the gather option in the context of a queue.

   use MCE;
   use MCE::Queue;

   my ($_order_id, %_tmp);

   sub _preserve_order {
      my ($Q, $chunk_id, $result) = @_;

      $_tmp{$chunk_id} = $result;

      while (1) {
         last unless exists $_tmp{$_order_id};
         $Q->enqueue( $_tmp{$_order_id} );
         delete $_tmp{$_order_id++};
      }

      return;
   }

   my @squares; my $q = MCE::Queue->new(
      queue => \@squares, gather => \&_preserve_order
   );

   $_order_id = 1;  ## The first chunk_id equals 1;

   my $mce = MCE->new(
      chunk_size => 1, input_data => [ 1 .. 100 ],
      user_func => sub {
         $q->enqueue( MCE->chunk_id, $_ * $_ );
      }
   );

   $mce->run;

   print "@squares\n";
->clear ( void )

Clears the queue of any items. This has the effect of nulling the queue. Each queue comes with a socket used for blocking behind the scene. Use the clear method when wanting to clear the content of the array.

   my @a; my $q = MCE::Queue->new( queue => \@a );

   @a = ();     ## no, the block socket may become out of sync
   $q->clear;   ## ok
->enqueue ( $item [, $item, ... ] )

Adds a list of items onto the end of the normal queue.

->enqueuep ( $p, $item [, $item, ... ] )

Adds a list of items onto the end of the priority queue with priority.

->dequeue ( [ $count ] )

Returns the requested number of items (default is 1) from the queue. Priority data will always dequeue first from the priority queue before any data from the normal queue.

The method will block if the queue contains zero items. If the queue contains fewer than the requested number of items, the method will not block, but return the remaining items and undef for up to the count requested.

The $count, used for requesting the number of times, is beneficial when workers are passing arguments through the queue. For this release, always remember to dequeue using the same multiple for count. This is unlike Thread::Queue which will block until the requested number of items are available.

->dequeue_nb ( [ $count ] )

Returns the requested number of items (default is 1) from the queue. Like with dequeue, priority data will always dequeue first. This method is non-blocking and will return undef in the absence of data from the queue.

->insert ( $index, $item [, $item, ... ] )

Adds the list of items to the queue at the specified index.

->insertp ( $p, $index, $item [, $item, ... ] )

Adds the list of items to the queue at the specified index with priority.

->pending ( void )

Returns the number of items remaining in the queue. This includes both normal and priority data.

->peek ( [ $index ] )

Returns an item from the normal queue, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified.

->peekp ( $p [, $index ] )

Returns an item from the queue with priority, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified.

->peekh ( [ $index ] )

Returns an item from the heap, at the specified index.

->heap ( void )

Returns an array containing the heap data. Heap data consists of priority numbers, not the data.

ACKNOWLEDGEMENTS

The main reason for writing MCE::Queue was to have Thread::Queue-like module for workers spawned as children with MCE. When searching for queue modules on CPAN, I was pleasantly surprised at the number of modules out there for Perl. What stood out immediately were all the priority queues, heap queues, and whether or not FIFO/LIFO or highest/lowest first options were available. It seemed like a daunting task to undertake. And so, I began, failed, tried again, failed again, and finally on the 3rd attempt was pleased with the results.

The following provides a list of resources I've read in helping me create MCE::Queue for MCE.

POE::Queue::Array

Two if statements were borrowed for checking if the item belongs at the end or head of the queue.

List::Binary::Search

After glancing over the bsearch_num_pos method for returning the best insert position, a couple variations of that were in order for MCE::Queue to accommodate the highest/lowest order routines.

The private _heap_insert_low and _heap_insert_high methods inside MCE::Queue are simply the 2 if statements and the binary search algorithm.

Heap-Priority, List::Priority

At this point, I thought why not have both normal queues and priority queues be efficient. And with that in mind, also provide options to allow folks to choose LIFO/LILO, and highest/lowest order for the queue. The data structure in MCE::Queue is describe above.

MCE workers may also benefit from being able to create local queues not available to other workers including the manager process. Thus, the reason for the 3 run modes described at the beginning of this document.

Thread::Queue

Being that MCE supports both children and threads, Thread::Queue was used as a guide for naming and documenting the methods in MCE::Queue. Although not 100% compatible, pay close attention to the dequeue method in MCE::Queue when requesting the number of items to dequeue.

   ->enqueuep( $p, $item [, $item, ... ] );    ## Extension (p)
   ->enqueue( $item [, $item, ... ] );

   ->dequeue( [ $count ] );      ## Priority data dequeues first
   ->dequeue_nb( [ $count ] );

   ->pending();                  ## Counts both normal/priority data
                                 ## in the queue
Parallel-DataPipe

The idea for the recursive synopsis used in this document came from reading the example seen in this module's documentation.

INDEX

MCE

AUTHOR

Mario E. Roy, <marioeroy AT gmail DOT com>

LICENSE

This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.