The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Net::STOMP::Client::Tutorial - Getting started with STOMP and Net::STOMP::Client

INTRODUCTION

Here is a five-parts tutorial to get started with Net::STOMP::Client.

A basic knowledge of STOMP is required. For this, you can read:

PART 1: CONNECTING TO A BROKER

Net::STOMP::Client, like similar modules under the Net::* hierarchy, provides an object oriented interface to a network protocol.

CREATING AN OBJECT

In order to connect to a broker, you first have to create an object. This object will later be used to interact with the broker. When the module creates the object, it tries to connect to the broker using either plain TCP or SSL. Nothing is done at the STOMP level.

To create the object, you of course need to specify where to connect to. This can be done either with a single uri parameter:

  $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");

or with the pair of host and port parameters:

  $stomp = Net::STOMP::Client->new(
      host => "mybroker",
      port => 6163,
  );

USING SSL

Using SSL is more complex since more parameters have to be given. Note: IO::Socket::SSL is used behind the scene so you can refer to its documentation for more information. Here is how this could be done:

  $stomp = Net::STOMP::Client->new(
      uri => "stomp+ssl://mybroker:6162",
      sockopts => {
          # path of the directory containing trusted certificates
          SSL_ca_path   => "/etc/ssl/ca",
          # client certificate to present
          SSL_cert_file => "/etc/ssl/client-cert.pem",
          # client private key
          SSL_key_file  => "/etc/ssl/client-key.pem",
          # passphrase of the client private key
          SSL_passwd_cb => sub { return("secret") },
      },
  );

GETTING PEER INFORMATION

Once connected, at TCP or SSL level, you can get information about the broker using the peer method. For instance:

  $peer = $stomp->peer();
  printf("connected to broker %s (IP %s), port %d\n",
      $peer->host(), $peer->addr(), $peer->port());

CONNECTING

After creating the broker object, you must start a STOMP session by sending a CONNECT frame. This is as simple as:

  $stomp->connect();

If authentication is required, you must pass extra information at this stage. For instance with:

  $stomp->connect(
      login    => "guest",
      passcode => "welcome",
  );

At this point, the session has been established and you can now send and/or receive messages.

GETTING SERVER INFORMATION

Once connected at STOMP level, you can get information about the broker using the server and version methods. For instance:

  printf("speaking STOMP %s with server %s\n",
      $stomp->version(), $stomp->server() || "UNKNOWN");

DISCONNECTING

When you are done with messaging, you should gracefully end the session by sending a DISCONNECT frame with:

  $stomp->disconnect();

Note: STOMP does not support reconnection. Once the session has been ended, the broker object cannot be used anymore.

WRAP UP

Putting all this together, here is a complete program that simply connects, starts and ends a session, printing information along the way.

  use Net::STOMP::Client;
  $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
  $peer = $stomp->peer();
  printf("connected to broker %s (IP %s), port %d\n",
      $peer->host(), $peer->addr(), $peer->port());
  $stomp->connect();
  printf("speaking STOMP %s with server %s\n",
      $stomp->version(), $stomp->server() || "UNKNOWN");
  printf("session %s started\n", $stomp->session());
  $stomp->disconnect();
  printf("session ended\n");

PART 2: SENDING MESSAGES

SENDING MESSAGES

A message is made of a header (a list of key/value pairs) and a body. Both are optional.

To send a message, you have to issue a SEND frame. For instance:

  $stomp->send(
      destination => "/queue/test",
      subject     => "this is a test",
      time        => time(),
      body        => "Hello world!\n",
  );

USING RECEIPTS

By default, you do not get any confirmation that the message has indeed been received by the broker. If you want such a confirmation, you have to use receipts. The following code snippet sends two messages with a receipt header containing a pseudo-unique id and waits for matching RECEIPT frames coming from the broker. This is easy because the Net::STOMP::Client module keeps track of which receipts are expected and have not been received yet.

  $stomp->send(
      destination => "/queue/test",
      body        => "Test of receipts 1...\n",
      receipt     => $stomp->uuid(),
  );
  $stomp->send(
      destination => "/queue/test",
      body        => "Test of receipts 2...\n",
      receipt     => $stomp->uuid(),
  );
  # wait at most 10 seconds for all pending receipts
  $stomp->wait_for_receipts(timeout => 10);
  # complain if some receipts are still pending
  die("Receipt not received!\n") if $stomp->receipts();

Note: all STOMP frames can carry a receipt header so this is not restricted to message sending.

USING TRANSACTIONS

