Author image Scott Wiersdorf
and 1 contributors

NAME

AnyEvent::Beanstalk::Worker - Event-driven FSA for beanstalk queues

SYNOPSIS

  use AnyEvent::Beanstalk::Worker;
  use Data::Dumper;
  use JSON;

  my $w = AnyEvent::Beanstalk::Worker->new(
      concurrency       => 10,
      initial_state     => 'reserved',
      beanstalk_watch   => 'jobs',
      beanstalk_decoder => sub {
          eval { decode_json(shift) };
      }
  );

  $w->on(reserved => sub {
      my $self = shift;
      my ($qjob, $qresp) = @_;

      say "Got a job: " . Dumper($qjob->decode);

      shift->emit( my_next_state => $qjob );
  });

  $w->on(my_next_state => sub {
      my $self = shift;
      my $job  = shift;

      ## do something with job
      ...

      ## maybe not ready yet?
      unless ($job_is_ready) {
          return $self->finish(release => $job->id, { delay => 60 });
      }

      ## all done!
      $self->finish(delete => $job->id);
  });

  $w->start;
  AnyEvent->condvar->recv;

DESCRIPTION

AnyEvent::Beanstalk::Worker implements a simple, abstract finite-state automaton for beanstalk queues. It can handle a configurable number of concurrent jobs, and implements graceful worker shutdown.

You are encouraged to subclass AnyEvent::Beanstalk::Worker and implement your own init function, for example, so your object has access to anything you need in subsequent states.

The "SUPPLEMENTAL" section below contains additional information about the various technolgies this module uses.

METHODS

AnyEvent::Beanstalk::Worker implements these methods:

new

Create a new object. The new method accepts the following arguments:

initial_state

Specify an initial state to move to after a job has been reserved. The handler for this state should expect to receive an AnyEvent::Beanstalk::Job object and the beanstalk queue response (a string such as "RESERVED"). Default is undefined--you should supply an initial state if you want your worker to do anything more than accepting and deleting jobs from the queue.

concurrency

How many concurrent jobs this worker will handle. Set this to a higher number to process more jobs simultaneously. Defaults to 1.

max_jobs

How many jobs this worker will handle before it exits. 0 means the worker will never exit. Defaults to 0.

max_stop_tries

How many TERM or INT signals must be received before we quit, regardless of outstanding jobs. Defaults to 3.

beanstalk_host

The hostname of the beanstalk server. Defaults to 'localhost'.

beanstalk_port

The port of the beanstalk server. Defaults to 11300.

beanstalk_decoder

A reference to a subroutine responsible for decoding a beanstalk job. See AnyEvent::Beanstalk.

beanstalk_watch

The beanstalk tube to watch. Set this to the same tube your producers add jobs to. See AnyEvent::Beanstalk.

log_level

The default log level. Defaults to 4 (meaning "error"). See AnyEvent::Log.

reserve_timeout

How long in seconds to wait for a job from beanstalk. Defaults to 1 second. After this time, the loop will run again looking for additional events before trying to reserve another job.

release_delay

How long in seconds a job should wait before another worker can take it. Defaults to 3 seconds.

init

Called at the end of new; by default this is an empty method. If you want your worker object to have access to additional "things" (such as a web user agent object), subclass AnyEvent::Beanstalk::Worker and implement init:

  package WebWorker;

  use parent 'AnyEvent::Beanstalk::Worker';
  use Mojo::UserAgent;
  sub init { shift->{ua} = Mojo::UserAgent->new }

  1;

Now we can use our WebWorker class:

  use WebWorker;
  use JSON;

  my $w = WebWorker->new(
      concurrency       => 50,
      initial_state     => 'reserved',
      beanstalk_watch   => 'web-jobs',
      beanstalk_decoder => sub {
          eval { decode_json(shift) };
      }
  );

  $w->on(reserved => sub {
      my $self = shift;
      my ($job, $resp) = @_;
      $self->{ua}->get($job->decode->{url},
                       sub { $self->emit(page_found => $job) });
  });

  $w->on(page_found => sub {...});

