has
prefix
=> (
is
=>
'ro'
,
isa
=>
'Str'
,
default
=>
sub
{
ref
shift
},
);
has
cluster
=> (
is
=>
'ro'
,
isa
=>
'MR::IProto::Cluster'
,
required
=> 1,
coerce
=> 1,
handles
=> [
qw( timeout )
],
);
has
max_parallel
=> (
is
=>
'ro'
,
isa
=>
'Int'
,
default
=> 1000,
);
has
max_request_retries
=> (
is
=>
'ro'
,
isa
=>
'Int'
,
default
=> 2,
);
has
retry_delay
=> (
is
=>
'ro'
,
isa
=>
'Num'
,
default
=> 0,
);
has
_reply_class
=> (
is
=>
'ro'
,
isa
=>
'HashRef[ClassName]'
,
lazy_build
=> 1,
);
has
_queue
=> (
is
=>
'ro'
,
isa
=>
'ArrayRef'
,
lazy_build
=> 1,
);
has
_in_progress
=> (
is
=>
'rw'
,
isa
=>
'Int'
,
default
=> 0,
);
sub
send
{
my
(
$self
,
$message
,
$callback
) =
@_
;
if
(
$callback
) {
die
"Method must be called in void context if you want to use async"
if
defined
wantarray
;
$self
->_send(
$message
,
$callback
);
return
;
}
else
{
die
"Method must be called in scalar context if you want to use sync"
unless
defined
wantarray
;
my
$olddie
=
ref
$SIG
{__DIE__} eq
'CODE'
?
$SIG
{__DIE__} :
ref
$SIG
{__DIE__} eq
'GLOB'
? *{
$SIG
{__DIE__}}{CODE} :
undef
;
local
$SIG
{__DIE__} =
sub
{
local
$! = 0;
$olddie
->(
@_
); }
if
$olddie
;
my
%servers
;
my
(
$data
,
$error
,
$errno
);
my
$conn
=
$self
->_send_now(
$message
,
sub
{
(
$data
,
$error
) =
@_
;
$errno
= $!;
return
;
}, \
%servers
);
return
if
$message
->{
continue
} && !
$conn
;
my
$cont
=
sub
{
$self
->_recv_now(\
%servers
,
max
=>
$message
->{
continue
}?1:0);
$! =
$errno
;
return
$message
->{
continue
}->(
$data
,
$error
,
$errno
)
if
$message
->{
continue
};
die
$error
if
$error
;
return
$data
;
};
return
{
fh
=>
$conn
->fh,
connection
=>
$conn
,
continue
=>
$cont
,
}
if
$message
->{
continue
};
return
&$cont
();
}
}
sub
send_bulk {
my
(
$self
,
$messages
,
$callback
) =
@_
;
my
@result
;
if
(
$callback
) {
die
"Method must be called in void context if you want to use async"
if
defined
wantarray
;
my
$cv
= AnyEvent->condvar();
$cv
->begin(
sub
{
$callback
->(\
@result
) } );
foreach
my
$message
(
@$messages
) {
$cv
->begin();
$self
->_send(
$message
,
sub
{
my
(
$data
,
$error
) =
@_
;
push
@result
, blessed(
$data
) ?
$data
: {
data
=>
$data
,
error
=>
$error
};
$cv
->end();
return
;
});
}
$cv
->end();
return
;
}
else
{
die
"Method must be called in scalar context if you want to use sync"
unless
defined
wantarray
;
my
$olddie
=
ref
$SIG
{__DIE__} eq
'CODE'
?
$SIG
{__DIE__} :
ref
$SIG
{__DIE__} eq
'GLOB'
? *{
$SIG
{__DIE__}}{CODE} :
undef
;
local
$SIG
{__DIE__} =
sub
{
local
$! = 0;
$olddie
->(
@_
); }
if
$olddie
;
my
%servers
;
foreach
my
$message
(
@$messages
) {
$self
->_send_now(
$message
,
sub
{
my
(
$data
,
$error
) =
@_
;
push
@result
, blessed(
$data
) ?
$data
: {
data
=>
$data
,
error
=>
$error
};
return
;
}, \
%servers
);
}
$self
->_recv_now(\
%servers
);
return
\
@result
;
}
}
sub
Chat {
my
$self
=
shift
;
my
$message
=
@_
== 1 ?
shift
: {
@_
};
$message
->{retry} = 1
if
ref
$message
eq
'HASH'
;
my
$data
;
eval
{
$data
=
$self
->
send
(
$message
); 1 } or
return
;
return
wantarray
?
@$data
:
$data
->[0];
}
sub
Chat1 {
my
$self
=
shift
;
my
$message
=
@_
== 1 ?
shift
: {
@_
};
my
$data
;
return
eval
{
$data
=
$self
->
send
(
$message
); 1 } ? {
ok
=>
$data
}
: {
fail
=> $@ =~ /^(.*?) at \S+ line \d+/s ? $1 : $@,
timeout
=> $! == Errno::ETIMEDOUT };
}
sub
SetTimeout {
my
(
$self
,
$timeout
) =
@_
;
$self
->timeout(
$timeout
);
return
;
}
sub
disconnect_all {
my
(
$class
) =
@_
;
MR::IProto::Cluster::Server->disconnect_all();
return
;
}
my
%servers
;
around
BUILDARGS
=>
sub
{
my
$orig
=
shift
;
my
$class
=
shift
;
my
%args
=
@_
== 1 ? %{
shift
()} :
@_
;
$args
{prefix} =
$args
{name}
if
exists
$args
{name};
if
(
$args
{servers} ) {
my
$cluster_class
=
$args
{cluster_class} ||
'MR::IProto::Cluster'
;
my
$server_class
=
$args
{server_class} ||
'MR::IProto::Cluster::Server'
;
my
%srvargs
;
$srvargs
{debug} =
$args
{debug}
if
exists
$args
{debug};
$srvargs
{timeout} =
delete
$args
{timeout}
if
exists
$args
{timeout};
$srvargs
{tcp_nodelay} =
delete
$args
{tcp_nodelay}
if
exists
$args
{tcp_nodelay};
$srvargs
{tcp_keepalive} =
delete
$args
{tcp_keepalive}
if
exists
$args
{tcp_keepalive};
$srvargs
{dump_no_ints} =
delete
$args
{dump_no_ints}
if
exists
$args
{dump_no_ints};
$srvargs
{prefix} =
$args
{name}
if
exists
$args
{name} and
defined
$args
{name};
my
%clusterargs
;
$clusterargs
{balance} =
delete
$args
{balance}
if
exists
$args
{balance};
$clusterargs
{servers} = [
map
{
my
(
$host
,
$port
,
$weight
) =
split
/:/,
$_
;
$args
{no_pool} ?
my
$server
:
$servers
{
"$host:$port"
} ||=
$server_class
->new(
%srvargs
,
host
=>
$host
,
port
=>
$port
,
defined
$weight
? (
weight
=>
$weight
) : (),
);
}
split
/,/,
delete
$args
{servers}
];
$args
{cluster} =
$cluster_class
->new(
%clusterargs
);
}
return
$class
->
$orig
(
%args
);
};
sub
_build_debug_cb {
my
(
$self
) =
@_
;
my
$prefix
=
$self
->prefix;
return
sub
{
my
(
$msg
) =
@_
;
chomp
$msg
;
warn
sprintf
"%s: %s\n"
,
$prefix
,
$msg
;
return
;
};
}
sub
_build__callbacks {
my
(
$self
) =
@_
;
return
{};
}
sub
_build__reply_class {
my
(
$self
) =
@_
;
my
$re
=
sprintf
'^%s::'
,
$self
->prefix;
my
%reply
=
map
{
$_
->
msg
=>
$_
}
grep
$_
->can(
'msg'
),
grep
/
$re
/,
@{ mro::get_isarev(
'MR::IProto::Response'
) };
return
\
%reply
;
}
sub
_build__queue {
my
(
$self
) =
@_
;
return
[];
}
sub
_send {
my
(
$self
,
$message
,
$callback
) =
@_
;
if
(
$self
->_in_progress <
$self
->max_parallel ) {
$self
->_in_progress(
$self
->_in_progress + 1 );
eval
{
$self
->_send_now(
$message
,
$callback
); 1 }
or
$self
->_report_error(
$message
,
$callback
, $@);
}
else
{
push
@{
$self
->_queue}, [
$message
,
$callback
];
}
return
;
}
sub
_finish_and_start {
my
(
$self
) =
@_
;
if
(
my
$task
=
shift
@{
$self
->_queue} ) {
eval
{
$self
->_send_now(
@$task
); 1 }
or
$self
->_report_error(
@$task
, $@);
}
else
{
$self
->_in_progress(
$self
->_in_progress - 1 );
}
return
;
}
sub
_send_now {
my
(
$self
,
$message
,
$callback
,
$sync
) =
@_
;
my
$args
;
if
(
ref
$message
ne
'HASH'
) {
my
$msg
=
$message
->msg;
my
$response_class
=
$self
->_reply_class->{
$msg
};
die
sprintf
"Cannot find response class for message code %d\n"
,
$msg
unless
$response_class
;
$args
= {
request
=>
$message
,
msg
=>
$msg
,
key
=>
$message
->key,
body
=>
$message
->data,
response_class
=>
$response_class
,
no_reply
=>
$response_class
->isa(
'MR::IProto::NoResponse'
),
};
}
else
{
die
"unpack or no_reply must be specified"
unless
$message
->{
unpack
} ||
$message
->{no_reply};
$args
=
$message
;
$args
->{body} =
exists
$args
->{payload} ?
delete
$args
->{payload}
:
ref
$message
->{data} ?
pack
delete
$message
->{
pack
} ||
'L*'
, @{
delete
$message
->{data} }
:
delete
$message
->{data};
}
my
$try
= 1;
weaken(
$self
);
my
$handler
;
$handler
=
sub
{
$self
->_server_callback(
[\
$handler
,
$args
,
$callback
,
$sync
, \
$try
],
[
@_
],
);
return
;
};
return
$self
->_send_try(
$sync
,
$args
,
$handler
,
$try
);
}
sub
_send_try {
my
(
$self
,
$sync
,
$args
,
$handler
,
$try
) =
@_
;
my
$xsync
=
$sync
?
'sync'
:
'async'
;
$args
->{max_request_retries} ||=
$self
->max_request_retries;
$self
->_debug(
sprintf
"send msg=%d try %d of %d total"
,
$args
->{msg},
$try
,
$args
->{max_request_retries} )
if
$self
->debug >= 2;
my
$server
=
$self
->cluster->server(
$args
->{key} );
my
$connection
=
$server
->
$xsync
();
return
unless
$connection
->
send
(
$args
->{msg},
$args
->{body},
$handler
,
$args
->{no_reply},
$args
->{sync});
$sync
->{
$connection
} ||=
$connection
if
$sync
;
return
$connection
;
}
sub
_send_retry {
my
(
$self
,
@in
) =
@_
;
my
(
$sync
) =
@in
;
if
(
$sync
) {
Time::HiRes::
sleep
(
$self
->retry_delay);
$self
->_send_try(
@in
);
}
else
{
my
$timer
;
$timer
= AnyEvent->timer(
after
=>
$self
->retry_delay,
cb
=>
sub
{
undef
$timer
;
$self
->_send_try(
@in
);
return
;
},
);
}
return
;
}
sub
_server_callback {
my
(
$self
,
$req_args
,
$resp_args
) =
@_
;
my
(
$handler
,
$args
,
$callback
,
$sync
,
$try
) =
@$req_args
;
my
(
$resp_msg
,
$data
,
$error
,
$errno
) =
@$resp_args
;
eval
{
if
(
$error
) {
$! =
$errno
;
$@ =
$error
;
my
$retry
=
defined
$args
->{request} ?
$args
->{request}->retry()
:
ref
$args
->{retry} eq
'CODE'
?
$args
->{retry}->()
:
$args
->{retry};
$self
->_debug(
"send: failed[@{[$retry, $$try+1, $args->{max_request_retries}]}]"
)
if
$self
->debug >= 2;
if
(
$retry
&&
$$try
++ <
$args
->{max_request_retries} ) {
$self
->_send_retry(
$sync
,
$args
,
$$handler
,
$$try
);
}
else
{
undef
$$handler
;
$self
->_report_error(
$args
->{request},
$callback
,
$error
,
$sync
,
$errno
);
}
}
else
{
my
$ok
=
eval
{
die
"Request and reply message code is different: $resp_msg != $args->{msg}\n"
unless
$args
->{no_reply} ||
$resp_msg
==
$args
->{msg};
if
(
defined
$args
->{request} ) {
$data
=
$args
->{response_class}->new(
data
=>
$data
,
request
=>
$args
->{request} );
}
else
{
$data
=
$args
->{no_reply} ? [ 0 ] : [
ref
$args
->{
unpack
} eq
'CODE'
?
$args
->{
unpack
}->(
$data
) :
unpack
$args
->{
unpack
},
$data
];
}
1;
};
if
(
$ok
) {
if
(
defined
$args
->{request} &&
$data
->retry &&
$$try
++ <
$args
->{max_request_retries} ) {
$self
->_send_retry(
$sync
,
$args
,
$$handler
,
$$try
);
}
elsif
(
defined
$args
->{is_retry} &&
$args
->{is_retry}->(
$data
) &&
$$try
++ <
$args
->{max_request_retries} ) {
$self
->_send_retry(
$sync
,
$args
,
$$handler
,
$$try
);
}
else
{
undef
$$handler
;
$self
->_finish_and_start()
unless
$sync
;
$callback
->(
$data
);
}
}
else
{
undef
$$handler
;
$self
->_report_error(
$args
->{request},
$callback
, $@,
$sync
);
}
}
1;
} or
do
{
undef
$$handler
;
$self
->_debug(
"unhandled fatal error: $@"
);
};
return
;
}
sub
_recv_now {
my
(
$self
,
$servers
,
%opts
) =
@_
;
while
(
my
@servers
=
values
%$servers
) {
%$servers
= ();
$_
->recv_all(
%opts
)
foreach
@servers
;
}
return
;
}
sub
_report_error {
my
(
$self
,
$request
,
$callback
,
$error
,
$sync
,
$errno
) =
@_
;
my
$errobj
=
defined
$request
&&
ref
$request
ne
'HASH'
? MR::IProto::Error->new(
request
=>
$request
,
error
=>
$error
,
errno
=>
defined
$errno
? 0 +
$errno
: 0,
)
:
undef
;
$self
->_finish_and_start()
unless
$sync
;
$! =
$errno
;
$@ =
$error
;
$callback
->(
$errobj
,
$error
,
$errno
);
return
;
}
no
Mouse;
__PACKAGE__->meta->make_immutable();
1;