The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

MCE::Core - Documentation describing the core API for Many-core Engine

VERSION

This document describes MCE::Core version 1.499_005

SYNOPSIS

This is a simplistic use case of MCE running with 4 workers.

   use MCE;

   my $mce = MCE->new(
      max_workers => 4,
      user_func => sub {
         my ($self) = @_;
         print "Hello from ", $self->wid, "\n";
      }
   );

   $mce->run;

      ## All public methods beginning with MCE 1.5 can be called
      ## directly via the package method e.g. MCE->wid, MCE->run.

   -- Output

   Hello from 3
   Hello from 1
   Hello from 2
   Hello from 4

EXPORT_CONST, CONST

The anonymous user_func above can be written using one less line. There are 3 constants defined in MCE which are exportable. Using the constants in lieu of 0,1,2 makes it more legible when accessing the array elements.

SELF CHUNK CID

   ## The following exports SELF => 0, CHUNK => 1, CID => 2

   use MCE EXPORT_CONST => 1;
   use MCE CONST => 1;              ## Same thing in 1.415 and later

   user_func => sub {
    # my ($self, $chunk_ref, $chunk_id) = @_;
      print "Hello from ", $_[SELF]->wid, "\n";
   }

   ## MCE 1.5 allows any public method to be be called directly.

   use MCE;

   user_func => sub {
    # my ($self, $chunk_ref, $chunk_id) = @_;
      print "Hello from ", MCE->wid, "\n";
   }

new ( [ options ] )

