The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

Message Passing for the Non-Blocked Mind

Introduction and Terminology

This is a tutorial about how to get the swing of the new AnyEvent::MP module, which allows programs to transparently pass messages within the process and to other processes on the same or a different host.

What kind of messages? Basically a message here means a list of Perl strings, numbers, hashes and arrays, anything that can be expressed as a JSON text (as JSON is used by default in the protocol). Here are two examples:

    write_log => 1251555874, "action was successful.\n"
    123, ["a", "b", "c"], { foo => "bar" }

When using AnyEvent::MP it is customary to use a descriptive string as first element of a message, that indictes the type of the message. This element is called a tag in AnyEvent::MP, as some API functions (rcv) support matching it directly.

Supposedly you want to send a ping message with your current time to somewhere, this is how such a message might look like (in Perl syntax):

   ping => 1251381636

Now that we know what a message is, to which entities are those messages being passed? They are passed to ports. A port is a destination for messages but also a context to execute code: when a runtime error occurs while executing code belonging to a port, the exception will be raised on the port and can even travel to interested parties on other nodes, which makes supervision of distributed processes easy.

How do these ports relate to things you know? Each port belongs to a node, and a node is just the UNIX process that runs your AnyEvent::MP application.

Each node is distinguished from other nodes running on the same or another host in a network by its node ID. A node ID is simply a unique string chosen manually or assigned by AnyEvent::MP in some way (UNIX nodename, random string...).

Here is a diagram about how nodes, ports and UNIX processes relate to each other. The setup consists of two nodes (more are of course possible): Node A (in UNIX process 7066) with the ports ABC and DEF. And the node B (in UNIX process 8321) with the ports FOO and BAR.

  |- PID: 7066 -|                  |- PID: 8321 -|
  |             |                  |             |
  | Node ID: A  |                  | Node ID: B  |
  |             |                  |             |
  |   Port ABC =|= <----\ /-----> =|= Port FOO   |
  |             |        X         |             |
  |   Port DEF =|= <----/ \-----> =|= Port BAR   |
  |             |                  |             |
  |-------------|                  |-------------|

The strings for the port IDs here are just for illustrative purposes: Even though ports in AnyEvent::MP are also identified by strings, they can't be choosen manually and are assigned by the system dynamically. These port IDs are unique within a network and can also be used to identify senders or as message tags for instance.

The next sections will explain the API of AnyEvent::MP by going through a few simple examples. Later some more complex idioms are introduced, which are hopefully useful to solve some real world problems.

Passing Your First Message

As a start lets have a look at the messaging API. The following example is just a demo to show the basic elements of message passing with AnyEvent::MP.

The example should print: Ending with: 123, in a rather complicated way, by passing some message to a port.

   use AnyEvent;
   use AnyEvent::MP;

   my $end_cv = AnyEvent->condvar;

   my $port = port;

   rcv $port, test => sub {
      my ($data) = @_;
      $end_cv->send ($data);
   };

   snd $port, test => 123;

   print "Ending with: " . $end_cv->recv . "\n";

It already uses most of the essential functions inside AnyEvent::MP: First there is the port function which will create a port and will return it's port ID, a simple string.

This port ID can be used to send messages to the port and install handlers to receive messages on the port. Since it is a simple string it can be safely passed to other nodes in the network when you want to refer to that specific port (usually used for RPC, where you need to tell the other end which port to send the reply to - messages in AnyEvent::MP have a destination, but no source).

The next function is rcv:

   rcv $port, test => sub { ... };

It installs a receiver callback on the port that specified as the first argument (it only works for "local" ports, i.e. ports created on the same node). The next argument, in this example test, specifies a tag to match. This means that whenever a message with the first element being the string test is received, the callback is called with the remaining parts of that message.

Messages can be sent with the snd function, which is used like this in the example above:

   snd $port, test => 123;

This will send the message 'test', 123 to the port with the port ID stored in $port. Since in this case the receiver has a tag match on test it will call the callback with the first argument being the number 123.

The callback is a typicall AnyEvent idiom: the callback just passes that number on to the condition variable $end_cv which will then pass the value to the print. Condition variables are out of the scope of this tutorial and not often used with ports, so please consult the AnyEvent::Intro about them.

Passing messages inside just one process is boring. Before we can move on and do interprocess message passing we first have to make sure some things have been set up correctly for our nodes to talk to each other.

System Requirements and System Setup

