Net::Async::AMQP::ConnectionManager - handle MQ connections
version 2.000
use IO::Async::Loop; use Net::Async::AMQP; my $loop = IO::Async::Loop->new; $loop->add( my $cm = Net::Async::AMQP::ConnectionManager->new ); $cm->add( host => 'localhost', user => 'guest', pass => 'guest', vhost => 'vhost', ); $cm->request_channel->then(sub { my $ch = shift; Future->needs_all( $ch->declare_exchange( 'exchange_name' ), $ch->declare_queue( 'queue_name' ), )->transform(done => sub { $ch }) })->then(sub { my $ch = shift; $ch->bind_queue( 'exchange_name', 'queue_name', '*' ) })->get;
Each connection has N total available channels, recorded in a hash. The total number of channels per connection is negotiated via the initial AMQP Tune/TuneOk sequence on connection.
We also maintain lists:
Unassigned channel - these are channels which were in use and have now been released.
Closed channel - any time a channel is closed, the ID is pushed onto this list so we can reopen it later without needing to scan the hash, contains arrayrefs of [$mq_conn, $id]
Highest-assigned ID is also recorded per connection.
if(have unassigned) { return shift unassigned } elsif(have closed) { my $closed = shift closed; return $closed->{mq}->open_channel($closed->{id}) } elsif(my $next_id = $mq->next_id) { return $mq->open_channel($next_id) } else { }
Calling methods on the channel proxy will establish a cycle for the duration of the pending request. This cycle will not be resolved until after all the callbacks have completed for a given request.
The channel object does not expose any methods that allow altering QoS or other channel state settings. These must be requested on channel assignment. This does not necessarily mean that any QoS change will require allocation of a new channel.
Bypassing the proxy object to change QoS flags is not recommended.
Connections are established on demand.
Attempts to assign a channel with the given QoS settings.
Available QoS settings are:
prefetch_count - number of messages that can be delivered at a time
prefetch_size - total size of messages allowed before acknowledging
confirm_mode - explicit publish ack
Confirm mode isn't really QoS but it fits in with the others since it modifies the channel state (and once enabled, cannot be disabled without closing and reopening the channel).
Will resolve to a Net::Async::AMQP::ConnectionManager::Channel instance on success.
A constant which indicates whether we can reopen channels. The AMQP0.9.1 spec doesn't seem to explicitly allow this, but it works with RabbitMQ 3.4.3 (and probably older versions) so it's enabled by default.
Returns the channel retry count. The default is 10, call "configure" with undef to retry indefinitely, 0 to avoid retrying at all:
# Keep trying until it works $mq->configure(channel_retry_count => undef); # Don't retry at all $mq->configure(channel_retry_count => 0);
Returns the current connection timeout. undef/zero means "no timeout".
Set QoS on the given channel.
Expects the Net::Async::AMQP::Channel object as the first parameter, followed by the key/value pairs corresponding to the desired QoS settings:
prefetch_count - number of messages that can be delivered before ACK is required
Returns a Future which will resolve to the original Net::Async::AMQP::Channel instance.
Attempts to connect to one of the known AMQP servers.
Returns the next AMQP host.
Attempts a connection to an AMQP host.
Indicate that this connection has already allocated all available channels.
Returns a key that represents the given arguments.
Called when one of our channels has been closed.
Releases the given channel back to our channel pool.
Returns true if this connection is one we know about, false if it's closed or otherwise not usable.
Adds connection details for an AMQP server to the pool.
Releases a connection.
Doesn't really do anything.
add_child, adopt_future, can_event, children, configure_unknown, debug_printf, get_loop, invoke_error, invoke_event, loop, make_event_cb, maybe_invoke_event, maybe_make_event_cb, new, notifier_name, parent, remove_child, remove_from_parent
Tom Molesworth <TEAM@cpan.org>
Licensed under the same terms as Perl itself, with additional licensing terms for the MQ spec to be found in share/amqp0-9-1.extended.xml ('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol ("AMQP") Specification').
share/amqp0-9-1.extended.xml
To install Net::Async::AMQP, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Net::Async::AMQP
CPAN shell
perl -MCPAN -e shell install Net::Async::AMQP
For more information on module installation, please visit the detailed CPAN module installation guide.