The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

MCE::Hobo - A threads-like parallelization module

VERSION

This document describes MCE::Hobo version 1.826

SYNOPSIS

   use MCE::Hobo;

   MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();

   sub parallel {
      my ($arg1) = @_;
      print "Hello again, $arg1\n" if defined($arg1);
      print "Hello again, $_\n"; # same thing
   }

   MCE::Hobo->create( \&parallel, $_ ) for 1 .. 3;

   my @hobos    = MCE::Hobo->list();
   my @running  = MCE::Hobo->list_running();
   my @joinable = MCE::Hobo->list_joinable();
   my @count    = MCE::Hobo->pending();

   # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
   $_->join() for @hobos;

   # Joining occurs immediately as hobo(s) complete execution.
   1 while MCE::Hobo->waitone();

   my $hobo = mce_async { foreach (@files) { ... } };

   $hobo->join();

   if ( my $err = $hobo->error() ) {
      warn "Hobo error: $err\n";
   }

   # Get a hobo's object
   $hobo = MCE::Hobo->self();

   # Get a hobo's ID
   $pid = MCE::Hobo->pid();  # $$
   $pid = $hobo->pid();
   $tid = MCE::Hobo->tid();  # $$  same thing
   $tid = $hobo->tid();

   # Test hobo objects
   if ( $hobo1 == $hobo2 ) {
      ...
   }

   # Give other hobos a chance to run
   MCE::Hobo->yield();
   MCE::Hobo->yield(0.05);

   # Return context, wantarray aware
   my ($value1, $value2) = $hobo->join();
   my $value = $hobo->join();

   # Check hobo's state
   if ( $hobo->is_running() ) {
      sleep 1;
   }
   if ( $hobo->is_joinable() ) {
      $hobo->join();
   }

   # Send a signal to a hobo
   $hobo->kill('SIGUSR1');

   # Exit a hobo
   MCE::Hobo->exit();

DESCRIPTION

A Hobo is a migratory worker inside the machine that carries the asynchronous gene. Hobos are equipped with threads-like capability for running code asynchronously. Unlike threads, each hobo is a unique process to the underlying OS. The IPC is managed by MCE::Shared, which runs on all the major platforms including Cygwin.

An exception was made on the Windows platform to spawn threads versus children in MCE::Hobo 1.807 until 1.816. For consistency, the 1.817 release reverts back to spawning children on all supported platforms.

MCE::Hobo may be used as a standalone or together with MCE including running alongside threads.

   use MCE::Hobo;
   use MCE::Shared;

   # synopsis: head -20 file.txt | perl script.pl

   my $ifh = MCE::Shared->handle( "<", \*STDIN  );  # shared
   my $ofh = MCE::Shared->handle( ">", \*STDOUT );
   my $ary = MCE::Shared->array();

   sub parallel_task {
      my ( $id ) = @_;

      while ( <$ifh> ) {
         printf {$ofh} "[ %4d ] %s", $., $_;

       # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" );  # dereferencing
         $ary->set( $. - 1, "[ ID $id ] read line $.\n" );  # faster via OO
      }
   }

   my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
   my $hobo2 = MCE::Hobo->new( \&parallel_task, 2 );
   my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );

   $_->join for MCE::Hobo->list();  # ditto: MCE::Hobo->waitall();

   # search array (total one round-trip via IPC)
   my @vals = $ary->vals( "val =~ / ID 2 /" );

   print {*STDERR} join("", @vals);

API DOCUMENTATION

$hobo = MCE::Hobo->create( FUNCTION, ARGS )
$hobo = MCE::Hobo->new( FUNCTION, ARGS )

This will create a new hobo that will begin execution with function as the entry point, and optionally ARGS for list of parameters. It will return the corresponding MCE::Hobo object, or undef if hobo creation failed.

FUNCTION may either be the name of a function, an anonymous subroutine, or a code ref.

   my $hobo = MCE::Hobo->create( "func_name", ... );
       # or
   my $hobo = MCE::Hobo->create( sub { ... }, ... );
       # or
   my $hobo = MCE::Hobo->create( \&func, ... );
$hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )

Options may be specified via a hash structure. At this time, posix_exit and hobo_timeout are the only options supported. Set posix_exit to avoid all END and destructor processing. Set hobo_timeout, in number of seconds, if you want the hobo process to terminate after some time. The default is 0 for no timeout.

Many modules on CPAN are not thread-safe nor safe to use with many processes. The posix_exit option must be set explicitly if your application is crashing, due to a module with a DESTROY or END block not accounting for the process ID $$.$tid the object was constructed under: e.g. Cache::BDB.

Constructing a Hobo inside a thread implies posix_exit = 1> or if present CGI, FCGI, Curses, Gearman::Util, Gearman::XS, Mojo::IOLoop, Prima, Tk, or Wx.

   my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
      ...
   } );

   $hobo1->join;

   my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
      sleep 1 for ( 1 .. 9 );
   } );

   $hobo2->join;

   if ( $hobo2->error() eq "Hobo timed out\n" ) {
      ...
   }