Below, a new instance is configured with all available options.

   use MCE;

   my $mce = MCE->new(

      max_workers  => 8,                 ## Default $MCE::MAX_WORKERS

          ## Number of workers to spawn. This can be set automatically
          ## with MCE 1.412 and later releases.

          ## max_workers => 'auto',      ## = MCE::Util::get_ncpu()
          ## max_workers => 'Auto-1',    ## = MCE::Util::get_ncpu() - 1
          ## max_workers => 'AUTO + 3',  ## = MCE::Util::get_ncpu() + 3
          ## max_workers => 'AUTO * 1.5',
          ## max_workers => 'auto / 1.333 + 2',

      chunk_size   => 2000,              ## Default $MCE::CHUNK_SIZE

          ## Less than or equal to 8192 is number of records.
          ## Greater than 8192 is number of bytes. MCE reads
          ## till the end of record before calling user_func.

          ## chunk_size =>     1,        ## Consists of 1 record
          ## chunk_size =>  1000,        ## Consists of 1000 records
          ## chunk_size => 16384,        ## Approximate 16384 bytes
          ## chunk_size => 50000,        ## Approximate 50000 bytes

      tmp_dir      => $tmp_dir,          ## Default $MCE::TMP_DIR

          ## Default is $MCE::Signal::tmp_dir which points to
          ## $ENV{TEMP} if defined. Otherwise, tmp_dir points
          ## to a location under /tmp.

      freeze       => \&encode_sereal,   ## Default $MCE::FREEZE
      thaw         => \&decode_sereal,   ## Default $MCE::THAW

          ## Release 1.412 allows freeze and thaw to be overridden.
          ## Just include a serialization module prior to loading MCE.

          ## use Sereal qw(encode_sereal decode_sereal);
          ## use JSON::XS qw(encode_json decode_json);
          ## use MCE;

      gather       => \@a,               ## Default undef

          ## Release 1.5 allows for gathering of data to an array
          ## reference, a MCE::Queue or Thread::Queue object, or
          ## a code reference. One invokes gathering by calling
          ## the gather method as often as needed.

          ## gather => $Q,
          ## gather => \&preserve_order,

      input_data   => $input_file,       ## Default undef
      RS           => "\n::\n",          ## Default undef

          ## input_data => '/path/to/file' for input file
          ## input_data => \@array for input array
          ## input_data => \*FILE_HNDL for file handle
          ## input_data => \$scalar to treat like a file

          ## Use the sequence option if simply wanting to loop
          ## through a sequence of numbers instead.

          ## Release 1.4 and later allows one to specify the
          ## record separator (RS) applicable to input files,
          ## file handles, and scalar references.

      use_slurpio  => 1,                 ## Default 0
      use_threads  => 1,                 ## Default 0 or 1

          ## Whether or not to enable slurpio when reading files
          ## (passes raw chunk to user function).

          ## By default MCE does forking (spawns child processes).
          ## MCE also supports threads via 2 threading libraries.
          ##
          ## The use of threads in MCE requires that you include
          ## threads support prior to loading MCE. The use_threads
          ## option defaults to 1 when a thread library is present.
          ##
          ##    use threads;                  use forks;
          ##    use threads::shared;   (or)   use forks::shared;
          ##
          ##    use MCE                       use MCE;

      spawn_delay  => 0.035,             ## Default undef
      submit_delay => 0.002,             ## Default undef
      job_delay    => 0.150,             ## Default undef

          ## Time to wait, in fractional seconds, before spawning
          ## each worker, parameters submission to each worker,
          ## and job commencement (staggered) for each worker.
          ## For example, use job_delay when wanting to stagger
          ## many workers connecting to a database.

      on_post_exit => \&on_post_exit,    ## Default undef
      on_post_run  => \&on_post_run,     ## Default undef

          ## Execute code block immediately after a worker exits
          ## (exit, MCE->exit, or die). Execute code block after
          ## running a job (MCE->process or MCE->run).

          ## One can take action immediately after a worker exits
          ## or wait until after the job has completed.

      user_args    => { env => 'test' }, ## Default undef

          ## MCE release 1.4 adds a new parameter to allow one to
          ## specify arbitrary arguments such as a string, an ARRAY
          ## or a HASH reference. Workers can access this directly:
          ##    my $args = $self->{user_args};

      user_begin   => \&user_begin,      ## Default undef
      user_func    => \&user_func,       ## Default undef
      user_end     => \&user_end,        ## Default undef

          ## Think of user_begin, user_func, user_end like the awk
          ## scripting language:
          ##    awk 'BEGIN { ... } { ... } END { ... }'

          ## MCE workers call user_begin once per job, then
          ## call user_func repeatedly until no chunks remain.
          ## Afterwards, user_end is called.

      user_error   => \&user_error,      ## Default undef
      user_output  => \&user_output,     ## Default undef

          ## When workers call the following functions, MCE will
          ## pass the data to user_error/user_output if defined.
          ## MCE->sendto('stderr', 'Sending to STDERR');
          ## MCE->sendto('stdout', 'Sending to STDOUT');

      stderr_file  => 'err_file',        ## Default STDERR
      stdout_file  => 'out_file',        ## Default STDOUT

          ## Or to file. User_error/user_output take precedence.

      flush_file   => 1,                 ## Default 0
      flush_stderr => 1,                 ## Default 0
      flush_stdout => 1,                 ## Default 0

          ## Flush sendto file, standard error, or standard output.

      interval     => {
          delay => 0.005 [, max_nodes => 4, node_id => 1 ]
      },

          ## For use with the yield method introduced in MCE 1.5.
          ## Both max_nodes & node_id are optional and default to 1.
          ## Delay is the amount of time between intervals.

      sequence     => {                  ## Default undef
          begin => -1, end => 1 [, step => 0.1 [, format => "%4.1f" ] ]
      },

          ## For looping through a sequence of numbers in parallel.
          ## STEP, if omitted, defaults to 1 if BEGIN is smaller than
          ## END or -1 if BEGIN is greater than END. The FORMAT string
          ## is passed to sprintf behind the scene (% can be omitted).
          ## e.g. $seq_n_formated = sprintf("%4.1f", $seq_n);

          ## Leave input_data set to undef when specifying the sequence
          ## option. One cannot specify both options together.

          ## Release 1.4 allows one to specify an array reference
          ## instead (3rd & 4th values are optional):
          ##
          ##    sequence => [ -1, 1, 0.1, "%4.1f" ]

      task_end     => \&task_end,        ## Default undef

          ## MCE 1.5 allows this to be specified at the top level.
          ## This is called by the manager process after the task
          ## has completed processing.

      task_name    => 'string',          ## Default 'MCE'

          ## Added in MCE 1.5. This is beneficial for user_tasks.
          ## Each task can be specified with a different name value.
          ## It allows for task_end to be specified at the top level.
          ## The task_name value is passed as the 3rd arg to task_end.

      user_tasks   => [                  ## Default undef
         { ... },                        ## Options for task 0
         { ... },                        ## Options for task 1
         { ... },                        ## Options for task 2
      ],

          ## Takes a list of hash references, each allowing up to 12
          ## options. All other MCE options are ignored. Input_data
          ## is applicable for the first task only.
          ##   task_name, max_workers, chunk_size, input_data, sequence,
          ##   task_end, user_args, user_begin, user_end, user_func,
          ##   gather, use_threads

          ## Options not specified here will default to same option
          ## specified at the top level.

   );

