our
$VERSION
=
'6.004'
;
sub
new {
my
(
$class
,
%args
) =
@_
;
$args
{queued_requests} //= [];
weaken(
$args
{redis} //
die
'Must be provided a Net::Async::Redis instance'
);
bless
\
%args
,
$class
;
}
async
sub
exec
{
my
(
$self
,
$code
) =
@_
;
try
{
my
$f
=
$self
->
$code
;
$f
->retain
if
blessed(
$f
) and
$f
->isa(
'Future'
);
$log
->tracef(
'MULTI exec'
);
dynamically
$self
->redis->{_is_multi} =
$self
->redis->{multi_queue};
my
(
$exec_result
) = await
$self
->redis->
exec
;
$self
->redis->{multi_queue}->finish;
my
@reply
=
$exec_result
->@*;
my
$success
= 0;
my
$failure
= 0;
while
(
@reply
) {
try
{
my
$reply
=
shift
@reply
;
my
$queued
=
shift
@{
$self
->{queued_requests}} or
die
'invalid queued request'
;
$queued
->done(
$reply
)
unless
$queued
->is_ready;
++
$success
}
catch
{
$log
->warnf(
"Failure during transaction: %s"
, $@);
++
$failure
}
}
return
$success
,
$failure
;
}
catch
{
my
$err
= $@;
$log
->errorf(
'Failed to complete multi - %s'
,
$err
);
for
my
$queued
(
splice
@{
$self
->{queued_requests}}) {
try
{
$queued
->fail(
"Transaction failed"
,
redis
=>
'transaction_failure'
)
unless
$queued
->is_ready;
}
catch
{
$log
->warnf(
"Failure during transaction: %s"
, $@);
}
}
die
$@;
}
}
sub
redis {
shift
->{redis} }
sub
AUTOLOAD {
my
(
$method
) =
our
$AUTOLOAD
=~ m{::([^:]+)$};
die
"Unknown method $method"
unless
Net::Async::Redis::Commands->can(
$method
) or Net::Async::Redis->can(
$method
);
my
$code
= async
sub
{
my
(
$self
,
@args
) =
@_
;
my
$f
=
$self
->redis->future->set_label(
$method
);
push
@{
$self
->{queued_requests}},
$f
;
my
$ff
=
do
{
dynamically
$self
->redis->{_is_multi} =
$self
->redis->{multi_queue};
$self
->redis->
$method
(
@args
);
};
my
(
$resp
) = await
$ff
;
return
await
$f
if
$resp
eq
'QUEUED'
;
$f
->fail(
$resp
);
die
$resp
;
};
set_subname
$method
=>
$code
;
{
no
strict
'refs'
;
*$method
=
sub
{
$code
->(
@_
)->retain } }
$code
->(
@_
)->retain;
}
sub
DESTROY {
my
(
$self
) =
@_
;
return
if
${^GLOBAL_PHASE} eq
'DESTRUCT'
or not
$self
->{queued_requests};
for
my
$queued
(
splice
@{
$self
->{queued_requests}}) {
try
{
$queued
->cancel;
}
catch
{
$log
->warnf(
"Failure during cleanup: %s"
, $@);
}
}
}
1;