The new() method is an alias for create().

mce_async { BLOCK } ARGS;
mce_async { BLOCK };

mce_async runs the block asynchronously similarly to MCE::Hobo-create()>. It returns the hobo object, or undef if hobo creation failed.

   my $hobo = mce_async { foreach (@files) { ... } };

   $hobo->join();

   if ( my $err = $hobo->error() ) {
      warn("Hobo error: $err\n");
   }
$hobo->join()

This will wait for the corresponding hobo to complete its execution. In non-voided context, join() will return the value(s) of the entry point function.

The context (void, scalar or list) for the return value(s) for join is determined at the time of joining and mostly wantarray aware.

   my $hobo1 = MCE::Hobo->create( sub {
      my @res = qw(foo bar baz);
      return (@res);
   });

   my @res1 = $hobo1->join();  # ( foo, bar, baz )
   my $res1 = $hobo1->join();  #   baz

   my $hobo2 = MCE::Hobo->create( sub {
      return 'foo';
   });

   my @res2 = $hobo2->join();  # ( foo )
   my $res2 = $hobo2->join();  #   foo
$hobo1->equal( $hobo2 )

Tests if two hobo objects are the same hobo or not. Hobo comparison is based on process IDs. This is overloaded to the more natural forms.

    if ( $hobo1 == $hobo2 ) {
        print("Hobos are the same\n");
    }
    # or
    if ( $hobo1 != $hobo2 ) {
        print("Hobos differ\n");
    }
$hobo->error()

Hobos are executed in an eval context. This method will return undef if the hobo terminates normally. Otherwise, it returns the value of $@ associated with the hobo's execution status in its eval context.

$hobo->exit()

This sends 'SIGQUIT' to the hobo object, notifying hobo to exit. It returns the hobo object to allow for method chaining. It is important to join later if not immediately to not leave a zombie or defunct process.

   $hobo->exit()->join();

   ...

   $hobo->join();  # later
MCE::Hobo->exit()

A hobo can be exited at any time by calling MCE::Hobo-exit()>. This behaves the same as exit(status) when called from the main process.

MCE::Hobo->finish()

This class method is called automatically by END, but may be called explicitly. Two shared objects to MCE::Shared are destroyed. An error is emitted via croak if there are active hobos not yet joined.

   MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;

   $_->join for MCE::Hobo->list();

   MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;

   $_->join for MCE::Hobo->list();

   MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;

   $_->join for MCE::Hobo->list();

   MCE::Hobo->finish();
$hobo->is_running()

Returns true if a hobo is still running.

$hobo->is_joinable()

Returns true if the hobo has finished running and not yet joined.

$hobo->kill( 'SIG...' )

Sends the specified signal to the hobo. Returns the hobo object to allow for method chaining. As with exit, it is important to join eventually if not immediately to not leave a zombie or defunct process.

   $hobo->kill('SIG...')->join();

The following is a parallel demonstration comparing MCE::Shared against Redis and Redis::Fast on a Fedora 23 VM. Joining begins after all workers have been notified to quit.

   use Time::HiRes qw(time);

   use Redis;
   use Redis::Fast;

   use MCE::Hobo;
   use MCE::Shared;

   my $redis = Redis->new();
   my $rfast = Redis::Fast->new();
   my $array = MCE::Shared->array();

   sub parallel_redis {
      my ($_redis) = @_;
      my ($count, $quit, $len) = (0, 0);

      # instead, use a flag to exit loop
      $SIG{'QUIT'} = sub { $quit = 1 };

      while (1) {
         $len = $_redis->rpush('list', $count++);
         last if $quit;
      }

      $count;
   }

   sub parallel_array {
      my ($count, $quit, $len) = (0, 0);

      # do not exit from inside handler
      $SIG{'QUIT'} = sub { $quit = 1 };

      while (1) {
         $len = $array->push($count++);
         last if $quit;
      }

      $count;
   }

   sub benchmark_this {
      my ($desc, $num_hobos, $timeout, $code, @args) = @_;
      my ($start, $total) = (time(), 0);

      MCE::Hobo->new($code, @args) for 1..$num_hobos;
      sleep $timeout;

      # joining is not immediate; ok
      $_->kill('QUIT') for MCE::Hobo->list();

      # joining later; ok
      $total += $_->join() for MCE::Hobo->list();

      printf "$desc <> duration: %0.03f secs, count: $total\n",
         time() - $start;

      sleep 0.2;
   }

   benchmark_this('Redis      ', 8, 5.0, \&parallel_redis, $redis);
   benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
   benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
MCE::Hobo->list()

Returns a list of all hobos not yet joined.

   @hobos = MCE::Hobo->list();
MCE::Hobo->list_running()

Returns a list of all hobos that are still running.

   @hobos = MCE::Hobo->list_running();