OVERRIDING DEFAULTS

The following list 5 options which may be overridden when loading the module.

   use Sereal qw(encode_sereal decode_sereal);

   use MCE max_workers => 4,                    ## Default 1
           chunk_size  => 100,                  ## Default 1
           tmp_dir     => "/path/to/app/tmp",   ## $MCE::Signal::tmp_dir
           freeze      => \&encode_sereal,      ## \&Storable::freeze
           thaw        => \&decode_sereal       ## \&Storable::thaw
   ;

   my $mce = MCE->new( ... );

There is a simplier way to enable Sereal with MCE 1.5. The following will attempt to use Sereal if available, otherwise will default back to using Storable for serialization.

   use MCE Sereal => 1;

   ## Serialization is through Sereal if available.
   my $mce = MCE->new( ... );

RUNNING

Run calls spawn, kicks off job, workers call user_begin, user_func, and user_end. Run shuts down workers afterwards. Call the spawn method early whenever the need arises for large data structures within the main process prior to running.

   MCE->spawn();                         ## This is optional

   MCE->run();                           ## Call run or process below

   ## Acquire data arrays and/or input_files. The same pool of
   ## workers are used.

   MCE->process(\@input_data_1);         ## Process arrays
   MCE->process(\@input_data_2);
   MCE->process(\@input_data_n);

   MCE->process('input_file_1');         ## Process files
   MCE->process('input_file_2');
   MCE->process('input_file_n');

   ## Shutdown workers afterwards.

   MCE->shutdown();

SYNTAX for ON_POST_EXIT

Often times, one may want to capture the exit status. The on_post_exit option, if defined, is executed immediately after a worker exits via exit (children), MCE->exit (children and threads), or die.

The format of $e->{pid} is PID_123 for children and THR_123 for threads.

   sub on_post_exit {
      my ($self, $e) = @_;
      print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
   }

   sub user_func {
      my $self = $_[0];
      MCE->exit(0, 'ok', 'pebbles');
   }

   my $mce = MCE->new(
      on_post_exit => \&on_post_exit,
      user_func    => \&user_func
   );

   MCE->run();

   -- Output

   2: PID_7625: 0: ok: pebbles

SYNTAX for ON_POST_RUN

The on_post_run option, if defined, is executed immediately after running MCE->process or MCE->run. This option receives an array reference of hashes.

The difference between on_post_exit and on_post_run is that the former is called immediately whereas the latter is called after all workers have completed processing or running.

   sub on_post_run {
      my ($self, $status_ref) = @_;
      foreach my $e ( @{ $status_ref } ) {
         print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
      }
   }

   sub user_func {
      my $self = $_[0];
      MCE->exit(0, 'ok', 'pebbles');
   }

   my $mce = MCE->new(
      on_post_run => \&on_post_run,
      user_func   => \&user_func
   );

   MCE->run();

SYNTAX for SEQUENCE

The 1.3 release and above allows workers to loop through a sequence of numbers computed mathematically without the overhead of an array. The sequence can be specified separately per each user_task entry unlike input_data which is applicable to the first task only.

