The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

MCE::Queue - Hybrid queues (normal including priority) for Many-core Engine

VERSION

This document describes MCE::Queue version 1.519

SYNOPSIS

   use MCE;
   use MCE::Queue;

   my $F = MCE::Queue->new(fast => 1);
   my $consumers = 8;

   my $mce = MCE->new(

      task_end => sub {
         my ($mce, $task_id, $task_name) = @_;

         $F->enqueue((undef) x $consumers)
            if $task_name eq 'dir';
      },

      user_tasks => [{
         max_workers => 1, task_name => 'dir',

         user_func => sub {
            ## Create a "standalone queue" only accessable to this worker.
            ## See included examples for running with multiple workers.
            my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]);

            while (defined (my $dir = $D->dequeue_nb)) {
               my (@files, @dirs); foreach (glob("$dir/*")) {
                  if (-d $_) { push @dirs, $_; next; }
                  push @files, $_;
               }
               $D->enqueue(@dirs ) if scalar @dirs;
               $F->enqueue(@files) if scalar @files;
            }
         }
      },{
         max_workers => $consumers, task_name => 'file',

         user_func => sub {
            while (defined (my $file = $F->dequeue)) {
               MCE->say($file);
            }
         }
      }]

   )->run({ user_args => [ $ARGV[0] || '.' ] });

   __END__

   Results from files_mce.pl and files_thr.pl; included with MCE.
   Usage:
      time ./files_mce.pl /usr 0 | wc -l
      time ./files_mce.pl /usr 1 | wc -l
      time ./files_thr.pl /usr   | wc -l

   Darwin (OS)    /usr:    216,271 files
      MCE::Queue, fast => 0 :    4.17s
      MCE::Queue, fast => 1 :    2.62s
      Thread::Queue         :    4.14s

   Linux (VM)     /usr:    186,154 files
      MCE::Queue, fast => 0 :   12.57s
      MCE::Queue, fast => 1 :    3.36s
      Thread::Queue         :    5.91s

   Solaris (VM)   /usr:    603,051 files
      MCE::Queue, fast => 0 :   39.04s
      MCE::Queue, fast => 1 :   18.08s
      Thread::Queue      * Perl not built to support threads

DESCRIPTION

This module provides a queue interface supporting normal and priority queues and utilizing the IPC engine behind MCE. Data resides under the manager process. MCE::Queue also allows for a worker to create any number of queues locally not available to other workers including the manager process. Think of a CPU having L3 (shared) and L1 (local) cache.

The structure for the MCE::Queue object is provided below. It allows for normal queues to run as fast as an array. Data for priority queues are also nearly as fast due to having a brief lookup if the priority exists in the hash including adding/removal of the key. The heap array contains only priorities, not the data itself. This makes the management of the heap order only as necessary while running.

   ## Normal queue data
   $_queue->{_datq} = [];

   ## Priority data { p1 => [ ], p2 => [ ], pN => [ ] }
   $_queue->{_datp} = {};

   ## Priority heap [ pN, p2, p1 ] ## in heap order
   ## fyi, _datp will always dequeue before _datq
   $_queue->{_heap} = [];

   ## Priority order (default)
   $_queue->{_porder} = $MCE::Queue::HIGHEST;

   ## Priority type (default)
   $_queue->{_type} = $MCE::Queue::FIFO;

IMPORT

Three options are available for overriding the default value for new queues (porder applies to priority queues only).

   use MCE::Queue porder => $MCE::Queue::HIGHEST,
                  type   => $MCE::Queue::FIFO,
                  fast   => 0;

   use MCE::Queue;       ## same as above

       porder => $HIGHEST = Highest priority items are dequeued first
                 $LOWEST  = Lowest priority items are dequeued first

       type   => $FIFO    = First in, first out
                 $LILO    =    (Synonym for FIFO)
                 $LIFO    = Last in, first out
                 $FILO    =    (Synonym for LIFO)

THREE RUN MODES

MCE::Queue can be utilized under the following conditions:

    A) use MCE;           B) use MCE::Queue;    C) use MCE::Queue;
       use MCE::Queue;       use MCE;
A) Loading MCE prior to inclusion of MCE::Queue

The dequeue method blocks for the manager process including workers. All data resides under the manager process. Workers send/request data through IPC.

Creating a queue from the worker process will cause the queue to run in local mode. The data resides under the worker process and not available to other workers including the manager process.

B) Loading MCE::Queue prior to inclusion of MCE

Queues behave as if running in local mode for the manager including workers for the duration of the script. I cannot think of a use-case for this, but wanted to mention the behavior in the event MCE::Queue is loaded prior to MCE.

C) Loading MCE::Queue without MCE

The dequeue method is non-blocking in this fashion. This behaves like local mode when MCE is not present. As with local queuing, this mode is speedy due to minimum overhead and zero IPC.

Essentially, the MCE module is not a prerequisite for using MCE::Queue.

API DOCUMENTATION

->new ( [ queue => \@array ] )

This creates a new queue. Available options are queue, porder, type, fast, and gather. The gather option is mainly for running with MCE and wanting to pass item(s) to a callback function for adding to the queue.

The 'fast' option speeds up ->dequeue ops and not enabled by default. It is beneficial for queues not calling ->clear or ->dequeue_nb and not altering the optional count value while running; e.g. ->dequeue($count). Basically, do not enable 'fast' if varying $count dynamically.

   use MCE;
   use MCE::Queue;

   my $q1 = MCE::Queue->new();
   my $q2 = MCE::Queue->new( queue => [ 0, 1, 2 ] );

   my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
   my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST  );

   my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO );
   my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO );

   my $q7 = MCE::Queue->new( fast => 1 );

