package DBD::Crate;
use strict;
use DBI;
use HTTP::Tiny;
use JSON::MaybeXS;
use vars qw($VERSION $REVISION);
use vars qw($err $errstr $state $drh);
$VERSION = "0.0.3";

$err     = 0;
$errstr  = "";
$state   = "";
$drh     = undef;
my $methods_are_installed = 0;
my ($HTTP, $JSON);

sub driver {
    return $drh if $drh;
    my ($class, $attr) = @_;
    $class .= "::dr";
    $drh = DBI::_new_drh($class, {
        'Name'        => 'Crate',
        'Version'     => $VERSION,
        'Err'         => \$err,
        'Errstr'      => \$errstr,
        'State'       => \$state,
        'Attribution' => 'DBD::Crate by Mamod Mehyar',
        'AutoCommit'  => 1
    }) or return undef;
    return $drh;
}

sub http { $HTTP }
sub json { $JSON }

#====================================================================
# DBD::Crate::dr
#====================================================================
package DBD::Crate::dr; {
    use strict;
    use DBI qw(:sql_types);
    use vars qw($imp_data_size);
    use Carp qw(carp croak);
    use Data::Dumper;
    use DBI;

    $imp_data_size = 0;

    sub connect {
        my ($drh, $dburl, $user, $pass, $attr) = @_;
        my $UTF8 = defined $attr->{utf8} ?
                        $attr->{utf8} : 1;

        $JSON = JSON::MaybeXS->new({ utf8 => $UTF8 });
        $HTTP = HTTP::Tiny->new( keep_alive => 1 );

        my @addresses = ($dburl);
        my @addr;
        if ($dburl =~ s/^\[(.*?)\]$/$1/){
            @addresses = split ',', $dburl;
        }

        foreach my $addr (@addresses){
            $addr =~ s/\s+//;
            if (!$addr){
                $addr = 'http://localhost:4200';
            }

            if ($addr !~ /^http/){
                $addr = 'http://' . $addr;
            }

            if ($user || $pass){
                my $auth = ($user || '') . ':' . ($pass || '');
                $addr =~ s/^(http(?:.)?:\/\/)(.*?)/$1$auth\@$2/;
            }
            push @addr, $addr;
        }

        my ($t, $dbh) = DBI::_new_dbh($drh, {
            'Name'   => \@addr
        });

        return $dbh;
    }

    sub data_sources { return "Cratedb" }
    sub disconnect_all { 1 }
};

