The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

POE::Component::MessageQueue::Manual::Clustering - Instructions for setting up clustered MQ's with fail-over

DESCRIPTION

PoCo::MQ now supports running multiple MQs (possibly on different machines) which share the same DBI back-store. If you put these MQs behind a reverse-proxy/load-balancer and write your clients to reconnect when a connection is broken, you will have automatic fail-over if one of the MQs goes down!

EXAMPLE

If you are using mq.pl, this is one possible way to start your MQs.

  # Start the first MQ (ex. on machine mq1.example.com)
  mq.pl --storage dbi \
        --front-store none \
        --dbi-dsn DBI:mysql:database=pocomq;hostname=db.example.com \
        --dbi-username pocomq \
        --dbi-password pocomq \
        --pump-freq 1 \
        --mq-id mq1 \
        --data-dir /tmp/perl_mq

  # Start the second MQ (ex. on machine mq2.example.com)
  mq.pl --storage dbi \
        --front-store none \
        --dbi-dsn DBI:mysql:database=pocomq;hostname=db.example.com \
        --dbi-username pocomq \
        --dbi-password pocomq \
        --pump-freq 1 \
        --mq-id mq2 \
        --data-dir /tmp/perl_mq

There are a couple important things to note above:

  • --front-store none

    The memory based front-stores Memory and BigMemory are local to each MQ. That means that messages delivered to one, won't be received by clients on the other, until after messages are moved into the back-store.

    You don't have to disable the front-store (and, in fact, it might improve performance when clients are connected directly to the queue receiving the messages) but its affects might not be expected.

    There is also POE::Component::MessageQueue::Storage::Remote front-store that the MQs can share but it isn't currently accessible from mq.pl.

  • --pump-freq 1

    Periodically, an internal queue (ie. /queue/example) is "pumped", which means that the MQ goes through the messaes on that queue and distributes them to available subscribers.

    Normally, a "pump" is triggered on an internal queue only after messages are received on it. But in the case of a clustered MQ, messages might have been recieved on one of its sister MQs and so it doesn't know when to pump.

    Setting --pump-freq tells the MQ to pump all queues regardless of whether anything has changed on them every X seconds. In our example, it pumps all queues every 1 second. If you want the messages to arrive quickly, small values like 1 or 2 are recommended.

  • --mq-id mq1 / mq2

    This sets a unique string id for each MQ. Every MQ sharing the same DBI database, MUST have a different --mq-id.

    Since they are sharing the database, they need to prefix some data in it, so that the other MQ doesn't mess up its state.

REVERSE PROXY / LOAD BALANCER

balance

balance is a very simple reverse-proxy and load-balancer that has packages for many distributions.

http://www.inlab.de/balance.html

Executing it on the load balancing machine (per our EXAMPLE above):

   balance -f 61613 mq1.example.com:61613 mq2.example.com:61613

And that's it!

LVS (Linux Virtual Server)

LVS is a load-balancing solution built into the Linux Kernel. I recommend the HOWTO for learning more about LVS:

http://www.austintek.com/LVS/LVS-HOWTO/

You will have to do a number of things (described that the documentation above) to get your system setup to do LVS, but once its ready, run the following commands as root to configure the load-balancer (per our EXAMPLE above):

  ipvsadm -A -t external.example.com:61613 -s wlc
  ipvsadm -a -t external.example.com:61613 -r mq1.example.com:61613 -m
  ipvsadm -a -t external.example.com:61613 -r mq2.example.com:61613 -m

Here is a link to the LVS project homepage:

http://www.linuxvirtualserver.org/

CLIENTS

Writting clients that are robust to MQ failure and simply reconnect and retry, is a little more challenging than it seems at first.

The main problem, is detecting failure correctly. Different systems (with different OSs, Perl versions) and also over different network configurations (through firewalls, routers, etc) will act differently when the connection to the MQ is broken.

Here are some example snippets for scripts that correctly handle failure when all programs involved (MQs, load-balancer, clients) are running on the same machine. These were tested on Debian Lenny with Linux 2.6.26 and Perl 5.10.

Your mileage may vary when running on your systems on your network. Please test appropriately!

