++ed by:
MATTP

1 PAUSE user

Doug Hoyte

NAME

AnyEvent::Task - Client/server-based asynchronous worker pool

SYNOPSIS 1: PASSWORD HASHING

Server

    use AnyEvent::Task::Server;
    use Authen::Passphrase::BlowfishCrypt;

    my $dev_urandom;
    my $server = AnyEvent::Task::Server->new(
                   listen => ['unix/', '/tmp/anyevent-task.socket'],
                   setup => sub {
                     open($dev_urandom, "/dev/urandom") || die "open urandom: $!";
                   },
                   interface => {
                     hash => sub {
                       my ($plaintext) = @_;
                       read($dev_urandom, my $salt, 16) == 16 || die "bad read from urandom";
                       return Authen::Passphrase::BlowfishCrypt->new(cost => 10,
                                                                     salt => $salt,
                                                                     passphrase => $plaintext)
                                                               ->as_crypt;

                     },
                     verify => sub {
                       my ($crypted, $plaintext) = @_;
                       return Authen::Passphrase::BlowfishCrypt->from_crypt($crypted)
                                                               ->match($plaintext);
                     },
                   },
                 );

    $server->run; # or AE::cv->recv

Client

    use AnyEvent::Task::Client;

    my $client = AnyEvent::Task::Client->new(
                   connect => ['unix/', '/tmp/anyevent-task.socket'],
                 );

    my $checkout = $client->checkout( timeout => 5, );

    my $cv = AE::cv;

    $checkout->hash('secret',
      sub {
        my ($checkout, $crypted) = @_;

        print "Hashed password is $crypted\n";

        $checkout->verify($crypted, 'secret',
          sub {
            my ($checkout, $result) = @_;
            print "Verify result is $result\n";
            $cv->send;
          });
      });

    $cv->recv;

Output

    Hashed password is $2a$10$NwTOwxmTlG0Lk8YZMT29/uysC9RiZX4jtWCx.deBbb2evRjCq6ovi
    Verify result is 1

SYNOPSIS 2: DBI

Server

    use AnyEvent::Task::Server;
    use DBI;

    my $dbh;

    AnyEvent::Task::Server->new(
      listen => ['unix/', '/tmp/anyevent-task.socket'],
      setup => sub {
        $dbh = DBI->connect("dbi:SQLite:dbname=/tmp/junk.sqlite3","","",{ RaiseError => 1, });
      },
      interface => sub {
        my ($method, @args) = @_;
        $dbh->$method(@args);
      },
    )->run;

Client

    use AnyEvent::Task::Client;

    my $client = AnyEvent::Task::Client->new(
                   connect => ['unix/', '/tmp/anyevent-task.socket'],
                 );

    my $dbh = $client->checkout;

    my $cv = AE::cv;

    $dbh->do(q{ CREATE TABLE user(username TEXT PRIMARY KEY, email TEXT); },
      sub { });

    ## Requests will queue up on the checkout and execute in order:

    $dbh->do(q{ INSERT INTO user (username, email) VALUES (?, ?) },
             undef, 'jimmy',
                    'jimmy@example.com',
      sub { });

    $dbh->selectrow_hashref(q{ SELECT * FROM user }, sub {
      my ($dbh, $user) = @_;
      print "username: $user->{username}, email: $user->{email}\n";
      $cv->send;
    });

    $cv->recv;

Output

    username: jimmy, email: jimmy@example.com

DESCRIPTION

WARNING: The above client examples don't implement error handling. See the "ERROR HANDLING" section for details on how to add this.

The synopsis makes this module sounds much more complicated than it actually is. Worker processes are forked off by a server when a client needs one, and the client can communicate with many workers using asynchronous communication.

Another way of saying it is that AnyEvent::Task is a fork-on-demand but persistent-worker server (AnyEvent::Task::Server) combined with an asynchronous interface to a request queue and pooled-worker client (AnyEvent::Task::Client).

Both client and server are of course built with AnyEvent because it's awesome. However, workers can't use AnyEvent (yet).

A server is started with AnyEvent::Task::Server->new. This should at least be passed the listen and interface arguments. Keep the returned server object around for as long as you want the server to be running. interface is the code that should handle each request. See the interface section below for its specification. A setup coderef can be passed in to run some code when a new worker is forked. A checkout_done coderef can be passed in to run some code whenever a checkout is released (see below).

A client is started with AnyEvent::Task::Client->new. You only need to pass connect to this. Keep the returned client object around as long as you wish the client to be connected.

After both the server and client are initialised, each process must enter AnyEvent's "main loop" in some way, possibly just AE::cv->recv.

In the client process, you may call the checkout method on the client object. This checkout object can be used to run code on a remote worker process in a non-blocking manner. The checkout method doesn't require any arguments, but timeout is recommended.

