NAME

AnyEvent::ProcessPool - Asynchronously runs code concurrently in a pool of perl processes

VERSION

version 0.07

SYNOPSIS

  use AnyEvent::ProcessPool;

  my $pool = AnyEvent::ProcessPool->new(
    workers => 8,
    limit   => 10,
    include => ['lib', 'some/lib/path'],
  );

  my $condvar = $pool->async(sub{
    # do task type stuff...
  });

  # Block until result is ready
  my $result = $condvar->recv;

DESCRIPTION

Executes code using a pool a forked Perl subprocesses. Supports configurable pool size, automatically restarting processes after a configurable number of requests, and closures (with the caveat that changes are not propagated back to the parent process).

CONSTRUCTOR

workers

Required attribute specifying the number of worker processes to launch. Defaults to the number of CPUs.

limit

Optional attribute that causes a worker process to be restarted after performing limit tasks. This can be useful when calling code which may be leaky. When unspecified or set to zero, worker processes will only be restarted if it unexpectedly fails.

include

An optional array ref of paths to add to the perl command string used to start the sub-process worker.

METHODS

async

Executes the supplied code ref in a worker sub-process. Remaining (optional) arguments are passed unchanged to the code ref in the worker process. Returns a condvar that will block and return the task result when recv is called on it.

Alternately, the name of a task class may be supplied. The class must implement the methods 'new' (as a constructor) and 'run'. When using a task class, the arguments will be passed to the constructor (new) and the result of 'run' will be returned.

  # With an anonymous subroutine
  my $cv = $pool->async(sub{ ... });

  # With a code ref
  my $cv = $pool->async(\&do_stuff);

  # With optional parameter list
  my $cv = $pool->async(sub{ ... }, $arg1, $arg2, ...);

  # With a task class
  my $cv = $pool->async('My::Task', $arg1, ...);

join

Blocks until all pending tasks have completed. This does not prevent new tasks from being queued while waiting (for example, in the callback of an already queued task's condvar).

PIPELINES

Pipelinelines are alternative way of using the process pool. See AnyEvent::ProcessPool::Pipeline for details.

  use AnyEvent::ProcessPool::Pipeline;

  pipeline workers => 4,
    in  { get_next_task() }
    out { do_stuff_with_result(shift->recv) };

DIAGNOSTICS

Task errors

Error messages resulting from a die or croak in task code executed in a worker process are rethrown in the parent process when the condition variable's recv method is called.

"AnyEvent::ProcessPool::Worker: ..." (warning)

When a worker sub-process emits output to STDERR, the process pool warns the message out to its own STDERR.

"error launching worker process: ..."

Thrown when a worker sub-process failed to launch due to an execution error.

"worker terminated in response to signal: ..."

Thrown when a worker sub-process exits as a result of a signal received.

"worker terminated with non-zero exit status: ..."

Thrown when a worker sub-process terminates with a non-zero exit code. The worker will be automatically restarted.

INCOMPATIBILITIES

Will not work on MSWin32 (although Cygwin should be fine) due to lack of support for non-blocking writes to process pipes (see notes in AnyEvent::Open3::Simple.

SEE ALSO

Parallel::ForkManager

Highly reliable, but somewhat arcane, blocking, and can be tricky to integrate into non-blocking code.

Coro::ProcessPool

Similar in function, but runs only under Coro (which as of 6.513 has experimental support for 5.22).

AUTHOR

Jeff Ober <sysread@fastmail.fm>

COPYRIGHT AND LICENSE

This software is copyright (c) 2017 by Jeff Ober.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.