NAME

POEx::ZMQ::Socket - A POE-enabled ZeroMQ socket

SYNOPSIS

  use POE;
  # Imports POEx::ZMQ::Socket and POEx::ZMQ::Constants ->
  use POEx::ZMQ;

  POE::Session->create(
    inline_states => +{
      _start => sub {
        # Set up a Context and save it for creating sockets later:
        $_[HEAP]->{ctx} = POEx::ZMQ->context;

        # Create a ZMQ_ROUTER socket associated with our Context:
        $_[HEAP]->{rtr} = POEx::ZMQ::Socket->new(
          context => $_[HEAP]->{ctx},
          type    => ZMQ_ROUTER,
        );

        # Set up the backend socket and start accepting/emitting events:
        $_[HEAP]->{rtr}->start;

        # Bind to a local TCP endpoint:
        $_[HEAP]->{rtr}->bind( 'tcp://127.0.0.1:1234' );
      },

      zmq_recv_multipart => sub {
        # ROUTER got message from REQ / DEALER
        # parts are available as a List::Objects::WithUtils::Array ->
        my $parts = $_[ARG0];

        # ROUTER receives [ IDENTITY, NULL, MSG .. ]:
        my $route = $parts->items_before(sub { $_ eq '' });
        my $body  = $parts->items_after(sub { $_ eq '' });

        my $response;
        # ... do work ...
        # Send a response back:
        $_[KERNEL]->post( $_[SENDER], send_multipart =>
          [ $route->all, '', $response ]
        );
      },
    },
  );

  POE::Kernel->run;

DESCRIPTION

An asynchronous POE-powered ZeroMQ socket.

These objects are event emitters powered by MooX::Role::POE::Emitter. That means they come with flexible event processing / dispatch / multiplexing options. See the MooX::Role::Pluggable and MooX::Role::POE::Emitter documentation for details.

ATTRIBUTES

type

Required; the socket type, as a constant.

See zmq_socket(3) for details on socket types.

See POEx::ZMQ::Constants for a ZeroMQ constant exporter.

ipv6

If set to true, IPv6 support is enabled via the appropriate socket option (ZMQ_IPV4ONLY or ZMQ_IPV6 depending on your ZeroMQ version) when the emitter is started.

Defaults to false.

max_queue_size

Socket types that would normally block or return EFSM (for example, out-of-order REP/REQ communication) will queue messages instead to avoid blocking the event loop; max_queue_size is the maximum number of messages queued application-side before "max_queue_action" is invoked.

This is not related to messages queued on the ZeroMQ side; see zmq_socket(3) for details on socket behavior.

Defaults to 0 (unlimited)

max_queue_action

The action to take during "send" invocation when the application-side outgoing message queue reaches "max_queue_size".

If set to drop, new messages will be dropped.

If set to warn, a warning will be issued and new messages will be dropped.

If set to die, a stack trace is thrown.

If set to a coderef:

  max_queue_action => sub {
    my ($buf_item, $queue) = @_;
    # Drop old and try again, for example:
    $queue->shift;
    1
  },

... the subroutine is invoked and passed the POEx::ZMQ::Buffered object for the message and the current application-side outgoing message queue as a List::Objects::WithUtils::Array (respectively). This can be used to manually munge your outgoing queue yourself or perform some other action; if the given subroutine returns a boolean true value, another socket write will be attempted after the subroutine returns.

Defaults to die.

context

The POEx::ZMQ::FFI::Context backend context object.

zsock

The POEx::ZMQ::FFI::Socket backend socket object.

METHODS

start

Start the emitter and set up the associated socket.

This method must be called to create the backend ZeroMQ socket and start the emitter's POE::Session.

Returns the object.

stop

Stop the emitter; a zmq_close(3) will be issued for the socket and "zsock" will be cleared.

Buffered items are not removed; "get_buffered_items" can be used to retrieve them for feeding to a new socket object's "send" method. See POEx::ZMQ::Buffered.

zmq_version

Returns the ZeroMQ version as a struct-like object; see "get_version" in POEx::ZMQ::FFI.

get_buffered_items