#====================================================================
# DBD::Crate::db
#====================================================================
package DBD::Crate::db; {
    use strict;
    use base qw(DBD::_::db);
    use vars qw($imp_data_size);
    use Data::Dumper;
    use DBI;
    use Digest::SHA1  qw(sha1_hex);

    $imp_data_size = 0;

    sub prepare {
        my ($dbh, $statement, @attr) = @_;
        my $sth = DBI::_new_sth($dbh, {
            'Statement'      => $statement,
            'ConnectionHOST' => $dbh->{Name}
        });
        return $sth;
    }

    #=============================================================
    # blob methods
    #=============================================================
    sub crate_blob_insert {
        my ($dbh, $table, $digest, $content) = @_;

        if (!$content){
            $content = $digest;
            $digest = sha1_hex($content);
        }

        my $path = "/_blobs/$table/" . $digest;
        my $sth = DBI::_new_sth($dbh, {
            'REQUEST_PATH'   => $path,
            'REQUEST_METHOD' => 'PUT',
            'Statement'      => $content,
            'ConnectionHOST' => $dbh->{Name},
            'BLOB'           => 1,
            'DIGEST'         => $digest
        });

        return $sth->execute();
    };

    sub crate_blob_get {
        my ($dbh, $table, $digest) = @_;

        if (!$digest){
            $dbh->set_err(-1, "BLOB sha1 digest required");
            return;
        }

        $digest ||= '';

        my $path = "/_blobs/$table/" . $digest;
        my $sth = DBI::_new_sth($dbh, {
            'REQUEST_PATH'   => $path,
            'REQUEST_METHOD' => 'GET',
            'Statement'      => '',
            'ConnectionHOST' => $dbh->{Name},
            'BLOB'           => 1,
            'DIGEST'         => $digest
        });
        return $sth->execute();
    }

    sub crate_blob_delete {
        my ($dbh, $table, $digest) = @_;

        if (!$digest){
            $dbh->set_err(-1, "BLOB sha1 digest required");
            return;
        }

        my $path = "/_blobs/$table/" . $digest;
        my $sth = DBI::_new_sth($dbh, {
            'REQUEST_PATH'   => $path,
            'REQUEST_METHOD' => 'DELETE',
            'Statement'      => '',
            'ConnectionHOST' => $dbh->{Name},
            'BLOB'           => 1,
            'DIGEST'         => $digest
        });

        return $sth->execute();
    }

    #=============================================================
    # table info methods
    #=============================================================
    #return columns information of provided table
    sub crate_table_columns {
        my ($dbh, $table) = @_;
        my $sth = $dbh->prepare(qq~
            select column_name, data_type, ordinal_position
            from information_schema.columns
            where schema_name = 'doc'
            AND table_name = ?
        ~);

        return $dbh->selectall_arrayref( $sth,
            { Slice => {} },
            $table);
    }

    #list all tables with information
    sub crate_tables_list {
        my $dbh = shift;
        my $schema = shift || 'doc';
        my $sth = $dbh->prepare(qq~
            select number_of_replicas, partitioned_by, blobs_path, schema_name,
                   table_name, number_of_shards, clustered_by
                   from information_schema.tables
                   where schema_name = ?
        ~);

        return $dbh->selectall_arrayref( $sth,
            { Slice => {} },
            $schema);
    }

    #get table info
    sub crate_table_info {
        my $dbh   = shift;
        my $table = shift;
        my $sth = $dbh->prepare(qq~
            select number_of_replicas, partitioned_by, blobs_path, schema_name,
                   table_name, number_of_shards, clustered_by
                   from information_schema.tables
                   where schema_name = 'doc'
                   AND table_name = ?
        ~);

        return $dbh->selectrow_hashref( $sth,
            undef,
            $table);
    }

    #==================================================
    # These should be removed once we get crate name
    # space registered with DBI
    #==================================================
    *DBI::db::crate_blob_insert   = \&crate_blob_insert;
    *DBI::db::crate_blob_get      = \&crate_blob_get;
    *DBI::db::crate_blob_delete   = \&crate_blob_delete;
    *DBI::db::crate_table_columns = \&crate_table_columns;
    *DBI::db::crate_tables_list   = \&crate_tables_list;
    *DBI::db::crate_table_info    = \&crate_table_info;
};

