use
5.008_001;
our
$VERSION
=
'0.12'
;
use
constant
DEBUG
=>
$ENV
{ANYEVENT_REDIS_DEBUG};
our
$AUTOLOAD
;
my
%bulk_command
=
map
{
$_
=> 1 }
qw( set setnx rpush lpush lset lrem sadd srem sismember echo getset smove zadd zrem zscore zincrby append hexists hset hsetnx hget hmget hmset hdel)
;
sub
new {
my
(
$class
,
%args
) =
@_
;
my
$host
=
delete
$args
{host} ||
'127.0.0.1'
;
my
$port
=
delete
$args
{port} || 6379;
bless
{
host
=>
$host
,
port
=>
$port
,
%args
,
},
$class
;
}
sub
run_cmd {
my
$self
=
shift
;
my
$cmd
=
shift
;
$self
->{cmd_cb} or
return
$self
->
connect
(
$cmd
,
@_
);
$self
->{cmd_cb}->(
$cmd
,
@_
);
}
sub
DESTROY { }
sub
AUTOLOAD {
my
$self
=
shift
;
(
my
$method
=
$AUTOLOAD
) =~ s/.*:://;
$self
->run_cmd(
$method
,
@_
);
}
sub
all_cv {
my
$self
=
shift
;
$self
->{all_cv} =
shift
if
@_
;
unless
(
$self
->{all_cv}) {
$self
->{all_cv} = AE::cv;
}
$self
->{all_cv};
}
sub
cleanup {
my
$self
=
shift
;
delete
$self
->{cmd_cb};
delete
$self
->{sock};
$self
->{on_error}->(
@_
);
}
sub
connect
{
my
$self
=
shift
;
my
$cv
;
if
(
@_
) {
$cv
= AE::cv;
push
@{
$self
->{connect_queue}}, [
$cv
,
@_
];
}
return
$cv
if
$self
->{sock};
$self
->{sock} = tcp_connect
$self
->{host},
$self
->{port},
sub
{
my
$fh
=
shift
or
die
"Can't connect Redis server: $!"
;
my
$hd
= AnyEvent::Handle->new(
fh
=>
$fh
,
on_error
=>
sub
{
$_
[0]->destroy;
if
(
$_
[1]) {
$self
->cleanup(
$_
[2]);
}
},
on_eof
=>
sub
{
$_
[0]->destroy;
$self
->cleanup(
'connection closed'
);
},
);
$self
->{cmd_cb} =
sub
{
$self
->all_cv->begin;
my
$command
=
shift
;
my
(
$cv
,
$cb
);
if
(
@_
) {
$cv
=
pop
if
UNIVERSAL::isa(
$_
[-1],
'AnyEvent::CondVar'
);
$cb
=
pop
if
ref
$_
[-1] eq
'CODE'
;
}
my
$cv_send
=
sub
{
my
$cv
=
shift
;
my
(
$res
,
$err
) =
@_
;
$self
->all_cv->end;
$err
?
$cv
->croak(
$res
) :
$cv
->
send
(
$res
);
};
my
$send
;
if
(
defined
$bulk_command
{
$command
} ) {
my
$value
=
pop
;
$value
=
''
if
!
defined
$value
;
$send
=
uc
(
$command
)
.
' '
.
join
(
' '
,
@_
)
.
' '
.
length
(
$value
)
.
"\r\n$value\r\n"
;
}
else
{
$send
=
uc
(
$command
)
.
' '
.
join
(
' '
,
@_
)
.
"\r\n"
;
}
warn
$send
if
DEBUG;
$cv
||= AE::cv;
$cv
->cb(
sub
{
my
$cv
=
shift
;
try
{
my
$res
=
$cv
->
recv
;
$cb
->(
$res
);
}
catch
{
(
$self
->{on_error} ||
sub
{
die
@_
})->(
$_
);
}
})
if
$cb
;
$hd
->push_write(
$send
);
$hd
->push_read(
line
=> _decoder(
$send
,
$cv
,
$cv_send
,
$command
));
return
$cv
;
};
for
my
$queue
(@{
$self
->{connect_queue} || []}) {
my
(
$cv
,
@args
) =
@$queue
;
$self
->{cmd_cb}->(
@args
,
$cv
);
}
};
return
$cv
;
}
sub
_decoder {
my
(
$send
,
$cv
,
$cv_send
,
$command
) =
@_
;
sub
{
my
(
$hd
,
$result
) =
@_
;
warn
"got line <$result> for command [$send]"
if
DEBUG;
my
$type
=
substr
$result
, 0, 1;
$result
=~ s/^.//;
if
(
$type
eq
'-'
) {
$cv_send
->(
$cv
,
$result
, 1);
}
elsif
(
$type
eq
'+'
) {
$cv_send
->(
$cv
,
$result
);
}
elsif
(
$type
eq
'$'
) {
if
(
$result
< 0) {
return
$cv_send
->(
$cv
,
undef
);
}
$hd
->unshift_read(
chunk
=>
$result
+ 2,
sub
{
my
(
$hd
,
$chunk
) =
@_
;
$chunk
=~ s/\r\n$//;
warn
"chunk <$chunk>"
if
DEBUG;
if
(
$command
eq
'info'
) {
my
%info
=
map
{
split
/:/,
$_
, 2 }
split
/\r\n/,
$chunk
;
$cv_send
->(
$cv
, \
%info
);
}
elsif
(
$command
eq
'keys'
) {
my
@keys
=
split
/\s+/,
$chunk
;
$cv_send
->(
$cv
, \
@keys
);
}
else
{
$cv_send
->(
$cv
,
$chunk
);
}
});
}
elsif
(
$type
eq
'*'
) {
my
$size
=
$result
;
warn
"size is $size"
if
DEBUG;
if
(
$result
< 0) {
return
$cv_send
->(
$cv
,
undef
);
}
elsif
(
$result
== 0) {
return
$cv_send
->(
$cv
, []);
}
my
@lines
;
for
(1..
$size
) {
$hd
->unshift_read(
line
=> _decoder
(
"recursion $_ of $send"
,
undef
,
sub
{
my
(
undef
,
$result
) =
@_
;
push
@lines
,
$result
;
if
(
@lines
>=
$size
) {
$cv_send
->(
$cv
, \
@lines
);
}
},
$command
));
}
}
elsif
(
$type
eq
':'
) {
$cv_send
->(
$cv
,
$result
);
}
else
{
$cv_send
->(
$cv
,
"Unknown type $type"
, 1);
}
}
}
1;