BEGIN {
$AnyMongo::Connection::VERSION
=
'0.03'
;
}
DEBUG
=>
$ENV
{ANYMONGO_DEBUG},
BSON_INT32
=> 4,
BSON_INT64
=> 8,
STANDARD_HEADER_SIZE
=> 16,
RESPONSE_HEADER_SIZE
=> 20,
OP_REPLY
=> 1,
OP_MSG
=> 1000,
OP_UPDATE
=> 2001,
OP_INSERT
=> 2002,
RESERVED
=> 2003,
OP_QUERY
=> 2004,
OP_GET_MORE
=> 2005,
OP_DELETE
=> 2006,
OP_KILL_CURSORS
=> 2007,
REPLY_CURSOR_NOT_FOUND
=> 1,
REPLY_QUERY_FAILURE
=> 2,
REPLY_SHARD_CONFIG_STALE
=> 4,
REPLY_AWAIT_CAPABLE
=> 8,
};
has
find_master
=> (
is
=>
'ro'
,
isa
=>
'Bool'
,
required
=> 1,
default
=> 0,
);
has
master_handle
=> (
isa
=>
'Maybe[AnyEvent::Handle]'
,
is
=>
'rw'
,
clearer
=>
'clear_master_handle'
,
);
has
secondary_nodes
=> (
isa
=>
'ArrayRef'
,
is
=>
'rw'
,
);
has
secondary_nodes
=> (
isa
=>
'ArrayRef'
,
is
=>
'rw'
,
lazy_build
=> 1,
clearer
=>
'clear_secondary_nodes'
,
);
sub
_build_secondary_nodes {
my
(
$self
) =
@_
;
[];
}
has
arbitor_nodes
=> (
isa
=>
'ArrayRef'
,
is
=>
'rw'
,
lazy_build
=> 1,
clearer
=>
'clear_arbitor_nodes'
,
);
sub
_build_arbitor_nodes {
my
(
$self
) =
@_
;
[];
}
has
ts
=> (
is
=>
'rw'
,
isa
=>
'Int'
,
default
=> 0
);
has
db_name
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
required
=> 1,
default
=>
'admin'
,
);
has
query_timeout
=> (
is
=>
'rw'
,
isa
=>
'Int'
,
required
=> 1,
default
=>
sub
{
return
$AnyMongo::Cursor::timeout
; },
);
has
auto_connect
=> (
is
=>
'ro'
,
isa
=>
'Bool'
,
required
=> 1,
default
=> 1,
);
has
auto_reconnect
=> (
is
=>
'ro'
,
isa
=>
'Bool'
,
required
=> 1,
default
=> 1,
);
has
host
=> (
is
=>
'ro'
,
isa
=>
'Str'
,
required
=> 1,
);
has
w
=> (
is
=>
'rw'
,
isa
=>
'Int'
,
default
=> 1,
);
has
wtimeout
=> (
is
=>
'rw'
,
isa
=>
'Int'
,
default
=> 1000,
);
has
timeout
=> (
is
=>
'ro'
,
isa
=>
'Int'
,
required
=> 1,
default
=> 20000,
);
has
username
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
required
=> 0,
);
has
password
=> (
is
=>
'rw'
,
isa
=>
'Str'
,
required
=> 0,
);
has
connected
=> (
isa
=>
'Bool'
,
is
=>
'rw'
,
default
=> 0,
);
has
cv
=> (
isa
=>
'AnyEvent::CondVar'
,
is
=>
'ro'
,
lazy_build
=> 1,
clearer
=>
'clear_cv'
,
);
sub
_build_cv {
my
(
$self
) =
@_
;
AE::cv;
}
has
_connection_error
=> (
isa
=>
'Bool'
,
is
=>
'rw'
,
default
=> 0,
);
sub
CLONE_SKIP { 1 }
sub
BUILD {
shift
->_init }
sub
_init {
my
(
$self
) =
@_
;
eval
"use ${_}"
for
qw/AnyMongo::Database AnyMongo::Cursor AnyMongo::BSON::OID/
;
$self
->_parse_servers();
if
(
$self
->auto_connect) {
$self
->
connect
;
}
}
sub
connect
{
my
(
$self
,
%args
) =
@_
;
return
if
$self
->connected ||
$self
->{_trying_connect};
$self
->{_trying_connect} = 1;
my
$timer
;
$timer
= AnyEvent->timer(
after
=> 5 ,
cb
=>
sub
{
undef
$timer
;
unless
(
$self
->connected ) {
$self
->{_trying_connect} = 0;
$self
->cv->croak(
'Failed to connect to any mongodb servers,timeout'
);
}
});
$self
->cv->begin(
sub
{
undef
$timer
;
if
(
$self
->connected) {
$self
->_connection_error(0);
shift
->
send
;
}
else
{
shift
->croak(
"Failed to connect to any mongodb servers"
);
}
});
my
$servers
=
$self
->{mongo_servers};
my
$seed_queue
=
$self
->{_seed_queue} = [
keys
%{
$servers
} ];
my
$seed_tried
=
$self
->{_seed_tried} = {};
while
(!
$self
->connected && @{
$seed_queue
} ) {
while
(
my
$h
=
shift
@{
$seed_queue
}) {
$seed_tried
->{
$h
} = 1;
$self
->_check_master(
$h
);
}
}
$self
->cv->
recv
;
$self
->{_trying_connect} = 0;
}
sub
_set_master {
my
(
$self
,
$server_id
,
$h
) =
@_
;
$self
->master_handle(
$h
);
$self
->{mongo_servers}->{
$server_id
}->{is_master} = 1;
$self
->{master_id} =
$server_id
;
$self
->connected(1);
}
sub
_is_master {
my
(
$config
) =
@_
;
$config
&& (
ref
(
$config
) ne
'SCALAR'
) && (
$config
->{ismaster});
}
sub
_check_master {
my
(
$self
,
$server_id
,
$only_self
) =
@_
;
my
$guards
=
$self
->{_guards};
return
if
$guards
->{
$server_id
} or
$self
->connected;
my
$connect_cv
= AE::cv;
$self
->_connect_to_host(
$server_id
,
sub
{
$connect_cv
->
send
(
shift
) });
my
$handle
=
$connect_cv
->
recv
;
if
(
$handle
) {
my
$config
=
$self
->admin->run_command({
ismaster
=> 1},
$handle
);
if
(
ref
$config
ne
'SCALAR'
) {
my
$servers
=
$self
->{mongo_servers};
if
(
$config
->{hosts} &&
time
() >
$self
->ts) {
foreach
my
$h
(@{
$config
->{hosts}}) {
$self
->_add_host(
$h
);
}
$self
->ts(
time
);
}
}
else
{
warn
$config
;
delete
$self
->{_handles}->{
$server_id
} ||
delete
$guards
->{
$server_id
};
$handle
->destroy();
undef
$handle
;
}
if
(
$handle
) {
if
(_is_master(
$config
)) {
$self
->_set_master(
$server_id
,
$handle
);
$self
->cv->end;
}
elsif
(
$self
->find_master &&
exists
$config
->{primary} && !
$only_self
) {
my
$primary_id
=
$self
->_add_host(
$config
->{primary});
$self
->_check_master(
$primary_id
,1)
if
$server_id
ne
$primary_id
;
}
}
}
}
sub
_add_host {
my
(
$self
,
$host
) =
@_
;
my
$servers
=
$self
->{mongo_servers};
my
$seed_queue
=
$self
->{_seed_queue};
my
$seed_tried
=
$self
->{_seed_tried};
my
(
$h
,
$p
) =
split
':'
,
$host
;
my
$server_id
=
$h
.
':'
.(
$p
|| 27017);
unless
(
exists
$servers
->{
$server_id
}) {
$servers
->{
$server_id
} = {
host
=>
$h
,
port
=>
$p
,
primary
=> 0,
};
push
@{
$seed_queue
},
$server_id
unless
exists
$seed_tried
->{
$server_id
};
}
$server_id
;
}
sub
_connect_to_host {
my
(
$self
,
$server_id
,
$cb
) =
@_
;
my
$guards
=
$self
->{_guards};
return
if
$guards
->{
$server_id
};
my
(
$host
,
$port
) = @{
$self
->{mongo_servers}->{
$server_id
}}{
'host'
,
'port'
};
$guards
->{
$server_id
} = tcp_connect
$host
,
$port
,
sub
{
my
(
$fh
,
$host
,
$port
) =
@_
;
my
(
$h
,
$config
);
if
(!
$fh
) {
warn
"failed to connect to $server_id\n"
;
delete
$guards
->{
$server_id
};
}
else
{
$h
= AnyEvent::Handle->new(
fh
=>
$fh
,
on_eof
=>
sub
{
my
$h
=
delete
$self
->{_handles}->{
$server_id
};
delete
$guards
->{
$server_id
};
warn
"Eof of connection to $server_id\n"
;
if
(
$self
->{master_id} eq
$server_id
) {
delete
$self
->{master_id};
$self
->clear_master_handle;
$self
->connected(0);
}
$h
->{cv}->
send
if
$h
->{cv};
$h
->destroy();
},
on_error
=>
sub
{
my
(
$hdl
,
$fatal
,
$msg
) =
@_
;
warn
"on_error:$server_id\n"
;
warn
"got error $msg\n"
;
my
$h
=
delete
$self
->{_handles}->{
$server_id
};
delete
$guards
->{
$server_id
};
if
(
$self
->{master_id} eq
$server_id
) {
delete
$self
->{master_id};
$self
->clear_master_handle;
$self
->connected(0);
}
$h
->{cv}->croak(
$msg
)
if
$h
->{cv};
$h
->destroy();
},
);
$self
->{_handles}->{
$server_id
} =
$h
;
}
if
(
ref
$cb
eq
'CODE'
) {
$cb
->(
$h
);
}
else
{
$self
->cv->end;
}
};
}
sub
_parse_servers {
my
(
$self
) =
@_
;
my
$str
=
$self
->host;
$str
=
substr
$self
->host, 10
if
$str
=~ /^mongodb:\/\//;
my
@pairs
=
split
","
,
$str
;
my
$servers
= {};
my
$server_seeds_cnt
= 0;
for
my
$h
(
@pairs
) {
my
(
$host
,
$port
) =
split
':'
,
$h
;
$port
||= 27017;
$servers
->{
$host
.
':'
.
$port
} = {
connected
=> 0,
handle
=>
undef
,
host
=>
$host
,
port
=>
$port
,
is_master
=> 0,
};
}
$self
->{mongo_servers} =
$servers
;
}
sub
send_message {
my
(
$self
,
$data
,
$hd
) =
@_
;
croak
'connection lost'
unless
$hd
or
$self
->_check_connection;
$hd
||=
$self
->master_handle;
$hd
->push_write(
$data
);
}
sub
_check_connection {
my
(
$self
) =
@_
;
$self
->connected or (
$self
->auto_reconnect and
$self
->
connect
);
$self
->connected;
}
sub
recv_message {
my
(
$self
,
$hd
) =
@_
;
my
(
$message_length
,
$request_id
,
$response_to
,
$op
) =
$self
->_receive_header(
$hd
);
my
(
$response_flags
,
$cursor_id
,
$starting_from
,
$number_returned
) =
$self
->_receive_response_header(
$hd
);
$self
->_check_respone_flags(
$response_flags
);
my
$results
=
$self
->_read_documents(
$message_length
-36,
$cursor_id
,
$hd
);
return
(
$number_returned
,
$cursor_id
,
$results
);
}
sub
_check_respone_flags {
my
(
$self
,
$flags
) =
@_
;
if
((
$flags
& REPLY_CURSOR_NOT_FOUND) != 0) {
croak(
"cursor not found"
);
}
}
sub
receive_data {
my
(
$self
,
$size
,
$hd
) =
@_
;
$hd
||=
$self
->master_handle;
croak
'connection lost'
unless
$hd
or
$self
->_check_connection;
my
$cv
= AE::cv;
my
$timer
;
$timer
= AnyEvent->timer(
after
=>
$self
->query_timeout ,
cb
=>
sub
{
undef
$timer
;
$cv
->croak(
'receive_data timeout'
);
});
$hd
->push_read(
chunk
=>
$size
,
sub
{
my
(
$hdl
,
$bytes
) =
@_
;
$cv
->
send
(
$_
[1]);
});
$hd
->{cv} =
$cv
;
my
$data
=
$cv
->
recv
;
delete
$hd
->{cv};
$data
;
}
sub
_receive_header {
my
(
$self
,
$hd
) =
@_
;
my
$header_buf
=
$self
->receive_data(STANDARD_HEADER_SIZE,
$hd
);
croak
'Short read for DB response header; length:'
.
length
(
$header_buf
)
unless
length
$header_buf
== STANDARD_HEADER_SIZE;
return
unpack
(
'V4'
,
$header_buf
);
}
sub
_receive_response_header {
my
(
$self
,
$hd
) =
@_
;
my
$header_buf
=
$self
->receive_data(RESPONSE_HEADER_SIZE,
$hd
);
croak
'Short read for DB response header'
unless
length
$header_buf
== RESPONSE_HEADER_SIZE;
my
(
$response_flags
) =
unpack
'V'
,
substr
(
$header_buf
,0,BSON_INT32);
my
(
$cursor_id
) =
unpack
'j'
,
substr
(
$header_buf
,BSON_INT32,BSON_INT64);
my
(
$starting_from
,
$number_returned
) =
unpack
'V2'
,
substr
(
$header_buf
,BSON_INT32+BSON_INT64);
return
(
$response_flags
,
$cursor_id
,
$starting_from
,
$number_returned
);
}
sub
_read_documents {
my
(
$self
,
$doc_message_length
,
$cursor_id
,
$hd
) =
@_
;
my
$remaining
=
$doc_message_length
;
my
$bson_buf
;
$bson_buf
=
$self
->receive_data(
$doc_message_length
,
$hd
);
return
unless
$bson_buf
;
my
$docs
= decode_bson_documents(
$bson_buf
);
return
$docs
;
}
sub
database_names {
my
(
$self
) =
@_
;
my
$ret
=
$self
->admin->run_command({
listDatabases
=> 1 });
return
map
{
$_
->{name} } @{
$ret
->{databases} };
}
sub
get_database {
my
(
$self
,
$database_name
) =
@_
;
return
AnyMongo::Database->new(
_connection
=>
$self
,
name
=>
$database_name
,
);
}
sub
authenticate {
my
(
$self
,
$dbname
,
$username
,
$password
,
$is_digest
) =
@_
;
my
$hash
=
$password
;
if
(!
$is_digest
) {
$hash
= Digest::MD5::md5_hex(
"${username}:mongo:${password}"
);
}
my
$db
=
$self
->get_database(
$dbname
);
my
$result
=
$db
->run_command({
getnonce
=> 1});
if
(!
$result
->{
'ok'
}) {
return
$result
;
}
my
$nonce
=
$result
->{
'nonce'
};
my
$digest
= Digest::MD5::md5_hex(
$nonce
.
$username
.
$hash
);
my
$login
=
tie
(
my
%hash
,
'Tie::IxHash'
);
%hash
= (
authenticate
=> 1,
user
=>
$username
,
nonce
=>
$nonce
,
key
=>
$digest
);
$result
=
$db
->run_command(
$login
);
return
$result
;
}
sub
admin {
shift
->get_database(
'admin'
) }
sub
disconnect {
my
(
$self
) =
@_
;
$self
->clear_master_handle;
my
$guards
=
$self
->{_guards};
my
$handles
=
$self
->{_handles};
map
{
delete
$guards
->{
$_
} }
keys
%{
$guards
};
map
{ (
delete
$handles
->{
$_
})->destroy }
keys
%{
$handles
};
$self
->{mongo_servers} = {};
$self
->{_is_connected} = 0;
$self
->connected(0);
}
sub
DEMOLISH {
shift
->disconnect;
}
__PACKAGE__->meta->make_immutable;
1;