NAME

Beekeeper::Worker - Base class for creating services

VERSION

Version 0.06

SYNOPSIS

  package MyApp::Worker;
  
  use Beekeeper::Worker ':log';
  use base 'Beekeeper::Worker';
  
  sub on_startup {
      my $self = shift;
      
      $self->accept_notifications(
          'myapp.msg' => 'got_message',
      );
      
      $self->accept_remote_calls(
          'myapp.sum' => 'do_sum',
      );
  
      log_info 'Ready';
  }
  
  sub authorize_request {
      my ($self, $req) = @_;
  
      return BKPR_REQUEST_AUTHORIZED;
  }
  
  sub got_message {
      my ($self, $params) = @_;
      warn $params->{message};
  }
  
  sub do_sum {
      my ($self, $params) = @_;
      return $params->[0] + $params->[1];
  }

DESCRIPTION

Base class for creating services.

METHODS

CONSTRUCTOR

Beekeeper::Worker objects are created automatically by Beekeeper::WorkerPool after spawning new processes.

METHODS

on_startup

This method is executed on a fresh worker process immediately after it was spawned, after connecting to the broker and initializing the logger.

The default implementation is just a placeholder, intended to be overrided in subclasses.

This is the place to perform startup tasks (like creating database or cache connections) and declare which calls and notifications the worker will accept.

After this method returns the worker will wait for incoming events to handle.

on_shutdown

This method is executed just before a worker process is stopped.

It can be overrided as needed, the default implementation does nothing.

authorize_request( $req )

This method MUST be overrided in worker classes, as the default behavior is to deny the execution of any request.

When a request is received this method is called before executing the corresponding handler, and it must return the exported constant BKPR_REQUEST_AUTHORIZED in order to authorize it. Returning any other value will result in the request being ignored.

This is the place to handle application authentication and authorization.

Parameter $req is either a Beekeeper::JSONRPC::Notification or a Beekeeper::JSONRPC::Request object.

log_handler

By default, all workers use a Beekeeper::Logger logger which logs errors and warnings to files and also to a topic on the message bus. The command line tool bkpr-log allows to inspect in real time the logs from the message bus.

This method can be overrided in worker classes in order to replace the default log mechanism for another one. To do so, the new implementation must return an object implementing a log method (see Beekeeper::Logger::log for reference).

For convenience you can import the ':log' symbols and expose to your class the functions log_fatal, log_alert, log_critical, log_error, log_warn, log_warning, log_notice, log_info, log_debug and log_trace.

These will call the underlying log method of the logger class if the severity is equal or higher than $Beekeeper::Worker::LogLevel, which is set to LOG_INFO by default. The log level can be set to LOG_DEBUG with the --debug option of bkpr, or setting a "debug" option to a true value in config file pool.config.json.

Using these functions makes very easy to switch logging backends at a later date.

All warnings and errors generated by the execution of the worker code are logged (unless their severity is below the current log level).

RPC call methods

In order to make RPC calls to another services, methods send_notification, call_remote, call_remote_async, fire_remote and wait_async_calls are automatically imported from Beekeeper::Client.

accept_notifications ( $method => $callback, ... )

Make this worker start accepting specified notifications from message bus.

$method is a string with the format "{service_class}.{method}". A default or fallback handler can be specified using a wildcard as "{service_class}.*".

$callback is the method handler (a method name or a coderef) that will be called when a notification is received. When executed, the handler will receive two parameters $params (which contains the notification data itself) and $req which is a Beekeeper::JSONRPC::Notification object (usually redundant unless it is necessary to inspect the MQTT properties of the notification).

Notifications are not expected to return a value. Any value returned from notification handlers will be ignored.

The handler is executed within an eval block. If it dies the error will be logged but the worker will continue running.