MCE::Hobo->list_joinable()

Returns a list of all hobos that have completed running. Thus, ready to be joined without blocking.

   @hobos = MCE::Hobo->list_joinable();
MCE::Hobo->pending()

Returns a count of all hobos not yet joined.

   $count = MCE::Hobo->pending();
$hobo->result()

Returns the result obtained by join, waitone, or waitall. If the process has not yet exited, waits for the corresponding hobo to complete its execution.

   use MCE::Hobo;
   use Time::HiRes qw(sleep);

   sub task {
      my ($id) = @_;
      sleep $id * 0.333;
      return $id;
   }

   MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );

   # 1 while MCE::Hobo->waitone;

   while ( my $hobo = MCE::Hobo->waitone() ) {
      my $err = $hobo->error() // 'no error';
      my $res = $hobo->result();
      my $pid = $hobo->pid();

      print "[$pid] $err : $res\n";
   }

Like join described above, the context (void, scalar or list) for the return value(s) is determined at the time result is called and mostly wantarray aware.

   my $hobo1 = MCE::Hobo->create( sub {
      my @res = qw(foo bar baz);
      return (@res);
   });

   my @res1 = $hobo1->result();  # ( foo, bar, baz )
   my $res1 = $hobo1->result();  #   baz

   my $hobo2 = MCE::Hobo->create( sub {
      return 'foo';
   });

   my @res2 = $hobo2->result();  # ( foo )
   my $res2 = $hobo2->result();  #   foo
MCE::Hobo->self()

Class method that allows a hobo to obtain it's own MCE::Hobo object.

$hobo->pid()
$hobo->tid()

Returns the ID of the hobo.

   pid: $$  process id
   tid: $$  same thing
MCE::Hobo->pid()
MCE::Hobo->tid()

Class methods that allows a hobo to obtain its own ID.

   pid: $$  process id
   tid: $$  same thing
MCE::Hobo->waitone()
MCE::Hobo->waitall()

Meaningful for the manager process only, waits for one or all hobos to complete execution. Afterwards, returns the corresponding hobo(s). If a hobo does not exist, returns the undef value or an empty list for waitone and waitall respectively.

   use MCE::Hobo;
   use Time::HiRes qw(sleep);

   sub task {
      my $id = shift;
      sleep $id * 0.333;
      return $id;
   }

   MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );

   # join, traditional use case
   $_->join() for MCE::Hobo->list();

   # waitone, simplistic use case
   1 while MCE::Hobo->waitone();

   # waitone
   while ( my $hobo = MCE::Hobo->waitone() ) {
      my $err = $hobo->error() // 'no error';
      my $res = $hobo->result();
      my $pid = $hobo->pid();

      print "[$pid] $err : $res\n";
   }

   # waitall
   my @hobos = MCE::Hobo->waitall();

   for ( @hobos ) {
      my $err = $_->error() // 'no error';
      my $res = $_->result();
      my $pid = $_->pid();

      print "[$pid] $err : $res\n";
   }
MCE::Hobo->yield( floating_seconds )

Let this hobo yield CPU time to other hobos. By default, the class method calls sleep(0.0005) on UNIX and sleep(0.015) on Windows including Cygwin.

   MCE::Hobo->yield();
   MCE::Hobo->yield(0.05);

CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE

Making an executable is possible with the PAR::Packer module. On the Windows platform, threads, threads::shared, and exiting via threads are all necessary for the binary to exit successfully.

   # https://metacpan.org/pod/PAR::Packer
   # https://metacpan.org/pod/pp
   #
   #   pp -o demo.exe demo.pl
   #   ./demo.exe

   use strict;
   use warnings;

   use if $^O eq "MSWin32", "threads";
   use if $^O eq "MSWin32", "threads::shared";

   use Time::HiRes (); # include minimum dependencies for MCE::Hobo
   use Storable ();

   use IO::FDPass ();  # optional: for MCE::Shared->condvar, handle, queue
   use Sereal ();      # optional: faster serialization, may omit Storable

   use MCE::Hobo;      # 1.808 or later on Windows
   use MCE::Shared;

   my $seq_a = MCE::Shared->sequence( 1, 30 );

   sub task {
      my ( $id ) = @_;
      while ( defined ( my $num = $seq_a->next ) ) {
         print "$id: $num\n";
      }
   }

   MCE::Hobo->new( \&task, $_ ) for 1 .. 2;
   MCE::Hobo->waitall;

   threads->exit(0) if $INC{"threads.pm"};

CREDITS

The inspiration for MCE::Hobo comes from wanting threads-like behavior for processes. Both can run side-by-side including safe-use by MCE workers. Likewise, the documentation resembles threads.

The inspiration for waitall and waitone comes from Parallel::WorkUnit.

SEE ALSO

INDEX

MCE, MCE::Shared

AUTHOR

Mario E. Roy, <marioeroy AT gmail DOT com>