MCE::Queue - Hybrid queues (normal including priority) for Many-core Engine
This document describes MCE::Queue version 1.519
use MCE; use MCE::Queue; my $F = MCE::Queue->new(fast => 1); my $consumers = 8; my $mce = MCE->new( task_end => sub { my ($mce, $task_id, $task_name) = @_; $F->enqueue((undef) x $consumers) if $task_name eq 'dir'; }, user_tasks => [{ max_workers => 1, task_name => 'dir', user_func => sub { ## Create a "standalone queue" only accessable to this worker. ## See included examples for running with multiple workers. my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]); while (defined (my $dir = $D->dequeue_nb)) { my (@files, @dirs); foreach (glob("$dir/*")) { if (-d $_) { push @dirs, $_; next; } push @files, $_; } $D->enqueue(@dirs ) if scalar @dirs; $F->enqueue(@files) if scalar @files; } } },{ max_workers => $consumers, task_name => 'file', user_func => sub { while (defined (my $file = $F->dequeue)) { MCE->say($file); } } }] )->run({ user_args => [ $ARGV[0] || '.' ] }); __END__ Results from files_mce.pl and files_thr.pl; included with MCE. Usage: time ./files_mce.pl /usr 0 | wc -l time ./files_mce.pl /usr 1 | wc -l time ./files_thr.pl /usr | wc -l Darwin (OS) /usr: 216,271 files MCE::Queue, fast => 0 : 4.17s MCE::Queue, fast => 1 : 2.62s Thread::Queue : 4.14s Linux (VM) /usr: 186,154 files MCE::Queue, fast => 0 : 12.57s MCE::Queue, fast => 1 : 3.36s Thread::Queue : 5.91s Solaris (VM) /usr: 603,051 files MCE::Queue, fast => 0 : 39.04s MCE::Queue, fast => 1 : 18.08s Thread::Queue * Perl not built to support threads
This module provides a queue interface supporting normal and priority queues and utilizing the IPC engine behind MCE. Data resides under the manager process. MCE::Queue also allows for a worker to create any number of queues locally not available to other workers including the manager process. Think of a CPU having L3 (shared) and L1 (local) cache.
The structure for the MCE::Queue object is provided below. It allows for normal queues to run as fast as an array. Data for priority queues are also nearly as fast due to having a brief lookup if the priority exists in the hash including adding/removal of the key. The heap array contains only priorities, not the data itself. This makes the management of the heap order only as necessary while running.
## Normal queue data $_queue->{_datq} = []; ## Priority data { p1 => [ ], p2 => [ ], pN => [ ] } $_queue->{_datp} = {}; ## Priority heap [ pN, p2, p1 ] ## in heap order ## fyi, _datp will always dequeue before _datq $_queue->{_heap} = []; ## Priority order (default) $_queue->{_porder} = $MCE::Queue::HIGHEST; ## Priority type (default) $_queue->{_type} = $MCE::Queue::FIFO;
Three options are available for overriding the default value for new queues (porder applies to priority queues only).
use MCE::Queue porder => $MCE::Queue::HIGHEST, type => $MCE::Queue::FIFO, fast => 0; use MCE::Queue; ## same as above porder => $HIGHEST = Highest priority items are dequeued first $LOWEST = Lowest priority items are dequeued first type => $FIFO = First in, first out $LILO = (Synonym for FIFO) $LIFO = Last in, first out $FILO = (Synonym for LIFO)
MCE::Queue can be utilized under the following conditions:
A) use MCE; B) use MCE::Queue; C) use MCE::Queue; use MCE::Queue; use MCE;
The dequeue method blocks for the manager process including workers. All data resides under the manager process. Workers send/request data through IPC.
Creating a queue from the worker process will cause the queue to run in local mode. The data resides under the worker process and not available to other workers including the manager process.
Queues behave as if running in local mode for the manager including workers for the duration of the script. I cannot think of a use-case for this, but wanted to mention the behavior in the event MCE::Queue is loaded prior to MCE.
The dequeue method is non-blocking in this fashion. This behaves like local mode when MCE is not present. As with local queuing, this mode is speedy due to minimum overhead and zero IPC.
Essentially, the MCE module is not a prerequisite for using MCE::Queue.
This creates a new queue. Available options are queue, porder, type, fast, and gather. The gather option is mainly for running with MCE and wanting to pass item(s) to a callback function for adding to the queue.
The 'fast' option speeds up ->dequeue ops and not enabled by default. It is beneficial for queues not calling ->clear or ->dequeue_nb and not altering the optional count value while running; e.g. ->dequeue($count). Basically, do not enable 'fast' if varying $count dynamically.
use MCE; use MCE::Queue; my $q1 = MCE::Queue->new(); my $q2 = MCE::Queue->new( queue => [ 0, 1, 2 ] ); my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST ); my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST ); my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO ); my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO ); my $q7 = MCE::Queue->new( fast => 1 );
Multiple queues may point to the same callback function. Please note that the first argument for the callback function is the queue object itself.
sub _append { my ($Q, @items) = @_; $Q->enqueue(@items); } my $q7 = MCE::Queue->new( gather => \&_append ); my $q8 = MCE::Queue->new( gather => \&_append ); ## Items are diverted to the gather callback function. $q7->enqueue( 'apple', 'orange' );
The gather option is useful when wanting to temporarily store items in a holding area until output order can be obtained. Although a queue is not required to gather data in MCE, this is simply a demonstration of the gather option in the context of a queue.
use MCE; use MCE::Queue; my ($_order_id, %_tmp); sub _preserve_order { my ($Q, $chunk_id, $result) = @_; $_tmp{$chunk_id} = $result; while (1) { last unless exists $_tmp{$_order_id}; $Q->enqueue( $_tmp{$_order_id} ); delete $_tmp{$_order_id++}; } return; } my @squares; my $q = MCE::Queue->new( queue => \@squares, gather => \&_preserve_order ); $_order_id = 1; ## The first chunk_id equals 1; my $mce = MCE->new( chunk_size => 1, input_data => [ 1 .. 100 ], user_func => sub { $q->enqueue( MCE->chunk_id, $_ * $_ ); } ); $mce->run; print "@squares\n";
Clears the queue of any items. This has the effect of nulling the queue. Each queue comes with a socket used for blocking behind the scene. Use the clear method when wanting to clear the content of the array.
my @a; my $q = MCE::Queue->new( queue => \@a ); @a = (); ## no, the block socket may become out of sync $q->clear; ## ok
Appends a list of items onto the end of the normal queue.
Appends a list of items onto the end of the priority queue with priority.
Returns the requested number of items (default is 1) from the queue. Priority data will always dequeue first from the priority queue before any data from the normal queue.
The method will block if the queue contains zero items. If the queue contains fewer than the requested number of items, the method will not block, but return the remaining items and undef for up to the count requested.
The $count, used for requesting the number of items, is beneficial when workers are passing parameters through the queue. For this release, always remember to dequeue using the same multiple for the count. This is unlike Thread::Queue which will block until the requested number of items are available.
Returns the requested number of items (default is 1) from the queue. Like with dequeue, priority data will always dequeue first. This method is non-blocking and will return undef in the absence of data from the queue.
Adds the list of items to the queue at the specified index.
Adds the list of items to the queue at the specified index with priority.
Returns the number of items in the queue. This includes both normal and priority data.
Returns an item from the normal queue, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified.
Returns an item from the queue with priority, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified.
Returns an item from the heap, at the specified index.
Returns an array containing the heap data. Heap data consists of priority numbers, not the data.
The main reason for writing MCE::Queue was to have a Thread::Queue-like module for workers spawned as children. I was pleasantly surprised at the number of modules on CPAN for queuing. What stood out immediately were all the priority queues, heap queues, and whether (FIFO/LIFO) or (highest/lowest first) options are available. Hence, the reason for MCE::Queue supporting both normal and priority queues.
The following provides a list of resources I've read in helping me create MCE::Queue for MCE.
Two if statements were adopted for checking if the item belongs at the end or head of the queue.
After glancing over the bsearch_num_pos method for returning the best insert position, a couple variations of that were in order for MCE::Queue to accommodate the highest/lowest order routines.
At this point, I thought why not have both normal queues and priority queues be efficient. And with that in mind, also provide options to allow folks to choose LIFO/LILO, and highest/lowest order for the queue. The data structure in MCE::Queue is described above.
MCE workers also benefit from being able to create local queues not available to other workers including the manager process. Hence, the reason for the 3 run modes described at the beginning of this document.
Being that MCE supports both children and threads, Thread::Queue was used as a template for identifying and documenting the methods in MCE::Queue. Although not 100% compatible, pay close attention to the dequeue method when requesting the number of items to dequeue.
->enqueuep( $p, $item [, $item, ... ] ); ## Extension (p) ->enqueue( $item [, $item, ... ] ); ->dequeue( [ $count ] ); ## Priority data dequeues first ->dequeue_nb( [ $count ] ); ->pending(); ## Counts both normal/priority data ## in the queue
The idea for a queue recursion example came from reading this sysnopsis.
MCE
Mario E. Roy, <marioeroy AT gmail DOT com>
This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.
See http://dev.perl.org/licenses/ for more information.
To install MCE, copy and paste the appropriate command in to your terminal.
cpanm
cpanm MCE
CPAN shell
perl -MCPAN -e shell install MCE
For more information on module installation, please visit the detailed CPAN module installation guide.