our
$VERSION
=
'v0.999.999.2'
;
use
Errno
qw[EINTR EPIPE]
;
use
Socket
qw/SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY/
;
HAS_GETTIME
=> Time::HiRes::d_clock_gettime(),
HAS_THREADS
=>
$Config
{usethreads},
P_INT32
=> $] lt
'5.010'
?
'l'
:
'l<'
,
MAX_BSON_OBJECT_SIZE
=> 4_194_304,
MAX_WRITE_BATCH_SIZE
=> 1000,
};
use
if
HAS_THREADS,
'threads'
;
*_get_tid
= HAS_THREADS() ?
sub
{ threads->tid } :
sub
() { 0 };
my
$SOCKET_CLASS
=
?
'IO::Socket::IP'
:
'IO::Socket::INET'
;
sub
new {
@_
== 2
||
@_
== 3
|| MongoDB::UsageError->throw(
q/Usage: MongoDB::_Link->new(address, [arg hashref])/
.
"\n"
);
my
(
$class
,
$address
,
$args
) =
@_
;
my
(
$host
,
$port
) =
split
/:/,
$address
;
MongoDB::UsageError->throw(
"new requires 'host:port' address argument"
)
unless
defined
(
$host
) &&
length
(
$host
) &&
defined
(
$port
) &&
length
(
$port
);
my
$self
=
bless
{
host
=>
$host
,
port
=>
$port
,
address
=>
"$host:$port"
,
connect_timeout
=> 20,
socket_timeout
=> 30,
with_ssl
=> 0,
SSL_options
=> {},
(
$args
? (
%$args
) : () ),
},
$class
;
return
$self
;
}
sub
connect
{
@_
== 1 || MongoDB::UsageError->throw(
q/Usage: $handle->connect()/
.
"\n"
);
my
(
$self
) =
@_
;
if
(
$self
->{with_ssl} ) {
$self
->_assert_ssl;
}
my
(
$host
,
$port
) = @{
$self
}{
qw/host port/
};
$self
->{fh} =
$SOCKET_CLASS
->new(
PeerHost
=>
$host
,
PeerPort
=>
$port
,
$self
->{local_address} ? (
LocalAddr
=>
$self
->{local_address} ) : (),
Proto
=>
'tcp'
,
Type
=> SOCK_STREAM,
Timeout
=>
$self
->{connect_timeout} >= 0 ?
$self
->{connect_timeout} :
undef
,
) or MongoDB::NetworkError->throw(
qq/Could not connect to '$host:$port': $@\n/
);
binmode
(
$self
->{fh} )
or MongoDB::InternalError->throw(
qq/Could not binmode() socket: '$!'\n/
);
defined
(
$self
->{fh}->
setsockopt
(IPPROTO_TCP, TCP_NODELAY, 1) )
or MongoDB::InternalError->throw(
qq/Could not set TCP_NODELAY on socket: '$!'\n/
);
defined
(
$self
->{fh}->
setsockopt
(SOL_SOCKET, SO_KEEPALIVE, 1) )
or MongoDB::InternalError->throw(
qq/Could not set SO_KEEPALIVE on socket: '$!'\n/
);
$self
->start_ssl(
$host
)
if
$self
->{with_ssl};
$self
->{pid} = $$;
$self
->{tid} = _get_tid();
$self
->{last_used} = [gettimeofday];
return
$self
;
}
my
@accessors
=
qw(
address server min_wire_version max_wire_version
max_message_size_bytes max_write_batch_size max_bson_object_size
)
;
for
my
$attr
(
@accessors
) {
no
strict
'refs'
;
*{
$attr
} =
eval
"sub { \$_[0]->{$attr} }"
;
}
sub
set_metadata {
my
(
$self
,
$server
) =
@_
;
$self
->{server} =
$server
;
$self
->{min_wire_version} =
$server
->is_master->{minWireVersion} ||
"0"
;
$self
->{max_wire_version} =
$server
->is_master->{maxWireVersion} ||
"0"
;
$self
->{max_bson_object_size} =
$server
->is_master->{maxBsonObjectSize} || MAX_BSON_OBJECT_SIZE;
$self
->{max_write_batch_size} =
$server
->is_master->{maxWriteBatchSize} || MAX_WRITE_BATCH_SIZE;
$self
->{max_message_size_bytes} =
$server
->is_master->{maxMessageSizeBytes} || 2 *
$self
->{max_bson_object_size};
return
;
}
sub
accepts_wire_version {
my
(
$self
,
$version
) =
@_
;
my
$min
=
$self
->{min_wire_version} || 0;
my
$max
=
$self
->{max_wire_version} || 0;
return
$version
>=
$min
&&
$version
<=
$max
;
}
sub
start_ssl {
my
(
$self
,
$host
) =
@_
;
my
$ssl_args
=
$self
->_ssl_args(
$host
);
IO::Socket::SSL->start_SSL(
$self
->{fh},
%$ssl_args
,
SSL_create_ctx_callback
=>
sub
{
my
$ctx
=
shift
;
Net::SSLeay::CTX_set_mode(
$ctx
, Net::SSLeay::MODE_AUTO_RETRY() );
},
);
unless
(
ref
(
$self
->{fh} ) eq
'IO::Socket::SSL'
) {
my
$ssl_err
= IO::Socket::SSL->errstr;
MongoDB::HandshakeError->throw(
qq/SSL connection failed for $host: $ssl_err\n/
);
}
}
sub
close
{
@_
== 1 || MongoDB::UsageError->throw(
q/Usage: $handle->close()/
.
"\n"
);
my
(
$self
) =
@_
;
if
(
$self
->connected ) {
CORE::
close
(
$self
->{fh} )
or MongoDB::NetworkError->throw(
qq/Error closing socket: '$!'\n/
);
delete
$self
->{fh};
}
}
sub
connection_valid {
my
(
$self
) =
@_
;
return
unless
$self
->{fh};
if
( !
$self
->{fh}->connected
||
$self
->{pid} != $$
||
$self
->{tid} != _get_tid() )
{
$self
->{fh}->
close
;
delete
$self
->{fh};
return
;
}
return
1;
}
sub
idle_time_ms {
my
(
$self
) =
@_
;
return
1000 * tv_interval(
$self
->{last_used} );
}
sub
remote_connected {
my
(
$self
) =
@_
;
return
unless
$self
->connection_valid;
return
if
$self
->can_read(0) &&
$self
->{fh}->
eof
;
return
1;
}
sub
assert_valid_connection {
my
(
$self
) =
@_
;
MongoDB::NetworkError->throw(
"connection lost to "
.
$self
->address )
unless
$self
->connection_valid;
return
1;
}
sub
write
{
@_
== 2 || MongoDB::UsageError->throw(
q/Usage: $handle->write(buf)/
.
"\n"
);
my
(
$self
,
$buf
) =
@_
;
$self
->assert_valid_connection;
if
( $] ge
'5.008'
) {
utf8::downgrade(
$buf
, 1 )
or MongoDB::InternalError->throw(
qq/Wide character in write()\n/
);
}
my
$len
=
length
$buf
;
my
$off
= 0;
if
(
exists
$self
->{max_message_size_bytes}
&&
$len
>
$self
->{max_message_size_bytes} )
{
MongoDB::ProtocolError->throw(
qq/Message of size $len exceeds maximum of /
.
$self
->{max_message_size_bytes} );
}
local
$SIG
{PIPE} =
'IGNORE'
;
while
() {
$self
->can_write
or MongoDB::NetworkTimeout->throw(
qq/Timed out while waiting for socket to become ready for writing\n/
);
my
$r
=
syswrite
(
$self
->{fh},
$buf
,
$len
,
$off
);
if
(
defined
$r
) {
$len
-=
$r
;
$off
+=
$r
;
last
unless
$len
> 0;
}
elsif
( $! == EPIPE ) {
MongoDB::NetworkError->throw(
qq/Socket closed by remote server: $!\n/
);
}
elsif
( $! != EINTR ) {
if
(
$self
->{fh}->can(
'errstr'
) ) {
my
$err
=
$self
->{fh}->errstr();
MongoDB::NetworkError->throw(
qq/Could not write to SSL socket: '$err'\n /
);
}
else
{
MongoDB::NetworkError->throw(
qq/Could not write to socket: '$!'\n/
);
}
}
}
$self
->{last_used} = [gettimeofday];
return
$off
;
}
sub
read
{
@_
== 1 || MongoDB::UsageError->throw(
q/Usage: $handle->read()/
.
"\n"
);
my
(
$self
) =
@_
;
my
$msg
=
''
;
$self
->assert_valid_connection;
$self
->_read_bytes( 4, \
$msg
);
my
$len
=
unpack
( P_INT32,
$msg
);
$self
->_read_bytes(
$len
- 4, \
$msg
);
$self
->{last_used} = [gettimeofday];
return
$msg
;
}
sub
_read_bytes {
@_
== 3 || MongoDB::UsageError->throw(
q/Usage: $handle->read(len, bufref)/
.
"\n"
);
my
(
$self
,
$len
,
$bufref
) =
@_
;
while
(
$len
> 0 ) {
$self
->can_read
or MongoDB::NetworkTimeout->throw(
q/Timed out while waiting for socket to become ready for reading/
.
"\n"
);
my
$r
=
sysread
(
$self
->{fh},
$$bufref
,
$len
,
length
$$bufref
);
if
(
defined
$r
) {
last
unless
$r
;
$len
-=
$r
;
}
elsif
( $! != EINTR ) {
if
(
$self
->{fh}->can(
'errstr'
) ) {
my
$err
=
$self
->{fh}->errstr();
MongoDB::NetworkError->throw(
qq/Could not read from SSL socket: '$err'\n /
);
}
else
{
MongoDB::NetworkError->throw(
qq/Could not read from socket: '$!'\n/
);
}
}
}
if
(
$len
) {
MongoDB::NetworkError->throw(
qq/Unexpected end of stream\n/
);
}
return
;
}
sub
_do_timeout {
my
(
$self
,
$type
,
$timeout
) =
@_
;
$timeout
=
$self
->{socket_timeout}
unless
defined
$timeout
;
my
$fd
=
fileno
$self
->{fh};
defined
$fd
&&
$fd
>= 0
or MongoDB::InternalError->throw(
qq/select(2): 'Bad file descriptor'\n/
);
my
$initial
= HAS_GETTIME ? Time::HiRes::clock_gettime(Time::HiRes::CLOCK_MONOTONIC()) :
time
;
my
$pending
=
$timeout
>= 0 ?
$timeout
:
undef
;
my
$nfound
;
vec
(
my
$fdset
=
''
,
$fd
, 1 ) = 1;
while
() {
$nfound
=
(
$type
eq
'read'
)
?
select
(
$fdset
,
undef
,
undef
,
$pending
)
:
select
(
undef
,
$fdset
,
undef
,
$pending
);
if
(
$nfound
== -1 ) {
$! == EINTR
or MongoDB::NetworkError->throw(
qq/select(2): '$!'\n/
);
redo
if
!
defined
(
$pending
);
my
$now
= HAS_GETTIME ? Time::HiRes::clock_gettime(Time::HiRes::CLOCK_MONOTONIC()) :
time
;
redo
if
(
$pending
=
$timeout
- (
$now
-
$initial
) ) > 0;
$nfound
= 0;
}
last
;
}
$! = 0;
return
$nfound
;
}
sub
can_read {
@_
== 1 ||
@_
== 2 || MongoDB::UsageError->throw(
q/Usage: $handle->can_read([timeout])/
.
"\n"
);
my
$self
=
shift
;
if
(
ref
(
$self
->{fh} ) eq
'IO::Socket::SSL'
) {
return
1
if
$self
->{fh}->pending;
}
return
$self
->_do_timeout(
'read'
,
@_
);
}
sub
can_write {
@_
== 1
||
@_
== 2
|| MongoDB::UsageError->throw(
q/Usage: $handle->can_write([timeout])/
.
"\n"
);
my
$self
=
shift
;
return
$self
->_do_timeout(
'write'
,
@_
);
}
sub
_assert_ssl {
MongoDB::UsageError->throw(
qq/IO::Socket::SSL 1.42 must be installed for SSL support\n/
)
MongoDB::UsageError->throw(
qq/Net::SSLeay 1.49 must be installed for SSL support\n/
)
unless
eval
{
require
Net::SSLeay; Net::SSLeay->VERSION(1.49) };
}
sub
_find_CA_file {
my
$self
=
shift
();
return
$self
->{SSL_options}->{SSL_ca_file}
if
$self
->{SSL_options}->{SSL_ca_file} and -e
$self
->{SSL_options}->{SSL_ca_file};
return
Mozilla::CA::SSL_ca_file()
foreach
my
$ca_bundle
(
"/etc/ssl/certs/ca-certificates.crt"
,
"/etc/pki/tls/certs/ca-bundle.crt"
,
"/etc/ssl/ca-bundle.pem"
,
"/etc/openssl/certs/ca-certificates.crt"
,
"/etc/ssl/cert.pem"
,
"/usr/local/share/certs/ca-root-nss.crt"
,
"/etc/pki/tls/cacert.pem"
,
"/etc/certs/ca-certificates.crt"
,
) {
return
$ca_bundle
if
-e
$ca_bundle
;
}
MongoDB::UsageError->throw(
qq/Couldn't find a CA bundle with which to verify the SSL certificate.\n/
.
qq/Try installing Mozilla::CA from CPAN\n/
);
}
sub
_ssl_args {
my
(
$self
,
$host
) =
@_
;
my
%ssl_args
;
if
( Net::SSLeay::OPENSSL_VERSION_NUMBER() >= 0x01000000 ) {
$ssl_args
{SSL_hostname} =
$host
,
}
$ssl_args
{SSL_verifycn_scheme} =
'http'
;
$ssl_args
{SSL_verifycn_name} =
$host
;
$ssl_args
{SSL_verify_mode} = 0x01;
$ssl_args
{SSL_ca_file} =
$self
->_find_CA_file;
for
my
$k
(
keys
%{
$self
->{SSL_options} } ) {
$ssl_args
{
$k
} =
$self
->{SSL_options}{
$k
}
if
$k
=~ m/^SSL_/;
}
return
\
%ssl_args
;
}
1;