Future::Workflow::Pipeline - a pipeline of processing stages
Future::Workflow::Pipeline
Instances of this class implement a "pipeline", a sequence of data-processing stages. Each stage is represented by a function that is passed a single argument and should return a result. The pipeline itself stores a function that will be passed each eventual result.
In front of every stage there exists a queue of pending items. If the first stage is currently busy when /push_input is called, the item is accepted into its queue instead. Items will be taken from the queue in the order they were pushed when the stage's work function finishes with prior items.
/push_input
If the queue between stages is full, then items will remain pending in prior stages. Ultimately this back-pressure will make its way back to the /push_input method at the beginning of the pipeline.
$pipeline = Future::Workflow::Pipeline->new;
The constructor takes no additional parameters.
$pipeline->set_output( $code ); await $code->( $result );
Sets the destination output for the pipeline. Each completed work item will be passed to the invoked function, which is expected to return a Future.
Future
$pipeline->set_output_sync( $code ); $code->( $result );
Similar to "set_output", where the output function is called synchronously, returning when it has finished.
$pipeline->append_stage( $code, %args ); $result = await $code->( $item );
Appends a pipeline stage that is implemented by an asynchronous function. Each work item will be passed in by invoking the function, and it is expected to return a Future which will eventually yield the result of that stage.
The following optional named args are recognised:
Allow this number of outstanding items concurrently.
If defined, no more than this number of items can be enqueued. If undefined, no limit is applied.
This value can be zero, which means that any attempts to push more items will remain pending until the work function is free to deal with it; i.e. no queueing will be permitted.
$on_failure->( $f )
Provides a callback event function for handling a failure thrown by the stage code. If not provided, the default behaviour is to print the failure message as a warning.
Note that this handler cannot turn a failure into a successful result or otherwise resume or change behaviour of the pipeline. For error-correction you will have to handle that inside the stage function code itself. This handler is purely the last stop of error handling, informing the user of an otherwise-unhandled error before ignoring it.
$pipeline->append_stage_sync( $code, %args ) $result = $code->( $item );
Similar to "append_stage", where the stage function is called synchronously, returning its result immediately.
Because of this, the concurrent named parameter is not permitted.
concurrent
await $pipeline->push_input( $item );
Adds a new work item into the pipeline, which will pass through each of the stages and eventually invoke the output function.
Paul Evans <leonerd@leonerd.org.uk>
To install Future::Workflow, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Future::Workflow
CPAN shell
perl -MCPAN -e shell install Future::Workflow
For more information on module installation, please visit the detailed CPAN module installation guide.