The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

App::MultiModule::Tutorial - How to make this thing go

DESCRIPTION

This tutorial walks you through constructing the example that can be found in ex/ex1 in this module distribution. That shell script creates some config, which in turn references some .pm files that can be found in lib/MultiModuleTest/

You will have everything you need in order to run this tutorial after you cpan or cpanm in App::MultiModule.

Goal

This silly example creates a simple pipline with two tasks.

The first task emits a message every second, with an incrementing counter field, as well as a configurable output string. It can also receive messages to set the counter field to any value. Finally, this task is stateful, so it will not lose track of its count between restarts and/or crashes.

The second task consumes these messages and displays the counter and the output string.

The Tasks

Create the following two files as Example1.pm and Example2.pm, and place them in a directory called MultiModuleTest/ where you plan to run this tutorial.

Example1

This task will be the 'producer': it will emit a message every second that contains the incrementing counter and configured output string.

Here is the annotated source: package MultiModuleTest::Example1; #this acts like a typical Perl Module

 use strict;use warnings;
    #generally a good idea

 use parent 'App::MultiModule::Task';
    #tasks should inherit this class

 sub message {
    #the only required method; every task must have this

    my $self = shift;
        #Perl OOP boilerplate

    my $message = shift;
        #this argument always comes in as a HASH reference

     my %args = @_;
        #other interesting things come in here, but are not always
        #used or useful depending on the task

     if(defined $message->{new_ct}) {
        #someone sent us a message with 'new_ct' set; that means
        #they want us to start counting from that value

        $self->{state}->{ct} = $message->{new_ct};
            #see some of the 'state' setup below

         print STDERR "Example1: set ct to $message->{new_ct}\n";
            #show that we took the change
     }
 }

 sub set_config {
    #optional, but usually necessary to do useful stuff.
    #This is called when a task receives config, and is the only
    #way a task can do any necessary setup in order to produce
    #messages.
    #Also note that set_config() is called every time config
    #changes.  Code in this method must not assume only a single
    #invocation.

    my $self = shift;
        #as in message()

    my $config = shift;
        #this argument always comes in as a HASH reference

    my %args = @_;
        #as in message()

    $self->{config} = $config;
        #not necessary in this example, but is a common pattern
        #if future calls to message() need access to config

    $self->{state} = { ct => 0 } unless $self->{state};
        #Initialize our counter, but only the first time. This task is
        #marked as 'stateful' (see the code below), so after we are first
        #run, future runs (between restarts and crashes) will have 
        #$self->{state} already 'filled in'.

    $self->named_recur(
        #perldoc App::MultiModule::Core
        #this method schedules the passed CODE reference to be run
        #on the specified interval

        recur_name => 'Example1',
            #this string is the process global uniqueness identifier;
            #that is, given that set_config() can be called any number
            #of times, named_recur(), with a unique recur_name field,
            #will schedule the repeating call to the passed CODE ref
            #only once.

        repeat_interval => 1,
            #how often to run, in seconds.

        work => sub {  #What we're going to do on an interval
            my $message = {
                #a 'message' is nothing special, except it must
                #always be a HASH reference at the top level

                ct => $self->{state}->{ct}++,
                    #increment the counter in the saved state, and
                    #put that into the 'ct' field of the message.

                outstr => $config->{outstr},
                    #simply pass in the configured 'output' string.
            };
            $self->emit($message);
                #key point!
                #Tasks do not send messages to other tasks.  They simply
                #emit() messages to the MultiModule framework.  Separate
                #config to the Router task is fully responsible for how
                #messages flow.
        },
    );
 }

 #tasks default to not stateful; in order to become stateful, just
 #make the is_stateful() method return some kind of true thing
 sub is_stateful {
    return 'yes!';
 }
 1;

Example2

This task will be the 'consumer': it will receive the produced messages and output the counter and output string.

