$POEx::ZMQ::FFI::Socket::VERSION
=
'0.002002'
;
sub
OPTVAL_MAXLEN () { 256 }
has
context
=> (
required
=> 1,
is
=>
'ro'
,
isa
=> ZMQContext,
);
has
type
=> (
required
=> 1,
is
=>
'ro'
,
isa
=> ZMQSocketType,
);
has
soname
=> (
lazy
=> 1,
is
=>
'ro'
,
isa
=> Str,
builder
=>
sub
{
shift
->context->soname },
);
has
_ffi
=> (
lazy
=> 1,
is
=>
'ro'
,
isa
=> InstanceOf[
'POEx::ZMQ::FFI::Callable'
],
builder
=>
sub
{
my
$soname
=
shift
->soname;
POEx::ZMQ::FFI::Callable->new(
zmq_socket
=> FFI::Raw->new(
$soname
,
zmq_socket
=>
FFI::Raw::ptr,
FFI::Raw::ptr,
FFI::Raw::
int
,
),
zmq_getsockopt
=> FFI::Raw->new(
$soname
,
zmq_getsockopt
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::ptr,
),
int_zmq_setsockopt
=> FFI::Raw->new(
$soname
,
zmq_setsockopt
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::
int
,
),
str_zmq_setsockopt
=> FFI::Raw->new(
$soname
,
zmq_setsockopt
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::
int
,
FFI::Raw::str,
FFI::Raw::
int
,
),
zmq_connect
=> FFI::Raw->new(
$soname
,
zmq_connect
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::str,
),
zmq_disconnect
=> FFI::Raw->new(
$soname
,
zmq_disconnect
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::str,
),
zmq_bind
=> FFI::Raw->new(
$soname
,
zmq_bind
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::str,
),
zmq_unbind
=> FFI::Raw->new(
$soname
,
zmq_unbind
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::str,
),
zmq_msg_init
=> FFI::Raw->new(
$soname
,
zmq_msg_init
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
),
zmq_msg_init_size
=> FFI::Raw->new(
$soname
,
zmq_msg_init_size
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::
int
,
),
zmq_msg_size
=> FFI::Raw->new(
$soname
,
zmq_msg_size
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
),
zmq_msg_data
=> FFI::Raw->new(
$soname
,
zmq_msg_data
=>
FFI::Raw::ptr,
FFI::Raw::ptr,
),
zmq_msg_close
=> FFI::Raw->new(
$soname
,
zmq_msg_close
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
),
zmq_msg_recv
=> FFI::Raw->new(
$soname
,
zmq_msg_recv
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::ptr,
FFI::Raw::
int
,
),
zmq_send
=> FFI::Raw->new(
$soname
,
zmq_send
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
FFI::Raw::str,
FFI::Raw::
int
,
FFI::Raw::
int
,
),
zmq_close
=> FFI::Raw->new(
$soname
,
zmq_close
=>
FFI::Raw::
int
,
FFI::Raw::ptr,
),
memcpy
=> FFI::Raw->new(
undef
,
memcpy
=>
FFI::Raw::ptr,
FFI::Raw::ptr,
FFI::Raw::ptr,
FFI::Raw::
int
,
),
)
},
);
has
_socket_ptr
=> (
lazy
=> 1,
is
=>
'ro'
,
isa
=> Defined,
writer
=>
'_set_socket_ptr'
,
predicate
=>
'_has_socket_ptr'
,
builder
=>
sub
{
my
(
$self
) =
@_
;
$self
->_ffi->zmq_socket(
$self
->context->get_raw_context,
$self
->type )
},
);
sub
get_raw_socket {
shift
->_socket_ptr }
has
_stored_handle
=> (
lazy
=> 1,
is
=>
'ro'
,
isa
=> FileHandle,
writer
=>
'_set_stored_handle'
,
clearer
=>
'_clear_stored_handle'
,
builder
=>
sub
{
my
$fno
=
shift
->get_sock_opt( ZMQ_FD );
IO::Handle->new_from_fd(
$fno
,
'r'
);
},
);
sub
get_handle {
shift
->_stored_handle }
sub
BUILD {
my
(
$self
) =
@_
;
$self
->
recv
while
$self
->has_event_pollin;
}
sub
DEMOLISH {
my
(
$self
,
$gd
) =
@_
;
return
if
$gd
;
$self
->warn_if_error(
zmq_close
=>
$self
->_ffi->zmq_close(
$self
->_socket_ptr )
)
if
$self
->_has_socket_ptr;
Time::HiRes::
sleep
0.01;
$self
->_clear_stored_handle;
}
our
$KnownTypes
= hash;
$KnownTypes
->set(
$_
=>
'int'
)
for
(
ZMQ_BACKLOG,
ZMQ_CONFLATE,
ZMQ_DELAY_ATTACH_ON_CONNECT,
ZMQ_EVENTS,
ZMQ_FD,
ZMQ_IMMEDIATE,
ZMQ_IPV4ONLY,
ZMQ_IPV6,
ZMQ_LINGER,
ZMQ_MULTICAST_HOPS,
ZMQ_PLAIN_SERVER,
ZMQ_CURVE_SERVER,
ZMQ_PROBE_ROUTER,
ZMQ_RATE,
ZMQ_RECOVERY_IVL,
ZMQ_RECONNECT_IVL,
ZMQ_RECONNECT_IVL_MAX,
ZMQ_REQ_CORRELATE,
ZMQ_REQ_RELAXED,
ZMQ_ROUTER_MANDATORY,
ZMQ_ROUTER_RAW,
ZMQ_RCVBUF,
ZMQ_RCVMORE,
ZMQ_RCVHWM,
ZMQ_RCVTIMEO,
ZMQ_SNDHWM,
ZMQ_SNDTIMEO,
ZMQ_SNDBUF,
ZMQ_XPUB_VERBOSE,
);
$KnownTypes
->set(
$_
=>
'uint64'
)
for
(
ZMQ_AFFINITY,
ZMQ_MAXMSGSIZE,
);
$KnownTypes
->set(
$_
=>
'binary'
)
for
(
ZMQ_IDENTITY,
ZMQ_SUBSCRIBE,
ZMQ_UNSUBSCRIBE,
ZMQ_CURVE_PUBLICKEY,
ZMQ_CURVE_SECRETKEY,
ZMQ_CURVE_SERVERKEY,
ZMQ_TCP_ACCEPT_FILTER,
);
$KnownTypes
->set(
$_
=>
'string'
)
for
(
ZMQ_LAST_ENDPOINT,
ZMQ_PLAIN_USERNAME,
ZMQ_PLAIN_PASSWORD,
ZMQ_ZAP_DOMAIN,
);
sub
known_type_for_opt {
$KnownTypes
->get(
$_
[1]) }
sub
get_sock_opt {
my
(
$self
,
$opt
,
$type
) =
@_
;
my
(
$val
,
$ptr
,
$len
);
unless
(
defined
$type
) {
$type
=
$self
->known_type_for_opt(
$opt
)
// confess
"No return type specified and none known to us (opt $opt)"
}
if
(
$type
eq
'binary'
||
$type
eq
'string'
) {
$ptr
= FFI::Raw::memptr( OPTVAL_MAXLEN );
$len
=
pack
'L!'
, OPTVAL_MAXLEN;
}
else
{
$val
= POEx::ZMQ::FFI->zpack(
$type
, 0);
$ptr
=
unpack
'L!'
,
pack
'P'
,
$val
;
$len
=
pack
'L!'
,
length
$val
;
}
my
$len_ptr
=
unpack
'L!'
,
pack
'P'
,
$len
;
$self
->throw_if_error(
zmq_getsockopt
=>
$self
->_ffi->zmq_getsockopt(
$self
->_socket_ptr,
$opt
,
$ptr
,
$len_ptr
)
);
POEx::ZMQ::FFI->zunpack(
$type
,
$val
,
$ptr
,
$len
)
}
sub
set_sock_opt {
my
(
$self
,
$opt
,
$val
,
$type
) =
@_
;
unless
(
defined
$type
) {
$type
=
$self
->known_type_for_opt(
$opt
)
// confess
"No opt type specified and none known to us (opt $opt)"
}
if
(
$type
eq
'binary'
||
$type
eq
'string'
) {
$self
->throw_if_error(
zmq_setsockopt
=>
$self
->_ffi->str_zmq_setsockopt(
$self
->_socket_ptr,
$opt
,
$val
,
length
$val
)
);
}
else
{
my
$packed
= POEx::ZMQ::FFI->zpack(
$type
,
$val
);
my
$ptr
=
unpack
'L!'
,
pack
'P'
,
$packed
;
$self
->throw_if_error(
zmq_setsockopt
=>
$self
->_ffi->int_zmq_setsockopt(
$self
->_socket_ptr,
$opt
,
$ptr
,
length
$packed
)
)
}
$self
}
sub
connect
{
my
(
$self
,
$endpoint
) =
@_
;
confess
"Expected an endpoint"
unless
defined
$endpoint
;
$self
->throw_if_error(
zmq_connect
=>
$self
->_ffi->zmq_connect(
$self
->_socket_ptr,
$endpoint
)
);
$self
}
sub
disconnect {
my
(
$self
,
$endpoint
) =
@_
;
confess
"Expected an endpoint"
unless
defined
$endpoint
;
$self
->throw_if_error(
zmq_disconnect
=>
$self
->_ffi->zmq_disconnect(
$self
->_socket_ptr,
$endpoint
)
);
$self
}
sub
bind
{
my
(
$self
,
$endpoint
) =
@_
;
confess
"Expected an endpoint"
unless
defined
$endpoint
;
$self
->throw_if_error(
zmq_bind
=>
$self
->_ffi->zmq_bind(
$self
->_socket_ptr,
$endpoint
)
);
$self
}
sub
unbind {
my
(
$self
,
$endpoint
) =
@_
;
confess
"Expected an endpoint"
unless
defined
$endpoint
;
$self
->throw_if_error(
zmq_unbind
=>
$self
->_ffi->zmq_unbind(
$self
->_socket_ptr,
$endpoint
)
);
$self
}
sub
send
{
my
(
$self
,
$msg
,
$flags
) =
@_
;
$flags
//= 0;
my
$len
= bytes::
length
(
$msg
);
$self
->throw_if_error(
zmq_send
=>
$self
->_ffi->zmq_send(
$self
->_socket_ptr,
$msg
,
$len
,
$flags
)
);
$self
}
sub
send_multipart {
my
(
$self
,
$parts
,
$flags
) =
@_
;
confess
"Expected an ARRAY of message parts"
unless
Scalar::Util::reftype(
$parts
) eq
'ARRAY'
and
@$parts
;
$self
->
send
(
$parts
->[
$_
], ZMQ_SNDMORE )
for
0 .. (
$#$parts
- 1);
$self
->
send
(
$parts
->[-1],
$flags
);
}
sub
recv
{
my
(
$self
,
$flags
) =
@_
;
$flags
//= 0;
my
$zmsg_ptr
= FFI::Raw::memptr(40);
$self
->throw_if_error(
zmq_msg_init
=>
$self
->_ffi->zmq_msg_init(
$zmsg_ptr
)
);
my
$zmsg_len
;
$self
->throw_if_error(
zmq_msg_recv
=>
(
$zmsg_len
=
$self
->_ffi->zmq_msg_recv(
$zmsg_ptr
,
$self
->_socket_ptr,
$flags
)
)
);
my
$ret
;
if
(
$zmsg_len
) {
my
$data_ptr
=
$self
->_ffi->zmq_msg_data(
$zmsg_ptr
);
my
$content_ptr
= FFI::Raw::memptr(
$zmsg_len
);
$self
->_ffi->memcpy(
$content_ptr
,
$data_ptr
,
$zmsg_len
);
$ret
=
$content_ptr
->tostr(
$zmsg_len
);
}
else
{
$ret
=
''
}
$self
->_ffi->zmq_msg_close(
$zmsg_ptr
);
$ret
}
sub
recv_multipart {
my
(
$self
,
$flags
) =
@_
;
my
@parts
=
$self
->
recv
(
$flags
);
push
@parts
,
$self
->
recv
(
$flags
)
while
$self
->get_sock_opt(ZMQ_RCVMORE);
array(
@parts
)
}
sub
has_event_pollin {
my
(
$self
) =
@_
;
!! (
$self
->get_sock_opt(ZMQ_EVENTS) & ZMQ_POLLIN )
}
sub
has_event_pollout {
my
(
$self
) =
@_
;
!! (
$self
->get_sock_opt(ZMQ_EVENTS) & ZMQ_POLLOUT )
}
1;