The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

MCE::Flow - Parallel flow model for building creative applications

VERSION

This document describes MCE::Flow version 1.499_002

DESCRIPTION

MCE::Flow is great for writing custom apps to maximize on all available cores. Obviously, it's trivial to parallelize with mce_stream as seen below. However, let's have MCE::Flow compute the same in parallel.

   ## Native map function.
   my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;

   ## Same with MCE::Stream (from right to left).
   @a = mce_stream
        sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
     
   mce_stream \@a,
        sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;

Ok, let's have some fun. This utilizes MCE::Queue. MCE::Flow makes it easy to harness user_tasks within MCE.

   use MCE::Flow;
   use MCE::Queue;

This calls for preserving output order. Always remember to set $_order_id to 1 before running.

   my ($_gather_ref, $_order_id, %_tmp); 

   sub _preserve_order {

      $_tmp{$_[1]} = $_[0];

      while (1) {
         last unless exists $_tmp{$_order_id};
         push @{ $_gather_ref }, @{ $_tmp{$_order_id} };
         delete $_tmp{$_order_id++};
      }

      return;
   }

We need 2 queues for flow of data between the 3 tasks. Notice the task_end function and how it checks the $task_name value for determining which task has ended.

   my $b = MCE::Queue->new;
   my $c = MCE::Queue->new;

   sub task_end {
      my ($mce, $task_id, $task_name) = @_;

      if (defined $mce->{user_tasks}->[$task_id + 1]) {
         my $N_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};

         if ($task_name eq 'a') {
            $b->enqueue((undef) x $N_workers);
         }
         elsif ($task_name eq 'b') {
            $c->enqueue((undef) x $N_workers);
         }
      }

      return;
   }

Now on with the 3 sub tasks. The first one reads input and begins the flow. The 2nd dequeues, performs the calculation, and enqueues into the next. Finally, the last sub task calls the gather method. Gather can be called as often as needed. Data is frozen in-between tasks to save from double serialization. This is the fastest approach in MCE with the least overhead.

   sub task_a {
      my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;

      push @ans, map { $_ * 2 } @{ $chunk_ref };
      $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
   }

   sub task_b {
      my ($mce) = @_;

      while (1) {
         my $chunk = $b->dequeue; last unless defined $chunk;
         my @ans; $chunk = MCE->thaw($chunk);

         push @ans, map { $_ * 3 } @{ $chunk->[0] };
         $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
      }

      return;
   }

   sub task_c {
      my ($mce) = @_;

      while (1) {
         my $chunk = $c->dequeue; last unless defined $chunk;
         my @ans; $chunk = MCE->thaw($chunk);

         push @ans, map { $_ * 5 } @{ $chunk->[0] };
         MCE->gather(\@ans, $chunk->[1]);
      }

      return;
   }

To put it all together, MCE::Flow builds out a MCE instance behind the scene automatically for you and runs. Task_name can take an array reference. The same is true for max_workers (not shown below).

   my @a; $_gather_ref = \@a; $_order_id = 1;

   mce_flow {
      gather => \&_preserve_order, task_name => [ 'a', 'b', 'c' ],
      task_end => \&task_end

   }, \&task_a, \&task_b, \&task_c, 1..10000;

   print "@a" . "\n";

What if speed is not a concern and wanting to rid of all the MCE->freeze and MCE->thaw in the code. Simply enqueue and dequeue 2 items at a time.

   $b->enqueue(\@ans, $chunk_id)

   ...

   my ($chunk_ref, $chunk_id) = $b->dequeue(2);
   last unless defined $chunk_ref;

   ...

The task_end must be updated as well due to workers dequeuing 2 items at a time. Therefore, we must double the number of undefs into the queue.

   if ($task_name eq 'a') {
      $b->enqueue((undef) x ($N_workers * 2));
   }
   elsif ($task_name eq 'b') {
      $c->enqueue((undef) x ($N_workers * 2));
   }

For faster data serialization behind the scene, MCE::Flow can make use of Sereal if available on the system. It silently falls back to Storable if not available. Both chunk_size and max_workers default to auto.

   use MCE::Flow Sereal => 1, chunk_size => 'auto', max_workers => 'auto';

API

mce_flow
   ## mce_flow is imported into the calling script.

   mce_flow {
      ## Both this (mce_options) and input data are optional.
   },
   sub { ... }, sub { ... }, sub { ... }, 1..10000;
mce_flow_f

TODO ...

mce_flow_s

TODO ...

init
   MCE::Flow::init {

      ## This form is available for configuring MCE options
      ## before running.

      user_begin => sub {
         print "## ", MCE->task_name, ": ", MCE->wid, "\n";
      }
      user_end => sub {
         ...
      }
   };
finish
   MCE::Flow::finish();   ## This is called automatically.

INDEX

MCE

AUTHOR

Mario E. Roy, <marioeroy AT gmail DOT com>

LICENSE

This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.