Here is the source:

 package MultiModuleTest::Example2;
 use strict;use warnings;
 use parent 'App::MultiModule::Task';
 sub message {
     my $self = shift; my $message = shift;

     if($message->{ct} and $message->{outstr}) {
        #defensive programming here; it is best not to assume that
        #every message we get has the fields we expect.

        print STDERR "Example2: received a message: $message->{ct} ($message->{outstr})\n"
            #just output the counter and output string
     }
 }

 #note what's missing: this task doesn't need any config, and
 #it needs no state handling.

 1;

The Config

Place this in a file named 'test.conf' in the directory where you are running this tutorial.

Simply put, the task name is the HASH key, and the value is the configuration sent to that task. It's really that simple.

Here it is:

 {   Example1 => { #this HASH reference is passed as the 
                   #second argument (after $self) to 
                   #the set_config() method, as illustrated
                   #above
        outstr => 'howdy'
     },
     Router => { #Just skim over this config, then read the
                 #detailed description below. Then re-read
                 #this config, and it'll make sense.

         routes => [
             #all of the routes.  A 'route' is, minimally, a
             #match and a forward.  If a message matches the
             #config in this match, the message is then handled
             #by the config in the related forward.
             {   match => {
                     source => 'Example1'
                        #any message that has a key named
                        #'source' whose value is 'Example1'
                 },
                 forwards => [
                     {   qname => 'Example2' }
                        #send this message to the queue named
                        #'Example2'
                 ]
             }
         ]
     }
 }

This is pretty much implemented in

https://metacpan.org/release/Message-Router

The matching syntax is implemented in

https://metacpan.org/release/Message-Match

Here's a part of the SYNOPSIS from Message::Match:

    mmatch(
        {a => 'b', c => 'd'},   #message
        {a => 'b'}              #match
    ); #true
    mmatch(
        {a => 'b', c => {x => 'y'}, #message
        {c => {x => 'y'}}           #match
    ); #true
    mmatch(
        {a => 'b', c => 'd'},   #message
        {x => 'y'}              #match
    ); #false

In our case, any message that contains the key 'source', which has value 'Example1' matches.

You'll note that Example1.pm only emits messages like this: { ct => 12, outstr => 'howdy' }

No 'source' is mentioned.

The emit() method automatically sets the 'source' field to be the name of the Task that called emit().

This is one of the most basic, deterministic ways to route messages between tasks.

So, the Router task will forward every message that comes from Example1 to Example2.

In MultiModule, task names and queue names always match. You send messages to a task by putting them on the queue whose name matches that task.

The Output

So here's an example run, copied from the bin/MultiModule perldoc:

 $ bin/MultiModule -p MultiModuleTest:: -c test.conf
 ...
 Example2: received a message: 1 (howdy)
 Example2: received a message: 2 (howdy)
 Example2: received a message: 3 (howdy)
 Example2: received a message: 4 (howdy)
 Example2: received a message: 5 (howdy)
 ^Ccaught SIGINT. starting orderly exit
 $ bin/MultiModule -p MultiModuleTest:: -c test.conf
 Example2: received a message: 6 (howdy)
 Example2: received a message: 7 (howdy)
 ...

In another window: $ strans '{new_ct => 999}' Example1

Back to the first window: Example2: received a message: 8 (howdy) Example2: received a message: 9 (howdy) Example2: received a message: 10 (howdy) Example1: received a message: $VAR1 = { '.ipc_transit_meta' => { 'send_ts' => 1234567890 }, 'new_ct' => 999 }; Example1: set ct to 999 Example2: received a message: 11 (howdy) Example2: received a message: 999 (howdy) Example2: received a message: 1000 (howdy) Example2: received a message: 1001 (howdy) Example2: received a message: 1002 (howdy) ^Ccaught SIGINT. starting orderly exit

You'll notice that the 11 appeared even after the ct was set to 999. That's expected; the message containing 11 was already on its way to Example2 when the set arrived.

What is 'strans'? It's the command-line client to send arbitrary messages via IPC::Transit, which is the module used for all of this communication.

There's a LOT MORE!

But that's the end of this tutorial.