The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Reflexive::ZmqSocket - Provides a reflexy way to talk over ZeroMQ sockets

VERSION

version 1.130710

SYNOPSIS

    package App::Test;
    use Moose;
    extends 'Reflex::Base';
    use Reflex::Trait::Watched qw/ watches /;
    use Reflexive::ZmqSocket::RequestSocket;
    use ZeroMQ::Constants(':all');

    watches request => (
        isa => 'Reflexive::ZmqSocket::RequestSocket',
        clearer => 'clear_request',
        predicate => 'has_request',
    );

    sub init {
        my ($self) = @_;

        my $req = Reflexive::ZmqSocket::RequestSocket->new(
            endpoints => [ 'tcp://127.0.0.1:54321' ],
            endpoint_action => 'bind',
            socket_options => {
                +ZMQ_LINGER ,=> 1,
            },
        );

        $self->request($req);
    }

    sub BUILD {
        my ($self) = @_;
        
        $self->init();
    }

    sub on_request_message {
        my ($self, $msg) = @_;
    }

    sub on_request_multipart_message {
        my ($self, $msg) = @_;
        my @parts = map { $_->data } $msg->all_parts;
    }

    sub on_request_socket_flushed {
        my ($self) = @_;
    }
    
    sub on_request_socket_error {
        my ($self, $msg) = @_;
    }

    sub on_request_connect_error {
        my ($self, $msg) = @_;
    }

    sub on_request_bind_error {
        my ($self, $msg) = @_;
    }

    __PACKAGE__->meta->make_immutable();

DESCRIPTION

Reflexive::ZmqSocket provides a reflexy way to participate in ZeroMQ driven applications. A number of events are emitted from the instantiated objects of this class and its subclasses. On successful reads, either "message" or "multipart_message" is emitted. For errors, "socket_error" is emitted. See "EMITTED_EVENTS" for more informations.

PUBLIC_ATTRIBUTES

socket_type

    is: ro, isa: enum, lazy: 1

This attribute holds what type of ZeroMQ socket should be built. It must be one of the constants exported by the ZeroMQ::Constants package. The attribute is populated by default in the various subclasses.

endpoints

    is: ro, isa: ArrayRef[Str], traits: Array, predicate: has_endpoints

This attribute holds an array reference of all of the endpoints to which the socket should either bind or connect.

The following methods are delegated to this attribute:

    endpoints_count
    all_endpoints

endpoint_action

    is: ro, isa: enum(bind, connect), predicate: has_endpoint_action

This attribute determines the socket action to take against the provided endpoints. While ZeroMQ allows sockets to both connect and bind, this module limits it to either/or. Patches welcome :)

    is: ro, isa: HashRef

This attribute has the options for the socket. Options are applied at BUILD time but before any action is taken on the end points. This allows for things like setting the ZMQ_IDENTITY

context

    is: ro, isa: ZeroMQ::Context

This attribute holds the context that is required for building sockets.

socket

    is: ro, isa: ZeroMQ::Socket

This attribute holds the actual ZeroMQ socket created. The following methods are delegated to this attribute:

    recv
    getsockopt
    setsockopt
    close
    connect
    bind

NOTE: close() is advised to stop polling the zmq_fd /before/ the call to the underlying zmq_close. This means that items in the "buffer" may not be sent or owned by zmq and you are responsible for managing these items.

PROTECTED_ATTRIBUTES

active

    is: ro, isa: Bool, default: true

This attribute controls whether the socket is observed or not for reads/writes according to Reflex

    is: ro, isa: FileHandle

This attribute contains a file handle built from the cloned file descriptor from inside the ZeroMQ::Socket. This is where the magic happens in how we poll for non-blocking IO.

buffer

    is: ro, isa: ArrayRef, traits: Array

Thie attribute is an internal buffer used for non-blocking writes.

The following methods are delegated to this attribute:

    buffer_count
    dequeue_item
    enqueue_item
    putback_item

PUBLIC_METHODS

send

This method is for sending messages through the "socket". It is non-blocking and will return the current buffer count.

PROTECTED_METHODS

initialize_endpoints

This method attempts the defined "endpoint_action" against the provided "endpoints". This method is called at BUILD if "active" is true. To defer initialization, simply set "active" to false.

If the provided action against a particular endpoint fails, a connect_error event will be emitted

PRIVATE_METHODS

zmq_writable

This method is used internally to handle when the ZeroMQ socket is ready for writing. This method can emit socket_error for various issues.

zmq_readable

This method is used internally by reflex to actually read from the ZeroMQ socket when it is readable. This method can emit socket_error when problems occur. For successful reads, either message or multipart_message will be emitted.

do_read

This private method does the actual reading from the socket.

EMITTED_EVENTS

message

message is emitted when a successful read occurs on the socket. When this event is emitted, the payload is a single message (in terms of ZeroMQ this is the result of the other end sending a message wuthout using SNDMORE). See Reflexive::ZmqSocket::ZmqMessage for more information.

multipart_message

multipart_message is emitted when multipart message is read from the socket. See Reflexive::ZmqSocket::ZmqMultiPartMessage for more information.

ACKNOWLEDGEMENTS

This module was originally developed for Booking.com and through their gracious approval, we've released this module to CPAN.

AUTHORS

  • Nicholas R. Perez <nperez@cpan.org>

  • Steffen Mueller <smueller@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2012 by Nicholas R. Perez <nperez@cpan.org>.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.