AnyEvent::Beanstalk::Worker - Event-driven FSA for beanstalk queues
use AnyEvent::Beanstalk::Worker; use Data::Dumper; use JSON; my $w = AnyEvent::Beanstalk::Worker->new( concurrency => 10, initial_state => 'reserved', beanstalk_watch => 'jobs', beanstalk_decoder => sub { eval { decode_json(shift) }; } ); $w->on(reserved => sub { my $self = shift; my ($qjob, $qresp) = @_; say "Got a job: " . Dumper($qjob->decode); shift->emit( my_next_state => $qjob ); }); $w->on(my_next_state => sub { my $self = shift; my $job = shift; ## do something with job ... ## maybe not ready yet? unless ($job_is_ready) { return $self->finish(release => $job->id, { delay => 60 }); } ## all done! $self->finish(delete => $job->id); }); $w->start; AnyEvent->condvar->recv;
AnyEvent::Beanstalk::Worker implements a simple, abstract finite-state automaton for beanstalk queues. It can handle a configurable number of concurrent jobs, and implements graceful worker shutdown.
You are encouraged to subclass AnyEvent::Beanstalk::Worker and implement your own init function, for example, so your object has access to anything you need in subsequent states.
The "SUPPLEMENTAL" section below contains additional information about the various technolgies this module uses.
AnyEvent::Beanstalk::Worker implements these methods:
Create a new object. The new method accepts the following arguments:
Specify an initial state to move to after a job has been reserved. The handler for this state should expect to receive an AnyEvent::Beanstalk::Job object and the beanstalk queue response (a string such as "RESERVED"). Default is undefined--you should supply an initial state if you want your worker to do anything more than accepting and deleting jobs from the queue.
How many concurrent jobs this worker will handle. Set this to a higher number to process more jobs simultaneously. Defaults to 1.
How many jobs this worker will handle before it exits. 0 means the worker will never exit. Defaults to 0.
How many TERM or INT signals must be received before we quit, regardless of outstanding jobs. Defaults to 3.
TERM
INT
The hostname of the beanstalk server. Defaults to 'localhost'.
The port of the beanstalk server. Defaults to 11300.
A reference to a subroutine responsible for decoding a beanstalk job. See AnyEvent::Beanstalk.
The beanstalk tube to watch. Set this to the same tube your producers add jobs to. See AnyEvent::Beanstalk.
The default log level. Defaults to 4 (meaning "error"). See AnyEvent::Log.
How long in seconds to wait for a job from beanstalk. Defaults to 1 second. After this time, the loop will run again looking for additional events before trying to reserve another job.
How long in seconds a job should wait before another worker can take it. Defaults to 3 seconds.
Called at the end of new; by default this is an empty method. If you want your worker object to have access to additional "things" (such as a web user agent object), subclass AnyEvent::Beanstalk::Worker and implement init:
package WebWorker; use parent 'AnyEvent::Beanstalk::Worker'; use Mojo::UserAgent; sub init { shift->{ua} = Mojo::UserAgent->new } 1;
Now we can use our WebWorker class:
use WebWorker; use JSON; my $w = WebWorker->new( concurrency => 50, initial_state => 'reserved', beanstalk_watch => 'web-jobs', beanstalk_decoder => sub { eval { decode_json(shift) }; } ); $w->on(reserved => sub { my $self = shift; my ($job, $resp) = @_; $self->{ua}->get($job->decode->{url}, sub { $self->emit(page_found => $job) }); }); $w->on(page_found => sub {...});
Starts the worker. Before start is invoked, the worker does not receive or emit events.
$w->start;
Tries to stop the worker. If max_stop_tries is reached or there are no outstanding jobs, the worker exits immediately. If max_stop_tries has not yet been reached and the worker has outstanding jobs, control returns to the event loop until the jobs complete or max_stop_tries is reached. Sending a SIGINT or SIGTERM invokes stop.
SIGINT
SIGTERM
$w->stop;
Should be called when a worker is finished with a job. The first argument is the beanstalk method to call: release, delete, or bury.
release
delete
bury
The second argument is the beanstalk job id. An optional third argument will be passed to the beanstalk method invoked in the first argument.
$w->finish(delete => $job->id);
Registers an event listener.
$w->on(some_state => sub { my $self = shift; ... $self->emit(next_state => @args); });
Emits an event with optional arguments.
$self->emit(a_state => ());
AnyEvent::Beanstalk::Worker emits some events internally, but these should not be interesting to anyone using the module in most cases. This module also provides its own handlers for each of these events. You may override these handlers (via on), but you should know what you're doing if you do that.
If you use this module, you should emit your own states and provide your own state handlers for those events, beginning with the handler for the event you indicated in the constructor's initial_state argument, which this module will emit for you once a job has been reserved from the queue.
The following list of internal events is provided for completeness only and you should generally not emit nor handle these events:
AnyEvent::Beanstalk::Worker implements the following attributes.
This is a handle to the internal AnyEvent::Beanstalk object.
This returns the number of outstanding jobs this worker is handling.
This returns the number of jobs this worker has reserved and begun work on.
Sets or gets the number of jobs this worker can handle at the same time.
AnyEvent::Beanstalk::Worker receives the following signals:
A INT signal will cause the worker to invoke its stop method, which will process any outstanding events before shutting down.
A TERM signal is handled in the same way as INT.
A USR2 signal will bump the log level of the worker up until it reaches trace; after trace it wraps around and starts again at critical. See AnyEvent::Log for available log levels.
USR2
AnyEvent::Beanstalk::Worker implements logging via AnyEvent::Log; it probably doesn't do this as well as it could and more work needs to be done here.
The eg directory has several working examples of using this module, including one that shows how to subclass it.
This section contains additional information not directly needed to use this module, but may be useful for those unfamiliar with any of the underlying technologies.
This module represents the current results of an ongoing experiment involving queues (beanstalk, AnyEvent::Beanstalk), non-blocking and asynchronous events (AnyEvent), and state machines as means of a simpler to understand method of event-driven programming.
beanstalkd is a small, fast work queue written in C. When you need to do lots of jobs (work units--call them what you will), such as sending an email, fetching and parsing a web page, image processing, etc.), a producer (a small worker that creates jobs) adds jobs to the queue. One or more consumer workers come along and ask for jobs from the queue, and then work on them. When the consumer worker is done, it deletes the job from the queue and asks for another job.
AnyEvent is an elegantly designed, generic interface to a variety of event loops.
The idea behind state machines is you have a "machine" (or program modeling a machine) with a set of states and a set of events that when triggered alter the state of the machine. For example, we could model a web crawler as a state machine. Our states will be get url, fetch, parse, and add url, and our events will be got url, fetched, parsed, and added.
+---------+ | get url | +-/-----^-+ (got url) / \ / \ (added) +-----v-+ +---\-----+ | fetch | | add url | +-----\-+ +-^-------+ (fetched) \ / \ / (parsed) +-v---/-+ | parse | +-------+
In the get url state, we take a URL from a list of URLs (perhaps we seed it with one URL), then we emit the got url event. This causes our machine to move to the fetch state. In the fetch state, we make an HTTP GET request on that URL and then emit the fetched event, which moves our machine to the parse state where we parse the incoming web page. Then we add any URLs we find into the queue and start over.
GET
If we use our WebWorker class above, the result might look like this:
#!/usr/bin/env perl use strict; use warnings; use feature 'say'; use WebWorker; my $w = WebWorker->new ( concurrency => 1, max_stop_tries => 1, initial_state => 'fetch', beanstalk_watch => "urls" ); ## do this before we call start() $w->beanstalk->use("urls")->recv; $w->on(fetch => sub { my ($self, $job, $resp) = @_; say STDERR "fetching " . $job->data; $w->{ua}->get($job->data, sub { $self->emit(receive => $job, @_) }); }); $w->on(receive => sub { my ($self, $job, undef, $tx) = @_; if ( $tx->error ) { warn "Moved or some error: " . $tx->error; return $self->finish(delete => $job->id); } unless ($tx->res->headers->content_type =~ /html/i) { warn "Not HTML; skipping\n"; return $self->finish(delete => $job->id); } say STDERR "parsing " . $job->data; eval { $tx->res->dom->at("html body")->find('a[href]') ->each(sub { $self->emit(add_url => shift->{href}) }); }; return $self->finish(delete => $job->id); }); $w->on(add_url => sub { my ($self, $url) = @_; return unless $url =~ /^http/; $self->beanstalk ->put({ priority => 100, ttr => 15, delay => 1, data => $url }, sub { say STDERR "URL $url added" }); }); $w->start; AnyEvent->condvar->recv;
We've just written a simple (and impolite--should read robots.txt) web crawler.
See eg/web-state.pl and eg/web-state-add.pl for this example.
I couldn't find any gentle introductions into event loops; I was going to write one myself but realized it would probably turn into a book. Additionally, I'm not qualified to write said book. With that disclaimer, here is a brief, "close enough" introduction to event loops which may help some people get an approximate mental model, good enough to begin event programming.
An event loop can be as simple as this:
my @events = (); my %watchers = (); while (1) { my $event = pop @events; handle($event); } sub handle { my $event = shift; $_->($event) for @{$watchers{$event->{type}}}; }
The @events list (or queue, since events are read as a FIFO) might be populated asynchronously from system events, such as receiving signals, network data, disk I/O, timers, or other sources. The handle() subroutine checks the %watchers hash to see if there are any watchers or handlers for this event and calls those subroutines as needed. Some of these subroutines may add more events to the event queue. Then the loop starts again.
@events
handle()
%watchers
Most of the time you never see the event loop--you just start it. For example, most of the time when I'm programming with EV, this is all I ever see of it:
EV::run;
EV receives all kinds of events from the system, but you can tell it about more events. Then you register event handlers to fire off when a particular kind of event is received.
beanstalkd, by Keith Rarick: http://kr.github.io/beanstalkd/
AnyEvent::Beanstalk, by Graham Barr: AnyEvent::Beanstalk
AnyEvent, by Marc Lehmann: http://anyevent.schmorp.de
Scott Wiersdorf, <scott@perlcode.org>
Copyright (C) 2014 by Scott Wiersdorf
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.16.1 or, at your option, any later version of Perl 5 you may have available.
To install AnyEvent::Beanstalk::Worker, copy and paste the appropriate command in to your terminal.
cpanm
cpanm AnyEvent::Beanstalk::Worker
CPAN shell
perl -MCPAN -e shell install AnyEvent::Beanstalk::Worker
For more information on module installation, please visit the detailed CPAN module installation guide.