The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
#----------------------------------------------------------------------
package DBIx::DataModel::Statement;
#----------------------------------------------------------------------
# see POD doc at end of file

use warnings;
use strict;
use Carp;
use List::Util       qw/min first/;
use List::MoreUtils  qw/firstval any/;
use Scalar::Util     qw/weaken refaddr reftype dualvar/;
use Params::Validate qw/validate ARRAYREF HASHREF/;
use POSIX            qw/LONG_MAX/;
use Acme::Damn       qw/damn/;
use Clone            qw/clone/;
use Try::Tiny;

use DBIx::DataModel;
use DBIx::DataModel::Meta::Utils;
use namespace::clean;

{no strict 'refs'; *CARP_NOT = \@DBIx::DataModel::CARP_NOT;}

use overload

  # overload the stringification operator so that Devel::StackTrace is happy;
  # also useful to show the SQL (if in sqlized state)
  '""' => sub {
    my $self = shift;
    my $string = try {my ($sql, @bind) = $self->sql;
                       __PACKAGE__ . "($sql // " . join(", ", @bind) . ")"; }
              || overload::StrVal($self);
  }
;


# sequence of states. Stored as dualvars for both ordering and printing
use constant {
  NEW      => dualvar(1, "new"     ),
  REFINED  => dualvar(2, "refined" ),
  SQLIZED  => dualvar(3, "sqlized" ),
  PREPARED => dualvar(4, "prepared"),
  EXECUTED => dualvar(5, "executed"),
};



#----------------------------------------------------------------------
# PUBLIC METHODS
#----------------------------------------------------------------------

sub new {
  my ($class, $connected_source, %other_args) = @_;

  # check $connected_source
  $connected_source 
    && $connected_source->isa('DBIx::DataModel::ConnectedSource')
    or croak "invalid connected_source for DBIx::DataModel::Statement->new()";

  # build the object
  my $self = bless {status           => NEW,
                    args             => {},
                    pre_bound_params => {},
                    bound_params     => [],
                    connected_source => $connected_source}, $class;

  # add placeholder_regex
  my $prefix = $connected_source->schema->{placeholder_prefix};
  $self->{placeholder_regex} = qr/^\Q$prefix\E(.+)/;

  # parse remaining args, if any
  $self->refine(%other_args) if %other_args;

  return $self;
}


# accessors
DBIx::DataModel::Meta::Utils->define_readonly_accessors(
  __PACKAGE__, qw/connected_source status/,
);
sub meta_source {shift->{connected_source}->meta_source}
sub schema      {shift->{connected_source}->schema}



# THINK : not documented yet, is this method useful ?
sub reset {
  my ($self, %other_args) = @_;

  my $new = (ref $self)->new($self->{connected_source}, %other_args);
  %$self = (%$new);

  return $self;
}




#----------------------------------------------------------------------
# PUBLIC METHODS IN RELATION WITH SELECT()
#----------------------------------------------------------------------


sub sql {
  my ($self) = @_;

  $self->{status} >= SQLIZED
    or croak "can't call sql() when in status $self->{status}";

  return wantarray ? ($self->{sql}, @{$self->{bound_params}})
                   : $self->{sql};
}


sub bind {
  my ($self, @args) = @_;

  # arguments can be a list, a hashref or an arrayref
  if (@args == 1) {
    for (reftype($args[0]) || "") {
      /^HASH$/  and do {@args = %{$args[0]}; last;};
      /^ARRAY$/ and do {my $i = 0; @args = map {($i++, $_)} @{$args[0]}; last};
      #otherwise
      croak "unexpected arg type to bind()";
    }
  }
  elsif (@args == 3) { # name => value, \%datatype (see L<DBI/bind_param>)
    # transform into ->bind($name => [$value, \%datatype])
    @args = ($args[0], [$args[1], $args[2]]);
  }
  elsif (@args % 2 == 1) {
    croak "odd number of args to bind()";
  }

  # do bind (different behaviour according to status)
  my %args = @args;
  if ($self->{status} < SQLIZED) {
    while (my ($k, $v) = each %args) {
      $self->{pre_bound_params}{$k} = $v;
    }
  }
  else {
    while (my ($k, $v) = each %args) {
      my $indices = $self->{param_indices}{$k} 
        or next; # silently ignore that binding (named placeholder unused)
      $self->{bound_params}[$_] = $v foreach @$indices;
    }
  }

  return $self;
}