Before we can start with real IPC we have to make sure some things work on your system.

First we have to setup a shared secret: for two AnyEvent::MP nodes to be able to communicate with each other over the network it is necessary to setup the same shared secret for both of them, so they can prove their trustworthyness to each other.

The easiest way is to set this up is to use the aemp utility:

   aemp gensecret

This creates a $HOME/.perl-anyevent-mp config file and generates a random shared secret. You can copy this file to any other system and then communicate over the network (via TCP) with it. You can also select your own shared secret (aemp setsecret) and for increased security requirements you can even create (or configure) a TLS certificate (aemp gencert), causing connections to not just be securely authenticated, but also to be encrypted and protected against tinkering.

Connections will only be successfully established when the nodes that want to connect to each other have the same shared secret (or successfully verify the TLS certificate of the other side, in which case no shared secret is required).

If something does not work as expected, and for example tcpdump shows that the connections are closed almost immediately, you should make sure that ~/.perl-anyevent-mp is the same on all hosts/user accounts that you try to connect with each other!

Thats is all for now, you will find some more advanced fiddling with the aemp utility later.

Passing Messages Between Processes

The Receiver

Lets split the previous example up into two programs: one that contains the sender and one for the receiver. First the receiver application, in full:

   use AnyEvent;
   use AnyEvent::MP;
   use AnyEvent::MP::Global;

   initialise_node "eg_simple_receiver";

   my $port = port;

   AnyEvent::MP::Global::register $port, "eg_receivers";

   rcv $port, test => sub {
      my ($data, $reply_port) = @_;

      print "Received data: " . $data . "\n";
   };

   AnyEvent->condvar->recv;

AnyEvent::MP::Global

Now, that wasn't too bad, was it? Ok, let's step through the new functions and modules that have been used.

For starters, there is now an additional module being used: AnyEvent::MP::Global. This module provides us with a global registry, which lets us register ports in groups that are visible on all nodes in a network.

What is this useful for? Well, the port IDs are random-looking strings, assigned by AnyEvent::MP. We cannot know those port IDs in advance, so we don't know which port ID to send messages to, especially when the message is to be passed between different nodes (or UNIX processes). To find the right port of another node in the network we will need to communicate this somehow to the sender. And exactly that is what AnyEvent::MP::Global provides.

Especially in larger, more anonymous networks this is handy: imagine you have a few database backends, a few web frontends and some processing distributed over a number of hosts: all of these would simply register themselves in the appropriate group, and your web frontends can start to find some database backend.

initialise_node And The Network

Now, let's have a look at the new function, initialise_node:

   initialise_node "eg_simple_receiver";

Before we are able to send messages to other nodes we have to initialise ourself to become a "distributed node". Initialising a node means naming the node, optionally binding some TCP listeners so that other nodes can contact it and connecting to a predefined set of seed addresses so the node can discover the existing network - and the existing network can discover the node!

The first argument, the string "eg_simple_receiver", is the so-called profile to use: A profile holds some information about the application that is going to be a node in an AnyEvent::MP network. Customarily you don't specify a profile name at all: in this case, AnyEvent::MP will use the POSIX nodename.

The profile allows you to set the node ID that your application will use (the node ID defaults to the profile name if not specified). You can also set binds in the profile, meaning that you can define TCP ports that the application will listen on for incoming connections from other nodes of the network.

You should also configure seeds in the profile: A seed is just a TCP address of some other node in the network. To explain this a bit more detailed we have to look at the topology of an AnyEvent::MP network. The topology is called a fully connected mesh, here an example with 4 nodes:

   N1--N2
   | \/ |
   | /\ |
   N3--N4

Now imagine another node N5. wants to connect itself to that network:

   N1--N2
   | \/ |    N5
   | /\ |
   N3--N4

The new node needs to know the binds of all nodes already connected. Exactly this is what the seeds are for: Let's assume that the new node (N5) uses the TCP address of the node N2 as seed. This cuases it to connect to N2:

   N1--N2____
   | \/ |    N5
   | /\ |
   N3--N4

N2 then tells N5 about the binds of the other nodes it is connected to, and N5 creates the rest of the connections:

    /--------\
   N1--N2____|
   | \/ |    N5
   | /\ |   /|
   N3--N4--- |
    \________/

All done: N5 is now happily connected to the rest of the network.

Setting Up The Profiles