You can treat a checkout object as an object that proxies its method calls to a worker process or a function that does the same. You pass the arguments to these method calls as an argument to the checkout object, followed by a callback as the last argument. This callback will be called once the worker process has returned the results. This callback will normally be passed two arguments, the checkout object and the return value. In the event of an exception thrown, an error is raised in the dynamic context of the callback (see the "ERROR HANDLING" section).

DESIGN

Each client maintains a "pool" of connections to worker processes. Every time a checkout is issued, it is placed into a first-come, first-serve queue. Once a worker process becomes available, it is associated with that checkout until that checkout is garbage collected which in perl means as soon as it is no longer needed. Each checkout also maintains a queue of requests, so that as soon as this worker process is allocated, the requests are filled also on a first-come, first-served basis.

timeout can be passed as a keyword argument to checkout. Once a request is queued up on that checkout, a timer of timout seconds (default is 30, undef means infinity) is started. If the request completes during this timeframe, the timer is cancelled. If the timer expires, the worker connection is terminated and an exception is thrown in the dynamic context of the callback (see the "ERROR HANDLING" section).

Note that since timeouts are associated with a checkout, the client process can be started before the server and as long as the server is started within timeout seconds, no requests will be lost. The client will continually try to acquire worker processes until a server is available, and once one is available it will attempt to fill all queued checkouts.

Because of checkout queuing, the maximum number of worker processes a client will attempt to obtain can be limited with the max_workers argument when creating a client object. If there are more live checkouts than max_workers, the remaining checkouts will have to wait until one of the other workers becomes available. Because of timeouts, some checkouts may never be serviced if the system can't handle the load (the timeout error should be handled to indicate the service is temporarily unavailable).

The min_workers argument can be used to pre-fork "hot-standby" worker processes when creating the client. The default is 2 though note that this may change to 0 in the future.

STARTING THE SERVER

Often you will want to start the client and server as completely separate processes as indicated in the synopsis.

Technically, running the server and the client in the same process is possible but is highly discouraged since the server will fork() when the client desires a worker process. In this case, all descriptors in use by the client would be duped into the worker process and the worker may have to close these extra descriptors. Also, forking a busy client may be memory-inefficient.

Since it's more of a bother than it's worth to run the server and the client in the same process, there is an alternate server constructor, AnyEvent::Task::Server::fork_task_server for when you'd like to run both. It can be passed the same arguments as the regular new constructor:

    ## my ($keepalive_pipe, $pid) =
    AnyEvent::Task::Server::fork_task_server(
      listen => ['unix/', '/tmp/anyevent-task.socket'],
      interface => sub {
                         return "Hello from PID $$";
                       },
    );

The only differences between this and the regular constructor is that this will fork a process which becomes the server, and that it will install a "keep-alive" pipe between the server and the client. This keep-alive pipe will be used by the server to detect when its parent the client process exits.

If AnyEvent::Task::Server::fork_task_server is called in a void context, then the reference to this keep-alive pipe is pushed onto @AnyEvent::Task::Server::children_sockets. Otherwise, the keep-alive pipe and the server's PID are returned. Closing the pipe will terminate the server gracefully. kill the PID to terminate it immediately.

Since the fork_task_server constructor forks and requires using AnyEvent in both the parent and child processes, it is important that you not install any AnyEvent watchers before calling it. The usual caveats about forking AnyEvent applications apply (see AnyEvent docs).

INTERFACE

There are two formats possible for the interface option when creating a server. The first (and most general) is a coderef. This coderef will be passed the list of arguments that were sent when the checkout was called in the client process (without the trailing callback of course).

As described above, you can use a checkout object as a coderef or as an object with methods. If the checkout is invoked as an object, the method name is prepended to the arguments passed to interface:

    interface => sub {
      my ($method, @args) = @_;
    },

If the checkout is invoked as a coderef, method is omitted:

    interface => sub {
      my (@args) = @_;
    },

The second format possible for interface is a hash ref. This is a simple method dispatch feature where the method invoked on the checkout object is the key used to lookup to which coderef to run in the worker:

    interface => {
      method1 => sub {
        my (@args) = @_;
      },
      method2 => sub {
        my (@args) = @_;
      },
    },

Note that since the protocol between the client and the worker process is JSON-based, all arguments and return values must be serializable to JSON. This includes most perl scalars like strings, a limited range of numerical types, and hash/list constructs with no cyclical references.

A future backwards compatible RPC protocol may use Storable or something else, although note that you can already serialise an object with Storable manually, send the resulting string over the existing protocol, and then deserialise it in the worker.

LOGGING