sub refine {
  my ($self, %more_args) = @_;

  $self->{status} <= REFINED
    or croak "can't refine() when in status $self->{status}";
  $self->{status} = REFINED;

  my $args = $self->{args};

  while (my ($k, $v) = each %more_args) {

  SWITCH:
    for ($k) {

      # -where : combine with previous 'where' clauses in same statement
      /^-where$/ and do {
        my $sqla = $self->schema->sql_abstract;
        $args->{-where} = $sqla->merge_conditions($args->{-where}, $v);
        last SWITCH;
      };

      # -fetch : special select() on primary key
      /^-fetch$/ and do {
        # build a -where clause on primary key
        my $primary_key = ref($v) ? $v : [$v];
        my @pk_columns  = $self->meta_source->primary_key;
        @pk_columns
          or croak "fetch: no primary key in source " . $self->meta_source;
        @pk_columns == @$primary_key
          or croak sprintf "fetch from %s: primary key should have %d values",
                           $self->meta_source, scalar(@pk_columns);
        List::MoreUtils::all {defined $_} @$primary_key
          or croak "fetch from " . $self->meta_source . ": "
                 . "undefined val in primary key";

        my %where = ();
        @where{@pk_columns} = @$primary_key;
        my $sqla = $self->schema->sql_abstract;
        $args->{-where} = $sqla->merge_conditions($args->{-where}, \%where);

        # want a single record as result
        $args->{-result_as} = "firstrow";

        last SWITCH;
      };

      # -columns : store in $self->{args}{-columns}; can restrict previous list
      /^-columns$/ and do {
        my @cols = ref $v ? @$v : ($v);
        if (my $old_cols = $args->{-columns}) {
          unless (@$old_cols == 1 && $old_cols->[0] eq '*' ) {
            foreach my $col (@cols) {
              any {$_ eq $col} @$old_cols
                or croak "can't restrict -columns on '$col' (was not in the) "
                       . "previous -columns list";
            }
          }
        }
        $args->{-columns} = \@cols;
        last SWITCH;
      };


      # other args are just stored, will be used later
      /^-( order_by       | group_by  | having    | for
         | union(?:_all)? | intersect | except    | minus
         | result_as      | post_SQL  | pre_exec  | post_exec  | post_bless
         | limit          | offset    | page_size | page_index
         | column_types   | prepare_attrs         | dbi_prepare_method
         | _left_cols     | where_on
         )$/x
         and do {$args->{$k} = $v; last SWITCH};

      # otherwise
      croak "invalid arg : $k";

    } # end SWITCH
  } # end while

  return $self;
}




