From Code to Community: Sponsoring The Perl and Raku Conference 2025 Learn more

#
# Copyright 2014 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Some portions of this code were copied and adapted from the Perl module
# HTTP::Tiny, which is copyright Christian Hansen, David Golden and other
# contributors and used with permission under the terms of the Artistic License
use v5.8.0;
use strict;
use version;
our $VERSION = 'v1.3.4'; # TRIAL
use Moo;
use Errno qw[EINTR EPIPE];
use IO::Socket qw[SOCK_STREAM];
use Scalar::Util qw/refaddr/;
use Socket qw/SOL_SOCKET SO_KEEPALIVE SO_RCVBUF IPPROTO_TCP TCP_NODELAY/;
use Time::HiRes qw/time/;
HostAddress
NonNegNum
ServerDesc
);
Bool
HashRef
Maybe
Num
Str
Undef
);
my $SOCKET_CLASS =
eval { require IO::Socket::IP; IO::Socket::IP->VERSION(0.25) }
? 'IO::Socket::IP'
: 'IO::Socket::INET';
has address => (
is => 'ro',
required => 1,
isa => HostAddress,
);
has connect_timeout => (
is => 'ro',
default => 20,
isa => Num,
);
has socket_timeout => (
is => 'ro',
default => 30,
isa => Num|Undef,
);
has with_ssl => (
is => 'ro',
isa => Bool,
);
has SSL_options => (
is => 'ro',
default => sub { {} },
isa => HashRef,
);
has server => (
is => 'rwp',
init_arg => undef,
isa => Maybe[ServerDesc],
);
has host => (
is => 'lazy',
init_arg => undef,
isa => Str,
);
sub _build_host {
my ($self) = @_;
my ($host, $port) = split /:/, $self->address;
return $host;
}
my @is_master_fields= qw(
min_wire_version max_wire_version
max_message_size_bytes max_write_batch_size max_bson_object_size
);
for my $f ( @is_master_fields ) {
has $f => (
is => 'rwp',
init_arg => undef,
isa => Maybe[NonNegNum],
);
}
# for caching wire version >= 2
has does_write_commands => (
is => 'rwp',
init_arg => undef,
isa => Bool,
);
my @connection_state_fields = qw(
fh connected rcvbuf last_used fdset is_ssl
);
for my $f ( @connection_state_fields ) {
has $f => (
is => 'rwp',
clearer => "_clear_$f",
init_arg => undef,
);
}
around BUILDARGS => sub {
my $orig = shift;
my $class = shift;
my $hr = $class->$orig(@_);
# shortcut on missing required field
return $hr unless exists $hr->{address};
($hr->{host}, $hr->{port}) = split /:/, $hr->{address};
return $hr;
};
sub connect {
@_ == 1 || MongoDB::UsageError->throw( q/Usage: $handle->connect()/ . "\n" );
my ($self) = @_;
if ( $self->with_ssl ) {
$self->_assert_ssl;
# XXX possibly make SOCKET_CLASS an instance variable and set it here to IO::Socket::SSL
}
my ($host, $port) = split /:/, $self->address;
my $fh = $SOCKET_CLASS->new(
PeerHost => $host,
PeerPort => $port,
Proto => 'tcp',
Type => SOCK_STREAM,
Timeout => $self->connect_timeout >= 0 ? $self->connect_timeout : undef,
)
or
MongoDB::NetworkError->throw(qq/Could not connect to '@{[$self->address]}': $@\n/);
unless ( binmode($fh) ) {
undef $fh;
MongoDB::InternalError->throw(qq/Could not binmode() socket: '$!'\n/);
}
unless ( defined( $fh->setsockopt( IPPROTO_TCP, TCP_NODELAY, 1 ) ) ) {
undef $fh;
MongoDB::InternalError->throw(qq/Could not set TCP_NODELAY on socket: '$!'\n/);
}
unless ( defined( $fh->setsockopt( SOL_SOCKET, SO_KEEPALIVE, 1 ) ) ) {
undef $fh;
MongoDB::InternalError->throw(qq/Could not set SO_KEEPALIVE on socket: '$!'\n/);
}
$self->_set_fh($fh);
$self->_set_connected(1);
my $fd = fileno $fh;
unless ( defined $fd && $fd >= 0 ) {
$self->_close;
MongoDB::InternalError->throw(qq/select(2): 'Bad file descriptor'\n/);
}
vec( my $fdset = '', $fd, 1 ) = 1;
$self->_set_fdset( $fdset );
$self->start_ssl($host) if $self->with_ssl;
$self->_set_last_used( time );
$self->_set_rcvbuf( $fh->sockopt(SO_RCVBUF) );
# Default max msg size is 2 * max BSON object size (DRIVERS-1)
$self->_set_max_message_size_bytes( 2 * MAX_BSON_OBJECT_SIZE );
return $self;
}
sub set_metadata {
my ( $self, $server ) = @_;
$self->_set_server($server);
$self->_set_min_wire_version( $server->is_master->{minWireVersion} || "0" );
$self->_set_max_wire_version( $server->is_master->{maxWireVersion} || "0" );
$self->_set_max_bson_object_size( $server->is_master->{maxBsonObjectSize}
|| MAX_BSON_OBJECT_SIZE );
$self->_set_max_write_batch_size( $server->is_master->{maxWriteBatchSize}
|| MAX_WRITE_BATCH_SIZE );
# Default is 2 * max BSON object size (DRIVERS-1)
$self->_set_max_message_size_bytes( $server->is_master->{maxMessageSizeBytes}
|| 2 * $self->max_bson_object_size );
$self->_set_does_write_commands( $self->accepts_wire_version(2) );
return;
}
sub accepts_wire_version {
my ( $self, $version ) = @_;
my $min = $self->min_wire_version || 0;
my $max = $self->max_wire_version || 0;
return $version >= $min && $version <= $max;
}
sub start_ssl {
my ( $self, $host ) = @_;
my $ssl_args = $self->_ssl_args($host);
IO::Socket::SSL->start_SSL(
$self->fh,
%$ssl_args,
SSL_create_ctx_callback => sub {
my $ctx = shift;
Net::SSLeay::CTX_set_mode( $ctx, Net::SSLeay::MODE_AUTO_RETRY() );
},
);
unless ( ref( $self->fh ) eq 'IO::Socket::SSL' ) {
my $ssl_err = IO::Socket::SSL->errstr;
$self->_close;
MongoDB::HandshakeError->throw(qq/SSL connection failed for $host: $ssl_err\n/);
}
}
sub close {
my ($self) = @_;
$self->_close
or MongoDB::NetworkError->throw(qq/Error closing socket: '$!'\n/);
}
# this is a quiet close so preexisting network errors can be thrown
sub _close {
my ($self) = @_;
$self->_clear_connected;
my $ok = 1;
if ( $self->fh ) {
$ok = CORE::close( $self->fh );
$self->_clear_fh;
}
return $ok;
}
sub is_connected {
my ($self) = @_;
return $self->connected && $self->fh;
}
sub write {
my ( $self, $buf ) = @_;
my ( $len, $off, $pending, $nfound, $r ) = ( length($buf), 0 );
MongoDB::ProtocolError->throw(
qq/Message of size $len exceeds maximum of / . $self->{max_message_size_bytes} )
if $len > $self->max_message_size_bytes;
local $SIG{PIPE} = 'IGNORE';
while () {
# do timeout
( $pending, $nfound ) = ( $self->socket_timeout, 0 );
TIMEOUT: while () {
if ( -1 == ( $nfound = select( undef, $self->fdset, undef, $pending ) ) ) {
unless ( $! == EINTR ) {
$self->_close;
MongoDB::NetworkError->throw(qq/select(2): '$!'\n/);
}
# to avoid overhead tracking monotonic clock times; assume
# interrupts occur on average halfway through the timeout period
# and restart with half the original time
$pending = int( $pending / 2 );
redo TIMEOUT;
}
last TIMEOUT;
}
unless ($nfound) {
$self->_close;
MongoDB::NetworkTimeout->throw(
qq/Timed out while waiting for socket to become ready for writing\n/);
}
# do write
if ( defined( $r = syswrite( $self->fh, $buf, $len, $off ) ) ) {
( $len -= $r ), ( $off += $r );
last unless $len > 0;
}
elsif ( $! == EPIPE ) {
$self->_close;
MongoDB::NetworkError->throw(qq/Socket closed by remote server: $!\n/);
}
elsif ( $! != EINTR ) {
if ( $self->fh->can('errstr') ) {
my $err = $self->fh->errstr();
$self->_close;
MongoDB::NetworkError->throw(qq/Could not write to SSL socket: '$err'\n /);
}
else {
$self->_close;
MongoDB::NetworkError->throw(qq/Could not write to socket: '$!'\n/);
}
}
}
$self->_set_last_used(time);
return;
}
sub read {
my ($self) = @_;
# len of undef triggers first pass through loop
my ( $msg, $len, $pending, $nfound, $r ) = ( '', undef );
while () {
# do timeout
( $pending, $nfound ) = ( $self->socket_timeout, 0 );
TIMEOUT: while () {
# no need to select if SSL and has pending data from a frame
if ( $self->with_ssl ) {
( $nfound = 1 ), last TIMEOUT
if $self->fh->pending;
}
if ( -1 == ( $nfound = select( $self->fdset, undef, undef, $pending ) ) ) {
unless ( $! == EINTR ) {
$self->_close;
MongoDB::NetworkError->throw(qq/select(2): '$!'\n/);
}
# to avoid overhead tracking monotonic clock times; assume
# interrupts occur on average halfway through the timeout period
# and restart with half the original time
$pending = int( $pending / 2 );
redo TIMEOUT;
}
last TIMEOUT;
}
unless ($nfound) {
$self->_close;
MongoDB::NetworkTimeout->throw(
q/Timed out while waiting for socket to become ready for reading/ . "\n" );
}
# read up to SO_RCVBUF if we can
if ( defined( $r = sysread( $self->fh, $msg, $self->rcvbuf, length $msg ) ) ) {
# because select said we're ready to read, if we read 0 then
# we got EOF before the full message
if ( !$r ) {
$self->_close;
MongoDB::NetworkError->throw(qq/Unexpected end of stream\n/);
}
}
elsif ( $! != EINTR ) {
if ( $self->fh->can('errstr') ) {
my $err = $self->fh->errstr();
$self->_close;
MongoDB::NetworkError->throw(qq/Could not read from SSL socket: '$err'\n /);
}
else {
$self->_close;
MongoDB::NetworkError->throw(qq/Could not read from socket: '$!'\n/);
}
}
if ( !defined $len ) {
$len = unpack( P_INT32, $msg );
MongoDB::ProtocolError->throw(
qq/Server reply of size $len exceeds maximum of / . $self->{max_message_size_bytes} )
if $len > $self->max_message_size_bytes;
}
last unless length($msg) < $len;
}
$self->_set_last_used(time);
return $msg;
}
sub _assert_ssl {
# Need IO::Socket::SSL 1.42 for SSL_create_ctx_callback
MongoDB::UsageError->throw(qq/IO::Socket::SSL 1.42 must be installed for SSL support\n/)
unless eval { require IO::Socket::SSL; IO::Socket::SSL->VERSION(1.42) };
# Need Net::SSLeay 1.49 for MODE_AUTO_RETRY
MongoDB::UsageError->throw(qq/Net::SSLeay 1.49 must be installed for SSL support\n/)
unless eval { require Net::SSLeay; Net::SSLeay->VERSION(1.49) };
}
# Try to find a CA bundle to validate the SSL cert,
# prefer Mozilla::CA or fallback to a system file
sub _find_CA_file {
my $self = shift();
return $self->SSL_options->{SSL_ca_file}
if $self->SSL_options->{SSL_ca_file} and -e $self->SSL_options->{SSL_ca_file};
return Mozilla::CA::SSL_ca_file()
if eval { require Mozilla::CA };
# cert list copied from golang src/crypto/x509/root_unix.go
foreach my $ca_bundle (
"/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc.
"/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL
"/etc/ssl/ca-bundle.pem", # OpenSUSE
"/etc/openssl/certs/ca-certificates.crt", # NetBSD
"/etc/ssl/cert.pem", # OpenBSD
"/usr/local/share/certs/ca-root-nss.crt", # FreeBSD/DragonFly
"/etc/pki/tls/cacert.pem", # OpenELEC
"/etc/certs/ca-certificates.crt", # Solaris 11.2+
) {
return $ca_bundle if -e $ca_bundle;
}
MongoDB::UsageError->throw(
qq/Couldn't find a CA bundle with which to verify the SSL certificate.\n/
. qq/Try installing Mozilla::CA from CPAN\n/);
}
sub _ssl_args {
my ( $self, $host ) = @_;
my %ssl_args;
# This test reimplements IO::Socket::SSL::can_client_sni(), which wasn't
# added until IO::Socket::SSL 1.84
if ( Net::SSLeay::OPENSSL_VERSION_NUMBER() >= 0x01000000 ) {
$ssl_args{SSL_hostname} = $host, # Sane SNI support
}
$ssl_args{SSL_verifycn_scheme} = 'http'; # enable CN validation
$ssl_args{SSL_verifycn_name} = $host; # set validation hostname
$ssl_args{SSL_verify_mode} = 0x01; # enable cert validation
$ssl_args{SSL_ca_file} = $self->_find_CA_file;
# user options override default settings
for my $k ( keys %{ $self->SSL_options } ) {
$ssl_args{$k} = $self->SSL_options->{$k} if $k =~ m/^SSL_/;
}
return \%ssl_args;
}
1;
# vim: ts=4 sts=4 sw=4 et: