The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

MCE - Many-Core Engine for Perl. Provides parallel processing capabilities.

VERSION

This document describes MCE version 1.499_001

DESCRIPTION

Many-core Engine (MCE) for Perl helps enable a new level of performance by maximizing all available cores. MCE spawns a pool of workers and therefore does not fork a new process per each element of data. Instead, MCE follows a bank queuing model. Imagine the line being the data and bank-tellers the parallel workers. MCE enhances that model by adding the ability to chunk the next n elements from the input stream to the next available worker.

Both chunking and input data are optional in MCE. One can simply use MCE to have many workers run in parallel.

SYNOPSIS

This is a simplistic use case of MCE running with 4 workers.

   use MCE;

   my $mce = MCE->new(
      max_workers => 4,
      user_func => sub {
         my ($self) = @_;
         print "Hello from ", $self->wid, "\n";
      }
   );

   $mce->run;

      ## All public methods beginning with MCE 1.5 can be called
      ## directly via the package method e.g. MCE->wid, MCE->run.

   -- Output

   Hello from 3
   Hello from 1
   Hello from 2
   Hello from 4

EXPORT_CONST, CONST

The anonymous user_func above can be written using one less line. There are 3 constants defined in MCE which are exportable. Using the constants in lieu of 0,1,2 makes it more legible when accessing the array elements.

SELF CHUNK CID

   ## The following exports SELF => 0, CHUNK => 1, CID => 2

   use MCE EXPORT_CONST => 1;
   use MCE CONST => 1;              ## Allowed in 1.415 and later

   user_func => sub {
    # my ($self, $chunk_ref, $chunk_id) = @_;
      print "Hello from ", $_[SELF]->wid, "\n";
   }

   ## MCE 1.5 allows any public method to be be called directly.

   use MCE;

   user_func => sub {
    # my ($self, $chunk_ref, $chunk_id) = @_;
      print "Hello from ", MCE->wid, "\n";
   }

OVERRIDING DEFAULTS

In addition to EXPORT_CONST, five additional arguments can be passed to MCE. This allows one to override the default configured in MCE.pm.

   use Sereal qw(encode_sereal decode_sereal);

   use MCE MAX_WORKERS => 4,                    ## Default 1
           CHUNK_SIZE  => 100,                  ## Default 1
           TMP_DIR     => "/path/to/app/tmp",   ## $MCE::Signal::tmp_dir
           FREEZE      => \&encode_sereal,      ## \&Storable::freeze
           THAW        => \&decode_sereal;      ## \&Storable::thaw

   my $mce = MCE->new( ... );

new ( [ options ] )

Below, a new instance is configured with all available options.

   use MCE;

   my $mce = MCE->new(

      max_workers  => 8,                 ## Default $MCE::MAX_WORKERS

          ## Number of workers to spawn. This can be set automatically
          ## with MCE 1.412 and later releases.

          ## max_workers => 'auto',      ## = MCE::Util::get_ncpu()
          ## max_workers => 'Auto-1',    ## = MCE::Util::get_ncpu() - 1
          ## max_workers => 'AUTO + 3',  ## = MCE::Util::get_ncpu() + 3
          ## max_workers => 'AUTO * 1.5',
          ## max_workers => 'auto / 1.333 + 2',

      chunk_size   => 2000,              ## Default $MCE::CHUNK_SIZE

          ## Less than or equal to 8192 is number of records.
          ## Greater than 8192 is number of bytes. MCE reads
          ## till the end of record before calling user_func.

          ## chunk_size =>     1,        ## Consists of 1 record
          ## chunk_size =>  1000,        ## Consists of 1000 records
          ## chunk_size => 16384,        ## Approximate 16384 bytes
          ## chunk_size => 50000,        ## Approximate 50000 bytes

      tmp_dir      => $tmp_dir,          ## Default $MCE::TMP_DIR

          ## Default is $MCE::Signal::tmp_dir which points to
          ## $ENV{TEMP} if defined. Otherwise, tmp_dir points
          ## to a location under /tmp.

      freeze       => \&encode_sereal,   ## Default $MCE::FREEZE
      thaw         => \&decode_sereal,   ## Default $MCE::THAW

          ## Release 1.412 allows freeze and thaw to be overridden.
          ## Just include a serialization module prior to loading MCE.

          ## use Sereal qw(encode_sereal decode_sereal);
          ## use JSON::XS qw(encode_json decode_json);
          ## use MCE;

      gather       => \@a,               ## Default undef

          ## Release 1.5 allows for gathering of data to an array
          ## reference, a MCE::Queue or Thread::Queue object, or
          ## a code reference. One invokes gathering by calling
          ## the gather method as often as needed.

          ## gather => $Q,
          ## gather => \&preserve_order,

      input_data   => $input_file,       ## Default undef
      RS           => "\n::\n",          ## Default undef

          ## input_data => '/path/to/file' for input file
          ## input_data => \@array for input array
          ## input_data => \*FILE_HNDL for file handle
          ## input_data => \$scalar to treat like a file

          ## Use the sequence option if simply wanting to loop
          ## through a sequence of numbers instead.

          ## Release 1.4 and later allows one to specify the
          ## record separator (RS) applicable to input files,
          ## file handles, and scalar references.

      use_slurpio  => 1,                 ## Default 0
      use_threads  => 1,                 ## Default 0 or 1

          ## Whether or not to enable slurpio when reading files
          ## (passes raw chunk to user function).

          ## By default MCE does forking (spawns child processes).
          ## MCE also supports threads via 2 threading libraries.
          ##
          ## The use of threads in MCE requires that you include
          ## threads support prior to loading MCE. The use_threads
          ## option defaults to 1 when a thread library is present.
          ##
          ##    use threads;                  use forks;
          ##    use threads::shared;   (or)   use forks::shared;
          ##
          ##    use MCE                       use MCE;

      spawn_delay  => 0.035,             ## Default undef
      submit_delay => 0.002,             ## Default undef
      job_delay    => 0.150,             ## Default undef

          ## Time to wait, in fractional seconds, before spawning
          ## each worker, parameters submission to each worker,
          ## and job commencement (staggered) for each worker.
          ## For example, use job_delay when wanting to stagger
          ## many workers connecting to a database.

      on_post_exit => \&on_post_exit,    ## Default undef
      on_post_run  => \&on_post_run,     ## Default undef

          ## Execute code block immediately after a worker exits
          ## (exit, MCE->exit, or die). Execute code block after
          ## running a job (MCE->process or MCE->run).

          ## One can take action immediately after a worker exits
          ## or wait until after the job has completed.

      user_args    => { env => 'test' }, ## Default undef

          ## MCE release 1.4 adds a new parameter to allow one to
          ## specify arbitrary arguments such as a string, an ARRAY
          ## or a HASH reference. Workers can access this directly:
          ##    my $args = $self->{user_args};

      user_begin   => \&user_begin,      ## Default undef
      user_func    => \&user_func,       ## Default undef
      user_end     => \&user_end,        ## Default undef

          ## Think of user_begin, user_func, user_end like the awk
          ## scripting language:
          ##    awk 'BEGIN { ... } { ... } END { ... }'

          ## MCE workers call user_begin once per job, then
          ## call user_func repeatedly until no chunks remain.
          ## Afterwards, user_end is called.

      user_error   => \&user_error,      ## Default undef
      user_output  => \&user_output,     ## Default undef

          ## When workers call the following functions, MCE will
          ## pass the data to user_error/user_output if defined.
          ## MCE->sendto('stderr', 'Sending to STDERR');
          ## MCE->sendto('stdout', 'Sending to STDOUT');

      stderr_file  => 'err_file',        ## Default STDERR
      stdout_file  => 'out_file',        ## Default STDOUT

          ## Or to file. User_error/user_output take precedence.

      flush_file   => 1,                 ## Default 0
      flush_stderr => 1,                 ## Default 0
      flush_stdout => 1,                 ## Default 0

          ## Flush sendto file, standard error, or standard output.

      interval     => {
          delay => 0.005 [, max_nodes => 4, node_id => 1 ]
      },

          ## For use with the yield method introduced in MCE 1.5.
          ## Both max_nodes & node_id are optional and default to 1.
          ## Delay is the amount of time between intervals.

      sequence     => {                  ## Default undef
          begin => -1, end => 1 [, step => 0.1 [, format => "4.1f" ] ]
      },

          ## For looping through a sequence of numbers in parallel.
          ## STEP, if omitted, defaults to 1 if BEGIN is smaller than
          ## END or -1 if BEGIN is greater than END. The FORMAT string
          ## (without the %) is passed to sprintf behind the scene.
          ## e.g. $seq_n_formated = sprintf("%4.1f", $seq_n);

          ## Leave input_data set to undef when specifying the sequence
          ## option. One cannot specify both options together.

          ## Release 1.4 allows one to specify an array reference
          ## instead (3rd & 4th values are optional):
          ##
          ##    sequence => [ -1, 1, 0.1, "4.1f" ]

      task_end     => \&task_end,        ## Default undef

          ## MCE 1.5 allows this to be specified at the top level.
          ## This is called by the manager process after the task
          ## has completed processing.

      task_name    => 'string',          ## Default 'MCE'

          ## Added in MCE 1.5. This is beneficial for user_tasks.
          ## Each task can be specified with a different name value.
          ## It allows for task_end to be specified at the top level.
          ## The task_name value is passed as the 3rd arg to task_end.

      user_tasks   => [                  ## Default undef
         { ... },                        ## Options for task 0
         { ... },                        ## Options for task 1
         { ... },                        ## Options for task 2
      ],

          ## Takes a list of hash references, each allowing up to 12
          ## options. All other MCE options are ignored. Input_data
          ## is applicable for the first task only.
          ##   task_name, max_workers, chunk_size, input_data, sequence,
          ##   task_end, user_args, user_begin, user_end, user_func,
          ##   gather, use_threads

          ## Options not specified here will default to same option
          ## specified at the top level.

   );