CONSUMER

  use Net::Stomp;
  use strict;

  my $QUEUE_NAME = "/queue/example";
  my $HOSTNAME   = "localhost";
  my $PORT       = 61613;
  my $USERNAME   = 'producer';
  my $PASSWORD   = 'test';

  sub main
  {
    my $stomp;
 
    my $connect = sub {
      while (1) {
        eval {
          $stomp = Net::Stomp->new({
            hostname => $HOSTNAME,
            port     => $PORT,
          });
          $stomp->connect({
            login    => $USERNAME,
            passcode => $PASSWORD,
          });
          $stomp->subscribe({
            destination => $QUEUE_NAME,
            ack         => 'client',
          });
        };
        if (!$@) {
          # If no Perl exception was thrown, return!  We are connected. 
          return;
        }

        # Otherwise, wait a second and try again.
        print STDERR "Unable to connect to MQ!  Sleeping 1 second then trying again...\n";
        sleep 1;
      }
    };
    $connect->();

    my $receive = sub {
      my $frame;
      while (1) {
        eval {
          $frame = $stomp->receive_frame;
        };
        if (!$@) {
          # If no Perl exception was thrown, return our $frame!
          return $frame;
        }

        # Otherwise, wait a second, reconnect and then try again.
        print STDERR "Connection to MQ broken!  Sleeping 1 second then attempting to reconnect...\n";
        sleep 1;
        $connect->();
      }
    };
 
    while (1) {
      my $frame = $receive->();

      # TODO: Do something useful with this frame!!
      print "received:". $frame->body ."\n";

      $stomp->ack({ frame => $frame });
    }
 
    $stomp->disconnect();
  }
  main;

PRODUCER

  use Net::Stomp;
  use Data::Random qw/rand_chars/;
  use strict;
 
  my $QUEUE_NAME = "/queue/example";
  my $HOSTNAME   = "localhost";
  my $PORT       = 61613;
  my $USERNAME   = 'producer';
  my $PASSWORD   = 'test';
 
  # If we try to send a message on a connection after the MQ on the other side has died,
  # we received SIGPIPE.  It would be awesome if instead of receiving a signal, we got
  # an exception from $stomp->send(), but this is unfortunately how it is...
  my $sigpipe = 0;
  $SIG{PIPE} = sub {
    print STDERR "Caught signal SIGPIPE\n";
    $sigpipe = 1;
  };
  
  sub main
  {
    my $stomp;
  
    my $connect = sub {
      while (1) {
        eval {
          $stomp = Net::Stomp->new({
            hostname => $HOSTNAME,
            port     => $PORT,
          });
          $stomp->connect({
            login    => $USERNAME,
            passcode => $PASSWORD,
          });
        };
        if (!$@) {
          # If no Perl exception was thrown, return!  We are connected. 
          return;
        }

        # Otherwise, wait a second and try again.
        print STDERR "Unable to connect to MQ!  Sleeping 1 second then trying again...\n";
        sleep 1;
      }
    };
    $connect->();
 
    my $send = sub {
      my ($message) = @_;

      # This requests a reciept from the server.  This is very important!  Without this, the
      # server won't let us know if the message was received or not.
      $message->{receipt} = join('', rand_chars(set => 'alpha', size => 10)),

      while (1) {
        $sigpipe = 0;
        eval {
          $stomp->send({ %$message });
        };
        if (!$@ && !$sigpipe) {
          # No Perl exceptions and no SIGPIPE, but we can't advance until we get a receipt!
          # Wait 5 seconds for one, if it doesn't come, then we need to reconnect and try again.

          my $receipt;
          eval {
            $receipt = $stomp->receive_frame(5);
          };
          if (!$@ && $receipt && $receipt->headers->{'receipt-id'} eq $message->{'receipt'}) {
            # No Perl exception, we got a receipt and it matchs the message sent!
            # "return" triumphantly!
            return;
          }
        }

        # Otherwise, wait a second, reconnect and then try again.
        print STDERR "Connection to MQ broken!  Sleeping 1 second then attempting to reconnect...\n";
        sleep 1;
        $connect->();
      };
    };
 
    # Send messages forever...
    while (1) {
      # TODO: construct a real message!
      my $data = "test";

      $send->({
        destination => $QUEUE_NAME,
        body        => $data,
        persistent  => 'true',
      });
    }
 
    $stomp->disconnect();
  }
  main;

AUTHORS

Copyright 2010 David Snopek (http://www.hackyourlife.org)

LICENSE

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.