our
$VERSION
=
'v1.3.4'
;
use
Errno
qw[EINTR EPIPE]
;
use
Socket
qw/SOL_SOCKET SO_KEEPALIVE SO_RCVBUF IPPROTO_TCP TCP_NODELAY/
;
HostAddress
NonNegNum
ServerDesc
)
;
Bool
HashRef
Maybe
Num
Str
Undef
)
;
my
$SOCKET_CLASS
=
?
'IO::Socket::IP'
:
'IO::Socket::INET'
;
has
address
=> (
is
=>
'ro'
,
required
=> 1,
isa
=> HostAddress,
);
has
connect_timeout
=> (
is
=>
'ro'
,
default
=> 20,
isa
=> Num,
);
has
socket_timeout
=> (
is
=>
'ro'
,
default
=> 30,
isa
=> Num|Undef,
);
has
with_ssl
=> (
is
=>
'ro'
,
isa
=> Bool,
);
has
SSL_options
=> (
is
=>
'ro'
,
default
=>
sub
{ {} },
isa
=> HashRef,
);
has
server
=> (
is
=>
'rwp'
,
init_arg
=>
undef
,
isa
=> Maybe[ServerDesc],
);
has
host
=> (
is
=>
'lazy'
,
init_arg
=>
undef
,
isa
=> Str,
);
sub
_build_host {
my
(
$self
) =
@_
;
my
(
$host
,
$port
) =
split
/:/,
$self
->address;
return
$host
;
}
my
@is_master_fields
=
qw(
min_wire_version max_wire_version
max_message_size_bytes max_write_batch_size max_bson_object_size
)
;
for
my
$f
(
@is_master_fields
) {
has
$f
=> (
is
=>
'rwp'
,
init_arg
=>
undef
,
isa
=> Maybe[NonNegNum],
);
}
has
does_write_commands
=> (
is
=>
'rwp'
,
init_arg
=>
undef
,
isa
=> Bool,
);
my
@connection_state_fields
=
qw(
fh connected rcvbuf last_used fdset is_ssl
)
;
for
my
$f
(
@connection_state_fields
) {
has
$f
=> (
is
=>
'rwp'
,
clearer
=>
"_clear_$f"
,
init_arg
=>
undef
,
);
}
around
BUILDARGS
=>
sub
{
my
$orig
=
shift
;
my
$class
=
shift
;
my
$hr
=
$class
->
$orig
(
@_
);
return
$hr
unless
exists
$hr
->{address};
(
$hr
->{host},
$hr
->{port}) =
split
/:/,
$hr
->{address};
return
$hr
;
};
sub
connect
{
@_
== 1 || MongoDB::UsageError->throw(
q/Usage: $handle->connect()/
.
"\n"
);
my
(
$self
) =
@_
;
if
(
$self
->with_ssl ) {
$self
->_assert_ssl;
}
my
(
$host
,
$port
) =
split
/:/,
$self
->address;
my
$fh
=
$SOCKET_CLASS
->new(
PeerHost
=>
$host
,
PeerPort
=>
$port
,
Proto
=>
'tcp'
,
Type
=> SOCK_STREAM,
Timeout
=>
$self
->connect_timeout >= 0 ?
$self
->connect_timeout :
undef
,
)
or
MongoDB::NetworkError->throw(
qq/Could not connect to '@{[$self->address]}': $@\n/
);
unless
(
binmode
(
$fh
) ) {
undef
$fh
;
MongoDB::InternalError->throw(
qq/Could not binmode() socket: '$!'\n/
);
}
unless
(
defined
(
$fh
->
setsockopt
( IPPROTO_TCP, TCP_NODELAY, 1 ) ) ) {
undef
$fh
;
MongoDB::InternalError->throw(
qq/Could not set TCP_NODELAY on socket: '$!'\n/
);
}
unless
(
defined
(
$fh
->
setsockopt
( SOL_SOCKET, SO_KEEPALIVE, 1 ) ) ) {
undef
$fh
;
MongoDB::InternalError->throw(
qq/Could not set SO_KEEPALIVE on socket: '$!'\n/
);
}
$self
->_set_fh(
$fh
);
$self
->_set_connected(1);
my
$fd
=
fileno
$fh
;
unless
(
defined
$fd
&&
$fd
>= 0 ) {
$self
->_close;
MongoDB::InternalError->throw(
qq/select(2): 'Bad file descriptor'\n/
);
}
vec
(
my
$fdset
=
''
,
$fd
, 1 ) = 1;
$self
->_set_fdset(
$fdset
);
$self
->start_ssl(
$host
)
if
$self
->with_ssl;
$self
->_set_last_used(
time
);
$self
->_set_rcvbuf(
$fh
->sockopt(SO_RCVBUF) );
$self
->_set_max_message_size_bytes( 2 * MAX_BSON_OBJECT_SIZE );
return
$self
;
}
sub
set_metadata {
my
(
$self
,
$server
) =
@_
;
$self
->_set_server(
$server
);
$self
->_set_min_wire_version(
$server
->is_master->{minWireVersion} ||
"0"
);
$self
->_set_max_wire_version(
$server
->is_master->{maxWireVersion} ||
"0"
);
$self
->_set_max_bson_object_size(
$server
->is_master->{maxBsonObjectSize}
|| MAX_BSON_OBJECT_SIZE );
$self
->_set_max_write_batch_size(
$server
->is_master->{maxWriteBatchSize}
|| MAX_WRITE_BATCH_SIZE );
$self
->_set_max_message_size_bytes(
$server
->is_master->{maxMessageSizeBytes}
|| 2 *
$self
->max_bson_object_size );
$self
->_set_does_write_commands(
$self
->accepts_wire_version(2) );
return
;
}
sub
accepts_wire_version {
my
(
$self
,
$version
) =
@_
;
my
$min
=
$self
->min_wire_version || 0;
my
$max
=
$self
->max_wire_version || 0;
return
$version
>=
$min
&&
$version
<=
$max
;
}
sub
start_ssl {
my
(
$self
,
$host
) =
@_
;
my
$ssl_args
=
$self
->_ssl_args(
$host
);
IO::Socket::SSL->start_SSL(
$self
->fh,
%$ssl_args
,
SSL_create_ctx_callback
=>
sub
{
my
$ctx
=
shift
;
Net::SSLeay::CTX_set_mode(
$ctx
, Net::SSLeay::MODE_AUTO_RETRY() );
},
);
unless
(
ref
(
$self
->fh ) eq
'IO::Socket::SSL'
) {
my
$ssl_err
= IO::Socket::SSL->errstr;
$self
->_close;
MongoDB::HandshakeError->throw(
qq/SSL connection failed for $host: $ssl_err\n/
);
}
}
sub
close
{
my
(
$self
) =
@_
;
$self
->_close
or MongoDB::NetworkError->throw(
qq/Error closing socket: '$!'\n/
);
}
sub
_close {
my
(
$self
) =
@_
;
$self
->_clear_connected;
my
$ok
= 1;
if
(
$self
->fh ) {
$ok
= CORE::
close
(
$self
->fh );
$self
->_clear_fh;
}
return
$ok
;
}
sub
is_connected {
my
(
$self
) =
@_
;
return
$self
->connected &&
$self
->fh;
}
sub
write
{
my
(
$self
,
$buf
) =
@_
;
my
(
$len
,
$off
,
$pending
,
$nfound
,
$r
) = (
length
(
$buf
), 0 );
MongoDB::ProtocolError->throw(
qq/Message of size $len exceeds maximum of /
.
$self
->{max_message_size_bytes} )
if
$len
>
$self
->max_message_size_bytes;
local
$SIG
{PIPE} =
'IGNORE'
;
while
() {
(
$pending
,
$nfound
) = (
$self
->socket_timeout, 0 );
TIMEOUT:
while
() {
if
( -1 == (
$nfound
=
select
(
undef
,
$self
->fdset,
undef
,
$pending
) ) ) {
unless
( $! == EINTR ) {
$self
->_close;
MongoDB::NetworkError->throw(
qq/select(2): '$!'\n/
);
}
$pending
=
int
(
$pending
/ 2 );
redo
TIMEOUT;
}
last
TIMEOUT;
}
unless
(
$nfound
) {
$self
->_close;
MongoDB::NetworkTimeout->throw(
qq/Timed out while waiting for socket to become ready for writing\n/
);
}
if
(
defined
(
$r
=
syswrite
(
$self
->fh,
$buf
,
$len
,
$off
) ) ) {
(
$len
-=
$r
), (
$off
+=
$r
);
last
unless
$len
> 0;
}
elsif
( $! == EPIPE ) {
$self
->_close;
MongoDB::NetworkError->throw(
qq/Socket closed by remote server: $!\n/
);
}
elsif
( $! != EINTR ) {
if
(
$self
->fh->can(
'errstr'
) ) {
my
$err
=
$self
->fh->errstr();
$self
->_close;
MongoDB::NetworkError->throw(
qq/Could not write to SSL socket: '$err'\n /
);
}
else
{
$self
->_close;
MongoDB::NetworkError->throw(
qq/Could not write to socket: '$!'\n/
);
}
}
}
$self
->_set_last_used(
time
);
return
;
}
sub
read
{
my
(
$self
) =
@_
;
my
(
$msg
,
$len
,
$pending
,
$nfound
,
$r
) = (
''
,
undef
);
while
() {
(
$pending
,
$nfound
) = (
$self
->socket_timeout, 0 );
TIMEOUT:
while
() {
if
(
$self
->with_ssl ) {
(
$nfound
= 1 ),
last
TIMEOUT
if
$self
->fh->pending;
}
if
( -1 == (
$nfound
=
select
(
$self
->fdset,
undef
,
undef
,
$pending
) ) ) {
unless
( $! == EINTR ) {
$self
->_close;
MongoDB::NetworkError->throw(
qq/select(2): '$!'\n/
);
}
$pending
=
int
(
$pending
/ 2 );
redo
TIMEOUT;
}
last
TIMEOUT;
}
unless
(
$nfound
) {
$self
->_close;
MongoDB::NetworkTimeout->throw(
q/Timed out while waiting for socket to become ready for reading/
.
"\n"
);
}
if
(
defined
(
$r
=
sysread
(
$self
->fh,
$msg
,
$self
->rcvbuf,
length
$msg
) ) ) {
if
( !
$r
) {
$self
->_close;
MongoDB::NetworkError->throw(
qq/Unexpected end of stream\n/
);
}
}
elsif
( $! != EINTR ) {
if
(
$self
->fh->can(
'errstr'
) ) {
my
$err
=
$self
->fh->errstr();
$self
->_close;
MongoDB::NetworkError->throw(
qq/Could not read from SSL socket: '$err'\n /
);
}
else
{
$self
->_close;
MongoDB::NetworkError->throw(
qq/Could not read from socket: '$!'\n/
);
}
}
if
( !
defined
$len
) {
$len
=
unpack
( P_INT32,
$msg
);
MongoDB::ProtocolError->throw(
qq/Server reply of size $len exceeds maximum of /
.
$self
->{max_message_size_bytes} )
if
$len
>
$self
->max_message_size_bytes;
}
last
unless
length
(
$msg
) <
$len
;
}
$self
->_set_last_used(
time
);
return
$msg
;
}
sub
_assert_ssl {
MongoDB::UsageError->throw(
qq/IO::Socket::SSL 1.42 must be installed for SSL support\n/
)
MongoDB::UsageError->throw(
qq/Net::SSLeay 1.49 must be installed for SSL support\n/
)
unless
eval
{
require
Net::SSLeay; Net::SSLeay->VERSION(1.49) };
}
sub
_find_CA_file {
my
$self
=
shift
();
return
$self
->SSL_options->{SSL_ca_file}
if
$self
->SSL_options->{SSL_ca_file} and -e
$self
->SSL_options->{SSL_ca_file};
return
Mozilla::CA::SSL_ca_file()
foreach
my
$ca_bundle
(
"/etc/ssl/certs/ca-certificates.crt"
,
"/etc/pki/tls/certs/ca-bundle.crt"
,
"/etc/ssl/ca-bundle.pem"
,
"/etc/openssl/certs/ca-certificates.crt"
,
"/etc/ssl/cert.pem"
,
"/usr/local/share/certs/ca-root-nss.crt"
,
"/etc/pki/tls/cacert.pem"
,
"/etc/certs/ca-certificates.crt"
,
) {
return
$ca_bundle
if
-e
$ca_bundle
;
}
MongoDB::UsageError->throw(
qq/Couldn't find a CA bundle with which to verify the SSL certificate.\n/
.
qq/Try installing Mozilla::CA from CPAN\n/
);
}
sub
_ssl_args {
my
(
$self
,
$host
) =
@_
;
my
%ssl_args
;
if
( Net::SSLeay::OPENSSL_VERSION_NUMBER() >= 0x01000000 ) {
$ssl_args
{SSL_hostname} =
$host
,
}
$ssl_args
{SSL_verifycn_scheme} =
'http'
;
$ssl_args
{SSL_verifycn_name} =
$host
;
$ssl_args
{SSL_verify_mode} = 0x01;
$ssl_args
{SSL_ca_file} =
$self
->_find_CA_file;
for
my
$k
(
keys
%{
$self
->SSL_options } ) {
$ssl_args
{
$k
} =
$self
->SSL_options->{
$k
}
if
$k
=~ m/^SSL_/;
}
return
\
%ssl_args
;
}
1;