sub sqlize {
  my ($self, @args) = @_;

  $self->{status} < SQLIZED
    or croak "can't sqlize() when in status $self->{status}";

  # merge new args into $self->{args}
  $self->refine(@args) if @args;

  # shortcuts
  my $args         = $self->{args};
  my $meta_source  = $self->meta_source;
  my $source_where = $meta_source->{where};
  my $sql_abstract = $self->schema->sql_abstract;
  my $result_as    = $args->{-result_as} || "";


  # build arguments for SQL::Abstract::More
  $self->refine(-where => $source_where) if $source_where;
  my @args_to_copy = qw/-columns -where
                        -union -union_all -intersect -except -minus
                        -order_by -group_by -having
                        -limit -offset -page_size -page_index/;
  my %sqla_args = (-from         => clone($meta_source->db_from),
                   -want_details => 1);
  $args->{$_} and $sqla_args{$_} = $args->{$_} for @args_to_copy;
  $sqla_args{-columns} ||= $meta_source->default_columns;
  $sqla_args{-limit}   ||= 1
    if $result_as eq 'firstrow' && $self->schema->autolimit_firstrow;

  # "-for" (e.g. "update", "read only")
  if ($result_as ne 'subquery') {
    if ($args->{-for}) {
      $sqla_args{-for} = $args->{-for};
    }
    elsif (!exists $args->{-for}) {
      $sqla_args{-for} = $self->schema->select_implicitly_for;
    }
  }

  # EXPERIMENTAL: "where_on"
  if (my $where_on = $args->{-where_on}) {
    # retrieve components of the join
    my ($join_op, $first_table, @other_join_args) = @{$sqla_args{-from}};
    $join_op eq '-join'
      or croak "datasource for '-where_on' was not a join";
    my %by_dest_table = reverse @other_join_args;

    # insert additional conditions into appropriate places
    while (my ($table, $additional_cond) = each %$where_on) {
      my $join_cond = $by_dest_table{$table}
        or croak "-where_on => {'$table' => ..}: this table is not in the join";
      $join_cond->{condition}
        = $sql_abstract->merge_conditions($join_cond->{condition},
                                          $additional_cond);
    }

    # TODO: should be able to use paths and aliases as keys, instead of 
    # database table names.
    # TOCHECK: is this stuff still compatible with the bind() method ?
  }

  # generate SQL
  my $sqla_result = $sql_abstract->select(%sqla_args);

  # maybe post-process the SQL
  if ($args->{-post_SQL}) {
    ($sqla_result->{sql}, @{$sqla_result->{bind}})
      = $args->{-post_SQL}->($sqla_result->{sql}, @{$sqla_result->{bind}});
  }

  # keep $sql / @bind / aliases in $self, and set new status
  $self->{bound_params} = $sqla_result->{bind};
  $self->{$_} = $sqla_result->{$_} for qw/sql aliased_tables aliased_columns/;
  $self->{status}       = SQLIZED;

  # analyze placeholders, and replace by pre_bound params if applicable
  if (my $regex = $self->{placeholder_regex}) {
    for (my $i = 0; $i < @{$self->{bound_params}}; $i++) {
      $self->{bound_params}[$i] =~ $regex 
        and push @{$self->{param_indices}{$1}}, $i;
    }
  }
  $self->bind($self->{pre_bound_params}) if $self->{pre_bound_params};

  # compute callback to apply to data rows
  my $callback = $self->{args}{-post_bless};
  weaken(my $weak_self = $self);   # weaken to avoid a circular ref in closure
  $self->{row_callback} = sub {
    my $row = shift;
    $weak_self->bless_from_DB($row);
    $callback->($row) if $callback;
  };

  return $self;
}



sub prepare {
  my ($self, @args) = @_;

  my $meta_source = $self->meta_source;

  $self->sqlize(@args) if @args or $self->{status} < SQLIZED;

  $self->{status} == SQLIZED
    or croak "can't prepare() when in status $self->{status}";

  # log the statement and bind values
  $self->schema->_debug("PREPARE $self->{sql} / @{$self->{bound_params}}");

  # call the database
  my $dbh          = $self->schema->dbh or croak "Schema has no dbh";
  my $method       = $self->{args}{-dbi_prepare_method}
                  || $self->schema->dbi_prepare_method;
  my @prepare_args = ($self->{sql});
  if (my $prepare_attrs = $self->{args}{-prepare_attrs}) {
    push @prepare_args, $prepare_attrs;
  }
  $self->{sth}  = $dbh->$method(@prepare_args);

  # new status and return
  $self->{status} = PREPARED;
  return $self;
}



sub execute {
  my ($self, @bind_args) = @_;

  # if not prepared yet, prepare it
  $self->prepare              if $self->{status} < PREPARED;

  # TODO: DON'T REMEMBER why the line below was here. Keep it around for a while ...
  push @bind_args, offset => $self->{offset}  if $self->{offset};

  $self->bind(@bind_args)      if @bind_args;

  # shortcuts
  my $args = $self->{args};
  my $sth  = $self->{sth};

  # previous row_count, row_num and reuse_row are no longer valid
  delete $self->{reuse_row};
  delete $self->{row_count};
  $self->{row_num} = $self->offset;

  # pre_exec callback
  $args->{-pre_exec}->($sth)   if $args->{-pre_exec};

  # check that all placeholders were properly bound to values
  my @unbound;
  while (my ($k, $indices) = each %{$self->{param_indices} || {}}) {
    exists $self->{bound_params}[$indices->[0]] or push @unbound, $k;
  }
  not @unbound 
    or croak "unbound placeholders (probably a missing foreign key) : "
            . CORE::join(", ", @unbound);

  # bind parameters and execute
  my $sqla = $self->schema->sql_abstract;
  $sqla->bind_params($sth, @{$self->{bound_params}});
  $sth->execute;

  # post_exec callback
  $args->{-post_exec}->($sth)  if $args->{-post_exec};

  $self->{status} = EXECUTED;
  return $self;
}


