The Perl and Raku Conference 2025: Greenville, South Carolina - June 27-29 Learn more

$Elastic::Model::Role::Store::VERSION = '0.52';
#===================================
has 'es' => (
#===================================
isa => ES,
is => 'ro',
required => 1,
);
my @Top_Level = qw(
index type lenient
preference routing scroll
search_type timeout version
);
#===================================
sub search {
#===================================
my $self = shift;
my $args = _tidy_search(@_);
$self->es->search($args);
}
#===================================
sub scrolled_search {
#===================================
my $self = shift;
my $args = _tidy_search(@_);
$self->es->scroll_helper($args);
}
#===================================
sub _tidy_search {
#===================================
my %body = ref $_[0] eq 'HASH' ? %{ shift() } : @_;
my %args;
for (@Top_Level) {
my $val = delete $body{$_};
if ( defined $val ) {
$args{$_} = $val;
}
}
$args{body} = \%body;
return \%args;
}
#===================================
sub delete_by_query {
#===================================
my $self = shift;
my $args = _tidy_search(@_);
$self->es->delete_by_query($args);
}
#===================================
sub get_doc {
#===================================
my ( $self, $uid, %args ) = @_;
return $self->es->get(
fields => [qw(_routing _parent)],
_source => 1,
%{ $uid->read_params },
%args,
);
}
#===================================
sub doc_exists {
#===================================
my ( $self, $uid, %args ) = @_;
return !!$self->es->exists( %{ $uid->read_params }, %args, );
}
#===================================
sub create_doc { shift->_write_doc( 'create', @_ ) }
sub index_doc { shift->_write_doc( 'index', @_ ) }
#===================================
#===================================
sub _write_doc {
#===================================
my ( $self, $action, $uid, $data, %args ) = @_;
return $self->es->$action(
body => $data,
%{ $uid->write_params },
%args
);
}
#===================================
sub delete_doc {
#===================================
my $self = shift;
my $uid = shift;
my %args = _cleanup(@_);
return $self->es->delete( %{ $uid->write_params }, %args );
}
#===================================
sub bulk {
#===================================
my ( $self, %args ) = @_;
return $self->es->bulk(%args);
}
#===================================
sub index_exists {
#===================================
my ( $self, %args ) = @_;
return $self->es->indices->exists(%args);
}
#===================================
sub create_index {
#===================================
my ( $self, %args ) = @_;
$args{body} = {
settings => ( delete( $args{settings} ) || {} ),
mappings => ( delete( $args{mappings} ) || {} ),
};
return $self->es->indices->create(%args);
}
#===================================
sub delete_index {
#===================================
my $self = shift;
my %args = _cleanup(@_);
return $self->es->indices->delete(%args);
}
#===================================
sub refresh_index {
#===================================
my $self = shift;
my %args = _cleanup(@_);
return $self->es->indices->refresh(%args);
}
#===================================
sub open_index {
#===================================
my $self = shift;
my %args = _cleanup(@_);
return $self->es->indices->open(%args);
}
#===================================
sub close_index {
#===================================
my $self = shift;
my %args = _cleanup(@_);
return $self->es->indices->close(%args);
}
#===================================
sub update_index_settings {
#===================================
my ( $self, %args ) = @_;
$args{body} = { settings => delete $args{settings} };
return $self->es->indices->put_settings(%args);
}
#===================================
sub get_aliases {
#===================================
my $self = shift;
my %args = _cleanup(@_);
return $self->es->indices->get_aliases( ignore => 404, %args ) || {};
}
#===================================
sub put_aliases {
#===================================
my ( $self, %args ) = @_;
$args{body} = { actions => delete $args{actions} };
return $self->es->indices->update_aliases(%args);
}
#===================================
sub get_mapping {
#===================================
my $self = shift;
my %args = _cleanup(@_);
return $self->es->indices->get_mapping(%args);
}
#===================================
sub put_mapping {
#===================================
my ( $self, %args ) = @_;
$args{body} = delete $args{mapping};
return $self->es->indices->put_mapping(%args);
}
#===================================
sub delete_mapping {
#===================================
my $self = shift;
my %args = _cleanup(@_);
return $self->es->indices->delete_mapping(%args);
}
#===================================
sub reindex {
#===================================
my ( $self, %args ) = @_;
my %params = (
max_count => delete $args{bulk_size},
on_conflict => delete $args{on_conflict},
on_error => delete $args{on_error},
verbose => delete $args{verbose},
);
for ( keys %params ) {
delete $params{$_} unless defined $params{$_};
}
my $bulk = $self->es->bulk_helper(%params);
$bulk->reindex( %args, version_type => 'external', );
}
#===================================
sub bootstrap_uniques {
#===================================
my ( $self, %args ) = @_;
my $es = $self->es;
return if $es->indices->exists( index => $args{index} );
$es->indices->create(
index => $args{index},
body => {
settings => { number_of_shards => 1 },
mappings => {
_default_ => {
_all => { enabled => 0 },
_source => { enabled => 0 },
_type => { index => 'no' },
enabled => 0,
}
}
}
);
}
#===================================
sub create_unique_keys {
#===================================
my ( $self, %args ) = @_;
my %keys = %{ $args{keys} };
my %failed;
my $bulk = $self->es->bulk_helper(
index => $args{index},
on_conflict => sub {
my ( $action, $doc ) = @_;
$failed{ $doc->{_type} } = $doc->{_id};
},
on_error => sub {
die "Error creating multi unique keys: $_[2]";
}
);
$bulk->create(
map { +{ _type => $_, _id => $keys{$_}, _source => {} } }
keys %keys
);
$bulk->flush;
if (%failed) {
delete @keys{ keys %failed };
$self->delete_unique_keys( index => $args{index}, keys => \%keys );
}
return %failed;
}
#===================================
sub delete_unique_keys {
#===================================
my ( $self, %args ) = @_;
my %keys = %{ $args{keys} };
my $bulk = $self->es->bulk_helper(
index => $args{index},
on_error => sub {
die "Error deleting multi unique keys: $_[2]";
}
);
$bulk->delete( map { { type => $_, id => $keys{$_} } } keys %keys );
$bulk->flush;
return 1;
}
our %Warned;
#===================================
sub _cleanup {
#===================================
my (%args) = @_;
if ( delete $args{ignore_missing} ) {
warn "(ignore_missing) is deprecated. use { ignore => 404 } instead"
unless $Warned{ignore_missing}++;
$args{ignore} = 404;
}
return (%args);
}
1;
=pod
=encoding UTF-8
=head1 NAME
Elastic::Model::Role::Store - Elasticsearch backend for document read/write requests
=head1 VERSION
version 0.52
=head1 DESCRIPTION
All document-related requests to the Elasticsearch backend are handled
via L<Elastic::Model::Role::Store>.
=head1 ATTRIBUTES
=head2 es
$es = $store->es
Returns the connection to Elasticsearch.
=head1 METHODS
=head2 get_doc()
$result = $store->get_doc($uid, %args);
Retrieves the doc specified by the L<$uid|Elastic::Model::UID> from
Elasticsearch, by calling L<Search::Elasticsearch/"get()">. Throws an exception
if the document does not exist.
=head2 doc_exists()
$bool = $store->doc_exists($uid, %args);
Checks whether the doc exists in ElastciSearch. Any C<%args> are passed through
to L<Search::Elasticsearch/exists()>.
=head2 create_doc()
$result = $store->create_doc($uid => \%data, %args);
Creates a doc in the Elasticsearch backend and returns the raw result.
Throws an exception if a doc with the same L<$uid|Elastic::Model::UID>
already exists. Any C<%args> are passed to L<Search::Elasticsearch::Client::Direct/"create()">
=head2 index_doc()
$result = $store->index_doc($uid => \%data, %args);
Updates (or creates) a doc in the Elasticsearch backend and returns the raw
result. Any failure throws an exception. If the L<version|Elastic::Model::UID/"version">
number does not match what is stored in Elasticsearch, then a conflict exception
will be thrown. Any C<%args> will be passed to L<Search::Elasticsearch::Client::Direct/"index()">.
For instance, to overwrite a document regardless of version number, you could
do:
$result = $store->index_doc($uid => \%data, version => 0 );
=head2 delete_doc()
$result = $store->delete_doc($uid, %args);
Deletes a doc in the Elasticsearch backend and returns the raw
result. Any failure throws an exception. If the L<version|Elastic::Model::UID/"version">
number does not match what is stored in Elasticsearch, then a conflict exception
will be thrown. Any C<%args> will be passed to L<Search::Elasticsearch::Client::Direct/"delete()">.
=head2 bulk()
$result = $store->bulk(
actions => $actions,
on_conflict => sub {...},
on_error => sub {...},
on_success => sub {...},
%args
);
Performs several actions in a single request. Any %args will be passed to
L<Search::Elasticsearch::Client::Direct/bulk_helper()>.
=head2 search()
$results = $store->search(@args);
Performs a search, passing C<@args> to L<Search::Elasticsearch::Client::Direct/"search()">.
=head2 scrolled_search()
$results = $store->scrolled_search(@args);
Performs a scrolled search, passing C<@args> to L<Search::Elasticsearch::Client::Direct/"scroll_helper()">.
=head2 delete_by_query()
$response = $store->delete_by_query(@args);
Performs a delete-by-query, passing C<@args> to L<Search::Elasticsearch::Client::Direct/delete_by_query()>.
=head2 index_exists()
$bool = $store->index_exists(@args);
Checks whether the specified index exists, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/exists()>.
=head2 create_index()
$response = $store->create_index(@args);
Creates the specified index, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/create()>.
=head2 delete_index()
$response = $store->delete_index(@args);
Deletes the specified index, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/delete()>.
=head2 refresh_index()
$response = $store->refresh_index(@args);
Refreshes the specified index, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/refresh()>.
=head2 open_index()
$response = $store->open_index(@args);
Opens the specified index, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/open()>.
=head2 close_index()
$response = $store->close_index(@args);
Closes the specified index, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/close()>.
=head2 update_index_settings()
$response = $store->update_index_settings(@args);
Updates the settings of the specified index, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/update_settings()>.
=head2 get_aliases()
$response = $store->get_aliases(@args);
Retrieves the aliases for the specified indices, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/get_aliases()>.
=head2 put_aliases()
$response = $store->put_aliases(@args);
Updates the aliases for the specified indices, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/update_aliases()>.
=head2 get_mapping()
$response = $store->get_mapping(@args);
Retrieves the mappings for the specified index, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/get_mapping()>.
=head2 put_mapping()
$response = $store->put_mapping(@args);
Updates the mappings for the specified index, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/put_mapping()>.
=head2 delete_mapping()
$response = $store->delete_mapping(@args);
Deletes the mappings and associated documents for the specified index, passing C<@args> to L<Search::Elasticsearch::Client::Direct::Indices/delete_mapping()>.
=head2 reindex()
$response = $store->reindex(@args);
Passes the C<@args> to L<Search::Elasticsearch::Bulk/reindex()>.
=head2 bootstrap_uniques()
$response = $store->bootstrap_uniques(@args);
Creates the index which will store unique constraints, unless it already exists.
=head2 create_unique_keys()
$response = $store->create_unique_keys(@args);
Inserts the documents representing unique constraints, and throws an error if they already exist.
=head2 delete_unique_keys()
$response = $store->delete_unique_keys(@args);
Deletes the documents representing the specified unique constraints.
=head1 AUTHOR
Clinton Gormley <drtech@cpan.org>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2015 by Clinton Gormley.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut
__END__
# ABSTRACT: Elasticsearch backend for document read/write requests