See the seq_demo.pl example, included with this distribution, on applying sequences with the user_tasks option including chunking.

Sequence can be specified using an array or a hash reference.

   use MCE;

   my $mce = MCE->new(
      max_workers => 3,

    # sequence => [ 10, 19, 0.7, "%4.1f" ],

      sequence => {
         begin => 10, end => 19, step => 0.7, format => "%4.1f"
      },

      user_func => sub {
         my ($self, $n, $chunk_id) = @_;
         print $n, " from ", MCE->wid(), " id ", $chunk_id, "\n";
      }
   );

   MCE->run();

   -- Output (sorted afterwards, notice wid and chunk_id in output)

   10.0 from 1 id 1
   10.7 from 2 id 2
   11.4 from 3 id 3
   12.1 from 1 id 4
   12.8 from 2 id 5
   13.5 from 3 id 6
   14.2 from 1 id 7
   14.9 from 2 id 8
   15.6 from 3 id 9
   16.3 from 1 id 10
   17.0 from 2 id 11
   17.7 from 3 id 12
   18.4 from 1 id 13

SYNTAX for USER_BEGIN and USER_END

The user_begin and user_end options, if specified, behave similarly to awk 'BEGIN { ... } { ... } END { ... }'. These are called once per each worker during a run.

   sub user_begin {                   ## Called once at the beginning
      my $self = shift;
      $self->{wk_total_rows} = 0;
   }

   sub user_func {                    ## Called while processing
      my $self = shift;
      $self->{wk_total_rows} += 1;
   }

   sub user_end {                     ## Called once at the end
      my $self = shift;
      printf "## %d: Processed %d rows\n",
         MCE->wid(), $self->{wk_total_rows};
   }

   my $mce = MCE->new(
      user_begin => \&user_begin,
      user_func  => \&user_func,
      user_end   => \&user_end
   );

   MCE->run();

SYNTAX for USER_FUNC with USE_SLURPIO => 0

When processing input data, MCE can pass an array or rows or a slurped chunk. Below, a reference to an array containing the chunk data is processed.

e.g. $chunk_ref = [ record1, record2, record3, ... ]

   sub user_func {

      my ($self, $chunk_ref, $chunk_id) = @_;

      foreach my $row ( @{ $chunk_ref } ) {
         $self->{wk_total_rows} += 1;
         print $row;
      }
   }

   my $mce = MCE->new(
      chunk_size  => 100,
      input_data  => "/path/to/file",
      user_func   => \&user_func,
      use_slurpio => 0
   );

   MCE->run();

SYNTAX for USER_FUNC with USE_SLURPIO => 1

Here, a reference to a scalar containing the raw chunk data is processed.

   sub user_func {

      my ($self, $chunk_ref, $chunk_id) = @_;

      my $count = () = $$chunk_ref =~ /abc/;
   }

   my $mce = MCE->new(
      chunk_size  => 16000,
      input_data  => "/path/to/file",
      user_func   => \&user_func,
      use_slurpio => 1
   );

   MCE->run();

SYNTAX for USER_ERROR and USER_OUTPUT

Output coming from MCE->sendto('stderr/stdout', ...) can be intercepted by specifying the user_error and user_output options. MCE on receiving output will instead direct to user_error and user_output in a serialized fashion. Handy when wanting to filter, modify, and/or direct the output elsewhere.

   sub user_error {                   ## Redirect STDERR to STDOUT
      my $error = shift;
      print STDOUT $error;
   }

   sub user_output {                  ## Redirect STDOUT to STDERR
      my $output = shift;
      print STDERR $output;
   }

   sub user_func {
      my ($self, $chunk_ref, $chunk_id) = @_;
      my $count = 0;

      foreach my $row ( @{ $chunk_ref } ) {
         MCE->sendto('stdout', $row);
         $count += 1;
      }

      MCE->sendto('stderr', "$chunk_id: processed $count rows\n");
   }

   my $mce = MCE->new(
      chunk_size  => 1000,
      input_data  => "/path/to/file",
      user_error  => \&user_error,
      user_output => \&user_output,
      user_func   => \&user_func
   );

   MCE->run();

