package Database::Async;
# ABSTRACT: database interface for use with IO::Async
use strict;
use warnings;

our $VERSION = '0.019';

use parent qw(Database::Async::DB IO::Async::Notifier);

=head1 NAME

Database::Async - provides a database abstraction layer for L<IO::Async>

=head1 SYNOPSIS

 # Just looking up one thing?
 my ($id) = $db->query(
  q{select id from some_table where name = ?},
  bind => ['some name']
 )->single
  # This is an example, so we want the result immediately - in
  # real async code, you'd rarely call Future->get, but would
  # typically use `->then` or `->on_done` instead
  ->get;
 # or, with Future::AsyncAwait, try:
 my ($id) = await $db->query(
  q{select id from some_table where name = ?},
  bind => ['some name']
 )->single;

 # Simple query
 $db->query(q{select id, some_data from some_table})
    ->row_hashrefs
    ->each(sub {
        printf "ID %d, data %s\n", $_->{id}, $_->{some_data};
    })
    # If you want to complete the full query, don't forget to call
    # ->get or ->retain here!
    ->retain;

 # Transactions
 $db->transaction(sub {
  my ($tx) = @_;
 })->commit
  # This returns a Future, so if you want to wait for it to complete,
  # call `->get` (throws an exception if something goes wrong)
  # or `->await` (just waits for it to succeed or fail, but ignores
  # the result).
 ->get;

=head1 DESCRIPTION

Database support for L<IO::Async>. This is the base API, see L<Database::Async::Engine>
and subclasses for specific database functionality.

B<This is an early preview release>.

L<DBI> provides a basic API for interacting with a database, but this is
very low level and uses a synchronous design. See L<DBIx::Async> if you're
familiar with L<DBI> and want an interface that follows it more closely.

Typically a database only allows a single query to run at a time.
Other queries will be queued.

Set up a pool of connections to provide better parallelism:

    my $dbh = Database::Async->new(
        uri  => 'postgresql://write@maindb/dbname?sslmode=require',
        pool => {
            max => 4,
        },
    );

Queries and transactions will then automatically be distributed
among these connections. However, note that:

=over 4

=item * all queries within a transaction will be made on the same connection

=item * ordering guarantees are weaker: queries will be started in
order on the next available connection

=back

With a single connection, you could expect:

    Future->needs_all(
     $dbh->do(q{insert into x ...}),
     $dbh->do(q{select from x ...})
    );

to insert the rows first, then return them in the C<select> call. B<With a pool of connections, that's not guaranteed>.

=head2 Pool configuration

The following parameters are currently accepted for defining the pool:

=over 4

=item * C<min> - minimum number of total connections to maintain, defaults to 0

=item * C<max> - maximum permitted active connections, default is 1

=item * C<ordering> - how to iterate through the available URIs, options include
C<random> and C<serial> (default, round-robin behaviour).

=item * C<backoff> - algorithm for managing connection timeouts or failures. The default
is an exponential backoff with 10ms initial delay, 30s maximum, resetting on successful
connection.

=back

See L<Database::Async::Pool> for more details.

=head2 DBI

The interface is not the same as L<DBI>, but here are some approximate equivalents for
common patterns:

=head3 selectall_hashref

In L<DBI>:

 print $_->{id} . "\n" for
  $dbh->selectall_hashref(
   q{select * from something where id = ?},
   undef,
   $id
  )->@*;

In L<Database::Async>:

 print $_->{id} . "\n" for
  $db->query(
   q{select * from something where id = ?},
   bind => [
    $id
   ])->row_hashrefs
     ->as_arrayref
     ->@*

In L<DBI>:

 my $sth = $dbh->prepare(q{select * from something where id = ?});
 for my $id (1, 2, 3) {
  $sth->bind(0, $id, 'bigint');
  $sth->execute;
  while(my $row = $sth->fetchrow_hashref) {
   print $row->{name} . "\n";
  }
 }

In L<Database::Async>:

 my $sth = $db->prepare(q{select * from something where id = ?});
 (Future::Utils::fmap_void  {
  my ($id) = @_;
  $sth->bind(0, $id, 'bigint')
   ->then(sub { $sth->execute })
   ->then(sub {
    $sth->row_hashrefs
     ->each(sub {
      print $_->{name} . "\n";
     })->completed
   })
 } foreach => [1, 2, 3 ])->get;

