The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Thread::Pipeline - multithreaded pipeline manager

VERSION

version 0.004

SYNOPSIS

    my %blocks = (
        map1 => { sub => \&mapper, num_threads => 2, main_input => 1, out => 'map2' },
        map2 => { sub => \&another_mapper, num_threads => 5, out => [ 'log', 'reduce' ] },
        reduce => { sub => \&reducer, need_finalize => 1, out => '_out' },
        log => { sub => \&logger },
    );

    # create pipeline
    my $pipeline = Thread::Pipeline->new( \%blocks );

    # fill its input queue
    for my $data_item ( @data_array ) {
        $pipeline->enqueue( $data_item );
    }

    # say that there's nothing more to process
    $pipeline->no_more_data();

    # get results from pipeline's output queue
    my @results = $pipeline->get_results();

METHODS

new

    my $pl = Thread::Pipeline->new( $blocks_description );

Constructor. Creates pipeline object, initializes blocks if defined.

Blocks description is a hashref { $id => $descr, ... } or an arrayref [ $id => $lite_descr, ... ] (see add_block). For arrayrefs constructor assumes direct block chain and automatically adds 'main_input' and 'out' fields.

add_block

    my %block_info = (
        sub => \&worker_sub,
        num_threads => $num_of_threads,
        out => $next_block_id,
    );
    $pl->add_block( $block_id => \%block_info );

Add new block to the pipeline. Worker threads and associated incoming queue would be created.

Block info is a hash containing keys:

    * sub - worker coderef (required)
    * num_threads - number of parallel threads of worker, default 1
    * out - id of block where processed data should be sent,
        use '_out' for pipeline's main output
    * main_input - mark this block as default for pipeline's enqueue
    * post_sub - code that run when all theads of worker ends
    * need_finalize - run worker one more time with undef after queue has finished

Worker is a sub that processes data. It is executed for every item in block's queue and receives two parameters: data item and ref to pipeline itself. It should return list of data items to be sent to next pipeline block.

    sub worker_sub {
        my ($data_portion, $pipeline_ref) = @_;

        # if block has 'need_finalize' option
        if ( !defined $data_portion ) {
            # ... queue has finished, finalize work
            return;
        }

        # ... do anything with data
        return @out_data_items;
    }

enqueue

    $pl->enqueue( $data, %opts );

Puts the data into block's queue

Options:

    * block - id of block, default is pipeline's main input block

no_more_data

    $pl->no_more_data( %opts );

get_results

    my @result = $pl->get_results();

Wait for all pipeline operations to finish. Returns content of outlet queue

get_threads_num

    my $num = $pl->get_threads_num($block_id);

AUTHOR

liosha <liosha@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2012 by liosha.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.