Returns (a shallow copy of) the List::Objects::WithUtils::Array containing messages currently buffered on the POE component (due to a backend ZeroMQ socket's blocking behavior; see zmq_socket(3)).

This will not return messages queued on the ZeroMQ side.

Each item is a POEx::ZMQ::Buffered object; look there for attribute documentation. These can also be fed back to "send" after retrieval from a dead socket, for example:

  $old_socket->stop;  # Shut down this socket
  my $pending = $old_socket->get_buffered_items;
  $new_socket->send($_) for $pending->all;

get_context_opt

Retrieve context option values.

See "get_ctx_opt" in POEx::ZMQ::FFI::Context & zmq_ctx_get(3)

set_context_opt

Set context option values.

See "set_ctx_opt" in POEx::ZMQ::FFI::Context & zmq_ctx_set(3)

Returns the invocant.

get_socket_opt

  my $last_endpt = $sock->get_sock_opt( ZMQ_LAST_ENDPOINT );

Get socket option values.

See "get_sock_opt" in POEx::ZMQ::FFI::Socket & zmq_getsockopt(3).

set_socket_opt

  $sock->set_sock_opt( ZMQ_LINGER, 0 );

Set socket option values.

See "set_sock_opt" in POEx::ZMQ::FFI::Socket & zmq_setsockopt(3).

Returns the invocant.

bind

  $sock->bind( @endpoints );

Call a zmq_bind(3) for one or more specified endpoints.

A "bind_added" event is emitted for each added endpoint.

Returns the invocant.

unbind

  $sock->unbind( @endpoints );

Call a zmq_unbind(3) for one or more specified endpoints.

A "bind_removed" event is emitted for each removed endpoint.

Returns the invocant.

connect

  $sock->connect( @endpoints );

Call a zmq_bind(3) for one or more specified endpoints.

A "connect_added" event is emitted for each added endpoint.

Returns the invocant.

disconnect

  $sock->disconnect( @endpoints );

Call a zmq_disconnect(3) for one or more specified endpoints.

A "disconnect_issued" event is emitted for each removed endpoint.

Returns the invocant.

send

  $sock->send( $msg, $flags );

Send a single-part message (without blocking).

Sending will not block, regardless of the typical behavior of the ZeroMQ socket. See "max_queue_size" for details on queuing behavior.

Returns the invocant.

send_multipart

  $sock->send_multipart( [ @parts ], $flags );
  # A ROUTER sending to $id ->
  $rtr->send_multipart( [ $id, '', $msg ], $flags );

Send a multi-part message.

Applies the same application-side queuing behavior as "send"; see "max_queue_size".

Returns the invocant.

ACCEPTED EVENTS

These POE events take the same arguments as their object-oriented counterparts documented in "METHODS":

bind
unbind
connect
disconnect
send
send_multipart

EMITTED EVENTS

Emitted events are prefixed with the value of the "event_prefix" in MooX::Role::POE::Emitter attribute; by default, zmq_.

bind_added

Emitted when a "bind" is issued for an endpoint; $_[ARG0] is the bound endpoint.

bind_removed

Emitted when a "unbind" is issued for an endpoint; $_[ARG0] is the unbound endpoint.

connect_added

Emitted when a "connect" is issued for an endpoint; $_[ARG0] is the target endpoint.

disconnect_issued

Emitted when a "disconnect" is issued for an endpoint; $_[ARG0] is the disconnecting endpoint.

recv

  sub zmq_recv {
    my $msg = $_[ARG0];
    $_[KERNEL]->post( $_[SENDER], send => 'bar' ) if $msg eq 'foo';
  }

Emitted when a single-part message is received; $_[ARG0] is the message item.

recv_multipart

  # A ROUTER receiving from REQ, for example:
  sub zmq_recv_multipart {
    my $parts = $_[ARG0];
    my ($id, undef, $content) = @$parts;

    my $response = 'bar' if $content eq 'foo';

    $_[KERNEL]->post( $_[SENDER], send_multipart =>
      [ $id, '', $response ]
    );
  }

  # ... or with more complex routing envelopes:
  sub zmq_recv_multipart {
    my $parts = $_[ARG0];
    # pop() the application-relevant body:
    my $body = $parts->pop;
    # Then include the envelope (including empty delimiter msg) later:
    $_[KERNEL]->post( $_[SENDER], send_multipart =>
      [ $parts->all, $response ]
    );
  }

Emitted when a multipart message is received.

$_[ARG0] is a List::Objects::WithUtils::Array array-type object containing the message parts. This makes basic handling tasks easy, such as splitting multipart bodies and the routing envelope on an empty part delimiter:

  my $envelope = $parts->items_before(sub { $_ eq '' });
  my $content  = $parts->items_after(sub { $_ eq '' });
  # ... returning a reply later:
  $zsock->send_multipart(
    [ $envelope->all, '', @parts ]
  );

CONSUMES

MooX::Role::POE::Emitter, which in turn consumes MooX::Role::Pluggable.

SEE ALSO

zmq(7)

zmq_socket(3)

POEx::ZMQ::FFI::Context for details on the ZeroMQ context backend.

POEx::ZMQ::FFI::Socket for details on the ZeroMQ socket backend.

ZMQ::FFI for a loop-agnostic ZeroMQ implementation.

AUTHOR

Jon Portnoy <avenj@cobaltirc.org>

Licensed under the same terms as Perl.