start

Starts the worker. Before start is invoked, the worker does not receive or emit events.

  $w->start;

stop

Tries to stop the worker. If max_stop_tries is reached or there are no outstanding jobs, the worker exits immediately. If max_stop_tries has not yet been reached and the worker has outstanding jobs, control returns to the event loop until the jobs complete or max_stop_tries is reached. Sending a SIGINT or SIGTERM invokes stop.

  $w->stop;

finish

Should be called when a worker is finished with a job. The first argument is the beanstalk method to call: release, delete, or bury.

The second argument is the beanstalk job id. An optional third argument will be passed to the beanstalk method invoked in the first argument.

  $w->finish(delete => $job->id);

on

Registers an event listener.

  $w->on(some_state => sub {
      my $self = shift;

      ...

      $self->emit(next_state => @args);
  });

emit

Emits an event with optional arguments.

  $self->emit(a_state => ());

EVENTS

AnyEvent::Beanstalk::Worker emits some events internally, but these should not be interesting to anyone using the module in most cases. This module also provides its own handlers for each of these events. You may override these handlers (via on), but you should know what you're doing if you do that.

If you use this module, you should emit your own states and provide your own state handlers for those events, beginning with the handler for the event you indicated in the constructor's initial_state argument, which this module will emit for you once a job has been reserved from the queue.

The following list of internal events is provided for completeness only and you should generally not emit nor handle these events:

start

reserved

ATTRIBUTES

AnyEvent::Beanstalk::Worker implements the following attributes.

beanstalk

This is a handle to the internal AnyEvent::Beanstalk object.

job_count

This returns the number of outstanding jobs this worker is handling.

handled_jobs

This returns the number of jobs this worker has reserved and begun work on.

concurrency

Sets or gets the number of jobs this worker can handle at the same time.

SIGNALS

AnyEvent::Beanstalk::Worker receives the following signals:

INT

A INT signal will cause the worker to invoke its stop method, which will process any outstanding events before shutting down.

TERM

A TERM signal is handled in the same way as INT.

USR2

A USR2 signal will bump the log level of the worker up until it reaches trace; after trace it wraps around and starts again at critical. See AnyEvent::Log for available log levels.

LOGGING

AnyEvent::Beanstalk::Worker implements logging via AnyEvent::Log; it probably doesn't do this as well as it could and more work needs to be done here.

EXAMPLES

The eg directory has several working examples of using this module, including one that shows how to subclass it.

SUPPLEMENTAL

This section contains additional information not directly needed to use this module, but may be useful for those unfamiliar with any of the underlying technologies.

Caveat

This module represents the current results of an ongoing experiment involving queues (beanstalk, AnyEvent::Beanstalk), non-blocking and asynchronous events (AnyEvent), and state machines as means of a simpler to understand method of event-driven programming.

Introduction to beanstalk

beanstalkd is a small, fast work queue written in C. When you need to do lots of jobs (work units--call them what you will), such as sending an email, fetching and parsing a web page, image processing, etc.), a producer (a small worker that creates jobs) adds jobs to the queue. One or more consumer workers come along and ask for jobs from the queue, and then work on them. When the consumer worker is done, it deletes the job from the queue and asks for another job.

Introduction to AnyEvent

AnyEvent is an elegantly designed, generic interface to a variety of event loops.

Introduction to state machines

The idea behind state machines is you have a "machine" (or program modeling a machine) with a set of states and a set of events that when triggered alter the state of the machine. For example, we could model a web crawler as a state machine. Our states will be get url, fetch, parse, and add url, and our events will be got url, fetched, parsed, and added.

                +---------+
                | get url |
                +-/-----^-+
      (got url)  /       \
                /         \ (added)
         +-----v-+     +---\-----+
         | fetch |     | add url |
         +-----\-+     +-^-------+
      (fetched) \       /
                 \     / (parsed)
                +-v---/-+
                | parse |
                +-------+