Ok, so much to the profile. Now let's setup the eg_simple_receiver profile for later use. For the receiver we just give the receiver a bind:

   aemp profile eg_simple_receiver setbinds localhost:12266

We use localhost in the example, but in the real world, you usually want to use the "real" IP address of your node, so hosts can connect to it. Of course, you can specify many binds, and it is also perfectly useful to run multiple nodes on the same host. Just keep in mind that other nodes will try to connect to those addresses, and this better succeeds if you want your network to be in good working conditions.

While we are at it, we setup the profile for the sender in the second part of this example, too. We will call the sender profile eg_simple_sender. For the sender we set up a seed pointing to the receiver:

   aemp profile eg_simple_sender setseeds localhost:12266
   aemp profile eg_simple_sender setbinds

You might wonder why we setup binds to be empty here: actually, the the fully in the fully connected mesh is not the complete truth: If you don't configure any binds for a node profile it will parse and try to resolve the node ID to find addresses to bind to. In this case we pretend that we do not want this and epxlicitly specify an empty binds list, so the node will not actually listen on any TCP ports.

Nodes without listeners will not be able to send messages to other nodes without listeners, but they can still talk to all other nodes. For this example, as well as in many cases in the real world, we can live with this restriction, and this makes it easier to avoid DNS (assuming your setup is broken, eliminating one potential problem :).

Whee, setting up nodes can be complicated at first, but you only have to do it once per network, and you can leave this boring task to the admins or end-users that want to use your stuff :)

Registering The Receiver

Coming back to our example, we have now introduced the basic purpose of AnyEvent::MP::Global and initialise_node and its use of profiles. We also set up our profiles for later use and now we will finally continue talking about the receiver.

Let's look at the next line(s):

   my $port = port;
   AnyEvent::MP::Global::register $port, "eg_receivers";

The port function has already been discussed. It simply creates a new port and returns the port ID. The register function, however, is new: The first argument is the port ID that we want to add to a global group, and its second argument is the name of that global group.

You can choose the name of such a global group freely (prefixing your package name is highly recommended!). The purpose of such a group is to store a set of port IDs. This set is made available throughout the whole AnyEvent::MP network, so that each node can see which ports belong to that group.

Later we will see how the sender looks for the ports in this global group to send messages to them.

The last step in the example is to set up a receiver callback for those messages, just as was discussed in the first example. We again match for the tag test. The difference is that this time we don't exit the application after receiving the first message. Instead we continue to wait for new messages indefinitely.

The Sender

Ok, now let's take a look at the sender code:

   use AnyEvent;
   use AnyEvent::MP;
   use AnyEvent::MP::Global;

   initialise_node "eg_simple_sender";

   my $find_timer =
      AnyEvent->timer (after => 0, interval => 1, cb => sub {
         my $ports = AnyEvent::MP::Global::find "eg_receivers"
            or return;

         snd $_, test => time
            for @$ports;
      });

   AnyEvent->condvar->recv;

It's even less code. The initialise_node serves the same purpose as in the receiver, we just specify a different profile, the profile we set up without the binds.

Next we set up a timer that repeatedly (every second) calls this chunk of code:

   my $ports = AnyEvent::MP::Global::find "eg_receivers"
      or return;

   snd $_, test => time
      for @$ports;

The only new function here is the find function of AnyEvent::MP::Global. It searches in the global group named eg_receivers for ports. If none are found, it returns undef, which makes our code return instantly and wait for the next round, as nobody is interested in our message.

As soon as the receiver application has connected and the information about the newly added port in the receiver has propagated to the sender node, find returns an array reference that contains the port ID of the receiver port(s).

We then just send a message with a tag and the current time to every port in the global group.

Multiple Receivers

You can even run multiple receivers - the only problem is that they will use the same node ID. To avoid this problem, you can either not specify a profile name at all and rely on DNS and your POSIX node name, or you can use a special feature called "anonymous nodes":

   aemp profile eg_simple_receiver setnodeid anon/

The special name anon/ will be replaced by a random string each time the node starts. This way you can start many receivers (they do not bind on a TCP port, so cnanot collide with each other), and all of them will receive the central time signal.

That's all for now - next time we will teach you about monitoring by writing a simple chat client and server :)

SEE ALSO

AnyEvent

AnyEvent::Handle

AnyEvent::MP

AnyEvent::MP::Global

AUTHOR

  Robin Redeker <elmex@ta-sa.org>