=cut

use mro;
no indirect;

use Future::AsyncAwait qw(:experimental);
use Syntax::Keyword::Try;

use URI;
use URI::db;
use Module::Load ();
use Scalar::Util qw(blessed);

use Database::Async::Engine;
use Database::Async::Pool;
use Database::Async::Query;
use Database::Async::StatementHandle;
use Database::Async::Transaction;

use Log::Any qw($log);

=head1 METHODS

=cut

=head2 transaction

Resolves to a L<Future> which will yield a L<Database::Async::Transaction>
instance once ready.

=cut

async sub transaction {
    my ($self, @args) = @_;
    Scalar::Util::weaken(
        $self->{transactions}[@{$self->{transactions}}] =
            my $txn = Database::Async::Transaction->new(
                database => $self,
                @args
            )
    );
    await $txn->begin;
    return $txn;
}

=head2 txn

Executes code within a transaction. This is meant as a shorter form of
the common idiom

 $db->transaction
    ->then(sub {
     my ($txn) = @_;
     Future->call($code)
      ->then(sub {
       $txn->commit
      })->on_fail(sub {
       $txn->rollback
      });
    })

The code must return a L<Future>, and the transaction will only be committed
if that L<Future> resolves cleanly.

Returns a L<Future> which resolves once the transaction is committed.

=cut

async sub txn {
    my ($self, $code, @args) = @_;
    my $txn = await $self->transaction;
    try {
        my @data = await Future->call(
            $code => ($txn, @args)
        );
        await $txn->commit;
        return @data;
    } catch {
        my $exception = $@;
        try {
            await $txn->rollback;
        } catch {
            $log->warnf("exception %s in rollback", $@);
        }
        die $exception;
    }
}

=head1 METHODS - Internal

You're welcome to call these, but they're mostly intended
for internal usage, and the API B<may> change in future versions.

=cut

=head2 uri

Returns the configured L<URI> for populating database instances.

=cut

sub uri { shift->{uri} }

=head2 pool

Returns the L<Database::Async::Pool> instance.

=cut

sub pool {
    my ($self) = @_;
    $self->{pool} ||= Database::Async::Pool->new(
        $self->pool_args
    );
}

=head2 pool_args

Returns a list of standard pool constructor arguments.

=cut

sub pool_args {
    my ($self) = @_;
    return (
        request_engine => $self->curry::weak::request_engine,
        uri            => $self->uri,
    );
}

=head2 configure

Applies configuration, see L<IO::Async::Notifier> for details.

Supports the following named parameters:

=over 4

=item * C<uri> - the endpoint to use when connecting a new engine instance

=item * C<engine> - the parameters to pass when instantiating a new L<Database::Async::Engine>

=item * C<pool> - parameters for setting up the pool, or a L<Database::Async::Pool> instance

=item * C<encoding> - default encoding to apply to parameters, queries and results, defaults to C<binary>

=back

=cut

my %encoding_map = (
    'utf8'    => 'UTF-8',
    'utf-8'   => 'UTF-8',
    'UTF8'    => 'UTF-8',
    'unicode' => 'UTF-8',
);

sub configure {
    my ($self, %args) = @_;

    if(my $encoding = delete $args{encoding}) {
        $self->{encoding} = $encoding_map{$encoding} // $encoding;
    }

    if(my $uri = delete $args{uri}) {
        # This could be any type of object. We make
        # the assumption here that it safely serialises
        # to a standard URI. Some of the database
        # engines provide such a standard (e.g. PostgreSQL).
        # Others may not...
        $self->{uri} = URI->new("$uri");
    }
    if(exists $args{engine}) {
        $self->{engine_parameters} = delete $args{engine};
    }
    if(exists $args{type}) {
        $self->{type} = delete $args{type};
    }
    if(my $pool = delete $args{pool}) {
        if(blessed $pool) {
            $self->{pool} = $pool;
        } else {
            $self->{pool} = Database::Async::Pool->new(
                $self->pool_args,
                %$pool,
            );
        }
    }
    $self->next::method(%args);
}