In the get url state, we take a URL from a list of URLs (perhaps we seed it with one URL), then we emit the got url event. This causes our machine to move to the fetch state. In the fetch state, we make an HTTP GET request on that URL and then emit the fetched event, which moves our machine to the parse state where we parse the incoming web page. Then we add any URLs we find into the queue and start over.

If we use our WebWorker class above, the result might look like this:

    #!/usr/bin/env perl
    use strict;
    use warnings;
    use feature 'say';

    use WebWorker;

    my $w = WebWorker->new
      ( concurrency     => 1,
        max_stop_tries  => 1,
        initial_state   => 'fetch',
        beanstalk_watch => "urls" );

    ## do this before we call start()
    $w->beanstalk->use("urls")->recv;

    $w->on(fetch => sub {
        my ($self, $job, $resp) = @_;

        say STDERR "fetching " . $job->data;
        $w->{ua}->get($job->data, sub { $self->emit(receive => $job, @_) });
    });

    $w->on(receive => sub {
        my ($self, $job, undef, $tx) = @_;

        if ( $tx->error ) {
            warn "Moved or some error: " . $tx->error;
            return $self->finish(delete => $job->id);
        }

        unless ($tx->res->headers->content_type =~ /html/i) {
            warn "Not HTML; skipping\n";
            return $self->finish(delete => $job->id);
        }

        say STDERR "parsing " . $job->data;
        eval {
            $tx->res->dom->at("html body")->find('a[href]')
              ->each(sub { $self->emit(add_url => shift->{href}) });
        };

        return $self->finish(delete => $job->id);
    });

    $w->on(add_url => sub {
        my ($self, $url) = @_;

        return unless $url =~ /^http/;

        $self->beanstalk
          ->put({ priority => 100,
                  ttr      => 15,
                  delay    => 1,
                  data     => $url },
                sub { say STDERR "URL $url added" });
    });

    $w->start;

    AnyEvent->condvar->recv;

We've just written a simple (and impolite--should read robots.txt) web crawler.

See eg/web-state.pl and eg/web-state-add.pl for this example.

Introduction to event loops

I couldn't find any gentle introductions into event loops; I was going to write one myself but realized it would probably turn into a book. Additionally, I'm not qualified to write said book. With that disclaimer, here is a brief, "close enough" introduction to event loops which may help some people get an approximate mental model, good enough to begin event programming.

An event loop can be as simple as this:

    my @events = ();
    my %watchers = ();

    while (1) {
        my $event = pop @events;
        handle($event);
    }

    sub handle {
        my $event = shift;

        $_->($event) for @{$watchers{$event->{type}}};
    }

The @events list (or queue, since events are read as a FIFO) might be populated asynchronously from system events, such as receiving signals, network data, disk I/O, timers, or other sources. The handle() subroutine checks the %watchers hash to see if there are any watchers or handlers for this event and calls those subroutines as needed. Some of these subroutines may add more events to the event queue. Then the loop starts again.

Most of the time you never see the event loop--you just start it. For example, most of the time when I'm programming with EV, this is all I ever see of it:

    EV::run;

EV receives all kinds of events from the system, but you can tell it about more events. Then you register event handlers to fire off when a particular kind of event is received.

SEE ALSO

beanstalkd, by Keith Rarick: http://kr.github.io/beanstalkd/

AnyEvent::Beanstalk, by Graham Barr: AnyEvent::Beanstalk

AnyEvent, by Marc Lehmann: http://anyevent.schmorp.de

AUTHOR

Scott Wiersdorf, <scott@perlcode.org>

COPYRIGHT AND LICENSE

Copyright (C) 2014 by Scott Wiersdorf

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.16.1 or, at your option, any later version of Perl 5 you may have available.