package MR::Tarantool::Box; =pod =head1 NAME MR::Tarantool::Box - A driver for an efficient Tarantool/Box NoSQL in-memory storage. =head1 SYNOPSIS my $box = MR::Tarantool::Box->new({ servers => "127.0.0.1:33013", name => "My Box", # mostly used for debug purposes spaces => [ { indexes => [ { index_name => 'idx1', keys => [0], }, { index_name => 'idx2', keys => [1,2], }, ], space => 1, # space id, as set in Tarantool/Box config name => "primary", # self-descriptive space-id format => "QqLlSsCc&", # pack()-compatible, Qq must be supported by perl itself, & stands for byte-string. default_index => 'idx1', fields => [qw/ id f2 field3 f4 f5 f6 f7 f8 misc_string /], # turn each tuple into hash, field names according to format }, { #... } ], default_space => "primary", timeout => 1.0, # seconds retry => 3, debug => 9, # output to STDERR some debugging info raise => 0, # dont raise an exception in case of error }); my $bool = $box->Insert(1, 2,3, 4,5,6,7,8,"asdf") or die $box->ErrorStr; my $bool = $box->Insert(2, 2,4, 4,5,6,7,8,"asdf",{space => "primary"}) or die $box->ErrorStr; my $tuple = $box->Insert(3, 3,3, 4,5,6,7,8,"asdf",{want_inserted_tuple => 1}) or die $box->ErrorStr; # Select by single-field key my $tuple = $box->Select(1); # scalar context - scalar result: $tuple my @tuples = $box->Select(1,2,3); # list context - list result: ($tuple, $tuple, ...) my $tuples = $box->Select([1,2,3],{space => "primary", use_index => "idx1"}); # arrayref result: [$tuple, $tuple, ...] # Select by multi-field key my $tuples = $box->Select([[2,3]],{use_index => "idx2"}); # by full key my $tuples = $box->Select([[2]] ,{use_index => "idx2"}); # by partial key my $bool = $box->UpdateMulti(1,[ f4 => add => 3 ]); my $bool = $box->UpdateMulti(2,[ f4 => add => 3 ],{space => "primary"}); my $tuple = $box->UpdateMulti(3,[ f4 => add => 3 ],{want_updated_tuple => 1}); my $bool = $box->Delete(1); my $tuple = $box->Delete(2, {want_deleted_tuple => 1}); =head1 DESCRIPTION =head2 METHODS =cut use strict; use warnings; use Scalar::Util qw/looks_like_number/; use List::MoreUtils qw/each_arrayref zip/; use Time::HiRes qw/sleep/; use MR::IProto (); use constant { WANT_RESULT => 1, INSERT_ADD => 2, INSERT_REPLACE => 4, }; sub IPROTOCLASS () { 'MR::IProto' } use vars qw/$VERSION %ERRORS/; $VERSION = 0.0.0; BEGIN { *confess = \&MR::IProto::confess } %ERRORS = ( 0x00000000 => q{OK}, 0x00000100 => q{Non master connection, but it should be}, 0x00000200 => q{Illegal parametrs}, 0x00000300 => q{Uid not from this storage range}, 0x00000400 => q{Tuple is marked as read-only}, 0x00000500 => q{Tuple isn't locked}, 0x00000600 => q{Tuple is locked}, 0x00000700 => q{Failed to allocate memory}, 0x00000800 => q{Bad integrity}, 0x00000a00 => q{Unsupported command}, 0x00000b00 => q{Can't do select}, 0x00001800 => q{Can't register new user}, 0x00001a00 => q{Can't generate alert id}, 0x00001b00 => q{Can't del node}, 0x00001c00 => q{User isn't registered}, 0x00001d00 => q{Syntax error in query}, 0x00001e00 => q{Unknown field}, 0x00001f00 => q{Number value is out of range}, 0x00002000 => q{Insert already existing object}, 0x00002200 => q{Can not order result}, 0x00002300 => q{Multiple update/delete forbidden}, 0x00002400 => q{Nothing affected}, 0x00002500 => q{Primary key update forbidden}, 0x00002600 => q{Incorrect protocol version}, 0x00002700 => q{WAL failed}, 0x00003000 => q{Procedure return type is not supported in the binary protocol}, 0x00003100 => q{Tuple doesn't exist}, 0x00003200 => q{Procedure is not defined}, 0x00003300 => q{Lua error}, 0x00003400 => q{Space is disabled}, 0x00003500 => q{No such index in space}, 0x00003600 => q{Field was not found in the tuple}, 0x00003700 => q{Tuple already exists}, 0x00003800 => q{Duplicate key exists in a unique index}, 0x00003900 => q{Space does not exists}, ); =pod =head3 new my $box = $class->new(\%args); %args: =over =item B<spaces> => [ \%space, ... ] %space: =over =item B<space> => $space_id_uint32 Space id as set in Tarantool/Box config. =item B<name> => $space_name_string Self-descriptive space id, which will be mapped into C<space>. =item B<format> => $format_string C<pack()>-compatible tuple format string, allowed formats: C<QqLlSsCc&>, where C<&> stands for bytestring. C<Qq> usable only if perl supports int64 itself. Tuples' fields are packed/unpacked according to this C<format>. =item B<hashify> => B<$coderef> Specify a callback to turn each tuple into a good-looking hash. It receives C<space> id and resultset as arguments. No return value needed. $coderef = sub { my ($space_id, $resultset) = @_; $_ = { FieldName1 => $_->[0], FieldName2 => $_->[1], ... } for @$resultset; }; =item B<fields> => B<$arrayref> Specify an arrayref of fields names according to C<format> to turn each tuple into a good-looking hash. Names must begin with C<< [A-Za-z] >>. =item B<indexes> => [ \%index, ... ] %index: =over =item B<id> => $index_id_uint32 Index id as set in Tarantool/Box config within current C<space>. If not set, order position in C<indexes> is theated as C<id>. =item B<name> => $index_name_string Self-descriptive index id, which will be mapped into C<index_id>. =item B<keys> => [ $field_no_uint32, ... ] Properly ordered arrayref of fields' numbers which are indexed. =back =item B<default_index> => $default_index_name_string_or_id_uint32 Index C<id> or C<name> to be used by default for the current C<space>. Must be set if there are more than one C<\%index>es. =back =item B<default_space> => $default_space_name_string_or_id_uint32 Space C<space> or C<name> to be used by default. Must be set if there are more than one C<\%space>s. =item B<timeout> => $timeout_fractional_seconds_float || 23 A common timeout for network operations. =item B<select_timeout> => $select_timeout_fractional_seconds_float || 2 Select queries timeout for network operations. See L</select_retry>. =item B<retry> => $retry_int || 1 A common retries number for network operations. =item B<select_retry> => $select_retry_int || 3 Select queries retries number for network operations. Sometimes we need short timeout for select's and long timeout for B<critical> update's, because in case of timeout we B<don't know if the update has succeeded>. For the same reason we B<can't retry> update operation. So increasing C<timeout> and setting C<< retry => 1 >> for updates lowers possibility of such situations (but, of course, does not exclude them at all), and guarantees that we dont do the same more then once. =item B<soft_retry> => $soft_retry_int || 3 A common retries number for Tarantool/Box B<temporary errors> (these marked by 1 in the lowest byte of C<error_code>). In that case we B<know for sure> that the B<request was declined> by Tarantool/Box for some reason (a tuple was locked for another update, for example), and we B<can> try it again. This is also limited by C<retry>/C<select_retry> (depending on query type). =item B<retry_delay> => $retry_delay_fractional_seconds_float || 1 Specify a delay between retries for network operations. =item B<raise> => $raise_bool || 1 Should we raise an exceptions? If so, exceptions are raised when no more retries left and all tries failed (with timeout, fatal, or temporary error). =item B<debug> => $debug_level_int || 0 Debug level, 0 - print nothing, 9 - print everything =item B<name> => $name A string used for self-description. Mainly used for debugging purposes. =back =cut sub new { my ($class, $arg) = @_; my $self; $arg = { %$arg }; $self->{name} = $arg->{name} || ref$class || $class; $self->{timeout} = $arg->{timeout} || 23; $self->{retry} = $arg->{retry} || 1; $self->{retry_delay} = $arg->{retry_delay} || 1; $self->{select_retry} = $arg->{select_retry} || 3; $self->{softretry} = $arg->{soft_retry} || $arg->{softretry} || 3; $self->{debug} = $arg->{'debug'} || 0; $self->{ipdebug} = $arg->{'ipdebug'} || 0; $self->{raise} = 1; $self->{raise} = $arg->{raise} if exists $arg->{raise}; $self->{select_timeout} = $arg->{select_timeout} || $self->{timeout}; $self->{iprotoclass} = $arg->{iprotoclass} || $class->IPROTOCLASS; $self->{_last_error} = 0; $self->{hashify} = $arg->{'hashify'} if exists $arg->{'hashify'}; $self->{default_raw} = $arg->{default_raw}; $arg->{spaces} = $arg->{namespaces} = [@{ $arg->{spaces} ||= $arg->{namespaces} || confess "no spaces given" }]; confess "no spaces given" unless @{$arg->{spaces}}; my %namespaces; for my $ns (@{$arg->{spaces}}) { $ns = { %$ns }; my $namespace = defined $ns->{space} ? $ns->{space} : $ns->{namespace}; $ns->{space} = $ns->{namespace} = $namespace; confess "space[?] `space' not set" unless defined $namespace; confess "space[$namespace] already defined" if $namespaces{$namespace} or $ns->{name}&&$namespaces{$ns->{name}}; confess "space[$namespace] no indexes defined" unless $ns->{indexes} && @{$ns->{indexes}}; $namespaces{$namespace} = $ns; $namespaces{$ns->{name}} = $ns if $ns->{name}; confess "space[$namespace] bad format `$ns->{format}'" if $ns->{format} =~ m/[^&lLsScCqQ ]/; $ns->{format} =~ s/\s+//g; my @f = split //, $ns->{format}; $ns->{byfield_unpack_format} = [ map { /&/ ? 'w/a*' : "x$_" } @f ]; $ns->{field_format} = [ map { /&/ ? 'a*' : $_ } @f ]; $ns->{unpack_format} = join('', @{$ns->{byfield_unpack_format}}); $ns->{append_for_unpack} = '' unless defined $ns->{append_for_unpack}; $ns->{check_keys} = {}; $ns->{string_keys} = { map { $_ => 1 } grep { $f[$_] eq '&' } 0..$#f }; my $inames = $ns->{index_names} = {}; my $i = -1; for my $index (@{$ns->{indexes}}) { ++$i; confess "space[$namespace]index[($i)] no name given" unless length $index->{index_name}; my $index_name = $index->{index_name}; confess "space[$namespace]index[$index_name($i)] no indexes defined" unless $index->{keys} && @{$index->{keys}}; confess "space[$namespace]index[$index_name($i)] already defined" if $inames->{$index_name} || $inames->{$i}; $index->{id} = $i unless defined $index->{id}; $inames->{$i} = $inames->{$index_name} = $index; int $_ == $_ and $_ >= 0 and $_ < @f or confess "space[$namespace]index[$index_name] bad key `$_'" for @{$ns->{keys}}; $ns->{check_keys}->{$_} = int !! $ns->{string_keys}->{$_} for @{$index->{keys}}; $index->{string_keys} ||= $ns->{string_keys}; } if( @{$ns->{indexes}} > 1 ) { confess "space[$namespace] default_index not given" unless defined $ns->{default_index}; confess "space[$namespace] default_index $ns->{default_index} does not exist" unless $inames->{$ns->{default_index}}; } else { $ns->{default_index} ||= 0; } if($ns->{fields}) { confess "space[$namespace] fields must be ARRAYREF" unless ref $ns->{fields} eq 'ARRAY'; confess "space[$namespace] fields number must match format" if @{$ns->{fields}} != @f; m/^[A-Za-z]/ or confess "space[$namespace] fields names must begin with [A-Za-z]: bad name $_" for @{$ns->{fields}}; $ns->{fields_hash} = { map { $ns->{fields}->[$_] => $_ } 0..$#{$ns->{fields}} }; } } $self->{namespaces} = \%namespaces; if (@{$arg->{spaces}} > 1) { $arg->{default_namespace} = $arg->{default_space} if defined $arg->{default_space}; confess "default_space not given" unless defined $arg->{default_namespace}; confess "default_space $arg->{default_namespace} does not exist" unless $namespaces{$arg->{default_namespace}}; $self->{default_namespace} = $arg->{default_namespace}; } else { $self->{default_namespace} = $arg->{default_space} || $arg->{default_namespace} || $arg->{spaces}->[0]->{space}; confess "default_space $self->{default_namespace} does not exist" unless $namespaces{$self->{default_namespace}}; } bless $self, $class; $self->_connect($arg->{'servers'}); return $self; } sub _debug { if($_[0]->{warn}) { &{$_[0]->{warn}}; } else { warn "@_[1..$#_]\n"; } } sub _connect { my ($self, $servers) = @_; $self->{server} = $self->{iprotoclass}->new({ servers => $servers, name => $self->{name}, debug => $self->{'ipdebug'}, dump_no_ints => 1, }); } =pod =head3 Error Last error code, or 'fail' for some network reason, oftenly a timeout. $box->Insert(@tuple) or die sprintf "Error %X", $box->Error; # die "Error 202" =head3 ErrorStr Last error code and description in a single string. $box->Insert(@tuple) or die $box->ErrorStr; # die "Error 00000202: Illegal Parameters" =cut sub ErrorStr { return $_[0]->{_last_error_msg}; } sub Error { return $_[0]->{_last_error}; } sub _chat { my ($self, %param) = @_; my $orig_unpack = delete $param{unpack}; $param{unpack} = sub { my $data = $_[0]; confess __LINE__."$self->{name}: [common]: Bad response" if length $data < 4; my ($full_code, @err_code) = unpack('LX[L]CSC', substr($data, 0, 4, '')); # $err_code[0] = severity: 0 -> ok, 1 -> transient, 2 -> permanent; # $err_code[1] = description; # $err_code[2] = da box project; return (\@err_code, \$data, $full_code); }; my $timeout = $param{timeout} || $self->{timeout}; my $retry = $param{retry} || $self->{retry}; my $soft_retry = $self->{softretry}; my $retry_count = 0; while ($retry > 0) { $retry_count++; $self->{_last_error} = 0x77777777; $self->{server}->SetTimeout($timeout); my $ret = $self->{server}->Chat1(%param); my $message; if (exists $ret->{ok}) { my ($ret_code, $data, $full_code) = @{$ret->{ok}}; $self->{_last_error} = $full_code; if ($ret_code->[0] == 0) { my $ret = $orig_unpack->($$data,$ret_code->[2]); confess __LINE__."$self->{name}: [common]: Bad response (more data left)" if length $$data > 0; return $ret; } $self->{_last_error_msg} = $message = $ret_code->[0] == 0 ? "ok" : sprintf "Error %08X: %s", $full_code, $$data || $ERRORS{$full_code & 0xFFFFFF00} || 'Unknown error'; $self->_debug("$self->{name}: $message") if $self->{debug} >= 1; if ($ret_code->[0] == 2) { #fatal error $self->_raise($message) if $self->{raise}; return 0; } # retry if error is soft even in case of update e.g. ROW_LOCK if ($ret_code->[0] == 1 and --$soft_retry > 0) { --$retry if $retry > 1; sleep $self->{retry_delay}; next; } } else { # timeout has caused the failure if $ret->{timeout} $self->{_last_error} = 'fail'; $message ||= $self->{_last_error_msg} = $ret->{fail}; $self->_debug("$self->{name}: $message") if $self->{debug} >= 1; } last unless --$retry; sleep $self->{retry_delay}; }; $self->_raise("no success after $retry_count tries\n") if $self->{raise}; } sub _raise { my ($self, $msg) = @_; die "$self->{name}: $msg\n"; } sub _validate_param { my ($self, $args, @pnames) = @_; my $param = $args && @$args && ref $args->[-1] eq 'HASH' ? {%{pop @$args}} : {}; my %pnames = map { $_ => 1 } @pnames; $pnames{space} = 1; $pnames{namespace} = 1; foreach my $pname (keys %$param) { confess "$self->{name}: unknown param $pname\n" unless exists $pnames{$pname}; } $param->{namespace} = $param->{space} if exists $param->{space} and defined $param->{space}; $param->{namespace} = $self->{default_namespace} unless defined $param->{namespace}; confess "$self->{name}: bad space `$param->{namespace}'" unless exists $self->{namespaces}->{$param->{namespace}}; my $ns = $self->{namespaces}->{$param->{namespace}}; $param->{use_index} = $ns->{default_index} unless defined $param->{use_index}; confess "$self->{name}: bad index `$param->{use_index}'" unless exists $ns->{index_names}->{$param->{use_index}}; $param->{index} = $ns->{index_names}->{$param->{use_index}}; if(exists $pnames{raw}) { $param->{raw} = $ns->{default_raw} unless defined $param->{raw}; $param->{raw} = $self->{default_raw} unless defined $param->{raw}; } return ($param, $ns, map { $param->{$_} } @pnames); } =pod =head3 Call Call a stored procedure. Returns an arrayref of the result tuple(s) upon success. my $results = $box->Call('stored_procedure_name', \@procedure_params, \%options) or die $box->ErrorStr; # Call failed my $result_tuple = @$results && $results->[0] or warn "Call succeeded, but returned nothing"; =over =item B<@procedure_params> An array of bytestrings to be passed as is to the procecedure. =item B<%options> =over =item B<unpack_format> Format to unpack the result tuple, the same as C<format> option for C<new()> =back =back =cut sub Call { my ($param, $namespace) = $_[0]->_validate_param(\@_, qw/flags raise unpack unpack_format/); my ($self, $sp_name, $tuple) = @_; my $flags = $param->{flags} || 0; local $self->{raise} = $param->{raise} if defined $param->{raise}; $self->_debug("$self->{name}: CALL($sp_name)[${\join ' ', map {join' ',unpack'(H2)*',$_} @$tuple}]") if $self->{debug} >= 4; confess "All fields must be defined" if grep { !defined } @$tuple; confess "Bad `unpack_format` option" if exists $param->{unpack_format} and ref $param->{unpack_format} ne 'ARRAY'; my $unpack_format = join '', map { /&/ ? 'w/a*' : "x$_" } @{$param->{unpack_format}}; local $namespace->{unpack_format} = $unpack_format if $unpack_format; # XXX local $namespace->{append_for_unpack} = '' if $unpack_format; # shit... $self->_chat ( msg => 22, payload => pack("L w/a* L(w/a*)*", $flags, $sp_name, scalar(@$tuple), @$tuple), unpack => $param->{unpack} || sub { $self->_unpack_select($namespace, "CALL", @_) }, callback => $param->{callback}, ); } =pod =head3 Add, Insert, Replace $box->Add(@tuple) or die $box->ErrorStr; # only store a new tuple $box->Replace(@tuple, { space => "secondary" }); # only store an existing tuple $box->Insert(@tuple, { space => "main" }); # store anyway Insert a C<< @tuple >> into the storage into C<$options{space}> or C<default_space> space. All of them return C<true> upon success. All of them have the same parameters: =over =item B<@tuple> A tuple to insert. All fields must be defined. All fields will be C<pack()>ed according to C<format> (see L</new>) =item B<%options> =over =item B<space> => $space_id_uint32_or_name_string Specify storage space to work on. =back =back The difference between them is the behaviour concerning tuple with the same primary key: =over =item * B<Add> will succeed if and only if duplicate-key tuple B<does not exist> =item * B<Replace> will succeed if and only if a duplicate-key tuple B<exists> =item * B<Insert> will succeed B<anyway>. Duplicate-key tuple will be B<overwritten> =back =cut sub Add { # store tuple if tuple identified by primary key _does_not_ exist my $param = @_ && ref $_[-1] eq 'HASH' ? pop : {}; $param->{action} = 'add'; $_[0]->Insert(@_[1..$#_], $param); } sub Set { # store tuple _anyway_ my $param = @_ && ref $_[-1] eq 'HASH' ? pop : {}; $param->{action} = 'set'; $_[0]->Insert(@_[1..$#_], $param); } sub Replace { # store tuple if tuple identified by primary key _does_ exist my $param = @_ && ref $_[-1] eq 'HASH' ? pop : {}; $param->{action} = 'replace'; $_[0]->Insert(@_[1..$#_], $param); } sub Insert { my ($param, $namespace) = $_[0]->_validate_param(\@_, qw/want_result want_inserted_tuple _flags action/); my ($self, @tuple) = @_; $self->_debug("$self->{name}: INSERT(NS:$namespace->{namespace},TUPLE:[@{[map {qq{`$_'}} @tuple]}])") if $self->{debug} >= 3; $param->{want_result} = $param->{want_inserted_tuple} if !defined $param->{want_result}; my $flags = $param->{_flags} || 0; $flags |= WANT_RESULT if $param->{want_result}; $param->{action} ||= 'set'; if ($param->{action} eq 'add') { $flags |= INSERT_ADD; } elsif ($param->{action} eq 'replace') { $flags |= INSERT_REPLACE; } elsif ($param->{action} ne 'set') { confess "$self->{name}: Bad insert action `$param->{action}'"; } my $chkkey = $namespace->{check_keys}; my $fmt = $namespace->{field_format}; confess "Wrong fields number in tuple" if @tuple != @$fmt; for (0..$#tuple) { confess "$self->{name}: ref in tuple $_=`$tuple[$_]'" if ref $tuple[$_]; no warnings 'uninitialized'; if(exists $chkkey->{$_}) { if($chkkey->{$_}) { confess "$self->{name}: undefined key $_" unless defined $tuple[$_]; } else { confess "$self->{name}: not numeric key $_=`$tuple[$_]'" unless looks_like_number($tuple[$_]) && int($tuple[$_]) == $tuple[$_]; } } $tuple[$_] = pack($fmt->[$_], $tuple[$_]); } $self->_debug("$self->{name}: INSERT[${\join ' ', map {join' ',unpack'(H2)*',$_} @tuple}]") if $self->{debug} >= 4; my $r = $self->_chat ( msg => 13, payload => pack("LLL (w/a*)*", $namespace->{namespace}, $flags, scalar(@tuple), @tuple), unpack => sub { $self->_unpack_affected($flags, $namespace, @_) }, callback => $param->{callback}, ) or return; return $r unless $param->{want_result}; $self->_PostSelect($r, $param, $namespace); return $r->[0]; } sub _unpack_select { my ($self, $ns, $debug_prefix) = @_; $debug_prefix ||= "SELECT"; confess __LINE__."$self->{name}: [$debug_prefix]: Bad response" if length $_[3] < 4; my $result_count = unpack('L', substr($_[3], 0, 4, '')); $self->_debug("$self->{name}: [$debug_prefix]: COUNT=[$result_count];") if $self->{debug} >= 3; my (@res); my $appe = $ns->{append_for_unpack}; my $fmt = $ns->{unpack_format}; for(my $i = 0; $i < $result_count; ++$i) { confess __LINE__."$self->{name}: [$debug_prefix]: Bad response" if length $_[3] < 8; my ($len, $cardinality) = unpack('LL', substr($_[3], 0, 8, '')); $self->_debug("$self->{name}: [$debug_prefix]: ROW[$i]: LEN=[$len]; NFIELD=[$cardinality];") if $self->{debug} >= 4; confess __LINE__."$self->{name}: [$debug_prefix]: Bad response" if length $_[3] < $len; my $packed_tuple = substr($_[3], 0, $len, ''); $self->_debug("$self->{name}: [$debug_prefix]: ROW[$i]: DATA=[@{[unpack '(H2)*', $packed_tuple]}];") if $self->{debug} >= 6; $packed_tuple .= $appe; my @tuple = eval { unpack($fmt, $packed_tuple) }; confess "$self->{name}: [$debug_prefix]: ROW[$i]: can't unpack tuple [@{[unpack('(H2)*', $packed_tuple)]}]: $@" if !@tuple || $@; $self->_debug("$self->{name}: [$debug_prefix]: ROW[$i]: FIELDS=[@{[map { qq{`$_'} } @tuple]}];") if $self->{debug} >= 5; push @res, \@tuple; } return \@res; } sub _unpack_select_multi { my ($self, $nss, $debug_prefix) = @_; $debug_prefix ||= "SMULTI"; my (@rsets); my $i = 0; for my $ns (@$nss) { push @rsets, $self->_unpack_select($ns, "${debug_prefix}[$i]", $_[3]); ++$i; } return \@rsets; } sub _unpack_affected { my ($self, $flags, $ns) = @_; ($flags & WANT_RESULT) ? $self->_unpack_select($ns, "AFFECTED", $_[3]) : unpack('L', substr($_[3],0,4,''))||'0E0'; } sub NPRM () { 3 } sub _pack_keys { my ($self, $ns, $idx) = @_; my $keys = $idx->{keys}; my $strkey = $ns->{string_keys}; my $fmt = $ns->{field_format}; if (@$keys == 1) { $fmt = $fmt->[$keys->[0]]; $strkey = $strkey->{$keys->[0]}; foreach (@_[NPRM..$#_]) { ($_) = @$_ if ref $_ eq 'ARRAY'; unless ($strkey) { confess "$self->{name}: not numeric key [$_]" unless looks_like_number($_) && int($_) == $_; $_ = pack($fmt, $_); } $_ = pack('L(w/a*)*', 1, $_); } } else { foreach my $k (@_[NPRM..$#_]) { confess "bad key [@$keys][$k][@{[ref $k eq 'ARRAY' ? (@$k) : ()]}]" unless ref $k eq 'ARRAY' and @$k and @$k <= @$keys; for my $i (0..$#$k) { unless ($strkey->{$keys->[$i]}) { confess "$self->{name}: not numeric key [$i][$k->[$i]]" unless looks_like_number($k->[$i]) && int($k->[$i]) == $k->[$i]; } $k->[$i] = pack($fmt->[$keys->[$i]], $k->[$i]); } $k = pack('L(w/a*)*', scalar(@$k), @$k); } } } sub _PackSelect { my ($self, $param, $namespace, @keys) = @_; return '' unless @keys; $self->_pack_keys($namespace, $param->{index}, @keys); my $format = ""; if ($param->{format}) { my $f = $namespace->{byfield_unpack_format}; $param->{unpack_format} = join '', map { $f->[$_->{field}] } @{$param->{format}}; $format = pack 'l*', scalar @{$param->{format}}, map { $_ = { %$_ }; if($_->{full}) { $_->{offset} = 0; $_->{length} = 'max'; } $_->{length} = 0x7FFFFFFF if $_->{length} eq 'max'; @$_{qw/field offset length/} } @{$param->{format}}; } return pack("LLLL a* La*", $namespace->{namespace}, $param->{index}->{id}, $param->{offset} || 0, $param->{limit} || scalar(@keys), $format, scalar(@keys), join('',@keys)); } sub _PostSelect { my ($self, $r, $param, $namespace) = @_; if(!$param->{raw}) { my $hashify = $param->{hashify} || $namespace->{hashify} || $self->{hashify}; if($hashify) { $hashify->($namespace->{namespace}, $r); } elsif( $namespace->{fields} ) { $_ = { zip @{$namespace->{fields}}, @$_ } for @$r; } } } =pod =head3 Select Select tuple(s) from storage my $key = $id; my $key = [ $firstname, $lastname ]; my @keys = ($key, ...); my $tuple = $box->Select($key) or $box->Error && die $box->ErrorStr; my $tuple = $box->Select($key, \%options) or $box->Error && die $box->ErrorStr; my @tuples = $box->Select(@keys) or $box->Error && die $box->ErrorStr; my @tuples = $box->Select(@keys, \%options) or $box->Error && die $box->ErrorStr; my $tuples = $box->Select(\@keys) or die $box->ErrorStr; my $tuples = $box->Select(\@keys, \%options) or die $box->ErrorStr; =over =item B<$key>, B<@keys>, B<\@keys> Specify keys to select. All keys must be defined. Contextual behaviour: =over =item * In scalar context, you can select one C<$key>, and the resulting tuple will be returned. Check C<< $box->Error >> to see if there was an error or there is just no such key in the storage =item * In list context, you can select several C<@keys>, and the resulting tuples will be returned. Check C<< $box->Error >> to see if there was an error or there is just no such keys in the storage =item * If you select C<< \@keys >> then C<< \@tuples >> will be returned upon success. C<< @tuples >> will be empty if there are no such keys, and false will be returned in case of error. =back Other notes: =over =item * If you select using index on multiple fields each C<< $key >> should be given as a key-tuple C<< $key = [ $key_field1, $key_field2, ... ] >>. =back =item B<%options> =over =item B<space> => $space_id_uint32_or_name_string Specify storage (by id or name) space to select from. =item B<use_index> => $index_id_uint32_or_name_string Specify index (by id or name) to use. =item B<raw> => $bool Don't C<hashify> (see L</new>). =item B<hash_by> => $by Return a hashref of the resultset. If you C<hashify> the result set, then C<$by> must be a field name of the hash you return, otherwise it must be a number of field of the tuple. C<False> will be returned in case of error. =back =back =cut my @select_param_ok = qw/use_index raw want next_rows limit offset raise hashify timeout format hash_by/; sub Select { confess q/Select isnt callable in void context/ unless defined wantarray; my ($param, $namespace) = $_[0]->_validate_param(\@_, @select_param_ok); my ($self, @keys) = @_; local $self->{raise} = $param->{raise} if defined $param->{raise}; @keys = @{$keys[0]} if @keys && ref $keys[0] eq 'ARRAY' and 1 == @{$param->{index}->{keys}} || (@keys && ref $keys[0]->[0] eq 'ARRAY'); $self->_debug("$self->{name}: SELECT(NS:$namespace->{namespace},IDX:$param->{use_index})[@{[map{ref$_?qq{[@$_]}:$_}@keys]}]") if $self->{debug} >= 3; my ($msg,$payload); if(exists $param->{next_rows}) { confess "$self->{name}: One and only one key can be used to get N>0 rows after it" if @keys != 1 || !$param->{next_rows}; $msg = 15; $self->_pack_keys($namespace, $param->{index}, @keys); $payload = pack("LL a*", $namespace->{namespace}, $param->{next_rows}, join('',@keys)), } else { $payload = $self->_PackSelect($param, $namespace, @keys); $msg = $param->{format} ? 21 : 17; } local $namespace->{unpack_format} = $param->{unpack_format} if $param->{unpack_format}; my $r = []; if (@keys && $payload) { $r = $self->_chat( msg => $msg, payload => $payload, unpack => sub { $self->_unpack_select($namespace, "SELECT", @_) }, retry => $self->{select_retry}, timeout => $param->{timeout} || $self->{select_timeout}, callback => $param->{callback}, ) or return; } $param->{want} ||= !1; $self->_PostSelect($r, $param, $namespace); if(defined(my $p = $param->{hash_by})) { my %h; if(@$r) { if (ref $r->[0] eq 'HASH') { confess "Bad hash_by `$p' for HASH" unless exists $r->[0]->{$p}; $h{$_->{$p}} = $_ for @$r; } elsif(ref $r->[0] eq 'ARRAY') { confess "Bad hash_by `$p' for ARRAY" unless $p =~ m/^\d+$/ && $p >= 0 && $p < @{$r->[0]}; $h{$_->[$p]} = $_ for @$r; } else { confess "i dont know how to hash_by ".ref($r->[0]); } } return \%h; } return $r if $param->{want} eq 'arrayref'; if (wantarray) { return @{$r}; } else { confess "$self->{name}: too many keys in scalar context" if @keys > 1; return $r->[0]; } } sub SelectUnion { confess "not supported yet"; my ($param) = $_[0]->_validate_param(\@_, qw/raw raise/); my ($self, @reqs) = @_; return [] unless @reqs; local $self->{raise} = $param->{raise} if defined $param->{raise}; confess "bad param" if grep { ref $_ ne 'ARRAY' } @reqs; $param->{want} ||= 0; for my $req (@reqs) { my ($param, $namespace) = $self->_validate_param($req, @select_param_ok); $req = { payload => $self->_PackSelect($param, $namespace, $req), param => $param, namespace => $namespace, }; } my $r = $self->_chat( msg => 18, payload => pack("L (a*)*", scalar(@reqs), map { $_->{payload} } @reqs), unpack => sub { $self->_unpack_select_multi([map { $_->{namespace} } @reqs], "SMULTI", @_) }, retry => $self->{select_retry}, timeout => $param->{select_timeout} || $self->{timeout}, callback => $param->{callback}, ) or return; confess __LINE__."$self->{name}: something wrong" if @$r != @reqs; my $ea = each_arrayref $r, \@reqs; while(my ($res, $req) = $ea->()) { $self->_PostSelect($res, { %$param, %{$req->{param}} }, $req->{namespace}); } return $r; } =pod =head3 Delete Delete tuple from storage. Return false upon error. my $n_deleted = $box->Delete($key) or die $box->ErrorStr; my $n_deleted = $box->Delete($key, \%options) or die $box->ErrorStr; warn "Nothing was deleted" unless int $n_deleted; my $deleted_tuple_set = $box->Delete($key, { want_deleted_tuples => 1 }) or die $box->ErrorStr; warn "Nothing was deleted" unless @$deleted_tuple_set; =over =item B<%options> =over =item B<space> => $space_id_uint32_or_name_string Specify storage space (by id or name) to work on. =item B<want_deleted_tuple> => $bool if C<$bool> then return deleted tuple. =back =back =cut sub Delete { my ($param, $namespace) = $_[0]->_validate_param(\@_, qw/want_deleted_tuple want_result/); my ($self, $key) = @_; $param->{want_result} = $param->{want_deleted_tuple} if !defined $param->{want_result}; my $flags = 0; $flags |= WANT_RESULT if $param->{want_result}; $self->_debug("$self->{name}: DELETE(NS:$namespace->{namespace},KEY:$key,F:$flags)") if $self->{debug} >= 3; confess "$self->{name}\->Delete: for now key cardinality of 1 is only allowed" unless 1 == @{$param->{index}->{keys}}; $self->_pack_keys($namespace, $param->{index}, $key); my $r = $self->_chat( msg => $flags ? 21 : 20, payload => $flags ? pack("L L a*", $namespace->{namespace}, $flags, $key) : pack("L a*", $namespace->{namespace}, $key), unpack => sub { $self->_unpack_affected($flags, $namespace, @_) }, callback => $param->{callback}, ) or return; return $r unless $param->{want_result}; $self->_PostSelect($r, $param, $namespace); return $r->[0]; } sub OP_SET () { 0 } sub OP_ADD () { 1 } sub OP_AND () { 2 } sub OP_XOR () { 3 } sub OP_OR () { 4 } sub OP_SPLICE () { 5 } my %update_ops = ( set => OP_SET, add => OP_ADD, and => OP_AND, xor => OP_XOR, or => OP_OR, splice => sub { confess "value for operation splice must be an ARRAYREF of <int[, int[, string]]>" if ref $_[0] ne 'ARRAY' || @{$_[0]} < 1; $_[0]->[0] = 0x7FFFFFFF unless defined $_[0]->[0]; $_[0]->[0] = pack 'l', $_[0]->[0]; $_[0]->[1] = defined $_[0]->[1] ? pack 'l', $_[0]->[1] : ''; $_[0]->[2] = '' unless defined $_[0]->[2]; return (OP_SPLICE, [ pack '(w/a*)*', @{$_[0]} ]); }, append => sub { splice => [undef, 0, $_[0]] }, prepend => sub { splice => [0, 0, $_[0]] }, cutbeg => sub { splice => [0, $_[0], '' ] }, cutend => sub { splice => [-$_[0], $_[0], '' ] }, substr => 'splice', ); !ref $_ && m/^\D/ and $_ = $update_ops{$_} || die "bad link" for values %update_ops; my %update_arg_fmt = ( (map { $_ => 'l' } OP_ADD), (map { $_ => 'L' } OP_AND, OP_XOR, OP_OR), ); my %ops_type = ( (map { $_ => 'any' } OP_SET), (map { $_ => 'number' } OP_ADD, OP_AND, OP_XOR, OP_OR), (map { $_ => 'string' } OP_SPLICE), ); BEGIN { for my $op (qw/Append Prepend Cutbeg Cutend Substr/) { eval q/ sub /.$op.q/ { my $param = ref $_[-1] eq 'HASH' ? pop : {}; my ($self, $key, $field_num, $val) = @_; $self->UpdateMulti($key, [ $field_num => /.lc($op).q/ => $val ], $param); } 1; / or die $@; } } =pod =head3 UpdateMulti Apply several update operations to a tuple. my @op = ([ f1 => add => 10 ], [ f1 => and => 0xFF], [ f2 => set => time() ], [ misc_string => cutend => 3 ]); my $n_updated = $box->UpdateMulti($key, @op) or die $box->ErrorStr; my $n_updated = $box->UpdateMulti($key, @op, \%options) or die $box->ErrorStr; warn "Nothing was updated" unless int $n_updated; my $updated_tuple_set = $box->UpdateMulti($key, @op, { want_result => 1 }) or die $box->ErrorStr; warn "Nothing was updated" unless @$updated_tuple_set; Different fields can be updated at one shot. The same field can be updated more than once. All update operations are done atomically. Returns false upon error. =over =item B<@op> = ([ $field => $op => $value ], ...) =over =item B<$field> Field-to-update number or name (see L</fields>). =item B<$op> =over =item B<set> Set C<< $field >> to C<< $value >> =item B<add>, B<and>, B<xor>, B<or> Apply an arithmetic operation to C<< $field >> with argument C<< $value >> Currently arithmetic operations are supported only for int32 (4-byte length) fields (and C<$value>s too) =item B<splice>, B<substr> Apply a perl-like L<splice|perlfunc/splice> operation to C<< $field >>. B<$value> = [$OFFSET, $LENGTH, $REPLACE_WITH]. substr is just an alias. =item B<append>, B<prepend> Append or prepend C<< $field >> with C<$value> string. =item B<cutbeg>, B<cutend> Cut C<< $value >> bytes from beginning or end of C<< $field >>. =back =back =item B<%options> =over =item B<space> => $space_id_uint32_or_name_string Specify storage space (by id or name) to work on. =item B<want_updated_tuple> => $bool if C<$bool> then return updated tuple. =back =cut sub UpdateMulti { my ($param, $namespace) = $_[0]->_validate_param(\@_, qw/want_updated_tuple want_result _flags/); my ($self, $key, @op) = @_; $self->_debug("$self->{name}: UPDATEMULTI(NS:$namespace->{namespace},KEY:$key)[@{[map{qq{[@$_]}}@op]}]") if $self->{debug} >= 3; confess "$self->{name}\->UpdateMulti: for now key cardinality of 1 is only allowed" unless 1 == @{$param->{index}->{keys}}; confess "$self->{name}: too many op" if scalar @op > 128; $param->{want_result} = $param->{want_updated_tuple} if !defined $param->{want_result}; my $flags = $param->{_flags} || 0; $flags |= WANT_RESULT if $param->{want_result}; my $fmt = $namespace->{field_format}; my $fields_hash = $namespace->{fields_hash}; foreach (@op) { confess "$self->{name}: bad op <$_>" if ref ne 'ARRAY' or @$_ != 3; my ($field_num, $op, $value) = @$_; if($field_num =~ m/^[A-Za-z]/) { confess "no such field $field_num in space $namespace->{name}($namespace->{space})" unless exists $fields_hash->{$field_num}; $field_num = $fields_hash->{$field_num}; } my $field_type = $namespace->{string_keys}->{$field_num} ? 'string' : 'number'; my $is_array = 0; if ($op eq 'bit_set') { $op = OP_OR; } elsif ($op eq 'bit_clear') { $op = OP_AND; $value = ~$value; } elsif ($op =~ /^num_(add|sub)$/) { $value = -$value if $1 eq 'sub'; $op = OP_ADD; } else { confess "$self->{name}: bad op <$op>" unless exists $update_ops{$op}; $op = $update_ops{$op}; } while(ref $op eq 'CODE') { ($op, $value) = &$op($value); $op = $update_ops{$op} if exists $update_ops{$op}; } confess "Are you sure you want to apply `$ops_type{$op}' operation to $field_type field?" if $ops_type{$op} ne $field_type && $ops_type{$op} ne 'any'; $value = [ $value ] unless ref $value; confess "dunno what to do with ref `$value'" if ref $value ne 'ARRAY'; confess "bad fieldnum: $field_num" if $field_num >= @$fmt; $value = pack($update_arg_fmt{$op} || $fmt->[$field_num], @$value); $_ = pack('LCw/a*', $field_num, $op, $value); } $self->_pack_keys($namespace, $param->{index}, $key); my $r = $self->_chat( msg => 19, payload => pack("LL a* L (a*)*" , $namespace->{namespace}, $flags, $key, scalar(@op), @op), unpack => sub { $self->_unpack_affected($flags, $namespace, @_) }, callback => $param->{callback}, ) or return; return $r unless $param->{want_result}; $self->_PostSelect($r, $param, $namespace); return $r->[0]; } sub Update { my $param = ref $_[-1] eq 'HASH' ? pop : {}; my ($self, $key, $field_num, $value) = @_; $self->UpdateMulti($key, [$field_num => set => $value ], $param); } sub AndXorAdd { my $param = ref $_[-1] eq 'HASH' ? pop : {}; my ($self, $key, $field_num, $and, $xor, $add) = @_; my @upd; push @upd, [$field_num => and => $and] if defined $and; push @upd, [$field_num => xor => $xor] if defined $xor; push @upd, [$field_num => add => $add] if defined $add; $self->UpdateMulti($key, @upd, $param); } sub Bit { my $param = ref $_[-1] eq 'HASH' ? pop : {}; my ($self, $key, $field_num, %arg) = @_; confess "$self->{name}: unknown op '@{[keys %arg]}'" if grep { not /^(bit_clear|bit_set|set)$/ } keys(%arg); $arg{bit_clear} ||= 0; $arg{bit_set} ||= 0; my @op; push @op, [$field_num => set => $arg{set}] if exists $arg{set}; push @op, [$field_num => bit_clear => $arg{bit_clear}] if $arg{bit_clear}; push @op, [$field_num => bit_set => $arg{bit_set}] if $arg{bit_set}; $self->UpdateMulti($key, @op, $param); } sub Num { my $param = ref $_[-1] eq 'HASH' ? pop : {}; my ($self, $key, $field_num, %arg) = @_; confess "$self->{name}: unknown op '@{[keys %arg]}'" if grep { not /^(num_add|num_sub|set)$/ } keys(%arg); $arg{num_add} ||= 0; $arg{num_sub} ||= 0; $arg{num_add} -= $arg{num_sub}; my @op; push @op, [$field_num => set => $arg{set}] if exists $arg{set}; push @op, [$field_num => num_add => $arg{num_add}]; # if $arg{num_add}; $self->UpdateMulti($key, @op, $param); } =head1 LICENCE AND COPYRIGHT This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself. =head1 SEE ALSO =over =item * L<http://tarantool.org> =item * L<MR::Tarantool::Box::Singleton> =back =cut 1;