The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

Net::Async::AMQP - provides client interface to AMQP using IO::Async

VERSION

version 0.002

SYNOPSIS

 use IO::Async::Loop;
 use Net::Async::AMQP;
 my $amqp = Net::Async::AMQP->new(loop => my $loop = IO::Async::Loop->new);
 $amqp->connect(
   host => 'localhost',
   user => 'guest',
   pass => 'guest',
   on_connected => sub { ... }
 );
 $loop->run;

DESCRIPTION

Does AMQP things. Note that the API may change before the stable 1.000 release - "SEE ALSO" has some alternative modules if you're looking for something that has been around for longer.

CONSTANTS

AUTH_MECH

Defines the mechanism used for authentication. Currently only AMQPLAIN is supported.

PAYLOAD_HEADER_LENGTH

Length of header used in payload messages. Defined by the AMQP standard.

MAX_FRAME_SIZE

Largest amount of data we'll attempt to send in a single frame. Actual frame limit will be negotiated with the remote server.

DEBUG

Debugging flag - set PERL_AMQP_DEBUG to 1 in the environment to enable informational messages to STDERR.

HEARTBEAT_INTERVAL

Interval in seconds between heartbeat frames, zero to disable. Can be overridden by PERL_AMQP_HEARTBEAT_INTERVAL in the environment, default is 60s.

PACKAGE VARIABLES

$XML_SPEC

This defines the path to the AMQP XML spec, which Net::AMQP uses to create methods and handlers for the appropriate version of the MQ protocol.

Defaults to an extended version of the 0.9.1 protocol as used by RabbitMQ, this is found in the amqp0-9-1.extended.xml distribution sharedir (see File::ShareDir).

METHODS

new

Constructor. Takes the following parameters:

  • loop - the IO::Async::Loop which we should add ourselves to

  • heartbeat_interval - (optional) interval between heartbeat messages, default is set by the "HEARTBEAT_INTERVAL" constant

Returns $self.

connect

Takes the following parameters:

  • port - the AMQP port, defaults to 5672, can be a service name if preferred

  • host - host to connect to, defaults to localhost

  • local_host - our local IP to connect from

  • user - which user to connect as, defaults to guest

  • pass - the password for this user, defaults to guest

  • on_connected - callback for when we establish a connection

  • on_error - callback for any errors encountered during connection

Returns $self.

handle_heartbeat_failure

Called when heartbeats are enabled and we've had no response from the server for 3 heartbeat intervals. We'd expect some frame from the remote - even if just a heartbeat frame - at least once every heartbeat interval so if this triggers then we're likely dealing with a dead or heavily loaded server.

This will invoke the "heartbeat_failure event" then close the connection.

send_heartbeat

Sends the heartbeat frame.

post_connect

Sends initial startup header and applies listener for the Connection::Start message.

Returns $self.

setup_tuning

Applies listener for the Connection::Tune message, used for determining max frame size and heartbeat settings.

Returns $self.

open_connection

Establish a new connection to a vhost - this is called after tuning is complete, and must happen before any channel connections are attempted.

Returns $self.

setup_connection

Applies listener for the Connection::OpenOk message, which triggers the connected event.

Returns $self.

next_channel

Returns the next available channel ready for "open_channel". Note that whatever it reports will be completely wrong if you've manually specified a channel anywhere, so don't do that.

open_channel

Opens a new channel.

Returns the new Net::Async::AMQP::Channel instance.

close

Close the connection.

Returns a Future which will resolve with $self when the connection is closed.

next_pending

Retrieves the next pending handler for the given incoming frame type (see "get_frame_type"), and calls it.

Takes the following parameters:

  • $type - the frame type, such as 'Basic::ConnectOk'

  • $frame - the frame itself

Returns $self.

METHODS - Accessors

loop

IO::Async::Loop container.

frame_max

Maximum number of bytes allowed in any given frame.

last_frame_time

Timestamp of the last frame we received from the remote. Used for handling heartbeats.

stream

Returns the current IO::Async::Stream for the AMQP connection.

incoming_message

Future for the current incoming message (received in two or more parts: the header then all body chunks).

METHODS - Internal

The following methods are intended for internal use. They are documented for completeness but should not normally be needed outside this library.

push_pending

Adds the given handler(s) to the pending handler list for the given type(s).

Takes one or more of the following parameter pairs:

  • $type - the frame type, see "get_frame_type"

  • $code - the coderef to call, will be invoked once as follows when a matching frame is received:

     $code->($self, $frame, @_)

Returns $self .

remove_pending

Removes a coderef from the pending event handler.

Returns $self .

write

Writes data to the server.

get_frame_type

Takes the following parameters:

Returns string representing type, typically the base class with Net::AMQP::Protocol prefix removed.

process_frame

Process a single incoming frame.

Takes the following parameters:

Returns $self.

split_payload

Splits a message into separate frames.

Takes the $payload as a scalar containing byte data, and the following parameters:

  • exchange - where we're sending the message

  • routing_key - other part of message destination

Returns list of frames suitable for passing to "send_frame".

send_frame

Send a single frame.

Takes the $frame instance followed by these optional named parameters:

  • channel - which channel we should send on

Returns $self.

reset_heartbeat

Resets our side of the heartbeat timer.

This is used to ensure we send data at least once every "heartbeat_interval" seconds.

EVENTS

The following events may be raised by this class - use "subscribe_to_event" in Mixin::Event::Dispatch to watch for them:

 $mq->subscribe_to_event(
   heartbeat_failure => sub {
     my ($ev, $last) = @_;
         print "Heartbeat failure detected\n";
   }
 );

connected event

Called after the connection has been opened.

close event

Called after the remote has closed the connection.

heartbeat_failure event

Raised if we receive no data from the remote for more than 3 heartbeat intervals and heartbeats are enabled,

unexpected_frame event

If we receive an unsolicited frame from the server this event will be raised:

 $mq->subscribe_to_event(
  unexpected_frame => sub {
   my ($ev, $type, $frame) = @_;
   warn "Frame type $type received: $frame\n";
  }
 )

SEE ALSO

INHERITED METHODS

Mixin::Event::Dispatch

add_handler_for_event, clear_event_handlers, event_handlers, invoke_event, subscribe_to_event, unsubscribe_from_event

AUTHOR

Tom Molesworth <cpan@entitymodel.com>

LICENSE

Licensed under the same terms as Perl itself, with additional licensing terms for the MQ spec to be found in share/amqp0-9-1.extended.xml ('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol ("AMQP") Specification').