In addition, you can use transactions to group the sending of several messages so that either none or all of them get handled by the broker.

  # create a unique transaction id
  $tid = $stomp->uuid();
  # begin the transaction
  $stomp->begin(transaction => $tid);
  # send two messages as part of this transaction
  $stomp->send(
      destination => "/queue/test1",
      body        => "message 1",
      transaction => $tid,
  );
  $stomp->send(
      destination => "/queue/test2",
      body        => "message 2",
      transaction => $tid,
  );
  # either abort or commit
  if (... something bad happened...) {
      # abort/rollback the transaction
      $stomp->abort(transaction => $tid);
      # no messages have been delivered to the broker
  } else {
      # commit the transaction
      $stomp->commit(transaction => $tid);
      # both messages have been delivered to the broker
  }

PART 3: RECEIVING MESSAGES

USING SUBSCRIPTIONS

In order to receive frames, you first have to subscribe to one or more destinations. This is as easy as:

  $stomp->subscribe(destination => "/queue/test");

When you are done, you simply unsubscribe with:

  $stomp->unsubscribe(destination => "/queue/test");

It is good practice to add an id header to uniquely identify the subscription. All messages part of this subscription will have a matching subscription header. This id can also be used to unsubscribe.

In fact, the code above only works with STOMP 1.0. In STOMP 1.1 and above, the id header has been made mandatory so you must use something like:

  $stomp->subscribe(
      destination => "/queue/test",
      id          => "testsub",
  );
  # received messages will contain: subscription:testsub
  $stomp->unsubscribe(id => "testsub");

RECEIVING FRAMES

While you are subscribed to some destinations, the broker may decide at any time to send you MESSAGE frames. You can process these frames with a simple loop:

  while ($frame = $stomp->wait_for_frames()) {
      # ... do something with the received frame ...
  }

The code above is blocking and will loop forever. You can add a timeout option to have a non-blocking loop:

  while (1) {
      # wait at most one second for a new frame
      $frame = $stomp->wait_for_frames(timeout => 1);
      # do what is appropriate
      if ($frame) {
          # ... do something with the received frame ...
      } else {
          # nothing received
      }
  }

Because of the asynchronous nature of STOMP, receiving messages is a bit tricky: you cannot know a priori which types of frames will be sent when. For instance, you may want to send messages (with receipts) while you are subscribed to some destinations and you may receive a MESSAGE frame while you would like to wait for a <RECEIPT> frame, or vice versa.

The wait_for_frames method described above will wait for any frame, not only message frames. It is up to you to check that what you receive is a MESSAGE frame or not. This can be done with something like:

  if ($frame->command() eq "MESSAGE") {
      # ... do something with the received message ...
  } else {
      # something else than a message frame
  }

WRAP UP

Putting all this together, here is a complete program that receives ten messages from to /queue/test:

  use Net::STOMP::Client;
  $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
  # the next line will be explained in the next part of the tutorial ;-)
  $stomp->message_callback(sub { return(1) });
  $stomp->connect();
  $sid = $stomp->uuid();
  $stomp->subscribe(
      destination => "/queue/test",
      # we use the generated subscription id
      id          => $sid,
      # we want a receipt on our SUBSCRIBE frame
      receipt     => $stomp->uuid(),
  );
  $count = 0;
  while ($count < 10) {
      $frame = $stomp->wait_for_frames(timeout => 1);
      if ($frame) {
          if ($frame->command() eq "MESSAGE") {
              $count++;
              printf("received message %d with id %s\n",
                     $count, $frame->header("message-id"));
          } else {
              # this will catch the RECEIPT frame
              printf("%s frame received\n", $frame->command());
          }
      } else {
          print("waiting for messages...\n");
      }
  }
  $stomp->unsubscribe(id => $sid);
  $stomp->disconnect();

PART 4: USING CALLBACKS

As seen in part 3, because of the asynchronous nature of STOMP, it is a bit tricky to properly handle all the different types of frames that can be received.

In order to simplify this, Net::STOMP::Client supports the use of callbacks. They are pieces of code called in well defined situations. In fact, there are two levels of callbacks: global and local.

GLOBAL CALLBACKS

Global (per command) callbacks are called each time a frame is received. Net::STOMP::Client has default callbacks that should be sufficient for all types of frames, except for MESSAGE frames. For these, it is really up to the coder to define what he wants to do with the received messages.

Here is an example with a message callback counting the messages received:

  $stomp->message_callback(sub {
      my($self, $frame) = @_;
      $count++;
      return($self);
  });

These callbacks are somehow global and it is good practice not to change them during a session. If you do not need a global message callback, you can supply the dummy:

  $stomp->message_callback(sub { return(1) });

