package MR::IProto::Cluster::Server; =head1 NAME MR::IProto::Cluster::Server - server =head1 DESCRIPTION This class is used to implement all communication with one server. =cut use Mouse; use Mouse::Util::TypeConstraints; use Scalar::Util; use MR::IProto::Connection::Async; use MR::IProto::Connection::Sync; use MR::IProto::Message; with 'MR::IProto::Role::Debuggable'; coerce 'MR::IProto::Cluster::Server' => from 'Str' => via { my ($host, $port, $weight) = split /:/, $_; __PACKAGE__->new( host => $host, port => $port, defined $weight ? ( weight => $weight ) : (), ); }; has prefix => ( is => 'ro', isa => 'Str', default => 'MR::IProto', ); =head1 ATTRIBUTES =over =item host Host name or IP address. =cut has host => ( is => 'ro', isa => 'Str', required => 1, ); =item port Port number. =cut has port => ( is => 'ro', isa => 'Int', required => 1, ); =item weight Server weight. =cut has weight => ( is => 'ro', isa => 'Int', default => 1, ); =item connect_timeout Timeout of connect operation. =cut has connect_timeout => ( is => 'rw', isa => 'Num', default => 2, ); =item timeout Timeout of read and write operations. =cut has timeout => ( is => 'rw', isa => 'Num', default => 2, trigger => sub { my ($self, $new) = @_; $self->async->set_timeout($new) if $self->has_async(); $self->sync->set_timeout($new) if $self->has_sync(); return; }, ); =item tcp_nodelay Enable TCP_NODELAY. =cut has tcp_nodelay => ( is => 'ro', isa => 'Int', default => 1, ); =item tcp_keepalive Enable SO_KEEPALIVE. =cut has tcp_keepalive => ( is => 'ro', isa => 'Int', default => 0, ); =item max_parallel Max amount of simultaneous request. =cut has max_parallel => ( is => 'ro', isa => 'Int', default => 10, ); =item active Is server used in balancing. =cut has active => ( is => 'rw', isa => 'Bool', default => 1, ); has on_close => ( is => 'rw', isa => 'CodeRef', ); has async => ( is => 'ro', isa => 'MR::IProto::Connection::Async', lazy_build => 1, ); has sync => ( is => 'ro', isa => 'MR::IProto::Connection::Sync', lazy_build => 1, ); my %servers; =back =head1 PUBLIC METHODS =over =item disconnect_all Class method used to disconnect all iproto-connections. Very useful in case of fork(). =cut sub disconnect_all { my ($class) = @_; foreach my $server (values %servers) { $server->clear_async(); $server->clear_sync(); } return; } =head1 PROTECTED METHODS =over =cut sub BUILD { my ($self) = @_; $servers{Scalar::Util::refaddr($self)} = $self; Scalar::Util::weaken($servers{Scalar::Util::refaddr($self)}); return; } sub DEMOLISH { my ($self) = @_; delete $servers{Scalar::Util::refaddr($self)}; return; } sub _build_async { my ($self) = @_; return MR::IProto::Connection::Async->new( server => $self ); } sub _build_sync { my ($self) = @_; return MR::IProto::Connection::Sync->new( server => $self ); } sub _build_debug_cb { my ($self) = @_; my $prefix = $self->prefix; return sub { my ($msg) = @_; chomp $msg; warn "$prefix: $msg\n"; return; }; } =item _send_started( $sync, $message, $data ) This method is called when message is started to send. =cut sub _send_started { return; } =item _recv_finished( $sync, $message, $data, $error ) This method is called when message is received. =cut sub _recv_finished { return; } sub _debug { my ($self, $msg) = @_; $self->debug_cb->( sprintf "%s:%d: %s", $self->host, $self->port, $msg ); return; } =back =head1 SEE ALSO L<MR::IProto>, L<MR::IProto::Cluster>. =cut no Mouse; __PACKAGE__->meta->make_immutable(); 1;