RUNNING

Run calls spawn, kicks off job, workers call user_begin, user_func, and user_end. Run shuts down workers afterwards. Call the spawn method early whenever the need arises for large data structures within the main process prior to running.

   MCE->spawn();                         ## This is optional

   MCE->run();                           ## Call run or process below

   ## Acquire data arrays and/or input_files. The same pool of
   ## workers are used.

   MCE->process(\@input_data_1);         ## Process arrays
   MCE->process(\@input_data_2);
   MCE->process(\@input_data_n);

   MCE->process('input_file_1');         ## Process files
   MCE->process('input_file_2');
   MCE->process('input_file_n');

   ## Shutdown workers afterwards.

   MCE->shutdown();

SEQUENCE

The 1.3 release and above allows workers to loop through a sequence of numbers in calculating mathematically without the overhead of an array. The sequence can be specified separately per each user_task entry unlike input_data. Input_data is applicable to the first task only.

See a demo at the end of this documentation applying sequences with the user_tasks option including chunking.

Sequence can be specified using an array or a hash reference.

   use MCE;

   my $mce = MCE->new(
      max_workers => 3,

    # sequence => [ 10, 19, 0.7, "4.1f" ],

      sequence => {
         begin => 10, end => 19, step => 0.7, format => "4.1f"
      },

      user_func => sub {
         my ($self, $n, $chunk_id) = @_;
         print $n, " from ", MCE->wid(), " id ", $chunk_id, "\n";
      }
   );

   MCE->run();

   -- Output (sorted afterwards, notice wid and chunk_id in output)

   10.0 from 1 id 1
   10.7 from 2 id 2
   11.4 from 3 id 3
   12.1 from 1 id 4
   12.8 from 2 id 5
   13.5 from 3 id 6
   14.2 from 1 id 7
   14.9 from 2 id 8
   15.6 from 3 id 9
   16.3 from 1 id 10
   17.0 from 2 id 11
   17.7 from 3 id 12
   18.4 from 1 id 13

USER_TASKS

The 1.2 release introduced the user_tasks option to allow for multiple roles. This option takes an array of tasks. Each task allows for 12 MCE options. Input_data can be specified inside the first task or at the top level only, otherwise is ignored.

   max_workers, chunk_size, input_data, sequence, task_end, task_name,
   gather, user_args, user_begin, user_end, user_func, use_threads

Sequence and chunk_size were added in 1.3. User_args was introduced in 1.4. Name and input_data are new options allowed in 1.5. In addition, one can specify task_end at the top level. Task_end also receives 2 additional arguments $task_id and $task_name (shown below).

Options not specified here will default to the same option specified at the top level. The task_end option is called by the manager process when all workers for that role have completed processing.

Forking and threading can be intermixed among tasks unless running Cygwin. The run method will continue running until all workers have completed processing.

Visit this URL for further reading on the user_tasks option. http://code.google.com/p/many-core-engine-perl/wiki/MCE_Tasks

   use MCE;

   my $mce = MCE->new(
      input_data => $list_file,

      task_end   => sub {
         my ($self, $task_id, $task_name) = @_;
         print "Task [$task_id -- $task_name] completed processing\n";
      },

      user_tasks => [{
         task_name   => 'a',
         max_workers => 2,
         user_func   => \&parallel_task1,
         use_threads => 0

      },{
         task_name   => 'b',
         max_workers => 4,
         user_func   => \&parallel_task2,
         use_threads => 1

      }]
   );

   MCE->run();

SYNTAX for ON_POST_EXIT

Often times, one may want to capture the exit status. The on_post_exit option, if defined, is executed immediately after a worker exits via exit (children), MCE->exit (children and threads), or die.

The format of $e->{pid} is PID_123 for children and THR_123 for threads.

   sub on_post_exit {
      my ($self, $e) = @_;
      print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
   }

   sub user_func {
      my $self = $_[0];
      MCE->exit(0, 'ok', 'pebbles');
   }

   my $mce = MCE->new(
      on_post_exit => \&on_post_exit,
      user_func    => \&user_func
   );

   MCE->run();

   -- Output

   2: PID_7625: 0: ok: pebbles

SYNTAX for ON_POST_RUN

The on_post_run option, if defined, is executed immediately after running MCE->process or MCE->run. This option receives an array reference of hashes.

The difference between on_post_exit and on_post_run is that the former is called immediately whereas the latter is called after all workers have completed processing or running.

   sub on_post_run {
      my ($self, $status_ref) = @_;
      foreach my $e ( @{ $status_ref } ) {
         print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
      }
   }

   sub user_func {
      my $self = $_[0];
      MCE->exit(0, 'ok', 'pebbles');
   }

   my $mce = MCE->new(
      on_post_run => \&on_post_run,
      user_func   => \&user_func
   );

   MCE->run();

SYNTAX for USER_BEGIN and USER_END

