BEGIN {
$AnyEvent::ZeroMQ::Handle::VERSION
=
'0.01'
;
}
use
POSIX
qw(EAGAIN EWOULDBLOCK)
;
has
'socket'
=> (
is
=>
'ro'
,
isa
=>
'ZeroMQ::Raw::Socket'
,
handles
=> [
qw/bind connect/
],
required
=> 1,
);
before
qw/bind connect/
=>
sub
{
$_
[0]->identity;
};
after
qw/bind connect/
=>
sub
{
my
$self
=
shift
;
$self
->
read
;
$self
->
write
;
};
has
'identity'
=> (
is
=>
'rw'
,
isa
=> IdentityStr,
lazy_build
=> 1,
trigger
=>
sub
{
shift
->_change_identity(
@_
) },
);
has
'on_read'
=> (
is
=>
'rw'
,
isa
=>
'CodeRef'
,
predicate
=>
'has_on_read'
,
clearer
=>
'clear_on_read'
,
trigger
=>
sub
{
$_
[0]->
read
},
);
has
'on_error'
=> (
is
=>
'rw'
,
isa
=>
'CodeRef'
,
predicate
=>
'has_on_error'
,
clearer
=>
'clear_on_error'
,
);
sub
handle_error {
my
(
$self
,
$str
) =
@_
;
return
$self
->on_error->(
$str
)
if
$self
->has_on_error;
warn
"AnyEvent::ZeroMQ::Handle: error in callback (ignoring): $str"
;
}
has
'on_drain'
=> (
is
=>
'rw'
,
isa
=>
'CodeRef'
,
predicate
=>
'has_on_drain'
,
clearer
=>
'clear_on_drain'
,
);
has
[
qw/read_watcher write_watcher/
] => (
init_arg
=>
undef
,
is
=>
'ro'
,
lazy_build
=> 1,
);
has
[
qw/read_buffer write_buffer/
] => (
init_arg
=>
undef
,
is
=>
'ro'
,
default
=>
sub
{ [] },
);
sub
_build_read_watcher {
my
$self
=
shift
;
weaken
$self
;
return
AnyEvent::ZeroMQ->io(
poll
=>
'r'
,
socket
=>
$self
->
socket
,
cb
=>
sub
{
$self
->
read
},
);
}
sub
_build_write_watcher {
my
$self
=
shift
;
weaken
$self
;
return
AnyEvent::ZeroMQ->io(
poll
=>
'w'
,
socket
=>
$self
->
socket
,
cb
=>
sub
{
$self
->
write
},
);
}
sub
_build_identity {
my
(
$self
) =
@_
;
return
$self
->
socket
->
getsockopt
( ZMQ_IDENTITY );
}
sub
_change_identity {
my
(
$self
,
$new
,
$old
) =
@_
;
return
$self
->
socket
->
setsockopt
( ZMQ_IDENTITY,
$new
);
}
sub
has_read_todo {
my
$self
=
shift
;
return
exists
$self
->read_buffer->[0];
}
sub
readable {
my
$self
=
shift
;
return
AnyEvent::ZeroMQ->probe(
poll
=>
'r'
,
socket
=>
$self
->
socket
);
}
sub
_read_once {
my
(
$self
,
$cb
) =
@_
;
local
$! = 0;
try
{
my
$msg
= ZeroMQ::Raw::Message->new;
$self
->
socket
->
recv
(
$msg
, ZMQ_NOBLOCK);
$cb
->(
$self
,
$msg
->data);
}
catch
{
if
($! == EWOULDBLOCK || $! == EAGAIN){
return
;
}
else
{
$self
->handle_error(
$_
);
}
};
}
sub
read
{
my
$self
=
shift
;
while
(
$self
->readable &&
$self
->has_read_todo){
$self
->_read_once(
shift
@{
$self
->read_buffer});
}
while
(
$self
->readable &&
$self
->has_on_read){
$self
->_read_once(
$self
->on_read);
}
if
(
$self
->has_read_todo ||
$self
->has_on_read){
$self
->read_watcher;
}
else
{
$self
->clear_read_watcher;
}
}
sub
push_read {
my
(
$self
,
$cb
) =
@_
;
push
@{
$self
->read_buffer},
$cb
;
$self
->
read
;
}
sub
has_write_todo {
my
$self
=
shift
;
return
exists
$self
->write_buffer->[0];
}
sub
writable {
my
$self
=
shift
;
return
AnyEvent::ZeroMQ->probe(
poll
=>
'w'
,
socket
=>
$self
->
socket
);
}
sub
build_message {
my
(
$self
,
$cb_or_msg
) =
@_
;
my
$msg
=
$cb_or_msg
;
if
(
my
$cb
= _CODELIKE(
$cb_or_msg
)){
$msg
=
$cb
->(
$self
);
}
return
$msg
if
ref
$msg
&& blessed
$msg
&&
$msg
->isa(
'ZeroMQ::Raw::Message'
);
return
ZeroMQ::Raw::Message->new_from_scalar(
$msg
)
if
defined
$msg
;
return
;
}
sub
write
{
my
$self
=
shift
;
$self
->clear_write_watcher;
my
$wrote_something
= 0;
while
(
$self
->writable &&
$self
->has_write_todo){
$wrote_something
++;
my
$buf
;
local
$! = 0;
try
{
$buf
=
shift
@{
$self
->write_buffer};
my
$msg
=
$self
->build_message(
$buf
);
$self
->
socket
->
send
(
$msg
, ZMQ_NOBLOCK)
if
$msg
;
}
catch
{
if
($! == EWOULDBLOCK || $! == EAGAIN){
unshift
@{
$self
->write_buffer},
$buf
if
defined
$buf
;
}
else
{
$self
->handle_error(
$_
);
}
}
}
$self
->on_drain->(
$self
,
$self
->writable)
if
$wrote_something
&&
$self
->has_on_drain;
$self
->write_watcher
if
$self
->has_write_todo;
}
sub
push_write {
my
(
$self
,
$msg
) =
@_
;
if
(_CODELIKE(
$msg
) || blessed
$msg
&&
$msg
->isa(
'ZeroMQ::Raw::Message'
)){
push
@{
$self
->write_buffer},
$msg
;
}
else
{
push
@{
$self
->write_buffer}, ZeroMQ::Raw::Message->new_from_scalar(
$msg
);
}
$self
->
write
;
}
'AnyEvent::ZeroMQ::Handle::Role::Writable'
,
'AnyEvent::ZeroMQ::Handle::Role::Generic'
;
__PACKAGE__->meta->make_immutable;