package MR::IProto; =head1 NAME MR::IProto - iproto network protocol client =head1 SYNOPSIS IProto client can be created with full control of its behaviour: my $client = MR::IProto->new( cluster => MR::IProto::Cluster->new( servers => [ MR::IProto::Cluster::Server->new( host => 'xxx.xxx.xxx.xxx', port => xxxx, ), ... ], ), ); Or without it: my $client = MR::IProto->new( servers => 'xxx.xxx.xxx.xxx:xxxx,xxx.xxx.xxx.xxx:xxxx', ); Messages can be prepared and processed using objects (recomended, but requires some more CPU): my $request = MyProject::Message::MyOperation::Request->new( arg1 => 1, arg2 => 2, ); my $response = $client->send($request); # $response isa My::Project::Message::MyOperation::Response. # Of course, both message classes (request and reply) must # be implemented by user. Or without them (not recommended because of unclean architecture): my $response = $client->send({ msg => x, data => [...], pack => 'xxx', unpack => sub { my ($data) = @_; return (...); }, }); Messages can be sent synchronously: my $response = $client->send($response); # exception is raised if error is occured # besides $@ you can check $! to identify reason of error Or asynchronously: use AnyEvent; my $callback = sub { my ($reply, $error) = @_; # on error $error is defined and $! can be set return; }; $client->send($request, $callback); # callback is called when reply is received or error is occured =head1 DESCRIPTION This client is used to communicate with cluster of balanced servers using iproto network protocol. To use it nicely you should to implement two subclasses of L<MR::IProto::Message> for each message type, one for request message and another for reply. This classes must be named as C<prefix::*::suffix>, where I<prefix> must be passed to constructor of L<MR::IProto> as value of L</prefix> attribute and I<suffix> is either C<Request> or C<Response>. This classes must be loaded before first message through client object will be sent. To send messages asyncronously you should to implement event loop by self. L<AnyEvent> is recomended. =cut use Mouse; use Errno; use MRO::Compat; use Scalar::Util qw(weaken); use Time::HiRes; use MR::IProto::Cluster; use MR::IProto::Error; with 'MR::IProto::Role::Debuggable'; =head1 ATTRIBUTES =over =item prefix Prefix of the class name in which hierarchy subclasses of L<MR::IProto::Message> are located. Used to find reply message classes. =cut has prefix => ( is => 'ro', isa => 'Str', default => sub { ref shift }, ); =item cluster Instance of L<MR::IProto::Cluster>. Contains all servers between which requests can be balanced. Also can be specified in I<servers> parameter of constructor as a list of C<host:port> pairs separated by comma. =cut has cluster => ( is => 'ro', isa => 'MR::IProto::Cluster', required => 1, coerce => 1, handles => [qw( timeout )], ); =item max_parallel Max amount of simultaneous request to all servers. =cut has max_parallel => ( is => 'ro', isa => 'Int', default => 1000, ); =item max_request_retries Max amount of request retries which must be sent to different servers before error is returned. =cut has max_request_retries => ( is => 'ro', isa => 'Int', default => 2, ); =item retry_delay Delay between request retries. =cut has retry_delay => ( is => 'ro', isa => 'Num', default => 0, ); =back =cut has _reply_class => ( is => 'ro', isa => 'HashRef[ClassName]', lazy_build => 1, ); has _queue => ( is => 'ro', isa => 'ArrayRef', lazy_build => 1, ); has _in_progress => ( is => 'rw', isa => 'Int', default => 0, ); =head1 PUBLIC METHODS =over =item new( [ %args | \%args ] ) Constructor. See L</ATTRIBUTES> and L</BUILDARGS> for more information about allowed arguments. =item send( [ $message | \%args ], $callback? ) Send C<$message> to server and receive reply. If C<$callback> is passed then request is done asyncronously and reply is passed to callback as first argument. Method B<must> be called in void context to prevent possible errors. Only client errors can be raised in async mode. All communication errors are passed to callback as second argument. Additional information can be extracted from C<$!> variable. In sync mode (when C<$callback> argument is skipped) all errors are raised and C<$!> is also set. Response is returned from method, so method B<must> be called in scalar context. Request C<$message> can be instance of L<MR::IProto::Message> subclass. In this case reply will be also subclass of L<MR::IProto::Message>. Or it can be passed as C<\%args> hash reference with keys described in L</_send>. =cut sub send { my ($self, $message, $callback) = @_; if($callback) { die "Method must be called in void context if you want to use async" if defined wantarray; $self->_send($message, $callback); return; } else { die "Method must be called in scalar context if you want to use sync" unless defined wantarray; my %servers; my ($data, $error, $errno); $self->_send_now($message, sub { ($data, $error) = @_; $errno = $!; return; }, \%servers); $self->_recv_now(\%servers); $! = $errno; die $error if $error; return $data; } } =item send_bulk( \@messages, $callback? ) Send all of messages in C<\@messages> and return result (sync-mode) or call callback (async-mode) after all replies was received. Result is returned as array reference, which values can be instances of L<MR::IProto::Response> or L<MR::IProto::Error> if request was passed as object, or hash with keys C<data> and C<error> if message was passed as C<\%args>. Replies in result can be returned in order different then order of requests. See L</_send> for more information about message data. Either C<$message> or C<\%args> allowed as content of C<\@messages>. =cut sub send_bulk { my ($self, $messages, $callback) = @_; my @result; if($callback) { die "Method must be called in void context if you want to use async" if defined wantarray; my $cv = AnyEvent->condvar(); $cv->begin( sub { $callback->(\@result) } ); foreach my $message ( @$messages ) { $cv->begin(); $self->_send($message, sub { my ($data, $error) = @_; push @result, blessed($data) ? $data : { data => $data, error => $error }; $cv->end(); return; }); } $cv->end(); return; } else { die "Method must be called in scalar context if you want to use sync" unless defined wantarray; my %servers; foreach my $message ( @$messages ) { $self->_send_now($message, sub { my ($data, $error) = @_; push @result, blessed($data) ? $data : { data => $data, error => $error }; return; }, \%servers); } $self->_recv_now(\%servers); return \@result; } } sub Chat { my $self = shift; my $message = @_ == 1 ? shift : { @_ }; $message->{retry} = 1 if ref $message eq 'HASH'; my $data; eval { $data = $self->send($message); 1 } or return; return wantarray ? @$data : $data->[0]; } sub Chat1 { my $self = shift; my $message = @_ == 1 ? shift : { @_ }; my $data; return eval { $data = $self->send($message); 1 } ? { ok => $data } : { fail => $@ =~ /^(.*?) at \S+ line \d+/s ? $1 : $@, timeout => $! == Errno::ETIMEDOUT }; } sub SetTimeout { my ($self, $timeout) = @_; $self->timeout($timeout); return; } =back =head1 PROTECTED METHODS =over =item BUILDARGS( [ %args | \%args ] ) For compatibility with previous version of client and simplicity some additional arguments to constructor is allowed: =over =item servers C<host:port> pairs separated by comma used to create L<MR::IProto::Cluster::Server> objects. =item timeout, tcp_nodelay, tcp_keepalive, dump_no_ints Are passed directly to constructor of L<MR::IProto::Cluster::Server>. =item balance Is passed directly to constructor of L<MR::IProto::Cluster>. =back See L<Mouse::Manual::Construction/BUILDARGS> for more information. =cut my %servers; around BUILDARGS => sub { my $orig = shift; my $class = shift; my %args = @_ == 1 ? %{shift()} : @_; $args{prefix} = $args{name} if exists $args{name}; if( $args{servers} ) { my $cluster_class = $args{cluster_class} || 'MR::IProto::Cluster'; my $server_class = $args{server_class} || 'MR::IProto::Cluster::Server'; my %srvargs; $srvargs{debug} = $args{debug} if exists $args{debug}; $srvargs{timeout} = delete $args{timeout} if exists $args{timeout}; $srvargs{tcp_nodelay} = delete $args{tcp_nodelay} if exists $args{tcp_nodelay}; $srvargs{tcp_keepalive} = delete $args{tcp_keepalive} if exists $args{tcp_keepalive}; $srvargs{dump_no_ints} = delete $args{dump_no_ints} if exists $args{dump_no_ints}; my %clusterargs; $clusterargs{balance} = delete $args{balance} if exists $args{balance}; $clusterargs{servers} = [ map { my ($host, $port, $weight) = split /:/, $_; $args{no_pool} ? my $server : $servers{"$host:$port"} ||= $server_class->new( %srvargs, host => $host, port => $port, defined $weight ? ( weight => $weight ) : (), ); } split /,/, delete $args{servers} ]; $args{cluster} = $cluster_class->new(%clusterargs); } return $class->$orig(%args); }; sub _build_debug_cb { my ($self) = @_; my $prefix = $self->prefix; return sub { my ($msg) = @_; chomp $msg; warn sprintf "%s: %s\n", $prefix, $msg; return; }; } sub _build__callbacks { my ($self) = @_; return {}; } sub _build__reply_class { my ($self) = @_; my $re = sprintf '^%s::', $self->prefix; my %reply = map { $_->msg => $_ } grep $_->can('msg'), grep /$re/, # MR::IProto::Response->meta->subclasses(); @{ mro::get_isarev('MR::IProto::Response') }; return \%reply; } sub _build__queue { my ($self) = @_; return []; } =item _send( [ $message | \%args ], $callback? ) Pure asyncronious internal implementation of send. C<$message> is an instance of L<MR::IProto::Message>. If C<\%args> hash reference is passed instead of C<$message> then it can contain following keys: =over =item msg Message code. =item key Depending on this value balancing between servers is implemented. =item data Message data. Already packed or unpacked. Unpacked data must be passed as array reference and additional parameter I<pack> must be passed. =item pack First argument of L<pack|perlfunc/pack> function. =item unpack Code reference which is used to unpack reply. =item no_reply Message have no reply. =item retry Is retry is allowed. Values of attributes L</max_request_retries> and L</retry_delay> is used if retry is allowed. =back =cut sub _send { my ($self, $message, $callback) = @_; if( $self->_in_progress < $self->max_parallel ) { $self->_in_progress( $self->_in_progress + 1 ); eval { $self->_send_now($message, $callback); 1 } or $self->_report_error($message, $callback, $@); } else { push @{$self->_queue}, [ $message, $callback ]; } return; } sub _finish_and_start { my ($self) = @_; if( my $task = shift @{$self->_queue} ) { eval { $self->_send_now(@$task); 1 } or $self->_report_error(@$task, $@); } else { $self->_in_progress( $self->_in_progress - 1 ); } return; } sub _send_now { my ($self, $message, $callback, $sync) = @_; my $args; # MR::IProto::Message OO-API if( ref $message ne 'HASH' ) { my $msg = $message->msg; my $response_class = $self->_reply_class->{$msg}; die sprintf "Cannot find response class for message code %d\n", $msg unless $response_class; $args = { request => $message, msg => $msg, key => $message->key, body => $message->data, response_class => $response_class, no_reply => $response_class->isa('MR::IProto::NoResponse'), }; } # Old-style compatible API else { die "unpack or no_reply must be specified" unless $message->{unpack} || $message->{no_reply}; $args = $message; $args->{body} = exists $args->{payload} ? delete $args->{payload} : ref $message->{data} ? pack delete $message->{pack} || 'L*', @{ delete $message->{data} } : delete $message->{data}; } my $try = 1; weaken($self); my $handler; $handler = sub { $self->_server_callback( [\$handler, $args, $callback, $sync, \$try], [@_], ); return; }; $self->_send_try($sync, $args, $handler, $try); return; } sub _send_try { my ($self, $sync, $args, $handler, $try) = @_; my $xsync = $sync ? 'sync' : 'async'; $self->_debug(sprintf "send msg=%d try %d of %d total", $args->{msg}, $try, $self->max_request_retries ) if $self->debug >= 2; my $server = $self->cluster->server( $args->{key} ); my $connection = $server->$xsync(); $connection->send($args->{msg}, $args->{body}, $handler, $args->{no_reply}, $args->{sync}); $sync->{$connection} ||= $connection if $sync; return; } sub _send_retry { my ($self, @in) = @_; my ($sync) = @in; if( $sync ) { Time::HiRes::sleep($self->retry_delay); $self->_send_try(@in); } else { my $timer; $timer = AnyEvent->timer( after => $self->retry_delay, cb => sub { undef $timer; $self->_send_try(@in); return; }, ); } return; } sub _server_callback { my ($self, $req_args, $resp_args) = @_; my ($handler, $args, $callback, $sync, $try) = @$req_args; my ($resp_msg, $data, $error) = @$resp_args; eval { if ($error) { my $retry = defined $args->{request} ? $args->{request}->retry() : ref $args->{retry} eq 'CODE' ? $args->{retry}->() : $args->{retry}; $self->_debug("send: failed[@{[$retry, $$try+1, $self->max_request_retries]}]") if $self->debug >= 2; if( $retry && $$try++ < $self->max_request_retries ) { $self->_send_retry($sync, $args, $$handler, $$try); } else { undef $$handler; $self->_report_error($args->{request}, $callback, $error, $sync); } } else { my $ok = eval { die "Request and reply message code is different: $resp_msg != $args->{msg}\n" unless $args->{no_reply} || $resp_msg == $args->{msg}; if( defined $args->{request} ) { $data = $args->{response_class}->new( data => $data, request => $args->{request} ); } else { $data = $args->{no_reply} ? [ 0 ] : [ ref $args->{unpack} eq 'CODE' ? $args->{unpack}->($data) : unpack $args->{unpack}, $data ]; } 1; }; if($ok) { if( defined $args->{request} && $data->retry && $$try++ < $self->max_request_retries ) { $self->_send_retry($sync, $args, $$handler, $$try); } elsif( defined $args->{is_retry} && $args->{is_retry}->($data) && $$try++ < $self->max_request_retries ) { $self->_send_retry($sync, $args, $$handler, $$try); } else { undef $$handler; $self->_finish_and_start() unless $sync; $callback->($data); } } else { undef $$handler; $self->_report_error($args->{request}, $callback, $@, $sync); } } 1; } or do { undef $$handler; $self->_debug("unhandled fatal error: $@"); }; return; } sub _recv_now { my ($self, $servers) = @_; while(my @servers = values %$servers) { %$servers = (); $_->recv_all() foreach @servers; } return; } sub _report_error { my ($self, $request, $callback, $error, $sync) = @_; my $errobj = defined $request && ref $request ne 'HASH' ? MR::IProto::Error->new( request => $request, error => $error, errno => 0+$!, ) : undef; $self->_finish_and_start() unless $sync; $callback->($errobj, $error); return; } =back =head1 SEE ALSO L<MR::IProto::Cluster>, L<MR::IProto::Cluster::Server>, L<MR::IProto::Message>. =cut no Mouse; __PACKAGE__->meta->make_immutable(); 1;