The user_begin and user_end options, if specified, behave similarly to awk 'BEGIN { ... } { ... } END { ... }'. These are called once per each worker during a run.

   sub user_begin {                   ## Called once at the beginning
      my $self = shift;
      $self->{wk_total_rows} = 0;
   }

   sub user_func {                    ## Called while processing
      my $self = shift;
      $self->{wk_total_rows} += 1;
   }

   sub user_end {                     ## Called once at the end
      my $self = shift;
      printf "## %d: Processed %d rows\n",
         MCE->wid(), $self->{wk_total_rows};
   }

   my $mce = MCE->new(
      user_begin => \&user_begin,
      user_func  => \&user_func,
      user_end   => \&user_end
   );

   MCE->run();

SYNTAX for USER_FUNC with USE_SLURPIO => 0

When processing input data, MCE can pass an array or rows or a slurped chunk. Below, a reference to an array containing the chunk data is processed.

e.g. $chunk_ref = [ record1, record2, record3, ... ]

   sub user_func {

      my ($self, $chunk_ref, $chunk_id) = @_;

      foreach my $row ( @{ $chunk_ref } ) {
         $self->{wk_total_rows} += 1;
         print $row;
      }
   }

   my $mce = MCE->new(
      chunk_size  => 100,
      input_data  => "/path/to/file",
      user_func   => \&user_func,
      use_slurpio => 0
   );

   MCE->run();

SYNTAX for USER_FUNC with USE_SLURPIO => 1

Here, a reference to a scalar containing the raw chunk data is processed.

   sub user_func {

      my ($self, $chunk_ref, $chunk_id) = @_;

      my $count = () = $$chunk_ref =~ /abc/;
   }

   my $mce = MCE->new(
      chunk_size  => 16000,
      input_data  => "/path/to/file",
      user_func   => \&user_func,
      use_slurpio => 1
   );

   MCE->run();

SYNTAX for USER_ERROR and USER_OUTPUT

Output coming from MCE->sendto('stderr/stdout', ...) can be intercepted by specifying the user_error and user_output options. MCE on receiving output will instead direct to user_error and user_output in a serialized fashion. Handy when wanting to filter, modify, and/or direct the output elsewhere.

   sub user_error {                   ## Redirect STDERR to STDOUT
      my $error = shift;
      print STDOUT $error;
   }

   sub user_output {                  ## Redirect STDOUT to STDERR
      my $output = shift;
      print STDERR $output;
   }

   sub user_func {
      my ($self, $chunk_ref, $chunk_id) = @_;
      my $count = 0;

      foreach my $row ( @{ $chunk_ref } ) {
         MCE->sendto('stdout', $row);
         $count += 1;
      }

      MCE->sendto('stderr', "$chunk_id: processed $count rows\n");
   }

   my $mce = MCE->new(
      chunk_size  => 1000,
      input_data  => "/path/to/file",
      user_error  => \&user_error,
      user_output => \&user_output,
      user_func   => \&user_func
   );

   MCE->run();

METHODS for MANAGER PROCESS and WORKERS

The methods listed below are callable by the main process and workers.

abort ( void )

The abort method is applicable when processing input_data only. This causes all workers to abort after processing the current chunk.

Workers write the next offset position to the queue socket for the next available worker. In essence, the abort method writes the last offset position. Workers, on requesting the next offset position, will think the end of input_data has been reached and leave the chunking loop.

   $self->abort();
   MCE->abort();

chunk_id ( void )

Returns the chunk_id for the current chunk. The value starts at 1. Chunking applies to workers processing input_data or sequence. The value is set to 0 for the manager process.

   my $chunk_id = $self->chunk_id();
   my $chunk_id = MCE->chunk_id();

chunk_size ( void )

Returns the chunk_size used by MCE.

   my $chunk_size = $self->chunk_size();
   my $chunk_size = MCE->chunk_size();

freeze ( $object_ref )

Calls the internal freeze method to serialize an object. The default serialization routines are handled by Storable. Both freeze and thaw can be overridden when including MCE.

   my $frozen = $self->freeze([ 0, 2, 4 ]);
   my $frozen = MCE->freeze([ 0, 2, 4 ]);

max_workers ( void )

Returns the value for max_workers used by MCE.

   my $max_workers = $self->max_workers();
   my $max_workers = MCE->max_workers();

sess_dir ( void )

Returns the session directory used by the MCE instance. This is defined during spawning and removed during shutdown.

   my $sess_dir = $self->sess_dir();
   my $sess_dir = MCE->sess_dir();

task_id ( void )

Returns the task ID. This applies to the user_tasks option (starts at 0).

   my $task_id = $self->task_id();
   my $task_id = MCE->task_id();

task_name ( void )

Returns the task_name value specified via the task_name option when configuring MCE.

   my $task_name = $self->task_name();
   my $task_name = MCE->task_name();

task_wid ( void )

Returns the task worker ID (applies to user_tasks). The value starts at 1 per each task configured within user_tasks. The value is set to 0 for the manager process.

   my $task_wid = $self->task_wid();
   my $task_wid = MCE->task_wid();

thaw ( $frozen )

Calls the internal thaw method to un-serialize the frozen object.

   my $object_ref = $self->thaw($frozen);
   my $object_ref = MCE->thaw($frozen);

tmp_dir ( void )

Returns the temporary directory used by MCE.

   my $tmp_dir = $self->tmp_dir();
   my $tmp_dir = MCE->tmp_dir();

user_args ( void )

Returns the arguments specified via the user_args option.

   my ($arg1, $arg2, $arg3) = $self->user_args();
   my ($arg1, $arg2, $arg3) = MCE->user_args();

wid ( void )

Returns the MCE worker ID. Starts at 1 per each MCE instance. The value is set to 0 for the manager process.

   my $wid = $self->wid();
   my $wid = MCE->wid();

METHODS for MANAGER PROCESS only

Methods listed below are callable by the main process only.

forchunk ( $input_data [, { options } ], sub { ... } )

Forchunk, foreach, and forseq are sugar methods in MCE. Workers are automatically spawned, the code block is executed in parallel, and shutdown is called. Do not call these methods if workers must persist afterwards.

Specifying options is optional. Valid options are the same as for the process method.

   ## Declare a MCE instance.

   my $mce = MCE->new(
      chunk_size  => 20,
      max_workers => $max_workers
   );

   ## Arguments inside the code block are the same as for user_func.

   MCE->forchunk(\@input_data, sub {
      my ($self, $chunk_ref, $chunk_id) = @_;

      foreach ( @{ $chunk_ref } ) {
         MCE->sendto("stdout", "$chunk_id: $_\n");
      }
   });

   ## Passing chunk_size as an option.

   MCE->forchunk(\@input_data, { chunk_size => 30 }, sub {
      ...
   });

foreach ( $input_data [, { options } ], sub { ... } )

Foreach implies chunk_size => 1 and cannot be overwritten. Arguments inside the code block are the same as for user_func. This is true even if chunk_size is set to 1. MCE is both a chunking engine plus a parallel engine all in one. Arguments within the block are the same whether calling foreach or forchunk.

   my $mce = MCE->new(
      max_workers => $max_workers
   );

   MCE->foreach(\@input_data, sub {
      my ($self, $chunk_ref, $chunk_id) = @_;
      my $row = $chunk_ref->[0];
      MCE->sendto("stdout", "$chunk_id: $row\n");
   });

