The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client

SYNOPSIS

  use Mojo::RabbitMQ::Client;

  # Supply URL according to (https://www.rabbitmq.com/uri-spec.html)
  my $client = Mojo::RabbitMQ::Client->new(
    url => 'amqp://guest:guest@127.0.0.1:5672/');

  # Catch all client related errors
  $client->catch(sub { warn "Some error caught in client"; });

  # When connection is in Open state, open new channel
  $client->on(
    open => sub {
      my ($client) = @_;

      # Create a new channel with auto-assigned id
      my $channel = Mojo::RabbitMQ::Client::Channel->new();

      $channel->catch(sub { warn "Error on channel received"; });

      $channel->on(
        open => sub {
          my ($channel) = @_;
          $channel->qos(prefetch_count => 1)->deliver;

          # Publish some example message to test_queue
          my $publish = $channel->publish(
            exchange    => 'test',
            routing_key => 'test_queue',
            body        => 'Test message',
            mandatory   => 0,
            immediate   => 0,
            header      => {}
          );
          # Deliver this message to server
          $publish->deliver;

          # Start consuming messages from test_queue
          my $consumer = $channel->consume(queue => 'test_queue');
          $consumer->on(message => sub { say "Got a message" });
          $consumer->deliver;
        }
      );
      $channel->on(close => sub { $log->error('Channel closed') });

      $client->open_channel($channel);
    }
  );

  # Start connection
  $client->connect();

  # Start Mojo::IOLoop if not running already
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

CONSUMER

  use Mojo::RabbitMQ::Client;
  my $consumer = Mojo::RabbitMQ::Client->consumer(
    url      => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo',
    defaults => {
      qos      => {prefetch_count => 1},
      queue    => {durable        => 1},
      consumer => {no_ack         => 0},
    }
  );

  $consumer->catch(sub { die "Some error caught in Consumer" } );
  $consumer->on('success' => sub { say "Consumer ready" });
  $consumer->on(
    'message' => sub {
      my ($consumer, $message) = @_;

      $consumer->channel->ack($message)->deliver;
    }
  );

  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

PUBLISHER

  use Mojo::RabbitMQ::Client;
  my $publisher = Mojo::RabbitMQ::Client->publisher(
    url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo'
  );

  $publisher->catch(sub { die "Some error caught in Publisher" } );
  $publisher->on('success' => sub { say "Publisher ready" });

  $publisher->publish('plain text');
  $publisher->publish({encode => { to => 'json'}});

  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

DESCRIPTION

Mojo::RabbitMQ::Client is a rewrite of AnyEvent::RabbitMQ to work on top of Mojo::IOLoop.

EVENTS

Mojo::RabbitMQ::Client inherits all events from Mojo::EventEmitter and can emit the following new ones.

connect

  $client->on(connect => sub {
    my ($client, $stream) = @_;
    ...
  });

Emitted when TCP/IP connection with RabbitMQ server is established.

open

  $client->on(open => sub {
    my ($client) = @_;
    ...
  });

Emitted AMQP protocol Connection.Open-Ok method is received.

close

  $client->on(close => sub {
    my ($client) = @_;
    ...
  });

Emitted on reception of Connection.Close-Ok method.

disconnect

  $client->on(close => sub {
    my ($client) = @_;
    ...
  });

Emitted when TCP/IP connection gets disconnected.

ATTRIBUTES

Mojo::RabbitMQ::Client has following attributes.

url

  my $url = $client->url;
  $client->url('rabbitmq://...');

heartbeat_timeout

  my $timeout = $client->heartbeat_timeout;
  $client->heartbeat_timeout(180);

METHODS

Mojo::RabbitMQ::Client inherits all methods from Mojo::EventEmitter and implements the following new ones.

connect

  $client->connect();

Tries to connect to RabbitMQ server and negotiate AMQP protocol.

close

  $client->close();

open_channel

  my $channel = Mojo::RabbitMQ::Client::Channel->new();
  ...
  $client->open_channel($channel);

delete_channel

  my $removed = $client->delete_channel($channel->id);

SEE ALSO

Mojo::RabbitMQ::Client::Channel, Mojo::RabbitMQ::Client::Consumer, Mojo::RabbitMQ::Client::Publisher

COPYRIGHT AND LICENSE

Copyright (C) 2015-2016, Sebastian Podjasek and others

Based on AnyEvent::RabbitMQ - Copyright (C) 2010 Masahito Ikuta, maintained by bobtfish@bobtfish.net

This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.