Because workers run in a separate process, they can't directly use logging contexts in the client process. That is why this module is integrated with Log::Defer.

A Log::Defer object is created on demand in the worker process and once the worker is done an operation, any messages in the object will be extracted and sent back to the client. The client then merges this into its main Log::Defer object that was passed in when creating the checkout.

In your server code, use AnyEvent::Task::Logger. It exports the function logger which returns a Log::Defer object:

    use AnyEvent::Task::Server;
    use AnyEvent::Task::Logger;

    AnyEvent::Task::Server->new(
      listen => ['unix/', '/tmp/anyevent-task.socket'],
      interface => sub {
        logger->info('about to compute some operation');
        {
          my $timer = logger->timer('computing some operation');
          select undef,undef,undef, 1; ## sleep for 1 second
        }
      },
    )->run;

Note: Portable server code should not call sleep because on some systems it will interfere with the recoverable worker timeout feature implemented with SIGALRM.

In your client code, pass a Log::Defer object in when you create a checkout:

    use AnyEvent::Task::Client;
    use Log::Defer;

    my $client = AnyEvent::Task::Client->new(
                   connect => ['unix/', '/tmp/anyevent-task.socket'],
                 );

    my $log_defer_object = Log::Defer->new(sub {
                                             my $msg = shift;
                                             use Data::Dumper; ## or whatever
                                             print Dumper($msg);
                                           });

    $log_defer_object->info('going to compute some operation in a worker');

    my $checkout = $client->checkout(log_defer_object => $log_defer_object);

    my $cv = AE::cv;

    $checkout->(sub {
      $log_defer_object->info('finished some operation');
      $cv->send;
    });

    $cv->recv;

When run, the above client will print something like this:

    $VAR1 = {
          'start' => '1363232705.96839',
          'end' => '1.027309',
          'logs' => [
                      [
                        '0.000179',
                        30,
                        'going to compute some operation in a worker'
                      ],
                      [
                        '0.023881061050415',
                        30,
                        'about to compute some operation'
                      ],
                      [
                        '1.025965',
                        30,
                        'finished some operation'
                      ]
                    ],
          'timers' => {
                        'computing some operation' => [
                                                        '0.024089061050415',
                                                        '1.02470206105041'
                                                      ]
                      }
        };

ERROR HANDLING

If you expected some operation to throw an exception, in a synchronous program you might wrap it in eval like this:

    my $crypted;

    eval {
      $crypted = hash('secret');
    };

    if ($@) {
      say "hash failed: $@";
    } else {
      say "hashed password is $crypted";
    }

But in an asynchronous program, typically hash would initiate some kind of asynchronous operation and then return immediately. The error might come back at any time in the future, in which case you need a way to map the exception that is thrown back to your original context.

AnyEvent::Task accomplishes this mapping with Callback::Frame.

Callback::Frame lets you preserve error handlers (and local variables) across asynchronous callbacks. Callback::Frame is not tied to AnyEvent::Task, AnyEvent or any other async framework and can be used with almost all most callback-based libraries.

However, when using AnyEvent::Task, libraries that you use in the client must be AnyEvent compatible. This restriction obviously does not apply to your server code (that being one of the main purposes of AnyEvent::Task -- accessing blocking resources from an asynchronous program).

As an example usage of Callback::Frame, here is how we would handle errors thrown from a worker process running the hash method in an asychronous client program:

    use Callback::Frame;

    frame(code => sub {

      $client->checkout->hash('secret', sub {
        my ($checkout, $crypted) = @_;
        say "Hashed password is $crypted";
      });

    }, catch => sub {

      my $back_trace = shift;
      say "Error is: $@";
      say "Full back-trace: $back_trace";

    })->(); ## <-- frame is created and then executed

Of course if hash is something like a bcrypt hash function it is very unlikely to raise an exception so maybe it's a bad example. Or maybe it's a really good example: In addition to errors that occur while running your callbacks, AnyEvent::Task uses Callback::Frame to throw errors if the worker process times out, so if the bcrypt work factor is really cranked up it might hit the default 30 second time limit.

Reforking of workers after errors

If a worker throws an error, the client receives the error but the worker process stays running. As long as the client has a reference to the checkout, it can still be used to communicate with that worker so you can access error states, rollback transactions, or clean-up something.

Once the checkout is released however, by default the worker will be shutdown instead of returning to the client's worker pool as in the normal case where no errors were thrown. This can be prevented by setting the dont_refork_after_error option in the client options. This only really matters if your setup routines take a long time and errors are being thrown frequently.

There are exceptions to returning workers that threw errors back into the worker pool: workers that have thrown fatal errors such as loss of connection or hung worker timeout errors. These errors are stored in the checkout and for as long as the checkout exists, any operations on it will return the stored fatal error. The worker connection is closed and a new worker process is forked.