SYNTAX for USER_TASKS and TASK_END

This option takes an array of tasks. Each task allows for 12 MCE options. Input_data can be specified inside the first task or at the top level only, otherwise is ignored.

   max_workers, chunk_size, input_data, sequence, task_end, task_name,
   gather, user_args, user_begin, user_end, user_func, use_threads

Sequence and chunk_size were added in 1.3. User_args was introduced in 1.4. Name and input_data are new options allowed in 1.5. In addition, one can specify task_end at the top level. Task_end also receives 2 additional arguments $task_id and $task_name (shown below).

Options not specified here will default to the same option specified at the top level. The task_end option is called by the manager process when all workers for that role have completed processing.

Forking and threading can be intermixed among tasks unless running Cygwin. The run method will continue running until all workers have completed processing.

   use MCE;

   my $mce = MCE->new(
      input_data => $list_file,

      task_end   => sub {
         my ($self, $task_id, $task_name) = @_;
         print "Task [$task_id -- $task_name] completed processing\n";
      },

      user_tasks => [{
         task_name   => 'a',
         max_workers => 2,
         user_func   => \&parallel_task1,
         use_threads => 0

      },{
         task_name   => 'b',
         max_workers => 4,
         user_func   => \&parallel_task2,
         use_threads => 1

      }]
   );

   MCE->run();

DEFAULT INPUT SCALAR

Beginning with MCE 1.5, the input scalar $_ is localized prior to calling user_func for input_data and sequence of numbers. The following applies.

use_slurpio => 1
   $_ is a reference to the buffer e.g. $_ = \$_buffer;
   $_ is a ref irregardless of whether chunk_size is 1 or greater

   user_func => sub {
    # my ($self, $chunk_ref, $chunk_id) = @_;
      print ${ $_ };    ## $_ is same as $chunk_ref
   }
chunk_size is greater than 1, use_slurpio => 0
   $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
   $_ is same as $chunk_ref or $_[CHUNK]

   user_func => sub {
    # my ($self, $chunk_ref, $chunk_id) = @_;
      for my $row ( @{ $_ } ) {
         print $row, "\n";
      }
   }

   use MCE CONST => 1;

   user_func => sub {
    # my ($self, $chunk_ref, $chunk_id) = @_;
      for my $row ( @{ $_[CHUNK] } ) {
         print $row, "\n";
      }
   }
chunk_size equals 1, use_slurpio => 0
   $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;

   ## Note that $_ and $chunk_ref are not the same below.
   ## $chunk_ref is a reference to an array.

   user_func => sub {
    # my ($self, $chunk_ref, $chunk_id) = @_;
      print $_, "\n;    ## Same as $chunk_ref->[0];
   }

   MCE->foreach("/path/to/file", sub {
    # my ($self, $chunk_ref, $chunk_id) = @_;
      print $_;         ## Same as $chunk_ref->[0];
   });

   ## However, that's not the case for the forseq method.
   ## Both $_ and $n_seq are the same when chunk_size => 1.

   MCE->forseq([ 1, 9 ], sub {
    # my ($self, $n_seq, $chunk_id) = @_;
      print $_, "\n";   ## Same as $n_seq
   });

Sequence can also be specified using an array reference. The below is the same as the example afterwards.

   MCE->forseq( { begin => 10, end => 40, step => 2 }, ... );

The code block receives an array containing the next 5 sequences. Chunk 1 (chunk_id = 1) contains 10,12,14,16,18. $n_seq is a reference to an array, same as $_, due to chunk_size being greater than 1.

   MCE->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
    # my ($self, $n_seq, $chunk_id) = @_;
      my @result;
      for my $n ( @{ $_ } ) {
         ... do work, append to result for 5
      }
      ... do something with result afterwards
   });

INDEX

MCE

AUTHOR

Mario E. Roy, <marioeroy AT gmail DOT com>

LICENSE

This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.