#====================================================================
# DBD::Crate::st
#====================================================================
package DBD::Crate::st; {
    use strict;
    use base qw(DBD::_::st);
    use vars qw($imp_data_size);
    use Data::Dumper;
    use DBI;

    $imp_data_size = 0;

    sub _fetch_data {
        my $sth       = shift;
        my $statement = shift;
        my $method    = shift || 'POST';
        my $path      = shift || '/_sql';

        my @hosts = @{ $sth->{ConnectionHOST} };

        TRYAGAIN :
        my $host = shift @hosts;
        my $ret = DBD::Crate::http->request($method, $host . $path, {
            content => $statement,
            # headers => {'Content-Type' => 'application/json'}
        });

        if ( $ret->{status} == 599 && scalar @hosts){
            my $i = 0;
            for (@{ $sth->{ConnectionHOST} }){
                #put failing address to the end of hosts
                #this will work only on persistant environments
                if ( $host eq $_ ){
                    splice @{ $sth->{ConnectionHOST} }, $i, 1;
                    push @{ $sth->{ConnectionHOST} }, $_;
                }
                $i++;
            }
            goto TRYAGAIN;
        }

        if (!$ret->{success}){
            my $olderr = $@;
            my $data = eval { DBD::Crate::json->decode($ret->{content}) };
            $@ = $olderr;

            my $error = ref $data eq 'HASH' && ref $data->{error} ? $data->{error} : {
                code    => ref $data ? $data->{status} : $ret->{status},
                message => ref $data ? $data->{error}  : ($ret->{content} || $ret->{reason})
            };

            $sth->set_err($error->{code}, $error->{message});
            return;
        }

        return $ret;
    }

    sub execute {
        my $sth = shift;
        my $statement = $sth->{Statement};
        my $ret;
        if ($sth->{BLOB}){
            if (!$sth->{DIGEST}){
                $sth->set_err(-1, "BLOB sha1 digest required");
                return;
            }

            $ret = _fetch_data($sth, $statement,
                                $sth->{REQUEST_METHOD},
                                $sth->{REQUEST_PATH}) or return;
        } else {
            my $hash = {stmt => $statement };
            if (@_){ $hash->{args} = \@_; }
            my $json = DBD::Crate::json->encode($hash);
            $ret = _fetch_data($sth, $json, 'POST', '/_sql') or return;
        }

        $sth->{'driver_raw_data'} = $ret->{content};
        if ($sth->{BLOB}){
            if ($ret->{status} == 201){ # put success
                return $sth->{DIGEST};
            } elsif ($ret->{status} == 200){ #get success
                return $ret->{content};
            } elsif ($ret->{status} == 204){ #delete success
                return 1;
            }
        }

        my $olderr = $@;
        my $data = eval { DBD::Crate::json->decode($ret->{content}) };
        if (!$data){
            my $error = $@;
            $@ = $olderr;
            $sth->set_err(-1, $error);
            return;
        }


        $sth->{'driver_data'} = $data->{rows};
        $sth->{'driver_rows'} =  $data->{rowcount};

        $sth->{'NAME'} = $data->{cols};
        $sth->STORE('NUM_OF_FIELDS', scalar @{ $data->{cols} });
        return $data->{rowcount} || '0E0';
    }

    *fetch = \&fetchrow_arrayref;
    sub fetchrow_arrayref {
        my $sth = shift;
        my $data = $sth->FETCH('driver_data');
        my $row = shift @$data or return;
        return $sth->_set_fbav($row);
    }

    sub raw {
        my $sth = shift;
        my $data = $sth->FETCH('driver_raw_data');
        return $data;
    }

    *DBI::st::raw = \&raw;

    #Nothing to close, crate is stateless
    sub close {}
};


1;

__END__

=head1 NAME

DBD::Crate - DBI driver for Crate db

=head1 SYNOPSIS

    use DBI;
    my $dbh = DBI->connect('dbi:Crate:' );
    my $sth = $dbh->prepare( 'SELECT id, content FROM articles WHERE id > 2' );
    my $res = $sth->execute;

    $sth->bind_columns (\my ($id, $content));
    while ($sth->fetch) {
        print "id: $id, content: $content\n";
    }

    print "Toatal ", $res, "\n";

=head1 DESCRIPTION

DBD::Crate is a DBI driver for L<Crate DB|https://Crate.io>, DBD::Crate is still
in early development so any feedback is much appreciated.

=head1 ABOUT CRATE

If you haven't heard of Crate I suggest you to give it a try, it's a is a
distributed data store With SQL query support, and fulltext seach based
on Elasticsearch, please read this L<overview|https://crate.io/overview/>

=head1 Methods

=over

