package AnyEvent::PgRecvlogical;
$AnyEvent::PgRecvlogical::VERSION = '1.04';
# ABSTRACT: perl port of pg_recvlogical

=pod

=head1 NAME

AnyEvent::PgRecvlogical - perl port of pg_recvlogical

=for html
<a href="https://travis-ci.org/mydimension/AnyEvent-PgRecvlogical"><img src="https://travis-ci.org/mydimension/AnyEvent-PgRecvlogical.svg?branch=master" /></a>
<a href='https://coveralls.io/github/mydimension/AnyEvent-PgRecvlogical?branch=master'><img src='https://coveralls.io/repos/github/mydimension/AnyEvent-PgRecvlogical/badge.svg?branch=master' alt='Coverage Status' /></a>
<a href="https://badge.fury.io/pl/AnyEvent-PgRecvlogical"><img src="https://badge.fury.io/pl/AnyEvent-PgRecvlogical.svg" alt="CPAN version" height="18"></a>

=head1 SYNOPSIS

    use AnyEvent::PgRecvlogical;

    my $recv = AnyEvent::PgRecvlogical->new(
        dbname     => 'mydb',
        slot       => 'myreplslot',
        on_message => sub {
            my ($record, $guard) = @_;

            process($record);

            undef $guard; # declare done with $record
        }
    );

    $recv->start;

=head1 DESCRIPTION

C<AnyEvent::PgRecvlogical> provides perl bindings of similar functionality to that of
L<pg_recvlogical|https://www.postgresql.org/docs/current/static/app-pgrecvlogical.html>.
The reasoning being that C<pg_recvlogical> does afford the consuming process the opportunity to emit feedback to
PostgreSQL. This results is potentially being sent more data than you can handle in a timely fashion.

=cut

use Moo;
use DBI;
use DBD::Pg 3.7.0 ':async';
use AnyEvent;
use AnyEvent::Util 'guard';
use Promises 0.99 backend => ['AnyEvent'], qw(deferred);
use Types::Standard ':all';
use Try::Tiny;
use Carp 'croak';
use curry;

use constant {
    AWAIT_INTERVAL    => 1,
    USECS             => 1_000_000,
    PG_MIN_VERSION    => 9_04_00,
    PG_MIN_NOEXPORT   => 10_00_00,
    PG_STATE_DUPEOBJ  => '42710',
    PG_EPOCH_DELTA    => 946_684_800,
    XLOGDATA          => 'Aq>3a*',
    PRIMARY_HEARTBEAT => 'Aq>2b',
    STANDBY_HEARTBEAT => 'Aq>4b',
};

use namespace::clean;

my $DBH = (InstanceOf ['DBI::db'])->create_child_type(
    constraint => sub {
        $_->{Driver}->{Name} eq 'Pg'
          and $_->{pg_server_version} >= PG_MIN_VERSION
          and $_->{Name} =~ /replication=/;
    },
    message => sub {
        my $parent_check = (InstanceOf ['DBI::db'])->validate($_);
        return $parent_check if $parent_check;
        return "$_ is not a DBD::Pg handle" unless $_->{Driver}->{Name} eq 'Pg';
        return "$_ is connected to an old postgres version ($_->{pg_server_version} < 9.4.0)"
          unless $_->{pg_server_version} >= PG_MIN_VERSION;
        return "$_ is not a replication connection: $_->{Name}" unless $_->{Name} =~ /replication=/;
    }
);

my $LSNStr = Str->where(sub { m{[0-9A-F]{1,8}/[0-9A-F]{1,8}} })
  ->plus_coercions(Int() => sub { sprintf '%X/%X', (($_ >> 32) & 0xffff_ffff), ($_ & 0xffff_ffff) });

my $LSN = Int->plus_coercions(
    $LSNStr => sub {
        my ($h, $l) = map { hex } split m{/}; ($h << 32) | $l;
    }
);

=head1 ATTRIBUTES

=over

=item C<dbname>

=over

=item L<Str|Types::Standard/Str>, Required

=back

Database name to connect to.

=item C<slot>

=over

=item L<Str|Types::Standard/Str>, Required

=back

Name of the replication slot to use (and/or create, see L</do_create_slot> and L</slot_exists_ok>)

=item C<host>

=over

=item L<Str|Types::Standard/Str>

=back

=item C<port>

=over

=item L<Int|Types::Standard/SInt>

=back

=item C<username>

=over

=item L<Str|Types::Standard/Str>

=back

=item C<password>

=over

=item L<Str|Types::Standard/Str>

=back

Standard PostgreSQL connection parameters, see L<DBD::Pg/connect>.

=item C<do_create_slot>

=over

=item L<Bool|Types::Standard/Bool>, Default: C<0>

