NAME

Thread::Workers - Creates a boss which feeds a queue consumed by workers.

DESCRIPTION

Thread::Workers utilizes threads, Thread::Sempahore, and Thread::Queue to create a pool of workers which are serviced with work by a boss thread. The boss thread could be fed data from a socket listening on the master thread, or could have a routine to check a database for work.

Use non-thread safe modules with care!! If your boss is the only one to access mongo, you're doing it right. ;)

SYNOPSIS

This module aims to be lightweight with limited features. Its primary aim is to provide simple Boss/Worker thread management while keeping dependencies low.

You can add workers after creating the pool, but you cannot remove them at this time. Under the hood, command passing is through a shared variable, and reads/writes are controlled through a Thread::Semaphore access. A Thread::Queue feeds the pipe which workers check after a periodic interval.

The work checks against the queue are non-blocking and threads sleep when no work is found. The workers provide your work callback's return value to a shared log, which can optionally be processed by your boss via a callback. You may also set a drain callback, which will pause all workers and the boss, then refeed your queue to the boss.

This is currently in experimental and development state and will be solidified more over time, but it works as advertised. Its up to you to ensure your callbacks are using thread safe modules, or you wrap your non-thread safe modules appropriately!

EXAMPLE

  use 5.012; #or higher
  use Thread::Workers;

  my $pool = Thread::Workers->new();
  
  # Other options include:
  # my $pool = Thread::Workers->new( threadcount => 5, threadinterval => 30, bossinterval => 30 );
  # Thread interval = how often the children will check for work, default is 5 secs. Boss interval = how often the boss checks for work, default is 1 sec.
  
  # This should be a function ref that will get some work. It puts it in the queue for processing. See below for how to structure this.
  $pool->set_boss_fetch_cb(\&function_returns_work);
  
  # When a worker is completed it may have work to hand back to the boss, maybe a log or return code of success/fail. This optional function ref processes that return.
  $pool->set_boss_log_cb(\&function_processes_worker_returns);
  
  # When the program is shutdown you may have unfinished work on the queue. This optional call will deal with that queue.
  $pool->set_drain_cb(\&function_gets_unworked_queue_on_drain);
  
  # This is the heart of your worker. This function ref is spawned on each worker and does the actual work that the boss sticks into the queue.
  # Note! This should be thread safe.
  $pool->set_worker_work_cb(\&function_does_work);
  
  # This will start the boss fetching work
  $pool->start_boss();
  
  # This will start the workers taking work from the queue
  $pool->start_workers();
  
  # This adds a single worker to the pool
  $pool->add_worker();
  
  # Pause the workers after their current job finishes
  $pool->pause_workers();
  
  # Start the workers 
  $pool->wake_workers();
  
  #internal control loops
  # we have orders to increase the load! add 500 workers. its cleaner to add it as Thread::Workers->new(threadcount => 500);
  for (1..500) { 
    $pool->add_worker();
  }

  #time to cleanup

  $pool->pause_boss(); #signal boss thread to die
  $pool->pause_workers; #stop the workers, may leave unfinished items in queue.
  $pool->drain();       #drains the queue of new work
  $pool->kill_boss();
  $pool->kill_workers();
  
  # Or if you don't care
  $pool->destroy();     #kills and joins all workers and the boss. you should probably clean up the object now :)
  
  # Or if you do care 
  $pool->stop_finish_work(); #gracefully stop boss and finish work queue, then shut down workers.

EXAMPLE CALLBACKS

    use Thread::Workers;
    
    sub fetch_data {
         my $obj = Some:DB->new();
         my $work = $db->get_data();
        # if you have an array of items and wish it to be processed you can do
        # my %hash = map { (0..$#{$work}) => $_ } @{$work}; # or something
        # the hask keys represent a 'priority' so to speak.
        # an array or a scalar being put into the work queue are passed directly
        # to a worker to be processed. if you have a single hash item you wish to pass,
        # do something like return [ %hash ]
        return $work;
    }

    sub work_data {
        my $work = shift;
        # process the work.
        # we can log a return by returning a value.
        return do_something_with($work);
    }
    sub work_log {
        my $log = shift; # this is an array of hashes. each array item is { workitem => $original_work_item, return => $return_from_worker };
        do_something_with_the_log($log);
        #maybe push into a DB?
    }
    my $workers = Thread::Workers->new(threadinterval => 5, bossinterval => 5, totalthreads => 15);
    $workers->set_boss_log_cb->(\&work_log);
    $workers->set_boss_fetch_cb->(\&fetch_data);
    $workers->set_workers_work_cb->(\&work_data);
    $workers->start_boss();
    $workers->start_workers();

    # would probably do other things in your code to spin in loops.
    # In my own code, I'm doing system monitoring and injecting some jobs locally, handling logging of the boss/worker subs,
    # and other tasks.

WORK DATA STRUCTURE

The boss fetch work function must return data in a manner Boss::Workers knows how to deal with. This can be a hash/array/scalar;

HASH

The boss expects the work to be presented with the keys being unique integers. The integers correspond to the order they are placed into the queue.

    my %jobs = 
    { 
        0 => $scalar # pass a simple string as an object to work on
        1 => { transid  => $objref } # you can pass a 'command' with an object
        2 => { step_all => '123'   } # scalar, maybe you just want a simple scalar for a worker.
        3 => { cmd1     => { 
                            something => 'data', 
                            jobid => 'blah', 
                            location => 'your moms house'
                           }         #or whatever your callback expects
             }
    };
    

The above would create 4 separate "work items" to be placed into the queue in the 0->1->2 order for execution.

ARRAY

    my @job = 
    [
        $scalar,
        { transid => $objref },
        #etc
    ];

A scalar is even acceptable, it will just place it 'as is' on the queue.

It is expected that your worker callback knows how to handle the data in whatever fashion you are presenting it in on the queue.

LOG QUEUE

If the client returns data, say for 'step 0' it returned a value, it will be given to the log queue as an array of hashes. Lets say the worker finishes with this:

    return { timestamp => '201209030823', jobid => 'cmd1', return = 'success' };

The log queue will have the following presentation to the boss log callback:

    my @log =
    [ 
        { 
            job => 'cmd1', #name of the job if any, otherwise an integer of some sort.
            return => {         
                        timestamp => '201209030823', 
                        jobid => '121', 
                        return => 'success' 
                      }   
        },
    ]

Whether you set a log callback or not, the log is flushed at the end of every boss interval. Use it or lose it.

If you wish to feed your data back upstream (for example, you are SIGINT'ing or stopping your program for some reason and you don't want to lose your queue):

$obj->stop_finish_work();

This will allow the workers to gracefully finish, return whatever they have, then drains the queue through the drain_cb that was set.

SEE ALSO

    threads
    Thread::Queue
    Thread::Sempahore

If this module doesn't suit your needs, see the following projects:

Thread::Pool - very similar in goals to this project with a larger feature set.

Gearman::Client - client/server worker pool

TheSchwartz (or Helios) - DBI fed backend to pools of workers

Beanstalk::Client and/or Beanstalk::Pool - another client/server worker pool

MooseX::Workers - Like this, but powered by mighty Moose antlers.

IO::Async::Routine - just what it sounds like!

Hopkins - "a better cronjob" with work queues, async driven backend

AUTHOR

Kal Aeolian, <kalielaeolian@gmail.com<gt>

COPYRIGHT AND LICENSE

Copyright (C) 2012 by Kal Aeolian

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.14.2 or, at your option, any later version of Perl 5 you may have available.