Rationale for Callback::Frame

Why not just call the callback but set $@ to indicate an error has occurred? This is the approach taken with AnyEvent::DBI and AnyEvent::Worker for example but I believe the Callback::Frame interface is superior to this. The problem is that exceptions are supposed to be an out-of-band message and code that doesn't handle them will have the exceptions bubbled up, usually to a top-level error handler. Invoking the callback when an error occurs forces exceptions to be handled in-band.

Why not just have AnyEvent::Task expose an error callback? I believe Callback::Frame is superior to this also: With error callbacks you still have to write error handler callbacks everywhere an error might be thrown instead of having a single "catch-all" top-level error handler.

Callback::Frame provides an error handler stack so you can have nested error handlers (similar to nested evals). This is useful when you wish to have a top-level "bail-out" error handler and also nested error handlers that know how to retry or recover from an error in an async sub-operation.

Callback::Frame helps you maintain the dynamic state (error handlers and dynamic variables) installed for a single connection. In other words, any errors that occur while servicing that connection will be able to be caught by an error handler specific to that connection. This lets you send an error response and also collect all associated log messages in a Log::Defer object specific to that connection.

Callback::Frame is designed to be easily used with libraries that don't know about Callback::Frame. fub is a shortcut for frame with just the code argument. Instead of passing sub { ... } into libraries you can pass in fub { ... }. When invoked, this wrapped callback will first re-establish any error handlers that you installed with frame and then run your actual callback code. Error callbacks should be populated with fub { die "..." } . It's important that all callbacks be created with fub (or frame) even if you don't expect them to fail so that the dynamic context is preserved for nested callbacks that might.

COMPARISON WITH HTTP

Why a custom protocol, client, and server? Can't we just use something like HTTP?

It depends.

AnyEvent::Task clients send discrete messages and receive ordered, discrete replies from workers, much like HTTP. The AnyEvent::Task protocol can be extended in a backwards compatible manner like HTTP. AnyEvent::Task communication can be pipelined and possibly in the future even compressed like HTTP.

AnyEvent::Task servers (currently) all obey a very specific implementation policy: They are like CGI servers in that each process in that each process they fork is guaranteed to be handling only one connection at once so it can perform blocking operations without worrying about holding up other connections.

But since a single process can handle many requests in a row without exiting, the AnyEvent::Task server is more like a FastCGI server. The difference however is that while a client holds a checkout it is guaranteed an exclusive lock on that process. With a FastCGI server it is assumed that requests are stateless so you can't necessarily be sure you'll get the same process for two consecutive requests. In fact, if an error is thrown in the FastCGI handler you may never get the same process back again, preventing you from being able to recover from the error, retry, or at least collect process state for logging reasons.

The fundamental difference between the AnyEvent::Task protocol and HTTP is that in AnyEvent::Task the client is the dominant protocol orchestrator whereas in HTTP it is the server.

In AnyEvent::Task, the client manages the worker pool and the client decides if/when the worker process should terminate. In the normal case, a client will just return the worker to its worker pool. A worker is supposed to accept commands for as long as possible until the client dismisses it.

Client processes can be started and checkouts can be obtained before the server is even started. The client will continue to try to connect to the server to obtain worker processes until either the server starts or the checkout's timeout period lapses.

The client decides the timeout for each checkout and different clients can have different timeouts while connecting to the same server.

The client even decides how many minimum and maximum workers it requires at once. The server is really just a simple fork-on-demand server and most of the sophistication is in the asynchronous client.

SEE ALSO

The AnyEvent::Task github repo

AnyEvent::Task is integrated with Callback::Frame. In order to handle exceptions in a meaningful way, you will need to use this module.

There's about a million CPAN modules that do similar things.

This module is designed to be used in a non-blocking, process-based unix program. Depending on your exact requirements you might find something else useful: Parallel::ForkManager, Thread::Pool, an HTTP server of some kind, &c.

If you're into AnyEvent, AnyEvent::DBI and AnyEvent::Worker (based on AnyEvent::DBI), and AnyEvent::ForkObject send and receive commands from worker processes similar to this module. AnyEvent::Worker::Pool also has an implementation of a worker pool. AnyEvent::Gearman can interface with Gearman services.

If you're into POE there is POE::Component::Pool::DBI, POEx::WorkerPool, POE::Component::ResourcePool, POE::Component::PreforkDispatch, Cantella::Worker, &c.

BUGS

This module is still being developed although the interface should be mostly stable.

AUTHOR

Doug Hoyte, <doug@hcsw.org>

COPYRIGHT & LICENSE

Copyright 2012-2013 Doug Hoyte.

This module is licensed under the same terms as perl itself.