sub select {
  my $self = shift;

  $self->refine(@_) if @_;

  my $args = $self->{args}; # all combined args

  my $callbacks = CORE::join ", ", grep {exists $args->{$_}} 
                                        qw/-pre_exec -post_exec -post_bless/;

 SWITCH:
  my ($result_as, @key_cols) 
    = ref $args->{-result_as} ? @{$args->{-result_as}}
                              : ($args->{-result_as} || "rows");
  for ($result_as) {

    # CASE statement : the DBIx::DataModel::Statement object 
    /^statement$/i and do {
        delete $self->{args}{-result_as};
        return $self;
      };

    # for all other cases, must first sqlize the statement
    $self->sqlize if $self->{status} < SQLIZED;

    # CASE sql : just return the SQL and bind values
    /^sql$/i        and do {
      not $callbacks 
        or croak "$callbacks incompatible with -result_as=>'sql'";
      return $self->sql;
    };

    # CASE subquery : return a ref to an arrayref with SQL and bind values
    /^subquery$/i        and do {
      not $callbacks 
        or croak "$callbacks incompatible with -result_as=>'subquery'";
      my ($sql, @bind) = $self->sql;
      return \ ["($sql)", @bind];
    };

    # for all other cases, must first execute the statement
    $self->execute;

    # CASE sth : return the DBI statement handle
    /^sth$/i        and do {
        not $args->{-post_bless}
          or croak "-post_bless incompatible with -result_as=>'sth'";
        return $self->{sth};
      };

    # CASE rows : all data rows (this is the default)
    /^(rows|arrayref)$/i  and return $self->all;

    # CASE firstrow : just the first row
    /^firstrow$/i   and do {
      my $row = $self->next;
      $self->{sth}->finish;
      return $row;
    };

    # CASE hashref : all data rows, put into a hashref
    /^hashref$/i   and do {
      @key_cols or @key_cols = $self->meta_source->primary_key
        or croak "-result_as=>'hashref' impossible: no primary key";
      my %hash;
      while (my $row = $self->next) {
        my @key;
        foreach my $col (@key_cols) {
          my $val = $row->{$col};
          $val = '' if not defined $val; # $val might be 0, so no '||'
          push @key, $val;
        }
        my $last_key_item = pop @key;
        my $node          = \%hash;
        $node = $node->{$_} ||= {} foreach @key;
        $node->{$last_key_item} = $row;
      }
      return \%hash;
    };

    # CASE fast_statement : creates a reusable row
    /^fast[-_]statement$/i and do {
        $self->_build_reuse_row;
        return $self;
      };

    # CASE flat_arrayref : flattened columns from each row
    /^flat(?:_array(?:ref)?)?$/ and do {
      $self->_build_reuse_row;
      my @vals;
      my $hash_key_name = $self->{sth}{FetchHashKeyName} || 'NAME';
      my $cols = $self->{sth}{$hash_key_name};
      while (my $row = $self->next) {
        push @vals, @{$row}{@$cols};
      }
      return \@vals;
    };


    # OTHERWISE
    croak "unknown -result_as value: $_"; 
  }
}