=item B<connect>

    use DBI;

    my $dbh = DBI->connect(DBI:Crate:"); #<- [localhost:4200] defaults
    my $dbh = DBI->connect("DBI:Crate:localhost:5000"); #<-- localhost port 5000
    my $dbh = DBI->connect("DBI:Crate:", '', '', { utf8 => [0|1] });

IP:PORT address of Crate server to which you need to connect to, if not available,
localhost and default port [4200] will be used.

Crate does't have builtin user permissions or ACL concept, but some times you may
use it behind a proxy with user authintication

    my $dsn = DBI->connect("DBI:Crate:123.99.76.3:5000", 'user', 'pass');

This will issue a request with Basic-style user:pass authintication, in the example
folder you can find a plack proxy example with basic authintication, you can also
read Crate post on how to set a proxy behind ngix L<here|https://crate.io/blog/readonly-crate-with-nginx-and-lua/>

B<DBD::Crate> has a simple fail over setup with multi servers

    my $dbh = DBI->connect("DBI:Crate:[localhost:4200, localhost:42001, ...]");

Since Crate DB will handle distributing the job to the best available server for you, we only
implement a simple fail over process, it check if the first connection failed, we will try
the next one and append the failed host to the end of the list.

=item B<raw>

Sometimes you need to get raw json data, maybe to send as jsonp response

    $sth->raw;

The returned data will be of json format as the following

    {"cols" : ["id","content"], "duration" : 0, "rows" : [ [1, "content"], ... ], "rowcount" : 2}

=back

=head1 BLOB Methods

Crate includes support to store binary large objects (L<BLOBS|https://crate.io/docs/stable/blob.html>)

=over

=item B<crate_blob_insert>

    $dbh->crate_blob_insert("blobtable", [sh1-digest], data);

C<crate_blob_insert> accepts three arguments, blob table name, sha1 hex digest of data, and data tp store.
sha1 hex digest argument is optional in which DBD::Crate will create the digest for you

    my $digest = $dbh->crate_blob_insert("blobtable", data);

C<crate_blob_insert> returns the sha1 hex digest of the data stored on success or undef on failure
and set C<$dbh-E<gt>errstr> & C<$dbh-E<gt>err>

=item B<crate_blob_get>

    $dbh->crate_blob_get("blobtable", "sha1 digest");

returns stored data, in C<blobtable> with C<sha1 digest> previously used to store data, on error
returns undef, and set C<$dbh-E<gt>errstr> & C<$dbh-E<gt>err>

=item B<crate_blob_delete>

    $dbh->crate_blob_delete("blobtable", "sha1 digest");

Delete blob, returns true on success, undef on failure and set C<$dbh-E<gt>errstr> & C<$dbh-E<gt>err>

=back

=head1 Table Info Method

    my $tables = $dbh->crate_tables_list();
    my $table = $dbh->crate_table_info("tablename");
    my $columns = $dbh->crate_table_columns("tablename");

=over

=item B<crate_tables_list>

    $dbh->crate_tables_list(schema);

Accepts table schema argument [optional] if not provided will fetch tables from default
C<doc> schema, to get blobs tables list, use C<blob> schema

    $dbh->crate_tables_list("blob");

return a list of tables under schema with information

    [
        {
            'number_of_replicas' => '0-all',
            'partitioned_by' => undef,
            'blobs_path' => undef,
            'schema_name' => 'doc',
            'table_name' => 'articles',
            'number_of_shards' => 5,
            'clustered_by' => 'id'
        },
        {
            ...
        }
    ];

=item B<crate_table_info>

    $dbh->crate_table_info("tablename");

Same as C<crate_tables_list> but returns single hash ref reult for the C<tablename>

=item B<crate_table_columns>

    my $columns = $dbh->crate_table_columns("tablename");

returns list of table columns

    [
      {
        'data_type' => 'string',
        'column_name' => 'content',
        'ordinal_position' => 1
      },
      {
        'data_type' => 'string',
        'column_name' => 'description',
        'ordinal_position' => 2
      },
      {
        'data_type' => 'integer',
        'column_name' => 'id',
        'ordinal_position' => 3
      }
    ]

=back

=head1 IMPORTANT NOTES

=head2 Pimary keys

Crate doesn't auto generate primary keys, you need to provide a unique key by your
self and it must be **ehem** UNIQUE :)

=head2 last inserted id

Well, there is also no way to get the last inserted id, for the obvious
reason mentioned above I guess, but you can query that if you want in a
new statement

=head1 INSTALLATION & TEST

To install this module, run the following commands:

    perl Makefile.PL
    make
    make test
    make install

OR

    cpan install DBD-Crate

If you want to run the complete test suite, you need to have
Crate DB installed and running, then set environment variable
CRATE_HOST to crate "ip:port"

on windows

    $ set CRATE_HOST=127.0.0.1:4200

on linux

    $ export CRATE_HOST=127.0.0.1:4200

=head1 See Also

=over 8

=item L<Crate.io|https://crate.io>

=item L<Crate Docs|https://crate.io/docs/stable/>

=back

=head1 AUTHOR

Mamod A. Mehyar, E<lt>mamod.mehyar@gmail.comE<gt>

=head1 LICENSE

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself