NAME
POEx::WorkerPool::Role::WorkerPool::Worker - A role that provides common semantics for Workers
VERSION
version 1.100910
PUBLIC_ATTRIBUTES
job_classes
is: ro, isa: ArrayRef[ClassName], required: 1
In order for the serializer on the other side of the process boundary to rebless jobs on the other side, it needs to make sure those classes are loaded.
This attribute is used to indicate which classes need to be loaded.
uuid
is: ro, isa: Str
This attribute holds the basis for the aliases for the Worker and its PubSub. It defaults to Data::UUID->new()->create_str()
pubsub_alias
is: ro, isa: Str
This is holds the alias to the associated PubSub component for this Worker
status
is: rw, isa: Bool
status indicates whether the Worker is currently processing its queue.
jobs
traits: Array, is: ro, isa: ArrayRef[DoesJob]
This is the FIFO queue of jobs this worker is responsible for processing.
The following handles are provided:
{
_enqueue_job => 'push',
_dequeue_job => 'shift',
count_jobs => 'count',
}
max_jobs
is: ro, isa: Int
This determines the fill mark for the job queue.
PROTECTED_ATTRIBUTES
child_wheel
is: ro, isa: Wheel
child_wheel holds this Worker's POE::Wheel::Run instance
guts_loader
is: ro, isa: GutsLoader
guts_loader holds the instance of the GutsLoader used as the Program for the child_wheel
PRIVATE_ATTRIBUTES
_in_process
is: rw, isa: DoesJob
This private attribute holds the currently processing Job
_completed_jobs
is: rw, isa: ScalarRef
This private attribute is a counter to the number of completed jobs this current processing cycle.
PUBLIC_METHODS
is_[not_]active
These are convinence methods for checking the status of the Worker
enqueue_job
(DoesJob $job)
enqueue_job takes an object with the Job role and places it into the queue after a few basic checks, such as if the Worker is currently processing or if the job queue has met the max_jobs limitation. If either case is true, an EnquueuError is thrown.
This method fires +PXWP_JOB_ENQUEUED to the associated PubSub component on success.
Subscribers will need to have the following signature:
method handler (SessionID :$worker_id, DoesJob $job ) is Event
enqueue_jobs
(ArrayRef[DoesJob] $jobs)
enqueue_jobs does the same thing as enqueue_job, but it acts on an array of jobs. Each job successfully enqueued means the worker will fire the +PXWP_JOB_ENQUEUED event via PubSub.
start_processing
is Event
start_processing kicks the Worker into gear and prevents adding jobs until the crrent queue has been processed. If there are no jobs in the queue, StartError will be thrown. This method fires the +PXWP_START_PROCESSING event via PubSub.
Subscribers should have the following signature:
method handler (SessionID :$worker_id, Int :$count_jobs)
halt
is Event
halt will destroy the child process, and unset the associated alias ensuring that the Session will stop
PROTECTED_METHODS
guts_error_handler
(Str $op, Int $error_num, Str $error_str, WheelID $id, Str $handle_name) is Event
guts_error_handler is the handler given to POE::Wheel::Run to handle errors that may crop up during operation of the wheel. It will post the arguments via +PXPW_WORKER_CHILD_ERROR using PubSub.
Subscribers will need the following signature:
method handler
(
SessionID :$worker_id,
Str :$operation,
Int :$error_number,
Str :$error_string,
WheelID :$wheel_id,
Str :$handle_name
) is Event
This method will then issue a SIGTERM signal to the child process forcing it to rebuild after exiting
guts_exited
(Str $chld, Int $pid, Int $exit_val) is Event
This is the SIGCHLD handler for the child process. It will post the arguments via +PXWP_WORKER_CHILD_EXIT using PubSub.
Subscribers will need to have the following signature:
method handler
(
SessionID :$worker_id,
Int :$process_id,
Int :$exit_value
) is Event
The wheel will then cleared
after _start
is Event
_start is advised to create a new PubSub component specific for this Worker and publish all of the various events that the Worker can fire.
after _stop
is Event
_stop is advised to terminate the associated PubSub component by calling its destroy event.
guts_output
(JobStatus $job_status) is Event
This is the StdoutEvent for the child POE::Wheel::Run. It handles all of the child output which is in the form of JobStatus hashrefs. The following describes the potential events from the child and the actions taken
+PXWP_JOB_COMPLETE
Action:
_in_process is cleared and _completed_jobs for this session is
incremented. yields() to _process_queue.
PubSub Event:
+PXWP_JOB_COMPLETE
PubSub Signature:
method handler(SessionID :$worker_id, DoesJob :$job, Ref :$msg)
Notes:
The :$msg argument will contain the output from the Job's execution
+PXWP_JOB_PROGRESS
Action:
PubSub event posted.
PubSub Event:
+PXWP_JOB_PROGRESS
PubSub Signature:
method handler
(
SessionID :$worker_id,
DoesJob :$job,
Int :$percent_complete,
Ref :$msg,
)
Notes:
The :$msg argument will contain the output from the last step executed
for multi-step jobs
+PXWP_JOB_FAILED
Action:
_in_process is cleared and _failed_jobs for this session is
incremented. yields() to _process_queue.
PubSub Event:
+PXWP_JOB_FAILED
PubSub Signature:
method handler(SessionID :$worker_id, DoesJob :$job, Ref :$msg)
Notes:
The :$msg argument will contain the exception generated from the Job
+PXWP_JOB_START
Action:
PubSub event posted.
PubSub Event:
+PXWP_JOB_START
PubSub Signature:
method handler
(
SessionID :$worker_id,
DoesJob :$job,
)
Notes:
This is an indication that the child process received the Job and is
beginning execution.
die_signal_handler
(Str $sig, HashRef) is Event
This is the handler for any exceptions that are thrown. All exceptions will be relayed to the consumer via a PubSub published event: +PXWP_WORKER_ERROR
Subscribers will need the following signature:
method handler(SessionID :$worker_id, IsaError :$exception, HashRef :$original) is Event
Note that $original will be what was given to us from POE
PRIVATE_METHODS
_process_queue
is Event
This private event is the queue processor. As jobs are dequeued for processing, +PXWP_JOB_DEQUEUED will be fired via PubSub. Subscribers will need the following signature:
method handler(SessionID :$worker_id, DoesJob :$job) is Event
Once the queue has been depleted +PXWP_STOP_PROCESSING will be fired via PubSub. Subscribers will need the following signature:
method handler
(
SessionID :$worker_id,
Int :$completed_jobs,
Int :$failed_jobs
) is Event
Then the run stats will be cleared, and the status will be toggled so that the Worker may again accept jobs.
_process_job
(DoesJob $job) is Event
This private event takes the given job and feeds it to the child process to be processed. If the child process doesn't exist for whatever reason, +PXWP_WORKER_INTERNAL_ERROR will be fired via PubSub. Subscribers need the following signature:
method handler(SessionID :$worker_id, Ref :$msg)
This event also places the given job into the _in_process attribute.
=attrtibute_private _failed_jobs
is: rw, isa: ScalarRef
This private attribute is a counter to the number of failed jobs this current processing cycle.
AUTHOR
Nicholas Perez <nperez@cpan.org>
COPYRIGHT AND LICENSE
This software is copyright (c) 2010 by Infinity Interactive.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.