Example:

  package MyWorker;
  
  use Beekeeper::Worker ':log';
  use base 'Beekeeper::Worker';
  
  sub on_startup {
      my ($self) = @_;
      
      $self->accept_notifications(
          'foo.bar' => 'bar',       # call $self->bar       for notifications 'foo.bar'
          'foo.baz' =>  $coderef,   # call $coderef->()     for notifications 'foo.baz'
          'foo.*'   => 'fallback',  # call $self->fallback  for any other 'foo.*'
      );
  }  
  
  sub bar {
       my ($self, $params, $req) = @_;
       
       # $self is a MyWorker object
       # $params is a ref to the notification data
       # $req is a Beekeeper::JSONRPC::Notification object
  
       log_warn "Got a notification foo.bar";
  }

accept_remote_calls ( $method => $callback, ... )

Make this worker start accepting specified RPC requests from message bus.

$method is a string with the format "{service_class}.{method}". A default or fallback handler can be specified using a wildcard as "{service_class}.*".

$callback is the method handler (a method name or a coderef) that will be called when a request is received. When executed, the handler will receive two parameters $params (which contains the notification data itself) and $req which is a Beekeeper::JSONRPC::Request object.

The value or reference returned by the handler will be sent back to the caller as response (unless the response is deferred with $req-\async_response>).

The handler is executed within an eval block. If it dies the error will be logged and the caller will receive a generic error response, but the worker will continue running.

Example:

  package MyWorker;
  
  use Beekeeper::Worker ':log';
  use base 'Beekeeper::Worker';
  
  sub on_startup {
       my ($self) = @_;
      
      $self->accept_remote_calls(
          'foo.inc' => 'increment',  # call $self->increment  for requests to 'foo.inc'
          'foo.baz' =>  $coderef,    # call $coderef->()      for requests to 'foo.baz'
          'foo.*'   => 'fallback',   # call $self->fallback   for any other 'foo.*'
      );
  }
  
  sub increment {
       my ($self, $params, $req) = @_;
       
       # $self is a MyWorker object
       # $params is a ref to the parameters of the request
       # $req is a Beekeeper::JSONRPC::Request object
  
       log_warn "Got a call to foo.inc";
  
       return $params->{number} + 1;
  }

Remote calls can be processed concurrently by means of calling $req-\async_response> to tell Beekeeper that the response for the request will be deferred until it is available, freeing the worker to accept more requests. Once the response is ready, it must be sent back to the caller with $req-\send_response>.

This handler process requests concurrently:

  sub increment {
      my ($self, $params, $req) = @_;
  
      my $number = $params->{number};
  
      $req->async_response;
  
      my $t; $t = AnyEvent->timer( after => 1, cb => sub {
          undef $t;
          $req->send_response( $number + 1 );
      });
  }

Note that callback closures will not be executed in Beekeeper scope but in the event loop one, so uncatched exceptions in these closures will cause the worker to die and be respawn.

Asynchronous method handlers use system resources more efficiently, but are significantly harder to write and debug.

stop_accepting_notifications ( $method, ... )

Make this worker stop accepting specified notifications from message bus.

$method must be one of the strings used previously in accept_notifications.

stop_accepting_calls ( $method, ... )

Make this worker stop accepting specified RPC requests from message bus.

$method must be one of the strings used previously in accept_remote_calls.

stop_working

Make this worker stop accepting new RPC requests, process all requests already received, execute on_shutdown method, and then exit.

This is the default signal handler for TERM signal.

Please note that it is not possible to stop worker pools calling this method, as WorkerPool will immediately respawn another worker after the current one exits.

SEE ALSO

Beekeeper::Client, Beekeeper::Logger, Beekeeper::WorkerPool.

AUTHOR

José Micó, jose.mico@gmail.com

COPYRIGHT AND LICENSE

Copyright 2015-2021 José Micó.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language itself.

This software is distributed in the hope that it will be useful, but it is provided “as is” and without any express or implied warranties. For details, see the full text of the license in the file LICENSE.