package Mango::Collection;
use Mojo::Base -base;

use Carp 'croak';
use Mango::BSON qw(bson_code bson_doc bson_oid);
use Mango::Bulk;
use Mango::Cursor;
use Mango::Cursor::Query;

has [qw(db name)];

sub aggregate {
  my ($self, $pipeline) = (shift, shift);
  my $cb = ref $_[-1] eq 'CODE' ? pop : undef;

  my $command = bson_doc(aggregate => $self->name, pipeline => $pipeline,
    %{shift // {}});
  $command->{cursor} //= {} unless $command->{explain};

  # Blocking
  return $self->_aggregate($command, $self->db->command($command)) unless $cb;

  # Non-blocking
  return $self->db->command($command,
    sub { shift; $self->$cb(shift, $self->_aggregate($command, shift)) });
}

sub build_index_name { join '_', keys %{$_[1]} }

sub bulk { Mango::Bulk->new(collection => shift) }

sub create {
  my $self = shift;
  my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
  return $self->_command(bson_doc(create => $self->name, %{shift // {}}), $cb);
}

sub drop { $_[0]->_command(bson_doc(drop => $_[0]->name), $_[1]) }

sub drop_index {
  my ($self, $name) = (shift, shift);
  return $self->_command(bson_doc(dropIndexes => $self->name, index => $name),
    shift);
}

sub ensure_index {
  my ($self, $spec) = (shift, shift);
  my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
  my $doc = shift // {};

  $doc->{name} //= $self->build_index_name($spec);
  $doc->{key} = $spec;

  # Non-blocking
  my $command = bson_doc createIndexes => $self->name, indexes => [$doc];
  return $self->db->command($command => sub { shift; $self->$cb(shift) })
    if $cb;

  # Blocking
  $self->db->command($command);
}

sub find {
  Mango::Cursor::Query->new(
    collection => shift,
    query      => shift // {},
    fields     => shift // {}
  );
}

sub find_and_modify {
  my ($self, $opts, $cb) = @_;
  return $self->_command(bson_doc(findAndModify => $self->name, %$opts),
    $cb, sub { my $doc = shift; $doc ? $doc->{value} : undef });
}

sub find_one {
  my ($self, $query) = (shift, shift);
  $query = {_id => $query} if ref $query eq 'Mango::BSON::ObjectID';
  my $cb = ref $_[-1] eq 'CODE' ? pop : undef;

  # Non-blocking
  my $cursor = $self->find($query, @_)->limit(-1);
  return $cursor->next(sub { shift; $self->$cb(@_) }) if $cb;

  # Blocking
  return $cursor->next;
}

sub full_name { join '.', $_[0]->db->name, $_[0]->name }

sub index_information {
  my $self = shift;
  my $cb = ref $_[-1] eq 'CODE' ? pop : undef;

  my $cmd = bson_doc(listIndexes => $self->name, @_);

  $self->_command($cmd, $cb,
    sub {
      my $doc = shift or return bson_doc;
      bson_doc map { delete $_->{ns}; (delete $_->{name}, $_) }
        @{$doc->{cursor}->{firstBatch}};
    }
  );
}

sub insert {
  my ($self, $orig_docs, $cb) = @_;
  $orig_docs = [$orig_docs] unless ref $orig_docs eq 'ARRAY';

  # Make a shallow copy of the documents and add an id if needed
  my @docs = map { bson_doc %$_ } @$orig_docs;
  my @ids = map { $_->{_id} //= bson_oid } @docs;

  my $command = bson_doc
    insert       => $self->name,
    documents    => \@docs,
    ordered      => \1,
    writeConcern => $self->db->build_write_concern;

  return $self->_command($command, $cb, sub { @ids > 1 ? \@ids : $ids[0] });
}

sub map_reduce {
  my ($self, $map, $reduce) = (shift, shift, shift);
  my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
  my $command = bson_doc
    mapreduce => $self->name,
    map       => ref $map ? $map : bson_code($map),
    reduce    => ref $reduce ? $reduce : bson_code($reduce),
    %{shift // {}};

  # Blocking
  return $self->_map_reduce($self->db->command($command)) unless $cb;

  # Non-blocking
  return $self->db->command(
    $command => sub { shift; $self->$cb(shift, $self->_map_reduce(shift)) });
}

sub options {
  my ($self, $cb) = @_;

  my $cmd = bson_doc(listCollections => 1, filter => { name => $self->name });
  $self->_command($cmd, $cb, sub { shift->{cursor}->{firstBatch}->[0] });
}

sub remove {
  my $self  = shift;
  my $cb    = ref $_[-1] eq 'CODE' ? pop : undef;
  my $query = shift // {};
  my $flags = shift // {};

  ($query, $flags) = ({_id => $query}, {single => 1})
    if ref $query eq 'Mango::BSON::ObjectID';
  my $command = bson_doc
    delete       => $self->name,
    deletes      => [{q => $query, limit => $flags->{single} ? 1 : 0}],
    ordered      => \1,
    writeConcern => $self->db->build_write_concern;

  return $self->_command($command, $cb);
}

sub rename {
  my ($self, $name, $cb) = @_;

  my $admin = $self->db->mango->db('admin');
  my $dbname = $self->db->name;
  my $oldname = join '.', $dbname, $self->name;
  my $newname = join '.', $dbname, $name;

  my $cmd = bson_doc renameCollection => $oldname, to => $newname;

  # Non-blocking
  return $admin->command($cmd, sub {
    my ($admin_db, $err, $doc) = @_;
    my $newcol = $doc->{ok} ? $self->db->collection($name) : undef;
    return $cb->($self, $err, $newcol);
  }) if $cb;

  # Blocking
  my $doc = $admin->command($cmd);
  return $doc->{ok} ? $self->db->collection($name) : undef;
}

sub save {
  my ($self, $doc, $cb) = @_;

  # New document
  return $self->insert($doc, $cb) unless $doc->{_id};

  # Update non-blocking
  my @update = ({_id => $doc->{_id}}, $doc, {upsert => 1});
  return $self->update(@update => sub { shift->$cb(shift, $doc->{_id}) })
    if $cb;

  # Update blocking
  $self->update(@update);
  return $doc->{_id};
}

sub stats { $_[0]->_command(bson_doc(collstats => $_[0]->name), $_[1]) }

sub update {
  my ($self, $query, $update) = (shift, shift, shift);
  my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
  my $flags = shift // {};

  $update = {
    q => ref $query eq 'Mango::BSON::ObjectID' ? {_id => $query} : $query,
    u => $update,
    upsert => $flags->{upsert} ? \1 : \0,
    multi  => $flags->{multi}  ? \1 : \0
  };
  my $command = bson_doc
    update       => $self->name,
    updates      => [$update],
    ordered      => \1,
    writeConcern => $self->db->build_write_concern;

  return $self->_command($command, $cb);
}

sub _aggregate {
  my ($self, $command, $doc) = @_;

  # Document (explain)
  return $doc if $command->{explain};

  # Collection
  my $out = $command->{pipeline}[-1]{'$out'};
  return $self->db->collection($out) if defined $out;

  # Cursor
  my $cursor = $doc->{cursor};
  return Mango::Cursor->new(collection => $self, id => $cursor->{id})
    ->add_batch($cursor->{firstBatch});
}

sub _command {
  my ($self, $command, $cb, $return) = @_;
  $return ||= sub {shift};

  # Non-blocking
  my $db       = $self->db;
  my $protocol = $db->mango->protocol;
  return $db->command(
    $command => sub {
      my ($db, $err, $doc) = @_;
      $err ||= $protocol->write_error($doc);
      $self->$cb($err, $return->($doc));
    }
  ) if $cb;

  # Blocking
  my $doc = $db->command($command);
  if (my $err = $protocol->write_error($doc)) { croak $err }
  return $return->($doc);
}

sub _map_reduce {
  my ($self, $doc) = @_;
  return $doc->{results} unless $doc->{result};
  return $self->db->collection($doc->{result});
}

1;

=encoding utf8

=head1 NAME

Mango::Collection - MongoDB collection

=head1 SYNOPSIS

  use Mango::Collection;

  my $collection = Mango::Collection->new(db => $db);
  my $cursor     = $collection->find({foo => 'bar'});

=head1 DESCRIPTION

L<Mango::Collection> is a container for MongoDB collections used by
L<Mango::Database>.

=head1 ATTRIBUTES

L<Mango::Collection> implements the following attributes.

=head2 db

  my $db      = $collection->db;
  $collection = $collection->db(Mango::Database->new);

L<Mango::Database> object this collection belongs to.

=head2 name

  my $name    = $collection->name;
  $collection = $collection->name('bar');

Name of this collection.

=head1 METHODS

L<Mango::Collection> inherits all methods from L<Mojo::Base> and implements
the following new ones.

=head2 aggregate

  my $cursor = $collection->aggregate(
    [{'$group' => {_id => undef, total => {'$sum' => '$foo'}}}]);
  my $collection = $collection->aggregate(
    [{'$match' => {'$gt' => 23}}, {'$out' => 'some_collection'}]);
  my $doc = $collection->aggregate(
    [{'$match' => {'$gt' => 23}}], {explain => bson_true});

Aggregate collection with aggregation framework, additional options will be
passed along to the server verbatim. You can also append a callback to perform
operation non-blocking.

  my $pipeline = [{'$group' => {_id => undef, total => {'$sum' => '$foo'}}}];
  $collection->aggregate($pipeline => sub {
    my ($collection, $err, $cursor) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 build_index_name

  my $name = $collection->build_index_name(bson_doc(foo => 1, bar => -1));
  my $name = $collection->build_index_name({foo => 1});

Build name for index specification, the order of keys matters for compound
indexes.

=head2 bulk

  my $bulk = $collection->bulk;

Build L<Mango::Bulk> object.

  my $bulk = $collection->bulk;
  $bulk->insert({foo => $_}) for 1 .. 10;
  $bulk->find({foo => 4})->update_one({'$set' => {bar => 'baz'}});
  $bulk->find({foo => 7})->remove_one;
  my $results = $bulk->execute;

=head2 create

  $collection->create;
  $collection->create({capped => bson_true, max => 5, size => 10000});

Create collection. You can also append a callback to perform operation
non-blocking.

  $collection->create({capped => bson_true, max => 5, size => 10000} => sub {
    my ($collection, $err) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 drop

  $collection->drop;

Drop collection. You can also append a callback to perform operation
non-blocking.

  $collection->drop(sub {
    my ($collection, $err) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 drop_index

  $collection->drop_index('foo');

Drop index. You can also append a callback to perform operation non-blocking.

  $collection->drop_index(foo => sub {
    my ($collection, $err) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 ensure_index

  $collection->ensure_index(bson_doc(foo => 1, bar => -1));
  $collection->ensure_index({foo => 1});
  $collection->ensure_index({foo => 1}, {unique => bson_true});

Make sure an index exists, the order of keys matters for compound indexes,
additional options will be passed along to the server verbatim. You can also
append a callback to perform operation non-blocking.

  $collection->ensure_index(({foo => 1}, {unique => bson_true}) => sub {
    my ($collection, $err) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 find

  my $cursor = $collection->find;
  my $cursor = $collection->find({foo => 'bar'});
  my $cursor = $collection->find({foo => 'bar'}, {foo => 1});

Build L<Mango::Cursor::Query> object for query.

  # Exclude "_id" field from results
  my $docs = $collection->find({foo => 'bar'}, {_id => 0})->all;

=head2 find_and_modify

  my $doc = $collection->find_and_modify(
    {query => {foo => 'bar'}, update => {'$set' => {foo => 'baz'}}});

Fetch and update or remove a document atomically. You can also append a callback
to perform operation non-blocking.

  my $opts = {query => {foo => 'bar'}, update => {'$set' => {foo => 'baz'}}};
  $collection->find_and_modify($opts => sub {
    my ($collection, $err, $doc) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

By default this method returns the unmodified version of the document. To
change this behaviour, add the option C<new =E<gt> 1>.

=head2 find_one

  my $doc = $collection->find_one({foo => 'bar'});
  my $doc = $collection->find_one({foo => 'bar'}, {foo => 1});
  my $doc = $collection->find_one($oid, {foo => 1});

Find one document. You can also append a callback to perform operation
non-blocking.

  $collection->find_one({foo => 'bar'} => sub {
    my ($collection, $err, $doc) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 full_name

  my $name = $collection->full_name;

Full name of this collection.

=head2 index_information

  my $info = $collection->index_information;
  # return only the 5 first indexes
  my $info = $collection->index_information(cursor => { batchSize => 5 });

Get index information for collection. You can also append a callback to
perform operation non-blocking.

  $collection->index_information(sub {
    my ($collection, $err, $info) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 insert

  my $oid  = $collection->insert({foo => 'bar'});
  my $oids = $collection->insert([{foo => 'bar'}, {baz => 'yada'}]);

Insert one or more documents into collection. You can also append a callback
to perform operation non-blocking.

  $collection->insert({foo => 'bar'} => sub {
    my ($collection, $err, $oid) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

Note that C<insert> has to ensure each document has an C<_id> before sending
them to MongoDB. To avoid modifying your data, it makes a copy of the
documents. This can be a bit slow if you are sending big objects like
pictures. To avoid that, consider using C<save> instead.

=head2 map_reduce

  my $collection = $collection->map_reduce($map, $reduce, {out => 'foo'});
  my $docs = $collection->map_reduce($map, $reduce, {out => {inline => 1}});
  my $docs = $collection->map_reduce(
    bson_code($map), bson_code($reduce), {out => {inline => 1}});

Perform map/reduce operation on collection, additional options will be passed
along to the server verbatim. You can also append a callback to perform
operation non-blocking.

  $collection->map_reduce(($map, $reduce, {out => {inline => 1}}) => sub {
      my ($collection, $err, $docs) = @_;
      ...
    }
  );
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 options

  my $doc = $collection->options;

Get options for collection. You can also append a callback to perform
operation non-blocking.

  $collection->options(sub {
    my ($collection, $err, $doc) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 remove

  my $result = $collection->remove;
  my $result = $collection->remove($oid);
  my $result = $collection->remove({foo => 'bar'});
  my $result = $collection->remove({foo => 'bar'}, {single => 1});

Remove documents from collection. You can also append a callback to perform
operation non-blocking. Returns a WriteResult document.

  $collection->remove(({foo => 'bar'}, {single => 1}) => sub {
    my ($collection, $err, $doc) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

These options are currently available:

=over 2

=item single

  single => 1

Remove only one document.

=back

=head2 rename

  my $new_collection = $collection->rename('NewName');

Rename a collection, keeping all of its original contents and options. Returns
a new Mango::Collection object pointing to the renamed collection. You can
also append a callback to perform operation non-blocking.

  $collection->rename('NewName' => sub {
    my ($collection, $err, $oid) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 save

  my $oid = $collection->save({foo => 'bar'});

Save document to collection. The document MUST have an C<_id>. You can also
append a callback to perform operation non-blocking.

  $collection->save({foo => 'bar'} => sub {
    my ($collection, $err, $oid) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 stats

  my $stats = $collection->stats;

Get collection statistics. You can also append a callback to perform operation
non-blocking.

  $collection->stats(sub {
    my ($collection, $err, $stats) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 update

  my $result = $collection->update($oid, {foo => 'baz'});
  my $result = $collection->update({foo => 'bar'}, {foo => 'baz'});
  my $result = $collection->update({foo => 'bar'}, {foo => 'baz'}, {multi => 1});

Update document in collection. You can also append a callback to perform
operation non-blocking. Returns a WriteResult document.

  $collection->update(({foo => 'bar'}, {foo => 'baz'}, {multi => 1}) => sub {
    my ($collection, $err, $doc) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

These options are currently available:

=over 2

=item multi

  multi => 1

Update more than one document.

=item upsert

  upsert => 1

Insert document if none could be updated.

=back

=head1 SEE ALSO

L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.

=cut