sub encoding { shift->{encoding} }

=head2 ryu

A L<Ryu::Async> instance, used for requesting sources, sinks and timers.

=cut

sub ryu {
    my ($self) = @_;
    $self->{ryu} //= do {
        $self->add_child(
            my $ryu = Ryu::Async->new
        );
        $ryu
    }
}

=head2 new_source

Instantiates a new L<Ryu::Source>.

=cut

sub new_source { shift->ryu->source }

=head2 new_sink

Instantiates a new L<Ryu::Sink>.

=cut

sub new_sink { shift->ryu->sink }

=head2 new_future

Instantiates a new L<Future>.

=cut

sub new_future { shift->loop->new_future }

=head1 METHODS - Internal, engine-related

=cut

=head2 request_engine

Attempts to instantiate and connect to a new L<Database::Async::Engine>
subclass. Returns a L<Future> which should resolve to a new
L<Database::Async::Engine> instance when ready to use.

=cut

async sub request_engine {
    my ($self) = @_;
    $log->tracef('Requesting new engine');
    my $engine = $self->engine_instance;
    $log->tracef('Connecting');
    return await $engine->connect;
}

=head2 engine_instance

Loads the appropriate engine class and attaches to the loop.

=cut

sub engine_instance {
    my ($self) = @_;
    my $uri = $self->uri;
    my $type = $self->{type} // $uri->scheme;
    die 'unknown database type ' . $type
        unless my $engine_class = $Database::Async::Engine::ENGINE_MAP{$type};
    Module::Load::load($engine_class) unless $engine_class->can('new');
    $log->tracef('Instantiating new %s', $engine_class);
    my %param = (
        %{$self->{engine_parameters} || {}},
        (defined($uri) ? (uri => $uri) : ()),
        db => $self,
    );

    # Only recent engine versions support this parameter
    if(my $encoding = $self->encoding) {
        if($engine_class->can('encoding')) {
            $param{encoding} = $self->encoding;
        } else {
            # If we're given this parameter, let's not ignore it silently
            die 'Database engine ' . $engine_class . ' does not support encoding parameter, try upgrading that module from CPAN or remove the encoding configuration in Database::Async';
        }
    }

    $self->add_child(
        my $engine = $engine_class->new(%param)
    );
    $engine;
}

=head2 engine_ready

Called by L<Database::Async::Engine> instances when the engine is
ready for queries.

=cut

sub engine_ready {
    my ($self, $engine) = @_;
    $self->pool->queue_ready_engine($engine);
}

sub engine_disconnected {
    my ($self, $engine) = @_;
    $self->pool->unregister_engine($engine);
}

sub db { shift }

=head2 queue_query

Assign the given query to the next available engine instance.

=cut

async sub queue_query {
    my ($self, $query) = @_;
    $log->tracef('Queuing query %s', $query);
    my $f = $self->pool->next_engine;
    CANCEL { $f->cancel; return undef }
    my $engine = await $f;
    $log->tracef('Query %s about to run on %s', $query, $engine);
    my $q = $engine->handle_query($query);
    CANCEL { $q->cancel; return undef }
    return await $q;
}

sub diagnostics {
    my ($self) = @_;
}

sub notification {
    my ($self, $engine, $channel, $data) = @_;
    $log->tracef('Database notifies us via %s of %s', $channel, $data);
    $self->notification_source($channel)->emit($data);
}

sub notification_source {
    my ($self, $name) = @_;
    $self->{notification_source}{$name} //= $self->new_source;
}

sub _add_to_loop {
    my ($self, $loop) = @_;
    $self->add_child(
        $self->pool
    );
    return;
}

sub _remove_from_loop {
    my ($self, $loop) = @_;
    if($self->{ryu}) {
        $self->remove_child(delete $self->{ryu});
    }
    if($self->{pool}) {
        $self->remove_child(delete $self->{pool});
    }
    return $self->next::method($loop);
}

1;

__END__

=head1 SEE ALSO

There's a range of options for interacting with databases - at a low level:

=over 4

=item * L<DBIx::Async> - runs L<DBI> in subprocesses, very inefficient but tries to
make all the methods behave a bit like DBI but deferring results via L<Future>s.

