has
_handle
=> (
is
=>
'ro'
,
isa
=>
'AnyEvent::Handle'
,
predicate
=>
'_has_handle'
,
lazy_build
=> 1,
);
has
_queue
=> (
is
=>
'ro'
,
isa
=>
'ArrayRef'
,
lazy_build
=> 1,
);
has
_in_progress
=> (
is
=>
'rw'
,
isa
=>
'Int'
,
default
=> 0,
);
has
_callbacks
=> (
is
=>
'ro'
,
isa
=>
'HashRef'
,
lazy_build
=> 1,
);
has
_read_reply
=> (
is
=>
'ro'
,
isa
=>
'CodeRef'
,
lazy_build
=> 1,
);
has
_no_reply
=> (
is
=>
'ro'
,
isa
=>
'ArrayRef'
,
lazy_build
=> 1,
);
has
_on_drain
=> (
is
=>
'ro'
,
isa
=>
'CodeRef'
,
lazy_build
=> 1,
);
sub
fh {
return
$_
[0]->_has_handle &&
$_
[0]->_handle }
sub
send
{
my
$self
=
shift
;
if
(
$self
->_in_progress <
$self
->max_parallel ) {
$self
->_in_progress(
$self
->_in_progress + 1 );
$self
->_send(
@_
);
}
else
{
push
@{
$self
->_queue}, [
@_
];
}
return
;
}
sub
set_timeout {
my
(
$self
,
$timeout
) =
@_
;
$self
->_handle->timeout(
$timeout
)
if
$self
->_has_handle();
return
;
}
sub
_send {
my
(
$self
,
$msg
,
undef
,
$callback
,
$no_reply
,
$sync
) =
@_
;
$sync
=
$self
->_choose_sync()
unless
defined
$sync
;
my
$header
=
$self
->_pack_header(
$msg
,
length
$_
[2],
$sync
);
my
$server
=
$self
->server;
$self
->_callbacks->{
$sync
} =
$callback
;
$server
->_send_started(
$sync
,
$msg
,
$_
[2]);
my
$handle
=
$self
->_handle;
if
(
$server
->debug >= 5 ) {
$server
->_debug_dump(
'send header: '
,
$header
);
$server
->_debug_dump(
'send payload: '
,
$_
[2]);
}
$handle
->push_write(
$header
);
$handle
->push_write(
$_
[2]);
if
(
$no_reply
) {
push
@{
$self
->_no_reply},
$sync
;
$handle
->on_drain(
$self
->_on_drain )
unless
defined
$handle
->{on_drain};
}
else
{
$handle
->push_read(
chunk
=> 12,
$self
->_read_reply );
}
return
;
}
sub
_build__read_reply {
my
(
$self
) =
@_
;
my
$server
=
$self
->server;
weaken(
$self
);
weaken(
$server
);
return
sub
{
my
(
$handle
,
$data
) =
@_
;
my
$dump_resp
=
$server
->debug >= 6;
$server
->_debug_dump(
'recv header: '
,
$data
)
if
$dump_resp
;
my
(
$msg
,
$payload_length
,
$sync
) =
$self
->_unpack_header(
$data
);
$handle
->unshift_read(
chunk
=>
$payload_length
,
sub
{
my
(
$handle
,
$data
) =
@_
;
$server
->_debug_dump(
'recv payload: '
,
$data
)
if
$dump_resp
;
$server
->_recv_finished(
$sync
,
$msg
,
$data
);
$self
->_finish_and_start();
delete
(
$self
->_callbacks->{
$sync
})->(
$msg
,
$data
);
return
;
});
return
;
};
}
sub
_try_to_send {
my
(
$self
) =
@_
;
while
(
$self
->_in_progress <
$self
->max_parallel && (
my
$task
=
shift
@{
$self
->_queue }) ) {
$self
->_in_progress(
$self
->_in_progress + 1 );
$self
->_send(
@$task
);
}
return
;
}
sub
_finish_and_start {
my
(
$self
) =
@_
;
if
(
my
$task
=
shift
@{
$self
->_queue} ) {
$self
->_send(
@$task
);
}
else
{
$self
->_in_progress(
$self
->_in_progress - 1 );
}
return
;
}
sub
_build__handle {
my
(
$self
) =
@_
;
my
$server
=
$self
->server;
$server
->_debug(
"connecting"
)
if
$server
->debug >= 4;
weaken(
$self
);
weaken(
$server
);
return
AnyEvent::Handle->new(
connect
=> [
$self
->host,
$self
->port ],
no_delay
=>
$self
->tcp_nodelay,
keepalive
=>
$self
->tcp_keepalive,
timeout
=>
$self
->timeout,
on_prepare
=>
sub
{
return
$self
->connect_timeout;
},
on_connect
=>
sub
{
my
(
$handle
) =
@_
;
$server
->_debug(
"connected"
)
if
$server
->debug >= 1;
return
;
},
on_error
=>
sub
{
my
(
$handle
,
$fatal
,
$message
) =
@_
;
my
$errno
= $!;
$server
->_debug((
$fatal
?
'fatal '
:
''
) .
'error: '
.
$message
);
my
@callbacks
;
foreach
my
$sync
(
keys
%{
$self
->_callbacks} ) {
$server
->_recv_finished(
$sync
,
undef
,
undef
,
$message
,
$errno
);
$self
->_in_progress(
$self
->_in_progress - 1 );
push
@callbacks
,
$self
->_callbacks->{
$sync
};
}
$server
->active(0);
$self
->_clear_handle();
$self
->_clear_callbacks();
$self
->_clear_no_reply();
$server
->_debug(
'closing socket'
)
if
$server
->debug >= 1;
$handle
->destroy();
$self
->_try_to_send();
$_
->(
undef
,
undef
,
$message
,
$errno
)
foreach
@callbacks
;
return
;
},
on_timeout
=>
sub
{
my
(
$handle
) =
@_
;
return
unless
keys
%{
$self
->_callbacks};
$handle
->_error( Errno::ETIMEDOUT )
if
keys
%{
$self
->_callbacks};
return
;
},
);
}
sub
_build__on_drain {
my
(
$self
) =
@_
;
my
$server
=
$self
->server;
weaken(
$self
);
weaken(
$server
);
return
sub
{
my
(
$handle
) =
@_
;
if
(
$self
->_has_no_reply() ) {
foreach
my
$sync
( @{
$self
->_no_reply} ) {
$server
->_recv_finished(
$sync
,
undef
,
undef
);
$self
->_in_progress(
$self
->_in_progress - 1 );
delete
(
$self
->_callbacks->{
$sync
})->(
undef
,
undef
);
}
$self
->_clear_no_reply();
$self
->_try_to_send();
$handle
->on_drain(
undef
);
}
return
;
};
}
sub
_build__queue {
my
(
$self
) =
@_
;
return
[];
}
sub
_build__callbacks {
my
(
$self
) =
@_
;
return
{};
}
sub
_build__no_reply {
my
(
$self
) =
@_
;
return
[];
}
around
_choose_sync
=>
sub
{
my
(
$orig
,
$self
) =
@_
;
my
$sync
;
my
$callbacks
=
$self
->_callbacks;
for
( 1 .. 50 ) {
$sync
=
$self
->
$orig
();
return
$sync
unless
exists
$callbacks
->{
$sync
};
}
die
"Can't choose sync value after 50 iterations"
;
};
sub
Close {
die
"This is not what should be done"
}
no
Mouse;
__PACKAGE__->meta->make_immutable();
1;