no
warnings
'recursion'
;
has
host
=>
'localhost'
;
has
port
=> 28015;
has
default_db
=>
'test'
;
has
auth_key
=>
''
;
has
timeout
=> 20;
has
[
'_rdb'
,
'_handle'
,
'_callbacks'
];
has
'_protocol'
=>
sub
{ Rethinkdb::Protocol->new; };
sub
connect
{
my
$self
=
shift
;
$self
->{_handle} = IO::Socket::INET->new(
PeerHost
=>
$self
->host,
PeerPort
=>
$self
->port,
Reuse
=> 1,
Timeout
=>
$self
->timeout,
)
or croak
'ERROR: Could not connect to '
.
$self
->host .
':'
.
$self
->port;
$self
->_handle->
send
(
pack
'L<'
,
$self
->_protocol->versionDummy->version->v0_3 );
$self
->_handle->
send
(
(
pack
'L<'
,
length
$self
->auth_key ) .
$self
->auth_key );
$self
->_handle->
send
(
pack
'L<'
,
$self
->_protocol->versionDummy->protocol->json );
my
$response
;
my
$char
=
''
;
do
{
$self
->_handle->
recv
(
$char
, 1 );
$response
.=
$char
;
}
while
(
$char
ne
"\0"
);
$response
=~ s/^\s//;
$response
=~ s/\s$//;
if
(
$response
eq
'SUCCESS'
) {
croak
"ERROR: Unable to connect to the database"
;
}
$self
->_callbacks( {} );
return
$self
;
}
sub
close
{
my
$self
=
shift
;
my
$args
=
ref
$_
[0] ?
$_
[0] : {
@_
};
if
(
$self
->_handle ) {
if
( !
defined
$args
->{noreply_wait} || !
$args
->{noreply_wait} ) {
$self
->noreply_wait;
}
$self
->_handle->
close
;
$self
->_handle(
undef
);
}
$self
->_callbacks( {} );
return
$self
;
}
sub
reconnect {
my
$self
=
shift
;
my
$args
=
ref
$_
[0] ?
$_
[0] : {
@_
};
return
$self
->
close
(
$args
)->
connect
;
}
sub
repl {
my
$self
=
shift
;
my
$package
=
caller
;
$package::_rdb_io
=
$self
;
return
$self
;
}
sub
use
{
my
$self
=
shift
;
my
$db
=
shift
;
$self
->default_db(
$db
);
return
$self
;
}
sub
noreply_wait {
my
$self
=
shift
;
return
$self
->_send(
{
type
=>
$self
->_protocol->query->queryType->noreply_wait,
token
=> Rethinkdb::Util::_token(),
}
);
}
sub
_start {
my
$self
=
shift
;
my
(
$query
,
$args
,
$callback
) =
@_
;
my
$q
= {
type
=>
$self
->_protocol->query->queryType->start,
token
=> Rethinkdb::Util::_token(),
query
=>
$query
->_build
};
if
(
ref
$callback
eq
'CODE'
) {
$self
->_callbacks->{
$q
->{token} } =
$callback
;
}
return
$self
->_send(
$q
);
}
sub
_encode {
my
$self
=
shift
;
my
$data
=
shift
;
if
(
$data
->{type} == 1 ) {
$data
=
$self
->_encode_recurse(
$data
);
push
@{
$data
}, {};
}
else
{
$data
= [
$data
->{type} ];
}
return
encode_json
$data
;
}
sub
_encode_recurse {
my
$self
=
shift
;
my
$data
=
shift
;
my
$json
= [];
if
(
$data
->{datum} ) {
my
$val
=
q{}
;
if
(
defined
$data
->{datum}->{r_bool} ) {
if
(
$data
->{datum}->{r_bool} ) {
return
JSON::PP::true;
}
else
{
return
JSON::PP::false;
}
}
else
{
foreach
(
keys
%{
$data
->{datum} } ) {
if
(
$_
ne
'type'
) {
return
$data
->{datum}->{
$_
};
}
}
}
}
if
(
$data
->{type} ) {
push
@{
$json
},
$data
->{type};
}
if
(
$data
->{query} ) {
push
@{
$json
},
$self
->_encode_recurse(
$data
->{query} );
}
if
(
$data
->{args} ) {
my
$args
= [];
foreach
( @{
$data
->{args} } ) {
push
@{
$args
},
$self
->_encode_recurse(
$_
);
}
push
@{
$json
},
$args
;
}
if
(
$data
->{optargs} ) {
my
$args
= {};
foreach
( @{
$data
->{optargs} } ) {
$args
->{
$_
->{key} } =
$self
->_encode_recurse(
$_
->{val} );
}
if
(
$data
->{type} ==
$self
->_protocol->term->termType->make_obj ) {
return
$args
;
}
push
@{
$json
},
$args
;
}
return
$json
;
}
sub
_decode {
my
$self
=
shift
;
my
$data
=
shift
;
my
$decode
= decode_json
$data
;
$decode
->{r} =
$self
->_clean(
$decode
->{r} );
return
$decode
;
}
sub
_clean {
my
$self
=
shift
;
my
$data
=
shift
;
my
$clean
= [];
if
(
ref
$data
eq
'ARRAY'
) {
foreach
( @{
$data
} ) {
push
@{
$clean
},
$self
->_real_cleaner(
$_
);
}
return
$clean
;
}
elsif
(
ref
$data
eq
'HASH'
) {
foreach
(
keys
%{
$data
} ) {
$data
->{
$_
} =
$self
->_real_cleaner(
$data
->{
$_
} );
}
return
$data
;
}
return
$data
;
}
sub
_real_cleaner {
my
$self
=
shift
;
my
$data
=
shift
;
my
$retval
;
if
(
ref
$data
eq
'JSON::PP::Boolean'
) {
if
(
$data
) {
$retval
=
$self
->_rdb->true;
}
else
{
$retval
=
$self
->_rdb->false;
}
}
elsif
(
ref
$data
eq
'ARRAY'
) {
$retval
=
$self
->_clean(
$data
);
}
elsif
(
ref
$data
eq
'HASH'
) {
$retval
=
$self
->_clean(
$data
);
}
else
{
$retval
=
$data
;
}
return
$retval
;
}
sub
_send {
my
$self
=
shift
;
my
$query
=
shift
;
if
(
$ENV
{RDB_DEBUG} ) {
$Data::Dumper::Indent
= 1;
say
{
*STDERR
}
'SENDING:'
;
say
{
*STDERR
} Dumper
$query
;
}
my
$token
;
my
$length
;
my
$serial
=
$self
->_encode(
$query
);
my
$header
=
pack
'QL<'
,
$query
->{token},
length
$serial
;
if
(
$ENV
{RDB_DEBUG} ) {
say
{
*STDERR
} Dumper
$serial
;
}
$self
->_handle->
send
(
$header
.
$serial
);
my
$data
;
$self
->_handle->
recv
(
$token
, 8 );
$token
=
unpack
'Q<'
,
$token
;
$self
->_handle->
recv
(
$length
, 4 );
$length
=
unpack
'L<'
,
$length
;
$self
->_handle->
recv
(
$data
,
$length
);
my
$res_data
=
$self
->_decode(
$data
);
$res_data
->{token} =
$token
;
if
(
$res_data
->{t} == 3 or
$res_data
->{t} == 5 ) {
if
(
$self
->_callbacks->{
$token
} ) {
my
$res
= Rethinkdb::Response->_init(
$res_data
);
if
(
$ENV
{RDB_DEBUG} ) {
say
{
*STDERR
}
'RECEIVED:'
;
say
{
*STDERR
} Dumper
$res
;
}
$self
->_callbacks->{
$token
}->(
$res
);
return
$self
->_send(
{
type
=>
$self
->_protocol->query->queryType->
continue
,
token
=>
$token
}
);
}
else
{
if
(
$ENV
{RDB_DEBUG} ) {
say
{
*STDERR
}
'RECEIVED:'
;
say
{
*STDERR
} Dumper
$res_data
;
}
my
$more
=
$self
->_send(
{
type
=>
$self
->_protocol->query->queryType->
continue
,
token
=>
$token
}
);
push
@{
$res_data
->{r} }, @{
$more
->response };
$res_data
->{t} =
$more
->type;
}
}
my
$res
= Rethinkdb::Response->_init(
$res_data
);
if
(
$ENV
{RDB_DEBUG} ) {
say
{
*STDERR
}
'RECEIVED:'
;
say
{
*STDERR
} Dumper
$res
;
}
return
$res
;
}
1;