NAME

Beekeeper::Worker - Base class for creating services

VERSION

Version 0.01

SYNOPSIS

  package MyApp::Worker;
  
  use Beekeeper::Worker ':log';
  use base 'Beekeeper::Worker';

  sub on_startup {
      my $self = shift;
      
      $self->accept_notifications(
          'myapp.msg' => 'got_message',
      );
      
      $self->accept_jobs(
          'myapp.sum' => 'do_sum',
      );

      log_debug 'Ready';
  }
  
  sub authorize_request {
      my ($self, $req) = @_;
  
      return REQUEST_AUTHORIZED;
  }

  sub got_message {
      my ($self, $params) = @_;
      warn $params->{message};
  }
  
  sub do_sum {
      my ($self, $params) = @_;
      return $params->[0] + $params->[1];
  }

DESCRIPTION

Base class for creating services.

METHODS

CONSTRUCTOR

Beekeeper::Worker objects are created automatically by Beekeeper::WorkerPool after spawning new processes.

METHODS

on_startup

This method is executed on a fresh worker process immediately after it was spawned. The connection to the broker was just stablished and the logger was initialized.

It is placeholder intended to be overrided in subclasses, which in their implementation perform startup tasks and declare which job methods and notifications will be handled.

This is the place to initialize, for example, persistent database or cache connections.

After this method returns the worker will wait for incoming events to handle.

on_shutdown

This method is executed just before a worker process is stopped.

It can be overrided as needed, the default implementation does nothing.

authorize_request( $req )

This method MUST be overrided in your worker classes, as the default behavior is to deny the execution of any request.

When a request is received this method is called before executing the corresponding callback, and it must return the exported constant REQUEST_AUTHORIZED in order to authorize it. Returning any other value will result in the request being ignored.

This is the place to handle application authentication and authorization.

log_handler

By default, all workers use a Beekeeper::Logger logger which logs errors and warnings to files and also to a topic on the message bus. The command line tool bkpr-log allows to inspect in real time the logs of the entire system.

To replace this default log mechanism for another one of your choice, you must override the class log_handler method and make that return an object implementing a log method.

For convenience you can import the ':log' symbols and expose to your class the functions log_fatal, log_alert, log_critical, log_error, log_warn, log_warning, log_notice, log_info, log_debug and log_trace.

These will call the underlying log method of your logger class, if the severity is equal or higher than $Beekeeper::Worker::LogLevel, which is set to allow warnings by default. You can increase the log level to include debug info with the --debug option of bkpr, or from class config in file pool.config.json.

Using these functions makes very easy to switch logging backends at a later date.

All warnings and errors generated by the execution of the worker code are logged, unless you specifically catch and ignore them.

Client methods

In order to make RPC calls to another services, methods send_notification, do_job, do_async_job, do_background_job and wait_all_jobs are automatically imported from Beekeeper::Client.

accept_notifications ( $method => $callback, ... )

Make this worker start accepting specified notifications from message bus.

$method is a string with the format "{service_class}.{method}". A default or fallback handler can be specified using a wildcard as "{service_class}.*".

$callback is a method name or a coderef that will be called when a notification is received. When executed, the callback will receive two parameters $params (which contains the notification data itself) and $req which is a Beekeeper::JSONRPC::Notification object (usually redundant unless you need to inspect request headers).

Notifications are not expected to return a value, any value returned from its callback is ignored.

The callback is executed within an eval block, if it dies the error will be logged but otherwise the worker will continue running.

Example:

  package MyWorker;
  use base 'Beekeeper::Worker';
  
  sub on_startup {
      my $self = shift;
      
      $self->accept_notifications(
          'foo.bar' => 'bar',       # call $self->bar for notifications 'foo.bar'
          'foo.baz' => $coderef,    # call $self->$coderef for notifications 'foo.baz'
          'foo.*'   => 'fallback',  # call $self->fallback for any other 'foo.*'
      );
  }  
  
  sub bar {
       my ($self, $params, $req) = @_
       
       # $self is a MyWorker object
       # $params is a ref to the notification data
       # $req is a Beekeeper::JSONRPC::Notification object
  }

accept_jobs ( $method => $callback, ... )

Make this worker start accepting specified RPC requests from message bus.

$method is a string with the format "{service_class}.{method}". A default or fallback handler can be specified using a wildcard as "{service_class}.*".

$callback is a method name or a coderef that will be called when a request is received. When executed, the callback will receive two parameters $params (which contains the notification data itself) and $req which is a Beekeeper::JSONRPC::Request object (usually redundant unless you need to inspect request headers).

The value or data ref returned by the callback will be sent back to the caller as response.

The callback is executed within an eval block, if it dies the error will be logged but otherwise the worker will continue running, and the caller will receive a generic error response.

Example:

  package MyWorker;
  use base 'Beekeeper::Worker';
  
  sub on_startup {
      my $self = shift;
      
      $self->accept_jobs(
          'foo.inc' => 'inc',       # call $self->inc for requests to 'foo.inc'
          'foo.baz' => $coderef,    # call $self->$coderef for requests to 'foo.baz'
          'foo.*'   => 'fallback',  # call $self->fallback for any other 'foo.*'
      );
  }
  
  sub bar {
       my ($self, $params, $req) = @_
       
       # $self is a MyWorker object
       # $params is a ref to the parameters of the request
       # $req is a Beekeeper::JSONRPC::Request object
       
       return $params->{number} + 1;
  }

stop_accepting_notifications ( $method, ... )

Make this worker stop accepting specified notifications from message bus.

$method must be one of the strings used previously in accept_notifications.

stop_accepting_jobs ( $method, ... )

Make this worker stop accepting specified RPC requests from message bus.

$method must be one of the strings used previously in accept_jobs.

stop_working

Make this worker stop processing RPC requests and exit. Unprocessed jobs will be resent to another worker by the message broker, unprocessed notifications will be ignored.

This is the default signal handler for TERM signal. If this method is called manually WorkerPool will immediately respawn the worker again after it exits.

SEE ALSO

Beekeeper::Client, Beekeeper::WorkerPool, Beekeeper::Logger.

AUTHOR

José Micó, jose.mico@gmail.com

COPYRIGHT AND LICENSE

Copyright 2015 José Micó.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language itself.

This software is distributed in the hope that it will be useful, but it is provided “as is” and without any express or implied warranties. For details, see the full text of the license in the file LICENSE.