The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Future::Workflow::Pipeline - a pipeline of processing stages

DESCRIPTION

Instances of this class implement a "pipeline", a sequence of data-processing stages. Each stage is represented by a function that is passed a single argument and should return a result. The pipeline itself stores a function that will be passed each eventual result.

Queueing

In front of every stage there exists a queue of pending items. If the first stage is currently busy when /push_input is called, the item is accepted into its queue instead. Items will be taken from the queue in the order they were pushed when the stage's work function finishes with prior items.

If the queue between stages is full, then items will remain pending in prior stages. Ultimately this back-pressure will make its way back to the /push_input method at the beginning of the pipeline.

CONSTRUCTOR

   $pipeline = Future::Workflow::Pipeline->new;

The constructor takes no additional parameters.

METHODS

set_output

   $pipeline->set_output( $code );

      await $code->( $result );

Sets the destination output for the pipeline. Each completed work item will be passed to the invoked function, which is expected to return a Future.

set_output_sync

   $pipeline->set_output_sync( $code );

      $code->( $result );

Similar to "set_output", where the output function is called synchronously, returning when it has finished.

append_stage

   $pipeline->append_stage( $code, %args );

      $result = await $code->( $item );

Appends a pipeline stage that is implemented by an asynchronous function. Each work item will be passed in by invoking the function, and it is expected to return a Future which will eventually yield the result of that stage.

The following optional named args are recognised:

concurrent => NUM

Allow this number of outstanding items concurrently.

max_queue => NUM

If defined, no more than this number of items can be enqueued. If undefined, no limit is applied.

This value can be zero, which means that any attempts to push more items will remain pending until the work function is free to deal with it; i.e. no queueing will be permitted.

on_failure => CODE
   $on_failure->( $f )

Provides a callback event function for handling a failure thrown by the stage code. If not provided, the default behaviour is to print the failure message as a warning.

Note that this handler cannot turn a failure into a successful result or otherwise resume or change behaviour of the pipeline. For error-correction you will have to handle that inside the stage function code itself. This handler is purely the last stop of error handling, informing the user of an otherwise-unhandled error before ignoring it.

append_stage_sync

   $pipeline->append_stage_sync( $code, %args )

      $result = $code->( $item );

Similar to "append_stage", where the stage function is called synchronously, returning its result immediately.

Because of this, the concurrent named parameter is not permitted.

push_input

   await $pipeline->push_input( $item );

Adds a new work item into the pipeline, which will pass through each of the stages and eventually invoke the output function.

AUTHOR

Paul Evans <leonerd@leonerd.org.uk>