Below, passing an anonymous array as input data. For example, wanting to parallelize a serial loop with MCE.

   ## Serial loops.

   for (my $i = 0; $i < $max; $i++) {
      ...  ## Runs serially
   }

   for my $i (0 .. $max - 1) {
      ...  ## Runs serially
   }

   ## Parallel loop via MCE.

   MCE->foreach([ (0 .. $max - 1) ], sub {
      my ($self, $chunk_ref, $chunk_id) = @_;
      my $i = $chunk_ref->[0];
      ...  ## Runs in parallel
   });

forseq ( $sequence_spec [, { options } ], sub { ... } )

Sequence can be specified using a hash or an array reference.

   my $mce = MCE->new(
      max_workers => 3
   );

   MCE->forseq({ begin => 15, end => 10, step => -1 }, sub {
      my ($self, $n, $chunk_id) = @_;
      print $n, " from ", MCE->wid(), "\n";
   });

   MCE->forseq([ 20, 40 ], sub {
      my ($self, $n, $chunk_id) = @_;
      my $result = `ping 192.168.1.${n}`;
      ...
   });

MCE 1.415 allows chunk_size > 1. $n_seq is a reference pointing to an array of sequences below. Chunking reduces IPC overhead behind the scene. Chunk size is 1 when not specified.

   MCE->forseq([ 20, 80 ], { chunk_size => 10 }, sub {
      my ($self, $n_seq, $chunk_id) = @_;
      for my $n ( @{ $n_seq } ) {
         my $result = `ping 192.168.1.${n}`;
         ...
      }
   });

process ( $input_data [, { options } ] )

The process method will spawn workers automatically if not already spawned. It will set input_data => $input_data. It calls run(0) to not auto-shutdown workers. Specifying options is optional.

Allowable options { key => value, ... } are:

   chunk_size input_data job_delay spawn_delay submit_delay RS
   flush_file flush_stderr flush_stdout stderr_file stdout_file
   on_post_exit on_post_run sequence user_args user_begin user_end
   user_func user_error user_output use_slurpio

Options remain persistent going forward unless changed. Setting user_begin, user_end, or user_func will cause already spawned workers to shutdown and re-spawn automatically. Therefore, define these during instantiation if possible.