=back

If true, the L</slot> will be be created upon connection. Otherwise, it's assumed it already exists. If it does not,
PostgreSQL will raise an exception.

=item C<slot_exists_ok>

=over

=item L<Bool|Types::Standard/Bool>, Default: C<0>

=back

If true, and if L</do_create_slot> is also true, then no exception will be raised if the L</slot> already exists.
Otherwise, one will be raised.

=item C<reconnect>

=over

=item L<Bool|Types::Standard/Bool>, Default: C<1>

=back

If true, will attempt to reconnect to the server and resume logical replication in the event the connection fails.
Otherwise, the connection will gracefully be allowed to close.

=item C<reconnect_delay>

=over

=item L<Int|Types::Standard/Int>, Default: C<5>

=back

Time, in seconds, to wait before reconnecting.

=item C<reconnect_limit>

=over

=item L<Int|Types::Standard/Int>, Default: C<1>

=back

Number of times to attempt reconnecting. If this limit is exceded, an exception will be thrown.

=item C<heartbeat>

=over

=item L<Int|Types::Standard/Int>, Default: C<10>

=back

Interval, in seconds, to report our progress to the PostgreSQL server.

=item C<plugin>

=over

=item L<Str|Types::Standard/Str>, Default: L<test_decoding|https://www.postgresql.org/docs/current/static/test-decoding.html>

=back

The server-sider plugin used to decode the WAL file before being sent to this connection. Only required when
L</create_slot> is true.

=item C<options>

=over

=item L<HashRef|Types::Standard/HashRef>, Default: C<{}>

=back

Key-value pairs sent to the server-side L</plugin>. Keys with a value of C<undef> are sent as the keyword only.

=item C<startpos>

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>

=back

Start replication from the given LSN. Also accepts the integer form, but that is considered advanced usage.

=item C<received_lsn>

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only

=back

Holds the last LSN position received from the server.

=item C<flushed_lsn>

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only

=back

Holds the last LSN signaled to handled by the client (see: L</on_message>)

=item C<on_error>

=over

=item L<CodeRef|Types::Standard/CodeRef>, Default: L<croak|Carp/croak>

=back

Callback in the event of an error.

=item C<on_message>

=over

=item L<CodeRef|Types::Standard/CodeRef>, Required

=back

Callback to receive the replication payload from the server. This is the raw output from the L</plugin>.

The callback is passed the C<$payload> received and a C<$guard> object. Hang onto the C<$guard> until you have handled
the payload. Once it is released, the server will be informed that the WAL position has been "flushed."

=back

=cut

has dbname   => (is => 'ro', isa => Str, required  => 1);
has host     => (is => 'ro', isa => Str, predicate => 1);
has port     => (is => 'ro', isa => Int, predicate => 1);
has username => (is => 'ro', isa => Str, default   => q{});
has password => (is => 'ro', isa => Str, default   => q{});
has slot     => (is => 'ro', isa => Str, required  => 1);

has dbh                => (is => 'lazy', isa => $DBH, clearer => 1, init_arg => undef);
has do_create_slot     => (is => 'ro',   isa => Bool,    default   => 0);
has slot_exists_ok     => (is => 'ro',   isa => Bool,    default   => 0);
has reconnect          => (is => 'ro',   isa => Bool,    default   => 1);
has reconnect_delay    => (is => 'ro',   isa => Int,     default   => 5);
has reconnect_limit    => (is => 'ro',   isa => Int,     predicate => 1);
has _reconnect_counter => (is => 'rw',   isa => Int,     default   => 0);
has heartbeat          => (is => 'ro',   isa => Int,     default   => 10);
has plugin             => (is => 'ro',   isa => Str,     default   => 'test_decoding');
has options            => (is => 'ro',   isa => HashRef, default   => sub { {} });
has startpos           => (is => 'rwp',  isa => $LSN, default => 0, coerce  => 1);
has received_lsn       => (is => 'rwp',  isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has flushed_lsn        => (is => 'rwp',  isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);

has on_message => (is => 'ro', isa => CodeRef, required => 1);
has on_error   => (is => 'ro', isa => CodeRef, default  => sub { \&croak });

has _fh_watch => (is => 'lazy', isa => Ref, clearer => 1, predicate => 1);
has _timer => (is => 'lazy', isa => Ref, clearer => 1);

=head1 CONSTRUCTOR

All the L</"ATTRIBUTES"> above are accepted by the constructor, with a few exceptions:

L</"received_lsn"> and L<"flushed_lsn"> are read-only and not accepted by the constructor.

L</"dbname">, L</"slot"> and L</"on_message"> are required.

Note, that logical replication will not automatically commence upon construction. One must call L</"start"> first.

=cut

sub _dsn {
    my $self = shift;

    my %dsn = (replication => 'database', client_encoding => 'sql_ascii');
    foreach (qw(host port dbname)) {
        my $x = "has_$_";
        next if $self->can($x) and not $self->$x;

        $dsn{$_} = $self->$_;
    }

    return 'dbi:Pg:' . join q{;}, map { "$_=$dsn{$_}" } sort keys %dsn;
}

sub _build_dbh {
    my $self = shift;
    my $dbh  = DBI->connect($self->_dsn, $self->username, $self->password, { PrintError => 0 },);

    croak $DBI::errstr unless $dbh;

    return $dbh;
}

sub _build__fh_watch {
    my $self = shift;

    my $w = AE::io $self->dbh->{pg_socket}, 0, $self->curry::weak::_read_copydata;
    if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
        $w->priority($w->priority - 1);    # be a little less aggressive
    }

    return $w;
}