Here is how to re-write a simplified version of the inner part of the receiving program of part 3 with a global callback:

  $count = 0;
  sub msg_cb ($$) {
      my($self, $frame) = @_;
      $count++;
      printf("received message %d with id %s\n",
             $count, $frame->header("message-id"));
      return($self);
  }
  $stomp->message_callback(\&msg_cb);
  $stomp->wait_for_frames() while $count < 10;

LOCAL CALLBACKS

Local (per invocation) callbacks are called by the wait_for_frame method. Their return value control what wait_for_frame does:

  • false: wait_for_frame should wait for more frames

  • true: wait_for_frame can stop and return this value

Here is how to use wait_for_frames with a local callback to wait until we receive a MESSAGE frame that contains "quit" in the body:

  sub msg_cb ($$) {
      my($self, $frame) = @_;
      return(0) unless $frame->command() eq "MESSAGE";
      return(0) unless $frame->body() =~ /quit/;
      return($frame);
  }
  $frame = $stomp->wait_for_frames(callback => \&msg_cb);

As you see, you can put the logic either in the global callbacks or in the local callbacks. The best practice is to have a single global message callback that does not change throughout the execution of the program and to optionally put in local callbacks what may change from one place of the program to another.

WRAP UP

Here is how to re-write the receiving program of part 3 with a global callback only counting the number of messages and a local callback printing information:

  use Net::STOMP::Client;
  $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
  $stomp->connect();
  sub msg_cb ($$) {
      my($self, $frame) = @_;
      my $cmd = $frame->command();
      if ($cmd eq "MESSAGE") {
          printf("received message %d with id %s\n",
                 $count, $frame->header("message-id"));
      } else {
          printf("%s frame received\n", $cmd);
      }
      return($frame);
  }
  $stomp->message_callback(sub { $count++ });
  $sid = $stomp->uuid();
  $stomp->subscribe(
      destination => "/queue/test",
      id          => $sid,
      receipt     => $stomp->uuid(),
  );
  $count = 0;
  while ($count < 10) {
      $stomp->wait_for_frames(
          callback => \&msg_cb,
          timeout => 1,
      ) or print("waiting for messages...\n");
  }
  $stomp->unsubscribe(id => $sid);
  $stomp->disconnect();

PART 5: ADVANCED FEATURES

ACKNOWLEDGMENT MODES

Unless specified otherwise, subscriptions are made in auto mode, meaning that a message is considered to be delivered by the broker as soon as it sends the corresponding MESSAGE frame. The client may not receive the frame or it could exit before processing it. This could result in message loss.

In order to avoid message loss, one can change the subscription acknowledgment mode to be client instead of auto. This is an option of the SUBSCRIBE frame:

  $stomp->subscribe(
      destination => "/queue/test",
      id          => $sid,
      ack         => "client",
  );

In client mode, the client must explicitly acknowledge the messages it has successfully processed. This is achieved by sending an ACK frame with a message-id header matching the one of the received message:

  $stomp->ack("message-id" => $frame->header("message-id"));

In practice, this is more complex since the exact way to acknowledge messages changed between STOMP 1.0 (only message-id required), STOMP 1.1 (both message-id and subscription required) and STOMP 1.2 (only id required).

Net::STOMP::Client has a unique feature to ease acknowledgments: you can directly pass the frame that holds the received message and the module will properly acknowledge, it regardless of the STOMP protocol version used:

  $stomp->ack(frame => $frame);

Messages that have not been acknowledged by the end of the session will be resent by the broker.

Note that starting with STOMP 1.1 also has a client-individual mode. Please consult the protocol specification for more details.

EFFICIENT I/O

The high-level methods handle one frame at a time. This can be inefficient for small frames. For instance, the send method will build a frame object, encode it and send it on the wire with at least one call to syswrite, maybe for very few bytes.

The low-level methods allow you to better control this and queue messages in memory before sending them. This way, you group data and use I/O more efficiently.

Here is how to queue ten messages and send them in one go.

  foreach my $n (1 .. 10) {
      $frame = Net::STOMP::Client::Frame->new(
          command => "SEND",
          headers => { destination => "/topic/test" },
          body    => "message $n",
      );
      # simply add the frame to the outgoing queue
      $stomp->queue_frame($frame);
  }
  # no timeout given: block until all data has been sent
  $stomp->send_data();

SEE ALSO

IO::Socket::SSL, Net::STOMP::Client, Net::STOMP::Client::Connection, Net::STOMP::Client::Frame, Net::STOMP::Client::HeartBeat, Net::STOMP::Client::Peer, Net::STOMP::Client::Receipt, Net::STOMP::Client::Version.

AUTHOR

Lionel Cons http://cern.ch/lionel.cons

Copyright (C) CERN 2010-2013