package MR::IProto::Connection::Async;

=head1 NAME

MR::IProto::Connection::Async - async communication

=head1 DESCRIPTION

Used to perform asynchronous communication.

=cut

use Mouse;
extends 'MR::IProto::Connection';

use AnyEvent::Handle;
use Scalar::Util qw(weaken);

has _handle => (
    is  => 'ro',
    isa => 'AnyEvent::Handle',
    predicate => '_has_handle',
    lazy_build => 1,
);

has _queue => (
    is  => 'ro',
    isa => 'ArrayRef',
    lazy_build => 1,
);

has _in_progress => (
    is  => 'rw',
    isa => 'Int',
    default => 0,
);

has _callbacks => (
    is  => 'ro',
    isa => 'HashRef',
    lazy_build => 1,
);

has _read_reply => (
    is  => 'ro',
    isa => 'CodeRef',
    lazy_build => 1,
);

has _no_reply => (
    is  => 'ro',
    isa => 'ArrayRef',
    lazy_build => 1,
);

has _on_drain => (
    is  => 'ro',
    isa => 'CodeRef',
    lazy_build => 1,
);

=head1 PUBLIC METHODS

=over

=item send

Enqueue message send.
For list of arguments see L</_send>.

=cut

sub fh { return $_[0]->_has_handle && $_[0]->_handle }

sub send {
    my $self = shift;
    if( $self->_in_progress < $self->max_parallel ) {
        $self->_in_progress( $self->_in_progress + 1 );
        $self->_send(@_);
    }
    else {
        push @{$self->_queue}, [@_];
    }
    return;
}

=item set_timeout( $timeout )

Set timeout value for existing connection.

=cut

sub set_timeout {
    my ($self, $timeout) = @_;
    $self->_handle->timeout($timeout) if $self->_has_handle();
    return;
}

=back

=head1 PROTECTED METHODS

=over

=item _send( $msg, $payload, $callback, $no_reply )

Send message to server.

=cut

sub _send {
    my ($self, $msg, undef, $callback, $no_reply, $sync) = @_;
    $sync = $self->_choose_sync() unless defined $sync;
    my $header = $self->_pack_header($msg, length $_[2], $sync);
    my $server = $self->server;
    $self->_callbacks->{$sync} = $callback;
    $server->_send_started($sync, $msg, $_[2]);
    my $handle = $self->_handle;
    if( $server->debug >= 5 ) {
        $server->_debug_dump('send header: ', $header);
        $server->_debug_dump('send payload: ', $_[2]);
    }
    $handle->push_write($header);
    $handle->push_write($_[2]);
    if( $no_reply ) {
        push @{$self->_no_reply}, $sync;
        $handle->on_drain( $self->_on_drain ) unless defined $handle->{on_drain};
    }
    else {
        $handle->push_read( chunk => 12, $self->_read_reply );
    }
    return;
}

sub _build__read_reply {
    my ($self) = @_;
    my $server = $self->server;
    weaken($self);
    weaken($server);
    return sub {
        my ($handle, $data) = @_;
        my $dump_resp = $server->debug >= 6;
        $server->_debug_dump('recv header: ', $data) if $dump_resp;
        my ($msg, $payload_length, $sync) = $self->_unpack_header($data);
        $handle->unshift_read( chunk => $payload_length, sub {
            my ($handle, $data) = @_;
            $server->_debug_dump('recv payload: ', $data) if $dump_resp;
            $server->_recv_finished($sync, $msg, $data);
            $self->_finish_and_start();
            delete($self->_callbacks->{$sync})->($msg, $data);
            return;
        });
        return;
    };
}

sub _try_to_send {
    my ($self) = @_;
    while( $self->_in_progress < $self->max_parallel && (my $task = shift @{ $self->_queue }) ) {
        $self->_in_progress( $self->_in_progress + 1 );
        $self->_send(@$task);
    }
    return;
}

sub _finish_and_start {
    my ($self) = @_;
    if( my $task = shift @{$self->_queue} ) {
        $self->_send(@$task);
    }
    else {
        $self->_in_progress( $self->_in_progress - 1 );
    }
    return;
}

sub _build__handle {
    my ($self) = @_;
    my $server = $self->server;
    $server->_debug("connecting") if $server->debug >= 4;
    weaken($self);
    weaken($server);
    return AnyEvent::Handle->new(
        connect    => [ $self->host, $self->port ],
        no_delay   => $self->tcp_nodelay,
        keepalive  => $self->tcp_keepalive,
        timeout    => $self->timeout,
        on_prepare => sub {
            return $self->connect_timeout;
        },
        on_connect => sub {
            my ($handle) = @_;
            $server->_debug("connected") if $server->debug >= 1;
            return;
        },
        on_error   => sub {
            my ($handle, $fatal, $message) = @_;
            my $errno = $!;
            $server->_debug(($fatal ? 'fatal ' : '') . 'error: ' . $message);
            my @callbacks;
            foreach my $sync ( keys %{$self->_callbacks} ) {
                $server->_recv_finished($sync, undef, undef, $message, $errno);
                $self->_in_progress( $self->_in_progress - 1 );
                push @callbacks, $self->_callbacks->{$sync};
            }
            $server->active(0);
            $self->_clear_handle();
            $self->_clear_callbacks();
            $self->_clear_no_reply();
            $server->_debug('closing socket') if $server->debug >= 1;
            $handle->destroy();
            $self->_try_to_send();
            $_->(undef, undef, $message, $errno) foreach @callbacks;
            return;
        },
        on_timeout => sub {
            my ($handle) = @_;
            return unless keys %{$self->_callbacks};
            $handle->_error( Errno::ETIMEDOUT ) if keys %{$self->_callbacks};
            return;
        },
    );
}

sub _build__on_drain {
    my ($self) = @_;
    my $server = $self->server;
    weaken($self);
    weaken($server);
    return sub {
        my ($handle) = @_;
        if( $self->_has_no_reply() ) {
            foreach my $sync ( @{$self->_no_reply} ) {
                $server->_recv_finished($sync, undef, undef);
                $self->_in_progress( $self->_in_progress - 1 );
                delete($self->_callbacks->{$sync})->(undef, undef);
            }
            $self->_clear_no_reply();
            $self->_try_to_send();
            $handle->on_drain(undef);
        }
        return;
    };
}

sub _build__queue {
    my ($self) = @_;
    return [];
}

sub _build__callbacks {
    my ($self) = @_;
    return {};
}

sub _build__no_reply {
    my ($self) = @_;
    return [];
}

around _choose_sync => sub {
    my ($orig, $self) = @_;
    my $sync;
    my $callbacks = $self->_callbacks;
    for( 1 .. 50 ) {
        $sync = $self->$orig();
        return $sync unless exists $callbacks->{$sync};
    }
    die "Can't choose sync value after 50 iterations";
};

sub Close { die "This is not what should be done" }

=back

=head1 SEE ALSO

L<MR::IProto::Connection>, L<MR::IProto::Cluster::Server>.

=cut

no Mouse;
__PACKAGE__->meta->make_immutable();

1;