The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Thread::Workers - Creates a boss which feeds a queue consumed by workers.

SYNOPSIS

This module aims to be lightweight with limited features. Its primary aim is to provide simple Boss/Worker thread management while keeping dependencies low.

You can add workers after creating the pool, but you cannot remove them at this time. Under the hood, command passing is through a shared variable, and reads/writes are controlled through a Thread::Semaphore access. A Thread::Queue feeds the pipe which workers check after a periodic interval.

The work checks against the queue are non-blocking and threads sleep when no work is found. The workers provide your work callback's return value to a shared log, which can optionally be processed by your boss via a callback. You may also set a drain callback, which will pause all workers and the boss, then refeed your queue to the boss.

This is currently in experimental and development state and will be solidified more over time, but it works as advertised. Its up to you to ensure your callbacks are using thread safe modules, or you wrap your non-thread safe modules appropriately!

  use 5.012; #or higher
  use Thread::Workers;

  my $pool = Thread::Workers->new();
  $pool->set_boss_fetch_cb(\&function_returns_work);
  $pool->set_boss_log_cb(\&function_processes_worker_returns);
  $pool->set_drain_cb(\&function_gets_unworked_queue_on_drain);
  $pool->set_worker_work_cb(\&function_does_work);
  $pool->start_boss();
  $pool->start_workers();
  $pool->add_worker();
  $pool->pause_workers();
  $pool->wake_workers();
  
  #internal control loops
  # we have orders to increase the load! add 500 workers
  for (1..500) { 
    $pool->add_worker();
  }

  #time to cleanup

  $pool->pause_boss(); #signal boss thread to die
  $pool->pause_workers; #stop the workers, may leave unfinished items in queue.
  $pool->drain();       #drains the queue of new work
  $pool->kill_boss();
  $pool->kill_workers();
  
  # Or if you don't care
  $pool->destroy();     #kills and joins all workers and the boss. you should probably clean up the object now :)
  # Or! 
  $pool->stop_finish_work(); #gracefully stop boss and finish work queue, then shut down workers.

DESCRIPTION

Thread::Workers utilizes threads, Thread::Sempahore, and Thread::Queue to create a pool of workers which are serviced with work by a boss thread. The boss thread could be fed data from a socket listening on the master thread, or could have a routine to check a database for work.

EXAMPLE use Thread::Workers;

    sub fetch_data {
         my $obj = Some:DB->new();
         my $work = $db->get_data();
        # if you have an array of items and wish it to be processed you can do
        # my %hash = map { (0..$#{$work}) => $_ } @{$work}; # or something
        # the hask keys represent a 'priority' so to speak.
        # an array or a scalar being put into the work queue are passed directly
        # to a worker to be processed. if you have a single hash item you wish to pass,
        # do something like return [ %hash ]
        return $work;
    }

    sub work_data {
        my $work = shift;
        # process the work.
        # we can log a return by returning a value.
        return do_something_with($work);
    }
    sub work_log {
        my $log = shift; # this is an array of hashes. each array item is { workitem => $original_work_item, return => $return_from_worker };
        do_something_with_the_log($log);
        #maybe push into a DB?
    }
    my $workers = Thread::Workers->new(threadinterval => 5, bossinterval => 5, totalthreads => 15);
    $workers->set_boss_log_cb->(\&work_log);
    $workers->set_boss_fetch_cb->(\&fetch_data);
    $workers->set_workers_work_cb->(\&work_data);
    $workers->start_boss();
    $workers->start_workers();

    # would probably do other things in your code to spin in loops.
    # In my own code, I'm doing system monitoring and injecting some jobs locally, handling logging of the boss/worker subs,
    # and other tasks.

The boss expects the work to be presented with the keys being unique integers. The integers correspond to the order they are placed into the queue.

    my %jobs = 
    { 
        0 => { transid  => $objref } # you can pass a 'command' with an object
        1 => { step_all => '123'   } # scalar, maybe you just want a simple scalar for a worker.
        2 => { cmd1     => { 
                            something => 'data', 
                            jobid => 'blah', 
                            location => 'your moms house'
                           }         #or whatever your callback expects
             }
    };

This will create 3 separate "work items" to be placed into the queue in the 0->1->2 order for execution.

If you need to feed your workers with a single block of data from a hash, you *must* assign it this way.

    my %job = 
    {
        0 => { 
                cmd1 => { 
                            something =>'data', 
                            somethingelse => 'data', 
                            jobid =>'121'   # whatever your worker callback is expecting
                        }
            }
    }

If the client returns data, say for 'step 0' it returned a value, it will be given to the log queue as an array of hashes. Lets say the worker logged { timestamp => '201209030823', jobid => 'cmd1', return = 'success' }

The log queue will have the following presentation to the boss log callback:

    my @log =
    [ 
        { 
            job => 'cmd1',#name of the job
            return => {         #value of return is worker_cb return
                        timestamp => '201209030823', 
                        jobid => '121', 
                        return => 'success' 
                      }   #i put interesting data to log about the work item processed here
        },
    ]

Whether you set a log callback or not, the log is flushed at the end of every boss interval. Use it or lose it.

Currently there is no signal to tell the boss to refeed its queue back upstream, though the Thread::Pool object can be accessed via $pool->{_queue}. Future revisions will include a callback for this ability.

SEE ALSO

    threads
    Thread::Queue
    Thread::Sempahore

If this module doesn't suit your needs, see the following projects:

Thread::Pool - very similar in goals to this project with a larger feature set.

Gearman::Client - client/server worker pool

TheSchwartz (or Helios) - DBI fed backend to pools of workers

Beanstalk::Client and/or Beanstalk::Pool - another client/server worker pool

MooseX::Workers - Like this, but powered by mighty Moose antlers.

IO::Async::Routine - just what it sounds like!

Hopkins - "a better cronjob" with work queues, async driven backend

AUTHOR

Kal Aeolian, <kalielaeolian@gmail.com<gt>

COPYRIGHT AND LICENSE

Copyright (C) 2012 by Kal Aeolian

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.14.2 or, at your option, any later version of Perl 5 you may have available.