=item * L<DBI> - synchronous database access

=item * L<Mojo::Pg> - attaches a L<DBD::Pg> handle to an event loop

=item * L<Mojo::mysql> - apparently has the ability to make MySQL "fun", an intriguing
prospect indeed

=back

and at higher levels, L<DBIx::Class> or one of the many other ORMs might be
worth a look. Nearly all of those will use L<DBI> in some form or other.
Several years ago I put together a list, the options have doubtless multiplied
since then:

=head2 Asynchronous ORMs

The list here is sadly lacking:

=over 4

=item * L<Async::ORM|https://github.com/vti/async-orm> - asynchronous ORM, see also article in L<http://showmetheco.de/articles/2010/1/mojolicious-async-orm-and-dbslayer.html>

=back

=head2 Synchronous ORMs

If you're happy for the database to tie up your process for an indefinite amount of time, you're in
luck - there's a nice long list of modules to choose from here:

=over 4

=item * L<DBIx::Class> - one of the more popular choices

=item * L<Rose::DB::Object> - written for speed, appears to cover most of the usual requirements, personally
I found the API less intuitive than other options but it appears to be widely deployed

=item * L<Fey::ORM> - "newer" than the other options, also appears to be reasonably flexible

=item * L<DBIx::DataModel> - UML-based Object-Relational Mapping (ORM) framework

=item * L<Alzabo> - another ORM which includes features such as GUI schema editing and SQL diff

=item * L<Class::DBI> - generally considered to be superceded by L<DBIx::Class>, which provides a compatibility
layer for existing applications

=item * L<Class::DBI::Lite> - like L<Class::DBI> but lighter, presumably

=item * L<ORMesque> - lightweight class-based ORM using L<SQL::Abstract>

=item * L<Oryx> - Object persistence framework, meta-model based with support for both DBM and regular RDBMS
backends, uses tied hashes and arrays

=item * L<Tangram> - An object persistence layer

=item * L<KiokuDB> - described as an "Object Graph storage engine" rather than an ORM

=item * L<DBIx::DataModel> - ORM using UML definitions

=item * L<Jifty::DBI> - another ORM

=item * L<ORLite> - minimal SQLite-based ORM

=item * L<Ormlette> - object persistence, "heavily influenced by Adam Kennedy's L<ORLite>". "light and fluffy", apparently!

=item * L<ObjectDB> - another lightweight ORM, currently has only L<DBI> as a dependency

=item * L<ORM> - looks like it has support for MySQL, PostgreSQL and SQLite

=item * L<fytwORM> - described as a "bare minimum ORM used for prototyping / proof of concepts"

=item * L<DBR> - Database Repository ORM

=item * L<SweetPea::Application::Orm> - specific to the L<SweetPea> web framework

=item * L<Jorge> - ORM Made simple

=item * L<Persistence::ORM> - looks like a combination between persistent Perl objects and standard ORM

=item * L<Teng> - lightweight minimal ORM

=item * L<Class::orMapper> - DBI-based "easy O/R Mapper"

=item * L<UR|https://github.com/genome/UR> - class framework and object/relational mapper (ORM) for Perl

=item * L<DBIx::NinjaORM> - "Flexible Perl ORM for easy transitions from inline SQL to objects"

=item * L<DBIx::Oro> - Simple Relational Database Accessor

=item * L<LittleORM> - Moose-based ORM

=item * L<Storm> - another Moose-based ORM

=item * L<DBIx::Mint> - "A mostly class-based ORM for Perl"

=back

=head2 Database interaction

=over 4

=item * L<DBI::Easy> - seems to be a wrapper around L<DBI>

=item * L<AnyData> - interface between L<DBI> and arbitrary data sources such as XML or HTML

=item * L<DBIx::ThinSQL> - helpers for SQL statements

=item * L<DB::Evented> - event-based wrapper for L<DBI>-like behaviour, uses L<AnyEvent::DBI>

=back

=head1 AUTHOR

Tom Molesworth C<< <TEAM@cpan.org> >>

=head1 LICENSE

Copyright Tom Molesworth 2011-2023. Licensed under the same terms as Perl itself.