sub row_count {
  my ($self) = @_;

  if (! exists $self->{row_count}) {
    $self->sqlize if $self->{status} < SQLIZED;
    my ($sql, @bind) = $self->sql;

    # get syntax used for LIMIT clauses ...
    my $sqla = $self->schema->sql_abstract;
    my ($limit_sql, undef, undef) = $sqla->limit_offset(0, 0);
    $limit_sql =~ s/([()?*])/\\$1/g;

    # ...and use it to remove the LIMIT clause and associated bind vals, if any
    if ($limit_sql =~ /ROWNUM/) { # special case for Oracle syntax, complex ...
                                  # see source code of SQL::Abstract::More
      $limit_sql =~ s/%s/(.*)/;
      if ($sql =~ s/^$limit_sql/$1/) {
        splice @bind, -2;
      }
    }
    elsif ($sql =~ s[\b$limit_sql][]i) { # regular LIMIT/OFFSET syntaxes
      splice @bind, -2;
    }

    # select COUNT(*) instead of initial columns
    if ($sql =~ /\b(UNION|INTERSECT|MINUS|EXCEPT)\b/) {
      $sql = "SELECT COUNT(*) FROM ( $sql )";
    }
    else {
      $sql =~ s[^SELECT\b.*?\bFROM\b][SELECT COUNT(*) FROM]i
        or croak "can't count rows from sql: $sql";
    }

    # log the statement and bind values
    $self->schema->_debug("PREPARE $sql / @bind");

    # call the database
    my $dbh    = $self->schema->dbh or croak "Schema has no dbh";
    my $method = $self->schema->dbi_prepare_method;
    my $sth    = $dbh->$method($sql);
    $sth->execute(@bind);
    ($self->{row_count}) = $sth->fetchrow_array;
  }

  return $self->{row_count};
}


sub row_num {
  my ($self) = @_;
  return $self->{row_num};
}

sub next {
  my ($self, $n_rows) = @_;

  $self->execute if $self->{status} < EXECUTED;

  my $sth      = $self->{sth}          or croak "absent sth in statement";
  my $callback = $self->{row_callback} or croak "absent callback in statement";

  if (not defined $n_rows) {  # if user wants a single row
    # fetch a single record, either into the reusable row, or into a fresh hash
    my $row = $self->{reuse_row} ? ($sth->fetch ? $self->{reuse_row} : undef)
                                 : $sth->fetchrow_hashref;
    if ($row) {
      $callback->($row);
      $self->{row_num} +=1;
    }
    return $row;
  }
  else {                      # if user wants an arrayref of size $n_rows
    $n_rows > 0            or croak "->next() : invalid argument, $n_rows";
    not $self->{reuse_row} or croak "reusable row, cannot retrieve several";
    my @rows;
    while ($n_rows--) {
      my $row = $sth->fetchrow_hashref or last;
      push @rows, $row;
    }
    $callback->($_) foreach @rows;
    $self->{row_num} += @rows;
    return \@rows;
  }
}


sub all {
  my ($self) = @_;

  # just call next() with a huge number
  return $self->next(POSIX::LONG_MAX);
}


sub page_size   { shift->{args}{-page_size}  || POSIX::LONG_MAX  }
sub page_index  { shift->{args}{-page_index} || 1                }
sub offset      { shift->{offset}            || 0                }


sub page_count {
  my ($self) = @_;

  my $row_count = $self->row_count or return 0;
  my $page_size = $self->page_size || 1;

  return int(($row_count - 1) / $page_size) + 1;
}

sub goto_page {
  my ($self, $page_index) = @_;

  # if negative index, count down from last page
  $page_index += $self->page_count + 1    if $page_index < 0;

  $page_index >= 1 or croak "illegal page_index: $page_index";

  $self->{page_index} = $page_index;
  $self->{offset}     = ($page_index - 1) * $self->page_size;
  $self->execute     unless $self->{row_num} == $self->{offset};

  return $self;
}


sub shift_pages {
  my ($self, $delta) = @_;

  my $page_index = $self->page_index + $delta;
  $page_index >= 1 or croak "illegal page index: $page_index";

  $self->goto_page($page_index);
}

sub next_page {
  my ($self) = @_;

  $self->shift_pages(1);
}


sub page_boundaries {
  my ($self) = @_;

  my $first = $self->offset + 1;
  my $last  = min($self->row_count, $first + $self->page_size - 1);
  return ($first, $last);
}


sub page_rows {
  my ($self) = @_;
  return $self->next($self->page_size);
}


