#include "db/db_iter.h"
#include <stdexcept>
#include <deque>
#include <string>
#include <limits>
#include "db/filename.h"
#include "db/dbformat.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "port/port.h"
#include "util/arena.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
namespace
rocksdb {
#if 0
static
void
DumpInternalIter(Iterator* iter) {
for
(iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey k;
if
(!ParseInternalKey(iter->key(), &k)) {
fprintf
(stderr,
"Corrupt '%s'\n"
, EscapeString(iter->key()).c_str());
}
else
{
fprintf
(stderr,
"@ '%s'\n"
, k.DebugString().c_str());
}
}
}
#endif
class
DBIter:
public
Iterator {
public
:
enum
Direction {
kForward,
kReverse
};
DBIter(Env* env,
const
Options& options,
const
Comparator* cmp,
Iterator* iter, SequenceNumber s,
bool
arena_mode)
: arena_mode_(arena_mode),
env_(env),
logger_(options.info_log.get()),
user_comparator_(cmp),
user_merge_operator_(options.merge_operator.get()),
iter_(iter),
sequence_(s),
direction_(kForward),
valid_(
false
),
current_entry_is_merged_(
false
),
statistics_(options.statistics.get()) {
RecordTick(statistics_, NO_ITERATORS);
has_prefix_extractor_ = (options.prefix_extractor.get() !=
nullptr
);
max_skip_ = options.max_sequential_skip_in_iterations;
}
virtual
~DBIter() {
RecordTick(statistics_, NO_ITERATORS, -1);
if
(!arena_mode_) {
delete
iter_;
}
else
{
iter_->~Iterator();
}
}
virtual
void
SetIter(Iterator* iter) {
assert
(iter_ ==
nullptr
);
iter_ = iter;
}
virtual
bool
Valid()
const
{
return
valid_; }
virtual
Slice key()
const
{
assert
(valid_);
return
saved_key_.GetKey();
}
virtual
Slice value()
const
{
assert
(valid_);
return
(direction_ == kForward && !current_entry_is_merged_) ?
iter_->value() : saved_value_;
}
virtual
Status status()
const
{
if
(status_.ok()) {
return
iter_->status();
}
else
{
return
status_;
}
}
virtual
void
Next();
virtual
void
Prev();
virtual
void
Seek(
const
Slice& target);
virtual
void
SeekToFirst();
virtual
void
SeekToLast();
private
:
void
PrevInternal();
void
FindParseableKey(ParsedInternalKey* ikey, Direction direction);
bool
FindValueForCurrentKey();
bool
FindValueForCurrentKeyUsingSeek();
void
FindPrevUserKey();
void
FindNextUserKey();
inline
void
FindNextUserEntry(
bool
skipping);
void
FindNextUserEntryInternal(
bool
skipping);
bool
ParseKey(ParsedInternalKey* key);
void
MergeValuesNewToOld();
inline
void
ClearSavedValue() {
if
(saved_value_.capacity() > 1048576) {
std::string empty;
swap(empty, saved_value_);
}
else
{
saved_value_.clear();
}
}
bool
has_prefix_extractor_;
bool
arena_mode_;
Env*
const
env_;
Logger* logger_;
const
Comparator*
const
user_comparator_;
const
MergeOperator*
const
user_merge_operator_;
Iterator* iter_;
SequenceNumber
const
sequence_;
Status status_;
IterKey saved_key_;
std::string saved_value_;
Direction direction_;
bool
valid_;
bool
current_entry_is_merged_;
Statistics* statistics_;
uint64_t max_skip_;
DBIter(
const
DBIter&);
void
operator=(
const
DBIter&);
};
inline
bool
DBIter::ParseKey(ParsedInternalKey* ikey) {
if
(!ParseInternalKey(iter_->key(), ikey)) {
status_ = Status::Corruption(
"corrupted internal key in DBIter"
);
Log(logger_,
"corrupted internal key in DBIter: %s"
,
iter_->key().ToString(
true
).c_str());
return
false
;
}
else
{
return
true
;
}
}
void
DBIter::Next() {
assert
(valid_);
if
(direction_ == kReverse) {
FindNextUserKey();
direction_ = kForward;
if
(!iter_->Valid()) {
iter_->SeekToFirst();
}
}
if
(!iter_->Valid()) {
valid_ =
false
;
return
;
}
FindNextUserEntry(
true
);
}
inline
void
DBIter::FindNextUserEntry(
bool
skipping) {
PERF_TIMER_AUTO(find_next_user_entry_time);
FindNextUserEntryInternal(skipping);
PERF_TIMER_STOP(find_next_user_entry_time);
}
void
DBIter::FindNextUserEntryInternal(
bool
skipping) {
assert
(iter_->Valid());
assert
(direction_ == kForward);
current_entry_is_merged_ =
false
;
uint64_t num_skipped = 0;
do
{
ParsedInternalKey ikey;
if
(ParseKey(&ikey) && ikey.sequence <= sequence_) {
if
(skipping &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
num_skipped++;
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}
else
{
skipping =
false
;
switch
(ikey.type) {
case
kTypeDeletion:
saved_key_.SetKey(ikey.user_key);
skipping =
true
;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break
;
case
kTypeValue:
valid_ =
true
;
saved_key_.SetKey(ikey.user_key);
return
;
case
kTypeMerge:
saved_key_.SetKey(ikey.user_key);
current_entry_is_merged_ =
true
;
valid_ =
true
;
MergeValuesNewToOld();
return
;
default
:
assert
(
false
);
break
;
}
}
}
if
(skipping && num_skipped > max_skip_) {
num_skipped = 0;
std::string last_key;
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), 0,
kValueTypeForSeek));
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
else
{
iter_->Next();
}
}
while
(iter_->Valid());
valid_ =
false
;
}
void
DBIter::MergeValuesNewToOld() {
if
(!user_merge_operator_) {
Log(logger_,
"Options::merge_operator is null."
);
throw
std::logic_error(
"DBIter::MergeValuesNewToOld() with"
" Options::merge_operator null"
);
}
std::deque<std::string> operands;
operands.push_front(iter_->value().ToString());
std::string merge_result;
ParsedInternalKey ikey;
for
(iter_->Next(); iter_->Valid(); iter_->Next()) {
if
(!ParseKey(&ikey)) {
continue
;
}
if
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) {
break
;
}
if
(kTypeDeletion == ikey.type) {
iter_->Next();
break
;
}
if
(kTypeValue == ikey.type) {
const
Slice value = iter_->value();
user_merge_operator_->FullMerge(ikey.user_key, &value, operands,
&saved_value_, logger_);
iter_->Next();
return
;
}
if
(kTypeMerge == ikey.type) {
const
Slice& value = iter_->value();
operands.push_front(value.ToString());
}
}
user_merge_operator_->FullMerge(saved_key_.GetKey(),
nullptr
, operands,
&saved_value_, logger_);
}
void
DBIter::Prev() {
assert
(valid_);
if
(direction_ == kForward) {
FindPrevUserKey();
direction_ = kReverse;
}
PrevInternal();
}
void
DBIter::PrevInternal() {
if
(!iter_->Valid()) {
valid_ =
false
;
return
;
}
ParsedInternalKey ikey;
while
(iter_->Valid()) {
saved_key_.SetKey(ExtractUserKey(iter_->key()));
if
(FindValueForCurrentKey()) {
valid_ =
true
;
if
(!iter_->Valid()) {
return
;
}
FindParseableKey(&ikey, kReverse);
if
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
FindPrevUserKey();
}
return
;
}
if
(!iter_->Valid()) {
break
;
}
FindParseableKey(&ikey, kReverse);
if
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
FindPrevUserKey();
}
}
assert
(!iter_->Valid());
valid_ =
false
;
}
bool
DBIter::FindValueForCurrentKey() {
assert
(iter_->Valid());
std::deque<std::string> operands;
ValueType last_not_merge_type = kTypeDeletion;
ValueType last_key_entry_type = kTypeDeletion;
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
size_t
num_skipped = 0;
while
(iter_->Valid() && ikey.sequence <= sequence_ &&
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0)) {
if
(num_skipped >= max_skip_) {
return
FindValueForCurrentKeyUsingSeek();
}
last_key_entry_type = ikey.type;
switch
(last_key_entry_type) {
case
kTypeValue:
operands.clear();
saved_value_ = iter_->value().ToString();
last_not_merge_type = kTypeValue;
break
;
case
kTypeDeletion:
operands.clear();
last_not_merge_type = kTypeDeletion;
break
;
case
kTypeMerge:
assert
(user_merge_operator_ !=
nullptr
);
operands.push_back(iter_->value().ToString());
break
;
default
:
assert
(
false
);
}
assert
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0);
iter_->Prev();
++num_skipped;
FindParseableKey(&ikey, kReverse);
}
switch
(last_key_entry_type) {
case
kTypeDeletion:
valid_ =
false
;
return
false
;
case
kTypeMerge:
if
(last_not_merge_type == kTypeDeletion) {
user_merge_operator_->FullMerge(saved_key_.GetKey(),
nullptr
, operands,
&saved_value_, logger_);
}
else
{
assert
(last_not_merge_type == kTypeValue);
std::string last_put_value = saved_value_;
Slice temp_slice(last_put_value);
user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice,
operands, &saved_value_, logger_);
}
break
;
case
kTypeValue:
break
;
default
:
assert
(
false
);
break
;
}
valid_ =
true
;
return
true
;
}
bool
DBIter::FindValueForCurrentKeyUsingSeek() {
std::string last_key;
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
kValueTypeForSeek));
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
if
(ikey.type == kTypeValue || ikey.type == kTypeDeletion) {
if
(ikey.type == kTypeValue) {
saved_value_ = iter_->value().ToString();
valid_ =
true
;
return
true
;
}
valid_ =
false
;
return
false
;
}
std::deque<std::string> operands;
while
(iter_->Valid() &&
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) &&
ikey.type == kTypeMerge) {
operands.push_front(iter_->value().ToString());
iter_->Next();
FindParseableKey(&ikey, kForward);
}
if
(!iter_->Valid() ||
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) ||
ikey.type == kTypeDeletion) {
user_merge_operator_->FullMerge(saved_key_.GetKey(),
nullptr
, operands,
&saved_value_, logger_);
if
(!iter_->Valid() ||
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0)) {
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
valid_ =
true
;
return
true
;
}
const
Slice& value = iter_->value();
user_merge_operator_->FullMerge(saved_key_.GetKey(), &value, operands,
&saved_value_, logger_);
valid_ =
true
;
return
true
;
}
void
DBIter::FindNextUserKey() {
if
(!iter_->Valid()) {
return
;
}
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
while
(iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) {
iter_->Next();
FindParseableKey(&ikey, kForward);
}
}
void
DBIter::FindPrevUserKey() {
if
(!iter_->Valid()) {
return
;
}
size_t
num_skipped = 0;
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
while
(iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
if
(num_skipped >= max_skip_) {
num_skipped = 0;
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
iter_->Seek(last_key.GetKey());
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
iter_->Prev();
++num_skipped;
FindParseableKey(&ikey, kReverse);
}
}
void
DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
while
(iter_->Valid() && !ParseKey(ikey)) {
if
(direction == kReverse) {
iter_->Prev();
}
else
{
iter_->Next();
}
}
}
void
DBIter::Seek(
const
Slice& target) {
saved_key_.Clear();
saved_key_.SetInternalKey(target, sequence_);
PERF_TIMER_AUTO(seek_internal_seek_time);
iter_->Seek(saved_key_.GetKey());
PERF_TIMER_STOP(seek_internal_seek_time);
if
(iter_->Valid()) {
direction_ = kForward;
ClearSavedValue();
FindNextUserEntry(
false
);
}
else
{
valid_ =
false
;
}
}
void
DBIter::SeekToFirst() {
if
(has_prefix_extractor_) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
direction_ = kForward;
ClearSavedValue();
PERF_TIMER_AUTO(seek_internal_seek_time);
iter_->SeekToFirst();
PERF_TIMER_STOP(seek_internal_seek_time);
if
(iter_->Valid()) {
FindNextUserEntry(
false
);
}
else
{
valid_ =
false
;
}
}
void
DBIter::SeekToLast() {
if
(has_prefix_extractor_) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
direction_ = kReverse;
ClearSavedValue();
PERF_TIMER_AUTO(seek_internal_seek_time);
iter_->SeekToLast();
PERF_TIMER_STOP(seek_internal_seek_time);
PrevInternal();
}
Iterator* NewDBIterator(Env* env,
const
Options& options,
const
Comparator* user_key_comparator,
Iterator* internal_iter,
const
SequenceNumber& sequence) {
return
new
DBIter(env, options, user_key_comparator, internal_iter, sequence,
false
);
}
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
void
ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }
void
ArenaWrappedDBIter::SetIterUnderDBIter(Iterator* iter) {
static_cast
<DBIter*>(db_iter_)->SetIter(iter);
}
inline
bool
ArenaWrappedDBIter::Valid()
const
{
return
db_iter_->Valid(); }
inline
void
ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline
void
ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline
void
ArenaWrappedDBIter::Seek(
const
Slice& target) {
db_iter_->Seek(target);
}
inline
void
ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline
void
ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline
Slice ArenaWrappedDBIter::key()
const
{
return
db_iter_->key(); }
inline
Slice ArenaWrappedDBIter::value()
const
{
return
db_iter_->value(); }
inline
Status ArenaWrappedDBIter::status()
const
{
return
db_iter_->status(); }
void
ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function,
void
* arg1,
void
* arg2) {
db_iter_->RegisterCleanup(function, arg1, arg2);
}
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env,
const
Options& options,
const
Comparator* user_key_comparator,
const
SequenceNumber& sequence) {
ArenaWrappedDBIter* iter =
new
ArenaWrappedDBIter();
Arena* arena = iter->GetArena();
auto
mem = arena->AllocateAligned(
sizeof
(DBIter));
DBIter* db_iter =
new
(mem)
DBIter(env, options, user_key_comparator,
nullptr
, sequence,
true
);
iter->SetDBIter(db_iter);
return
iter;
}
}