The below will cause workers to re-spawn after each process. It doesn't really matter for small jobs. One can create a few MCE instances. Perhaps the code block is different altogether.

   MCE->new( max_workers => 'auto' );

   MCE->process( {
      user_begin => sub { ## connect to DB },
      user_func  => sub { ## process each row },
      user_end   => sub { ## close handle to DB },
   }, \@input_data );

   MCE->process( {
      user_begin => sub { ## connect to DB },
      user_func  => sub { ## process each file },
      user_end   => sub { ## close handle to DB },
   }, "/list/of/files" );

To have persistent workers, it's better to do the following instead. Workers will stay alive until told to shutdown or during the exiting of the script. Below is shown with overriding the default for max_workers.

   use MCE MAX_WORKERS => 'auto';

   MCE->new(
      user_begin => sub { ## connect to DB },
      user_func  => sub { ## process each chunk or row or host },
      user_end   => sub { ## close handle to DB },
   );

   MCE->spawn();     ## Not necessary, unless you want to spawn early

   MCE->process("/one/very_big_file/_mce_/will_chunk_in_parallel");
   MCE->process(\@array_of_files_to_grep);
   MCE->process("/path/to/host/list");

   MCE->process($array_ref);
   MCE->process($array_ref, { stdout_file => $output_file });

   ## This was not allowed before. Fixed in 1.415.
   MCE->process({ sequence => { begin => 10, end => 90, step 2 } });
   MCE->process({ sequence => [ 10, 90, 2 ] });

   MCE->shutdown();

restart_worker ( $wid )

One can restart a worker who has died or exited. The job never ends below due to restarting each time. The same wid from the worker, which has exited, is used (recommended).

   my $mce = MCE->new(

      on_post_exit => sub {
         my ($self, $e) = @_;
         print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";

         MCE->restart_worker($e->{wid});
      },

      user_begin => sub {
         my $self = $_[0];
         ## Not interested in die messages going to STDERR.
         ## The die handler calls MCE->exit(255, $_[0]).
         close STDERR;
      },

      user_tasks => [{
         max_workers => 5,
         user_func   => sub {
            my $self = $_[0]; sleep MCE->wid();
            MCE->exit(3, "exited from " . MCE->wid() . "\n");
         }
      },{
         max_workers => 4,
         user_func   => sub {
            my $self = $_[0]; sleep MCE->wid();
            die("died from " . MCE->wid() . "\n");
         }
      }]
   );

   MCE->run();

   -- Output

   1: PID_85388: status 3: exited from 1
   2: PID_85389: status 3: exited from 2
   1: PID_85397: status 3: exited from 1
   3: PID_85390: status 3: exited from 3
   1: PID_85399: status 3: exited from 1
   4: PID_85391: status 3: exited from 4
   2: PID_85398: status 3: exited from 2
   1: PID_85401: status 3: exited from 1
   5: PID_85392: status 3: exited from 5
   1: PID_85404: status 3: exited from 1
   6: PID_85393: status 255: died from 6
   3: PID_85400: status 3: exited from 3
   2: PID_85403: status 3: exited from 2
   1: PID_85406: status 3: exited from 1
   7: PID_85394: status 255: died from 7
   1: PID_85410: status 3: exited from 1
   8: PID_85395: status 255: died from 8
   4: PID_85402: status 3: exited from 4
   2: PID_85409: status 3: exited from 2
   1: PID_85412: status 3: exited from 1
   9: PID_85396: status 255: died from 9
   3: PID_85408: status 3: exited from 3
   1: PID_85416: status 3: exited from 1

   ...

run ( [ $auto_shutdown [, { options } ] ] )

The run method, by default, spawns workers, processes once, and shuts down workers. Set $auto_shutdown to 0 when wanting workers to persist after running (default is 1).

Specifying options is optional. Valid options are the same as for the process method.

   my $mce = MCE->new( ... );

   MCE->run(0);                          ## Disables auto-shutdown

send ( $data_ref )

The send method is useful when wanting to spawn workers early to minimize memory consumption and afterwards send data individually to each worker. One cannot send more than the total workers spawned. Workers store the received data as $self->{user_data}.

The data which can be sent is restricted to an ARRAY, HASH, or PDL reference. Workers begin processing immediately after receiving data. Workers set $self->{user_data} to undef after processing. One cannot specify input_data, sequence, or user_tasks for the MCE instance to receive user data via the "send" method.

Passing any options e.g. run(0, { options }) is ignored due to workers running immediately after receiving user data.

   my $mce = MCE->new(
      max_workers => 5,

      user_func => sub {
         my ($self) = @_;
         my $data = $self->{user_data};
         my $first_name = $data->{first_name};
         print MCE->wid, ": Hello from $first_name\n";
      }
   );

   MCE->spawn();    ## Optional, send will spawn if necessary.

   MCE->send( { first_name => "Theresa" } );
   MCE->send( { first_name => "Francis" } );
   MCE->send( { first_name => "Padre"   } );
   MCE->send( { first_name => "Anthony" } );

   MCE->run(0);     ## Wait for workers to complete processing.

   -- Output

   2: Hello from Theresa
   5: Hello from Anthony
   3: Hello from Francis
   4: Hello from Padre

shutdown ( void )

The run method will automatically spawn workers, run once, and shutdown workers automatically. The process method leaves workers waiting for the next job after processing. Call shutdown after processing all jobs.

   my $mce = MCE->new( ... );

   MCE->spawn();

   MCE->process(\@input_data_1);         ## Processing multiple arrays
   MCE->process(\@input_data_2);
   MCE->process(\@input_data_n);

   MCE->process('input_file_1');         ## Processing multiple files
   MCE->process('input_file_2');
   MCE->process('input_file_n');

   MCE->shutdown();

spawn ( void )

Workers are normally spawned automatically. The spawn method is beneficial when wanting to spawn workers early.

   my $mce = MCE->new( ... );

   MCE->spawn();

status ( void )

The greatest exit status is saved among workers exiting during a run. Call this after running or processing to check if a worker exited with a non-zero value. For more granularity, look at the on_post_exit or on_post_run options for callback support.

   my $mce = MCE->new( ... );

   MCE->run();

   my $greatest_exit_status = MCE->status();

METHODS for WORKERS only

Methods listed below are callable by workers only.

do ( 'callback_func' [, $arg1, ... ] )

MCE can serialized data transfers from worker processes via helper functions do & sendto. The main MCE thread will process these in a serial fashion. This utilizes the Storable Perl module for passing data from a worker process to the main MCE thread. The callback function can optionally return a reply.

   [ $reply = ] MCE->do('callback' [, $arg1, ... ]);

Passing args to a callback function using references & scalar.

   sub callback {
      my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
      ...
   }

   MCE->do('main::callback', \@a, \%h, \$s, 'hello');
   MCE->do('callback', \@a, \%h, \$s, 'hello');

MCE knows if wanting a void, list, hash, or a scalar return value.

   MCE->do('callback' [, $arg1, ... ]);

   my @array  = MCE->do('callback' [, $arg1, ... ]);
   my %hash   = MCE->do('callback' [, $arg1, ... ]);
   my $scalar = MCE->do('callback' [, $arg1, ... ]);

exit ( [ $status [, $message [, $id ] ] ] )

Worker exits entirely from MCE. $id (optional) can be used to pass the primary key or a string along with the message. Please look at the on_post_exit and on_post_run options for callback support.

   MCE->exit();             ## default is 0
   MCE->exit(1);
   MCE->exit(2, 'error: entire chunk failed', $chunk_id);
   MCE->exit(0, 'ok', 'pebbles');

gather ( $arg1, [, $arg2, ... ] )

A worker can submit data to the location specified via the gather option by calling this method.

   MCE->gather(\@results, $chunk_id);
   MCE->gather([ $arg1, $arg2 ]);

last ( void )

Worker immediately leaves the chunking loop or user func. Called from inside foreach, forchunk, forseq, and user_func.

   my @list = (1 .. 80);

   MCE->forchunk(\@list, { chunk_size => 2 }, sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      MCE->last if ($chunk_id > 4);

      my @output = ();

      foreach my $rec ( @{ $chunk_ref } ) {
         push @output, $rec, "\n";
      }

      MCE->sendto('stdout', @output);
   });

   -- Output (each chunk above consists of 2 elements)

   1
   2
   3
   4
   5
   6
   7
   8

next ( void )

Worker starts the next iteration of the chunking loop. Called from inside foreach, forchunk, forseq, and user_func.

   my @list = (1 .. 80);

   MCE->forchunk(\@list, { chunk_size => 4 }, sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      MCE->next if ($chunk_id < 20);

      my @output = ();

      foreach my $rec ( @{ $chunk_ref } ) {
         push @output, $rec, "\n";
      }

      MCE->sendto('stdout', @output);
   });

   -- Output (each chunk above consists of 4 elements)

   77
   78
   79
   80

Use the print, printf, and say methods when wanting to serialize output among workers. These are sugar syntax for the sendto method. These behave similar to the native subroutines in Perl with the exception that barewords must be surrounded with quotes and require the comma after it including glob handles. Say is like print, but implicitly appends a newline.

   MCE->print("$output_msg\n");
   MCE->print("STDERR", "$error_msg\n");
   MCE->print($fh, $log_msg);

   MCE->printf("%s: %d\n", $name, $age);
   MCE->printf("STDOUT", "%s: %d\n", $name, $age);
   MCE->printf($fh, "%s: %d\n", $name, $age);

   MCE->say($output_msg);
   MCE->say("STDERR", $error_msg);
   MCE->say($fh, $log_msg);

sendto ( 'to_string', $arg1, ... )

The sendto method is called by workers to serialize data to standard output, standard error, or to end of file. The action is done by the main process or thread.

Release 1.100 added the ability to pass multiple arguments.

syntax for 1.00x

Release 1.00x supported only 1 data argument. /path/to/file is the 3rd argument for 'file'.

   MCE->sendto('stdout', \@array);
   MCE->sendto('stdout', \$scalar);
   MCE->sendto('stdout', $scalar);

   MCE->sendto('stderr', \@array);
   MCE->sendto('stderr', \$scalar);
   MCE->sendto('stderr', $scalar);

   MCE->sendto('file', \@array, '/path/to/file');
   MCE->sendto('file', \$scalar, '/path/to/file');
   MCE->sendto('file', $scalar, '/path/to/file');

syntax for 1.100 and later releases

Notice the syntax change for appending to a file.

   MCE->sendto('stdout', $arg1 [, $arg2, ... ]);
   MCE->sendto('stderr', $arg1 [, $arg2, ... ]);
   MCE->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);

Passing a reference is no longer necessary beginning with 1.100.

   MCE->sendto("stdout", @a, "\n", %h, "\n", $s, "\n");

To retain 1.00x compatibility, sendto outputs the content when a single data argument is specified and is a reference.

   MCE->sendto('stdout', \@array);
   MCE->sendto('stderr', \$scalar);
   MCE->sendto('file:/path/to/file', \@array);

Otherwise, the reference for \@array and \$scalar is shown, not the content. Basically, output matches the print statement.

   ## print STDOUT "hello\n", \@array, \$scalar, "\n";

   MCE->sendto('stdout', "hello\n", \@array, \$scalar, "\n");

sync ( void )

A barrier sync operation in the example code means any worker must stop at this point until all workers reach this barrier. Barrier syncing is useful for many computer algorithms.

Barrier synchronization is supported for task 0 only. Note: Workers are assigned task_id 0 when omitting user_tasks. All workers assigned task_id 0 must call sync whenever barrier syncing.

Barrier synchronization (sync) was added to MCE 1.406.

   use MCE;

   sub user_func {

      my ($self) = @_;
      my $wid = MCE->wid();

      MCE->sendto('stdout', "a: $wid\n");
      MCE->sync;

      MCE->sendto('stdout', "b: $wid\n");
      MCE->sync;

      MCE->sendto('stdout', "c: $wid\n");
      MCE->sync;
   }

   my $mce = MCE->new(
      max_workers => 4,
      user_func   => \&user_func
   );

   MCE->run();

   -- Output (without barrier synchronization)

   a: 1
   a: 2
   b: 1
   b: 2
   c: 1
   c: 2
   a: 3
   b: 3
   c: 3
   a: 4
   b: 4
   c: 4

   -- Output (with barrier synchronization)

   a: 1
   a: 2
   a: 4
   a: 3
   b: 2
   b: 1
   b: 3
   b: 4
   c: 1
   c: 4
   c: 2
   c: 3

Consider the following example. The sync operation is done inside a for loop along with calling the do method. A stall can occur for workers calling sync the 2nd or 3rd time while other workers are sending results via the do or sendto methods. It requires yet another semaphore lock file in MCE to solve this which was not done in order to keep resources low.

Therefore, please keep this in mind when mixing MCE->sync with the MCE->do or MCE->sendto methods inside a loop.

   sub user_func {

      my ($self) = @_;
      my @result;

      for (1 .. 3) {
         ... compute algorithm ...

         MCE->sync;

         ... compute algorithm ...

         MCE->sync;


         MCE->do('aggregate_result', \@result);  ## or MCE->sendto

         MCE->sync;      ## The sync operation is also needed here to
                         ## prevent MCE from stalling.
      }
   }

yield ( void )

There may be times when the MCE driven app is too fast. The interval option combined with the yield method, both introduced with MCE 1.5, allows one to enable the "grace" factor for the app.

A use case is an app configured with 160 workers running on a 24 logical way box. Data is polled from the database containing over 2.5 million rows. Workers chunk away at 300 rows per chunk and perform SNMP gets (300 sockets per worker) to poll 25 metrics from each device. With this scenario, the load on the box, at some point, rises to 90+. In addition, IP_Tables reaches it's contention point causing the entire app to fail.

The above is solved by simply having workers yield among themselves in a synchronized fashion. A delay of 0.005 seconds between intervals is all that's needed -- even okay with 0.003. The load on the box hovers between 23 ~ 27 for the duration of the run. The actual poll completes in under 17 minutes time. It's quite fast considering the app polls 62,500,000 metrics combined. The math equates to 3,676,470 per minute or rather 61,275 per second from a single node.

   ## Both max_nodes and node_id are optional (default is 1).

   interval => {
      delay => 0.005, max_nodes => $max_nodes, node_id => $node_id
   }

A 4 node setup can poll 10 million devices without the additional overhead of a distribution agent. The difference between the 4 nodes are simply node_id and the where clause used to query the data. The mac addresses are random such that the data divides equally to any power of 2. The distribution key lies in the mac address itself. In fact, the 2nd character from the right is all that's needed to maximize on the power of randomness for mac addresses.

   Query NodeID 1: ... AND substr(MAC, -2, 1) IN ('0', '1', '2', '3')
   Query NodeID 2: ... AND substr(MAC, -2, 1) IN ('4', '5', '6', '7')
   Query NodeID 3: ... AND substr(MAC, -2, 1) IN ('8', '9', 'A', 'B')
   Query NodeID 4: ... AND substr(MAC, -2, 1) IN ('C', 'D', 'E', 'F')

The user_tasks is configured to simulate 4 nodes below. The demonstration uses 2 workers to minimize the output size. Input is from the sequence option.

   use Time::HiRes qw(time);
   use MCE;

   my $d = shift || 0.10;

   sub create_task {

      my ($node_id) = @_;

      my $seq_size  = 6;
      my $seq_start = ($node_id - 1) * $seq_size + 1;
      my $seq_end   = $seq_start + $seq_size - 1;

      return {
         max_workers => 2, sequence => [ $seq_start, $seq_end ],
         interval => { delay => $d, max_nodes => 4, node_id => $node_id }
      };
   }

   sub user_begin {

      my ($self) = @_;

      ## The yield method causes this worker to wait for its next
      ## time interval slot before running. Yield has no effect
      ## without the "interval" option.

      ## Yielding is beneficial inside a user_begin block. A use case
      ## is wanting to stagger database connections among workers in
      ## order to not impact the DB server.

      MCE->yield;

      MCE->sendto( "STDOUT",
         sprintf("Node %2d: %0.5f -- Worker %2d: %12s -- Started\n",
            MCE->task_id + 1, time(), MCE->task_wid, '')
      );

      return;
   }

   {
      my $prev_time = time();

      sub user_func {

         my ($self, $seq_n, $chunk_id) = @_;

         ## Yield simply waits for the next time interval.
         MCE->yield;

         ## Calculate how long this worker has waited.
         my $curr_time = time();
         my $time_waited = $curr_time - $prev_time;

         $prev_time = $curr_time;

         MCE->sendto( "STDOUT",
            sprintf("Node %2d: %0.5f -- Worker %2d: %12.5f -- Seq_N %3d\n",
               MCE->task_id + 1, time(), MCE->task_wid, $time_waited, $seq_n)
         );

         return;
      }
   }

   ## Simulate a 4 node environment passing node_id to create_task.

   print "Node_ID  Current_Time        Worker_ID  Time_Waited     Comment\n";

   MCE->new(
      user_begin => \&user_begin,
      user_func  => \&user_func,

      user_tasks => [
         create_task(1),
         create_task(2),
         create_task(3),
         create_task(4)
      ]

   )->run;

   -- Output (notice Current_Time below, stays 0.10 apart)

   Node_ID  Current_Time        Worker_ID  Time_Waited     Comment
   Node  1: 1374807976.74634 -- Worker  1:              -- Started
   Node  2: 1374807976.84634 -- Worker  1:              -- Started
   Node  3: 1374807976.94638 -- Worker  1:              -- Started
   Node  4: 1374807977.04639 -- Worker  1:              -- Started
   Node  1: 1374807977.14634 -- Worker  2:              -- Started
   Node  2: 1374807977.24640 -- Worker  2:              -- Started
   Node  3: 1374807977.34649 -- Worker  2:              -- Started
   Node  4: 1374807977.44657 -- Worker  2:              -- Started
   Node  1: 1374807977.54636 -- Worker  1:      0.90037 -- Seq_N   1
   Node  2: 1374807977.64638 -- Worker  1:      1.00040 -- Seq_N   7
   Node  3: 1374807977.74642 -- Worker  1:      1.10043 -- Seq_N  13
   Node  4: 1374807977.84643 -- Worker  1:      1.20045 -- Seq_N  19
   Node  1: 1374807977.94636 -- Worker  2:      1.30037 -- Seq_N   2
   Node  2: 1374807978.04638 -- Worker  2:      1.40040 -- Seq_N   8
   Node  3: 1374807978.14641 -- Worker  2:      1.50042 -- Seq_N  14
   Node  4: 1374807978.24644 -- Worker  2:      1.60045 -- Seq_N  20
   Node  1: 1374807978.34628 -- Worker  1:      0.79996 -- Seq_N   3
   Node  2: 1374807978.44631 -- Worker  1:      0.79996 -- Seq_N   9
   Node  3: 1374807978.54634 -- Worker  1:      0.79996 -- Seq_N  15
   Node  4: 1374807978.64636 -- Worker  1:      0.79997 -- Seq_N  21
   Node  1: 1374807978.74628 -- Worker  2:      0.79996 -- Seq_N   4
   Node  2: 1374807978.84632 -- Worker  2:      0.79997 -- Seq_N  10
   Node  3: 1374807978.94634 -- Worker  2:      0.79996 -- Seq_N  16
   Node  4: 1374807979.04636 -- Worker  2:      0.79996 -- Seq_N  22
   Node  1: 1374807979.14628 -- Worker  1:      0.80001 -- Seq_N   5
   Node  2: 1374807979.24631 -- Worker  1:      0.80000 -- Seq_N  11
   Node  3: 1374807979.34634 -- Worker  1:      0.80001 -- Seq_N  17
   Node  4: 1374807979.44636 -- Worker  1:      0.80000 -- Seq_N  23
   Node  1: 1374807979.54628 -- Worker  2:      0.80000 -- Seq_N   6
   Node  2: 1374807979.64631 -- Worker  2:      0.80000 -- Seq_N  12
   Node  3: 1374807979.74633 -- Worker  2:      0.80000 -- Seq_N  18
   Node  4: 1374807979.84636 -- Worker  2:      0.80000 -- Seq_N  24

The interval.pl example is included with MCE.

EXAMPLES

MCE comes with various examples showing real-world use case scenarios on parallelizing something as small as cat (try with -n) to searching for patterns and word count aggregation.

INCLUDED with DISTRIBUTION

   barrier_sync.pl
             A barrier sync demonstration.

   cat.pl    Concatenation script, similar to the cat binary.
   egrep.pl  Egrep script, similar to the egrep binary.
   wc.pl     Word count script, similar to the wc binary.

   findnull.pl
             A parallel driven script to report lines containing
             null fields. It's many times faster than the binary
             egrep command. Try against a large file containing
             very long lines.

   foreach.pl, forseq.pl, forchunk.pl
             These take the same sqrt example from Parallel::Loops
             and measures the overhead of the engine. The number
             indicates the size of @input which can be submitted
             and results displayed in under 1 second.

             Parallel::Loops is based on Parallel::ForkManager.
             MCE utilizes a pool of workers.

             Parallel::Loops:     600  Forking each @input is expensive
             MCE foreach....:  22,500  Sends result after each @input
             MCE forseq.....:  66,000  Loops through sequence of numbers
             MCE forchunk...: 450,000  Chunking reduces overhead

   interval.pl
             Demonstration of the interval option appearing in MCE 1.5.

   matmult/matmult_base.pl, matmult_mce.pl, strassen_mce.pl
             Various matrix multiplication demonstrations benchmarking
             PDL, PDL + MCE, as well as parallelizing Strassen's
             divide-and-conquer algorithm. Also included are 2 plain
             Perl examples.

   scaling_pings.pl
             Perform ping test and report back failing IPs to
             standard output.

   seq_demo.pl
             A demonstration of the new sequence option appearing
             in MCE 1.3. Run with seq_demo.pl | sort

   tbray/wf_mce1.pl, wf_mce2.pl, wf_mce3.pl
             An implementation of wide finder utilizing MCE.
             As fast as MMAP IO when file resides in OS FS cache.
             2x ~ 3x faster when reading directly from disk.

CHUNK_SIZE => 1 (in essence, wanting no chunking on input data)

Imagine a long running process and wanting to parallelize an array against a pool of workers. Note: The sequence option can be used if simply wanting to loop through a sequence of numbers in parallel one number at a time.

Below, a callback function for displaying results is used. The logic shows how one can display results immediately while still preserving the output order as if processing serially. The %result hash is a temporary cache to store results for out-of-order replies.

   my @input_data  = (0 .. 18000 - 1);
   my $max_workers = 3;
   my $order_id    = 1;
   my %result;

   ## Callback function for displaying results.

   sub display_result {

      my ($wk_result, $chunk_id) = @_;
      $result{$chunk_id} = $wk_result;

      while (1) {
         last unless (exists $result{$order_id});

         printf "i: %d sqrt(i): %f\n",
            $input_data[$order_id - 1], $result{$order_id};

         delete $result{$order_id};
         $order_id++;
      }
   }

   ## Compute via MCE.

   my $mce = MCE->new(
      input_data  => \@input_data,
      max_workers => $max_workers,
      chunk_size  => 1,

      user_func => sub {
         my ($self, $chunk_ref, $chunk_id) = @_;
         my $wk_result = sqrt($chunk_ref->[0]);

         MCE->do('display_result', $wk_result, $chunk_id);
      }
   );

   MCE->run();

FOREACH sugar METHOD

   ## Compute via MCE. Foreach implies chunk_size => 1.

   my $mce = MCE->new(
      max_workers => $max_workers
   );

   ## Worker calls code block passing a reference to an array containing
   ## one item. Use $chunk_ref->[0] to retrieve the single element.

   MCE->foreach(\@input_data, sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      my $wk_result = sqrt($chunk_ref->[0]);

      MCE->do('display_result', $wk_result, $chunk_id);
   });

CHUNKING INPUT_DATA

Chunking reduces overhead many folds. Instead of passing a single item from @input_data, a chunk of $chunk_size is sent to the next available worker. The sequence option can be used as well if simply wanting to loop through a sequence of numbers with chunking applied in parallel.

   my @input_data  = (0 .. 385000 - 1);
   my $max_workers = 3;
   my $chunk_size  = 500;
   my $order_id    = 1;
   my %result;

   ## Callback function for displaying results.

   sub display_result {

      my ($wk_result, $chunk_id) = @_;
      $result{$chunk_id} = $wk_result;

      while (1) {
         last unless (exists $result{$order_id});
         my $i = ($order_id - 1) * $chunk_size;

         foreach ( @{ $result{$order_id} } ) {
            printf "i: %d sqrt(i): %f\n", $input_data[$i++], $_;
         }

         delete $result{$order_id};
         $order_id++;
      }
   }

   ## Compute via MCE.

   my $mce = MCE->new(
      input_data  => \@input_data,
      max_workers => $max_workers,
      chunk_size  => $chunk_size,

      user_func => sub {
         my ($self, $chunk_ref, $chunk_id) = @_;
         my @wk_result;

         foreach ( @{ $chunk_ref } ) {
            push @wk_result, sqrt($_);
         }

         MCE->do('display_result', \@wk_result, $chunk_id);
      }
   );

   MCE->run();

FORCHUNK sugar METHOD

   ## Compute via MCE.

   my $mce = MCE->new(
      max_workers => $max_workers,
      chunk_size  => $chunk_size
   );

   ## Below, $chunk_ref is a reference to an array containing the next
   ## $chunk_size items from @input_data.

   MCE->forchunk(\@input_data, sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      my @wk_result;

      foreach ( @{ $chunk_ref } ) {
         push @wk_result, sqrt($_);
      }

      MCE->do('display_result', \@wk_result, $chunk_id);
   });

DEMO APPLYING SEQUENCES WITH USER_TASKS

One may specify the sequence option per each task. The following is taken directly from the seq_demo.pl example. Think of the following demonstration as having 3 mini-MCEs running simultaneously in parallel. Chunking can also be configured independently as well.

   use MCE;

   ## Run with seq_demo.pl | sort

   sub user_func {
      my ($self, $seq_n, $chunk_id) = @_;

      my $wid      = MCE->wid();
      my $task_id  = MCE->task_id();
      my $task_wid = MCE->task_wid();

      if (ref $seq_n eq 'ARRAY') {
         ## Received the next "chunked" sequence of numbers
         ## e.g. when chunk_size > 1, $seq_n will be an array ref above

         foreach (@{ $seq_n }) {
            printf(
               "task_id %d: seq_n %s: chunk_id %d: wid %d: task_wid %d\n",
               $task_id,    $_,       $chunk_id,   $wid,   $task_wid
            );
         }
      }
      else {
         printf(
            "task_id %d: seq_n %s: chunk_id %d: wid %d: task_wid %d\n",
            $task_id,    $seq_n,   $chunk_id,   $wid,   $task_wid
         );
      }
   }

   ## Each task can be configured independently.

   my $mce = MCE->new(
      user_tasks => [{
         max_workers => 2,
         chunk_size  => 1,
         sequence    => { begin => 11, end => 19, step => 1 },
         user_func   => \&user_func
      },{
         max_workers => 2,
         chunk_size  => 5,
         sequence    => { begin => 21, end => 29, step => 1 },
         user_func   => \&user_func
      },{
         max_workers => 2,
         chunk_size  => 3,
         sequence    => { begin => 31, end => 39, step => 1 },
         user_func   => \&user_func
      }]
   );

   MCE->run();

   -- Output

   task_id 0: seq_n 11: chunk_id 1: wid 1: task_wid 1
   task_id 0: seq_n 12: chunk_id 2: wid 2: task_wid 2
   task_id 0: seq_n 13: chunk_id 3: wid 1: task_wid 1
   task_id 0: seq_n 14: chunk_id 4: wid 2: task_wid 2
   task_id 0: seq_n 15: chunk_id 5: wid 1: task_wid 1
   task_id 0: seq_n 16: chunk_id 6: wid 2: task_wid 2
   task_id 0: seq_n 17: chunk_id 7: wid 1: task_wid 1
   task_id 0: seq_n 18: chunk_id 8: wid 2: task_wid 2
   task_id 0: seq_n 19: chunk_id 9: wid 1: task_wid 1
   task_id 1: seq_n 21: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 22: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 23: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 24: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 25: chunk_id 1: wid 3: task_wid 1
   task_id 1: seq_n 26: chunk_id 2: wid 4: task_wid 2
   task_id 1: seq_n 27: chunk_id 2: wid 4: task_wid 2
   task_id 1: seq_n 28: chunk_id 2: wid 4: task_wid 2
   task_id 1: seq_n 29: chunk_id 2: wid 4: task_wid 2
   task_id 2: seq_n 31: chunk_id 1: wid 5: task_wid 1
   task_id 2: seq_n 32: chunk_id 1: wid 5: task_wid 1
   task_id 2: seq_n 33: chunk_id 1: wid 5: task_wid 1
   task_id 2: seq_n 34: chunk_id 2: wid 6: task_wid 2
   task_id 2: seq_n 35: chunk_id 2: wid 6: task_wid 2
   task_id 2: seq_n 36: chunk_id 2: wid 6: task_wid 2
   task_id 2: seq_n 37: chunk_id 3: wid 5: task_wid 1
   task_id 2: seq_n 38: chunk_id 3: wid 5: task_wid 1
   task_id 2: seq_n 39: chunk_id 3: wid 5: task_wid 1

MULTIPLE WORKERS RUNNING IN PARALLEL

Both input_data and sequence options are optional in MCE. One can simply use MCE to parallelize multiple workers. The "do" & "sendto" methods can be used to pass data back to the manager process. One doesn't have to wait until the worker has completed processing to pass data back. Both "do" & "sendto" methods are processed serially by the main process on a first come, first serve basis. All 4 workers run in parallel for the demonstration below.

   use MCE;

   sub report_stats {
      my ($wid, $msg, $hash_ref) = @_;
      print "Worker $wid says $msg: ", $hash_ref->{'counter'}, "\n";
   }

   my $mce = MCE->new(
      max_workers => 4,

      user_func => sub {
         my ($self) = @_;
         my $wid = MCE->wid();

         if ($wid == 1) {
            my %hash = ('counter' => 0);
            while (1) {
               $hash{'counter'} += 1;
               MCE->do('report_stats', $wid, 'Hello there', \%hash);
               last if ($hash{'counter'} == 4);
               sleep 2;
            }
         }

         else {
            my %hash = ('counter' => 0);
            while (1) {
               $hash{'counter'} += 1;
               MCE->do('report_stats', $wid, 'Welcome ...', \%hash);
               last if ($hash{'counter'} == 2);
               sleep 4;
            }
         }

         MCE->sendto('stdout', "Worker $wid is exiting\n");
      }
   );

   MCE->run;

   Worker 2 gets there first in 2nd output below.

   $ ./demo.pl
   Worker 1 says Hello there: 1
   Worker 2 says Welcome ...: 1
   Worker 3 says Welcome ...: 1
   Worker 4 says Welcome ...: 1
   Worker 1 says Hello there: 2
   Worker 2 says Welcome ...: 2
   Worker 3 says Welcome ...: 2
   Worker 1 says Hello there: 3
   Worker 2 is exiting
   Worker 3 is exiting
   Worker 4 says Welcome ...: 2
   Worker 4 is exiting
   Worker 1 says Hello there: 4
   Worker 1 is exiting

   $ ./demo.pl
   Worker 2 says Welcome ...: 1
   Worker 1 says Hello there: 1
   Worker 4 says Welcome ...: 1
   Worker 3 says Welcome ...: 1
   Worker 1 says Hello there: 2
   Worker 2 says Welcome ...: 2
   Worker 4 says Welcome ...: 2
   Worker 3 says Welcome ...: 2
   Worker 2 is exiting
   Worker 4 is exiting
   Worker 1 says Hello there: 3
   Worker 3 is exiting
   Worker 1 says Hello there: 4
   Worker 1 is exiting

DEFAULT INPUT SCALAR

Beginning with MCE 1.5, the input scalar $_ is localized prior to calling user_func for input_data and sequence of numbers. The following applies.

## use_slurpio => 1:

      $_ is a reference to the buffer e.g. $_ = \$_buffer;
      $_ is a ref irregardless of whether chunk_size is 1 or greater

      user_func => sub {
       # my ($self, $chunk_ref, $chunk_id) = @_;
         print ${ $_ };    ## $_ is same as $chunk_ref
      }

## chunk_size is greater than 1, use_slurpio => 0:

      $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
      $_ is same as $chunk_ref or $_[CHUNK]

      user_func => sub {
       # my ($self, $chunk_ref, $chunk_id) = @_;
         for my $row ( @{ $_ } ) {
            print $row, "\n";
         }
      }

      use MCE CONST => 1;

      user_func => sub {
       # my ($self, $chunk_ref, $chunk_id) = @_;
         for my $row ( @{ $_[CHUNK] } ) {
            print $row, "\n";
         }
      }

## chunk_size equals 1, use_slurpio => 0:

      $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;

      ## Note that $_ and $chunk_ref are not the same below.
      ## $chunk_ref is a reference to an array.

      user_func => sub {
       # my ($self, $chunk_ref, $chunk_id) = @_;
         print $_, "\n;    ## Same as $chunk_ref->[0];
      }

      MCE->foreach("/path/to/file", sub {
       # my ($self, $chunk_ref, $chunk_id) = @_;
         print $_;         ## Same as $chunk_ref->[0];
      });

      ## However, that's not the case for the forseq method.
      ## Both $_ and $n_seq are the same when chunk_size => 1.

      MCE->forseq([ 1, 9 ], sub {
       # my ($self, $n_seq, $chunk_id) = @_;
         print $_, "\n";   ## Same as $n_seq
      });

In case you missed it, forseq will now apply the chunk_size option. Note that sequence can also be specified using an array reference. The below is the same as the example afterwards.

   MCE->forseq( { begin => 10, end => 40, step => 2 }, ... );

The code block receives an array containing the next 5 sequences. Chunk 1 (chunk_id = 1) contains 10,12,14,16,18. $n_seq is a reference to an array, same as $_, due to chunk_size being greater than 1.

   MCE->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
    # my ($self, $n_seq, $chunk_id) = @_;
      my @result;
      for my $n ( @{ $_ } ) {
         ... do work, append to result for 5
      }
      ... do something with result afterwards
   });

REQUIREMENTS

Perl 5.8.0 or later. The PDL::IO::Storable module is required in scripts parallelizing PDL with MCE.

SEE ALSO

MCE::Queue, MCE::Signal, MCE::Subs, MCE::Util

SOURCE

The source is hosted at http://code.google.com/p/many-core-engine-perl/

AUTHOR

Mario E. Roy, <marioeroy AT gmail DOT com>

COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by Mario E. Roy

This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.