sub bless_from_DB {
  my ($self, $row) = @_;

  # inject ref to $schema if in multi-schema mode
  $row->{__schema} = $self->schema unless $self->schema->{is_singleton};

  # bless into appropriate class
  bless $row, $self->meta_source->class;
  # apply handlers
  $self->{from_DB_handlers} or $self->_compute_from_DB_handlers;
  while (my ($column_name, $handler) 
           = each %{$self->{from_DB_handlers}}) {
    exists $row->{$column_name}
      and $handler->($row->{$column_name}, $row, $column_name, 'from_DB');
  }

  return $row;
}


#----------------------------------------------------------------------
# PRIVATE METHODS IN RELATION WITH SELECT()
#----------------------------------------------------------------------

sub _build_reuse_row {
  my ($self) = @_;

  $self->{status} == EXECUTED
    or croak "cannot _build_reuse_row() when in state $self->{status}";

  # create a reusable hash and bind_columns to it (see L<DBI/bind_columns>)
  my %row;
  my $hash_key_name = $self->{sth}{FetchHashKeyName} || 'NAME';
  $self->{sth}->bind_columns(\(@row{@{$self->{sth}{$hash_key_name}}}));
  $self->{reuse_row} = \%row; 
}



sub _compute_from_DB_handlers {
  my ($self) = @_;
  my $meta_source    = $self->meta_source;
  my $meta_schema    = $self->schema->metadm;
  my %handlers       = $meta_source->_consolidate_hash('column_handlers');
  my %aliased_tables = $meta_source->aliased_tables;

  # iterate over aliased_columns
  while (my ($alias, $column) = each %{$self->{aliased_columns} || {}}) {
    my $table_name;
    $column =~ s/^(.+)\.// and $table_name = $1;
    if (!$table_name) {
      $handlers{$alias} = $handlers{$column};
    }
    else {
      $table_name = $aliased_tables{$table_name} || $table_name;

      my $table   = $meta_schema->table($table_name)
                 || firstval {($_->{db_name} || '') eq $table_name}
                             ($meta_source, $meta_source->ancestors)
                 # THINK: might perform a case-insensitive search 
                 # (as second pass)
        or croak "unknown table name: $table_name";

      $handlers{$alias} = $table->{column_handlers}->{$column};
    }
  }

  # handlers may be overridden from args{-column_types}
  if (my $col_types = $self->{args}{-column_types}) {
    while (my ($type_name, $columns) = each %$col_types) {
      ref $columns or $columns = [$columns];
      my $type = $self->schema->metadm->type($type_name)
        or croak "no such column type: $type_name";
      $handlers{$_} = $type->{handlers} foreach @$columns;
    }
  }

  # just keep the "from_DB" handlers
  my $from_DB_handlers = {};
  while (my ($column, $col_handlers) = each %handlers) {
    my $from_DB_handler = $col_handlers->{from_DB} or next;
    $from_DB_handlers->{$column} = $from_DB_handler;
  }
  $self->{from_DB_handlers} = $from_DB_handlers;

  return $self;
}



1; # End of DBIx::DataModel::Statement

__END__

=head1 NAME

DBIx::DataModel::Statement - DBIx::DataModel statement objects

=head1 DESCRIPTION

The purpose of a I<statement> object is to retrieve rows from the
database and bless them as objects of appropriate classes.

Internally the statement builds and then encapsulates a C<DBI>
statement handle (sth).

The design principles for statements are described in the
L<DESIGN|DBIx::DataModel::Doc::Design/"STATEMENT OBJECTS"> section of
the manual (purpose, lifecycle, etc.).

=head1 METHODS

=head2 new

  my $statement 
    = DBIx::DataModel::Statement->new($connected_source, %options);

This is the statement constructor; C<$connected_source> is an
instance of L<DBIx::DataModel::ConnectedSource>. 
If present, C<%options> are delegated
to the L<refine()|DBIx::DataModel::Doc::Reference/refine()> method.

Explicit calls to the statement constructor are exceptional;
the usual way to create a statement is through 
L<ConnectedSource::select()|DBIx::DataModel::Doc::Reference/ConnectedSource::select()>.


=head1 PRIVATE METHOD NAMES

The following methods or functions are used
internally by this module and 
should be considered as reserved names, not to be
redefined in subclasses :

=over

=item _bless_from_DB

=item _compute_from_DB_handlers

=back