Multiple queues may point to the same callback function. Please note that the first argument for the callback function is the queue object itself.

   sub _append {
      my ($Q, @items) = @_;
      $Q->enqueue(@items);
   }

   my $q7 = MCE::Queue->new( gather => \&_append );
   my $q8 = MCE::Queue->new( gather => \&_append );

   ## Items are diverted to the gather callback function.
   $q7->enqueue( 'apple', 'orange' );

The gather option is useful when wanting to temporarily store items in a holding area until output order can be obtained. Although a queue is not required to gather data in MCE, this is simply a demonstration of the gather option in the context of a queue.

   use MCE;
   use MCE::Queue;

   my ($_order_id, %_tmp);

   sub _preserve_order {
      my ($Q, $chunk_id, $result) = @_;

      $_tmp{$chunk_id} = $result;

      while (1) {
         last unless exists $_tmp{$_order_id};
         $Q->enqueue( $_tmp{$_order_id} );
         delete $_tmp{$_order_id++};
      }

      return;
   }

   my @squares; my $q = MCE::Queue->new(
      queue => \@squares, gather => \&_preserve_order
   );

   $_order_id = 1;  ## The first chunk_id equals 1;

   my $mce = MCE->new(
      chunk_size => 1, input_data => [ 1 .. 100 ],
      user_func => sub {
         $q->enqueue( MCE->chunk_id, $_ * $_ );
      }
   );

   $mce->run;

   print "@squares\n";
->clear ( void )

Clears the queue of any items. This has the effect of nulling the queue. Each queue comes with a socket used for blocking behind the scene. Use the clear method when wanting to clear the content of the array.

   my @a; my $q = MCE::Queue->new( queue => \@a );

   @a = ();     ## no, the block socket may become out of sync
   $q->clear;   ## ok
->enqueue ( $item [, $item, ... ] )

Appends a list of items onto the end of the normal queue.

->enqueuep ( $p, $item [, $item, ... ] )

Appends a list of items onto the end of the priority queue with priority.

->dequeue ( [ $count ] )

Returns the requested number of items (default is 1) from the queue. Priority data will always dequeue first from the priority queue before any data from the normal queue.

The method will block if the queue contains zero items. If the queue contains fewer than the requested number of items, the method will not block, but return the remaining items and undef for up to the count requested.

The $count, used for requesting the number of items, is beneficial when workers are passing parameters through the queue. For this release, always remember to dequeue using the same multiple for the count. This is unlike Thread::Queue which will block until the requested number of items are available.

->dequeue_nb ( [ $count ] )

Returns the requested number of items (default is 1) from the queue. Like with dequeue, priority data will always dequeue first. This method is non-blocking and will return undef in the absence of data from the queue.

->insert ( $index, $item [, $item, ... ] )

Adds the list of items to the queue at the specified index.

->insertp ( $p, $index, $item [, $item, ... ] )

Adds the list of items to the queue at the specified index with priority.

->pending ( void )

Returns the number of items in the queue. This includes both normal and priority data.

->peek ( [ $index ] )

Returns an item from the normal queue, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified.

->peekp ( $p [, $index ] )

Returns an item from the queue with priority, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified.

->peekh ( [ $index ] )

Returns an item from the heap, at the specified index.

->heap ( void )

Returns an array containing the heap data. Heap data consists of priority numbers, not the data.

ACKNOWLEDGEMENTS

The main reason for writing MCE::Queue was to have a Thread::Queue-like module for workers spawned as children. I was pleasantly surprised at the number of modules on CPAN for queuing. What stood out immediately were all the priority queues, heap queues, and whether (FIFO/LIFO) or (highest/lowest first) options are available. Hence, the reason for MCE::Queue supporting both normal and priority queues.

The following provides a list of resources I've read in helping me create MCE::Queue for MCE.

POE::Queue::Array

Two if statements were adopted for checking if the item belongs at the end or head of the queue.

List::Binary::Search

After glancing over the bsearch_num_pos method for returning the best insert position, a couple variations of that were in order for MCE::Queue to accommodate the highest/lowest order routines.

Heap-Priority, List::Priority

At this point, I thought why not have both normal queues and priority queues be efficient. And with that in mind, also provide options to allow folks to choose LIFO/LILO, and highest/lowest order for the queue. The data structure in MCE::Queue is described above.

MCE workers also benefit from being able to create local queues not available to other workers including the manager process. Hence, the reason for the 3 run modes described at the beginning of this document.

Thread::Queue

Being that MCE supports both children and threads, Thread::Queue was used as a template for identifying and documenting the methods in MCE::Queue. Although not 100% compatible, pay close attention to the dequeue method when requesting the number of items to dequeue.

   ->enqueuep( $p, $item [, $item, ... ] );    ## Extension (p)
   ->enqueue( $item [, $item, ... ] );

   ->dequeue( [ $count ] );      ## Priority data dequeues first
   ->dequeue_nb( [ $count ] );

   ->pending();                  ## Counts both normal/priority data
                                 ## in the queue
Parallel-DataPipe

The idea for a queue recursion example came from reading this sysnopsis.

INDEX

MCE

AUTHOR

Mario E. Roy, <marioeroy AT gmail DOT com>

LICENSE

This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.