sub _build__timer {
    my $self = shift;
    if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
        my $w = EV::periodic(0, $self->heartbeat, 0, $self->curry::weak::_heartbeat);
        $w->priority(&EV::MAXPRI);
        return $w;
    } else {
        return AE::timer $self->heartbeat, $self->heartbeat, $self->curry::weak::_heartbeat;
    }
}

=head1 METHODS

All L</"ATTRIBUTES"> are also accesible via methods. They are all read-only.

=over

=item start

Initialize the logical replication process asyncronously and return immediately. This performs the following steps:

=over

=item 1. L</"identify_system">

=item 2. L</"create_slot"> (if requested)

=item 3. L</"start_replication">

=item 4. heartbeat timer

=back

This method wraps the above steps for convenience. Should you desire to modify the
L<replication startup protocol|https://www.postgresql.org/docs/current/static/protocol-replication.html> (which you
shouldn't), the methods are described in detail below.

Returns: L<Promises::Promise>

=cut

sub start {
    my $self = shift;

    $self->_post_init(
        deferred {
            shift->chain($self->curry::identify_system, $self->curry::create_slot, $self->curry::start_replication);
        }
    );
}

sub _post_init {
    my ($self, $d) = @_;

    return $d->then(
        sub {
            $self->_fh_watch;
            $self->_timer;
        },
        $self->on_error,
    );
}

=item identify_system

Issues the C<IDENTIFY_SYSTEM> command to the server to put the connection in repliction mode.

Returns: L<Promises::Promise>

=cut

sub identify_system {
    my $self = shift;
    $self->dbh->do('IDENTIFY_SYSTEM', { pg_async => PG_ASYNC });
    return _async_await($self->dbh)->catch(
        sub {
            my @error = @_;
            unshift @error, $DBI::errstr if $DBI::errstr;

            croak @error;
        }
    );
}

=item create_slot

Issues the appropriate C<CREATE_REPLICATION_SLOT> command to the server, if requested.

Returns: L<Promises::Promise>

=cut

sub create_slot {
    my $self = shift;

    return deferred->resolve unless $self->do_create_slot;

    my $dbh = $self->dbh;
    $dbh->do(
        sprintf(
            'CREATE_REPLICATION_SLOT %s LOGICAL %s%s',
            $dbh->quote_identifier($self->slot),
            $dbh->quote_identifier($self->plugin),
            ($dbh->{pg_server_version} >= PG_MIN_NOEXPORT ? ' NOEXPORT_SNAPSHOT' : '')    # uncoverable branch true
        ),
        { pg_async => PG_ASYNC }
    );

    return _async_await($dbh)->catch(
        sub {
            croak @_ unless $dbh->state eq PG_STATE_DUPEOBJ and $self->slot_exists_ok;
        }
    );
}

sub _option_string {
    my $self = shift;

    my @opts;
    while (my ($k, $v) = each %{ $self->options }) {
        push @opts, $self->dbh->quote_identifier($k);
        defined $v and $opts[-1] .= sprintf ' %s', $self->dbh->quote($v);    # uncoverable branch false
    }

    return @opts ? sprintf('(%s)', join q{, }, @opts) : q{};                 # uncoverable branch false
}

=item start_replication

Issues the C<START_REPLICATION SLOT> command and immediately returns. The connection will then start receiving
logical replication payloads.

=cut

sub start_replication {
    my $self = shift;

    $self->dbh->do(
        sprintf(
            'START_REPLICATION SLOT %s LOGICAL %s%s',
            $self->dbh->quote_identifier($self->slot),
            $LSNStr->coerce($self->startpos),
            $self->_option_string
        )
    );
}

=item pause

Pauses reading from the database. Useful for throttling the inbound flow of data so as to not overwhelm your
application. It is safe, albeit redundant, to call this method multiple time in a row without unpausing.

