Jose Luis Martinez Torres
and 1 contributors

NAME

SQS::Worker - A light framework for processing messages from SQS queues

DESCRIPTION

SQS::Worker is a light framework that allows you to just code asyncronous tasks that consume messages from an SQS Queue. The framework takes care of launching the necessary processes (workers), and executes your code on incoming messages, so you can focus on writing the important part (behavior)

Also, since you're surely going to be deserializing the messages that come from the queue, SQS::Worker provides you with ways to easily consume JSON messages, for example.

It comes in the form of a Moose role that is to be composed into the end user code that wants to receive and process messages from an SQS queue.

The worker runs uninterrumped, fetching messages from it's configured queue, one at a time and then executing the process_message of the worker class.

The worker consumer can compose further funcionality by consuming more roles from the SQS::Worker namespace.

USAGE

Simple usage

        package YourWorker;

        use Moose;
        with 'SQS::Worker';

        sub process_message {
                my ($self,$message) = @_;

    # $message is a Paws::SQS::Message
                # do something with the message 
        }

Composing automatic json decoding to perl data structure

        package YourWorker;
  use Moose;
        with 'SQS::Worker', 'SQS::Worker::DecodeJson';

        sub process_mesage {
                my ($self, $data) = @_;
                
                # Do something with the data, already parsed into a structure
                my $name = $data->{name};

    # You get a logger attached to the worker so you can log stuff
    $c->log->info("I processed a message for $name");
        }

Bundled roles

SQS::Worker::DecodeJson decodes the message body in json format and passes

SQS::Worker::DecodeStorable decodes the message body in Perl storable format

SQS::Worker::Multiplex dispatches to different methods via a dispatch table

SQS::Worker::SNS decodes a message sent from SNS and inflates it to a SNS::Notfication

Creating your own processing module

Create a Moose role that wraps functionality around the method process_message

  package PrefixTheMessage;
    use Moose::Role;

    around process_message => sub {
      my ($orig, $self, $message) = @_;
      return 'prefixed ' . $message->Body;
    };

  1;

And then use it inside your consumers

  package YourWorker;
  
        use Moose;
        with 'SQS::Worker', 'PrefixTheMessage';
  
        sub process_mesage {
                my ($self, $message) = @_;
    # surprise! $message is prefixed!
  }
  
  1;

Composing roles

The worker roles can be composed (if it makes sense), so your worker could implement

  with 'SQS::Worker', 'SQS::Worker::DecodeJson', 'SQS::Worker::Multiplex';

to decode a message in json format that will then dispatch the json to the multiplex worker

Error handling

Any exception thrown from process_message will be treated as a failed message. Different message processors treat failed messages in different ways:

Message processors

SQS::Consumers::Default Messages processed before deleting them from the queue. If a message fails, it will be treated by SQS as an unprocessed message, and will reappear in the queue to be processed again by SQS (or delivered to a dead letter queue after N redeliveries if your SQS queue is configured appropiately

SQS::Consumers::DeleteAlways Message deleted, then processed. If a message fails it will not be reprocessed ever

Running the worker

Running the worker can be done via the spawn_worker command that comes bundled with the distribution

  spawn_worker --worker YourWorker --queue_url sqs_endpoint_url --region aws_sqs_region --log_conf log4perl_config_file_path

You can also control if the message should be deleted upon reception (before the message is actually processed) with

 spawn_worker --worker YourClass --queue_url sqs_endpoint_url --region aws_sqs_region --log_conf log4perl_config_file_path --consumer DeleteAlways

or you can create an instance of your object and invoke run:

  my $worker_instance = YourWorker->new(
    queue_url => $args->queue_url,
    region    => $args->region,
    log => Log::Log4perl->get_logger('async'),
    processor => $args->_consumer,
  );
  $worker_instance->run

Credentials

SQS::Worker uses the same credential system as Paws to authenticate to SQS: so, in a nutshell, it will work if you:

  • have the credentials in the home of the user launching the script, in the ~/.aws/credentials file.

  • assign an IAM role to the EC2 instance that is running the code (if deploying the code inside an EC2 instance)

  • set environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

SEE ALSO

Paws

COPYRIGHT and LICENSE

Copyright (c) 2016 by CAPSiDE

This code is distributed under the Apache 2 License. The full text of the license can be found in the LICENSE file included with this module.

AUTHORS

Jose Luis Martinez, Albert Hilazo, Pau Cervera and Loic Prieto