=cut

sub pause { shift->_clear_fh_watch; return; }

=item unpause

Resume reading from the database. After a successful L</pause>, this will pick right back reciving data and sending it
to the provided L</callback>. It is safe, albeit redundant, to call this method multiple time in a row without pausing.

=cut

sub unpause { shift->_fh_watch; return; }

=item is_paused

Returns the current pause state.

Returns: boolean

=cut

sub is_paused { return !shift->_has_fh_watch }

sub _read_copydata {
    my $self = shift;

    my ($n, $msg);
    my $ok = try {
        $n = $self->dbh->pg_getcopydata_async($msg);
        1;
    } catch {
        # uncoverable statement count:2
        AE::postpone { $self->_handle_disconnect };
        0;
    };

    # exception thrown, going to reconnect
    return unless $ok;    # uncoverable branch true

    # nothing waiting
    # watcher will re-enter until $n == 0
    return if $n == 0;

    if ($n == -1) {
        AE::postpone { $self->_handle_disconnect };
        return;
    }

    # uncoverable branch true
    if ($n == -2) {
        # error reading
        # uncoverable statement
        $self->on_error->('could not read COPY data: ' . $self->dbh->errstr);
    }

    my $type = substr $msg, 0, 1;

    if ('k' eq $type) {
        # server keepalive
        my (undef, $lsnpos, $ts, $reply) = unpack PRIMARY_HEARTBEAT, $msg;

        $self->_set_received_lsn($lsnpos) if $lsnpos > $self->received_lsn;

        # only interested in the request-reply bit
        # uncoverable branch true
        if ($reply) {
            # uncoverable statement
            AE::postpone { $self->_heartbeat };
        }

        # an inbound heartbeat is proof enough of successful reconnect
        $self->_reconnect_counter(0) if $self->_reconnect_counter;

        return;
    }

    # uncoverable branch true
    unless ('w' eq $type) {
        # uncoverable statement
        $self->on_error->("unrecognized streaming header: '$type'");
        return;
    }

    my (undef, $startlsn, $endlsn, $ts, $record) = unpack XLOGDATA, $msg;

    $self->_set_received_lsn($startlsn) if $startlsn > $self->received_lsn;

    my $guard = $self->$curry::weak(
        sub {
            my $self = shift;
            $self->_set_flushed_lsn($startlsn) if $startlsn > $self->flushed_lsn;
        }
    );

    $self->on_message->($record, guard(\&$guard));

    return;
}

=item stop

Stop receiving replication payloads and disconnect from the PostgreSQL server.

=back

=cut

sub stop {
    my $self = shift;

    $self->_clear_fh_watch;
    $self->_clear_timer;
    $self->clear_dbh;
}

sub _handle_disconnect {
    my $self = shift;

    $self->stop;

    return unless $self->reconnect;

    if (    $self->has_reconnect_limit
        and $self->_reconnect_counter($self->_reconnect_counter + 1) > $self->reconnect_limit) {
        $self->on_error->('reconnect limit reached: ' . $self->reconnect_limit);
        return;
    }

    $self->_set_startpos($self->flushed_lsn);
    $self->clear_received_lsn;
    $self->clear_flushed_lsn;

    my $w; $w = AE::timer $self->reconnect_delay, 0, sub {
        undef $w;
        $self->_post_init(deferred { $self->start_replication });
    };
}

sub _heartbeat {
    my ($self, $req_reply) = @_;
    $req_reply = !!$req_reply || 0;    #uncoverable condition right

    my $status = pack STANDBY_HEARTBEAT, 'r',     # receiver status update
      $self->received_lsn,                        # last WAL received
      $self->flushed_lsn,                         # last WAL flushed
      0,                                          # last WAL applied
      int((AE::now - PG_EPOCH_DELTA) * USECS),    # ms since 2000-01-01
      $req_reply;                                 # request heartbeat

    $self->dbh->pg_putcopydata($status);
}

sub _async_await {
    my ($dbh) = @_;

    my $d = deferred;

    # no async operation in progress
    return $d->reject if $dbh->{pg_async_status} == 0;    # uncoverable branch true

    my $w; $w = AE::timer 0, AWAIT_INTERVAL, sub {
        return unless $dbh->pg_ready;
        try {
            $d->resolve($dbh->pg_result);
        } catch {
            $d->reject($_);
        };
        undef $w;
    };

    return $d->promise;
}

=head1 AUTHOR

William Cox (cpan:MYDMNSN) <mydimension@gmail.com>

=head1 COPYRIGHT

Copyright (c) 2017-2018 William Cox

=head1 LICENSE

This library is free software and may be distributed under the same terms as perl itself.
See L<http://dev.perl.org/licenses/>.

=cut

1;