#include "table/block.h"
#include <algorithm>
#include <string>
#include <unordered_map>
#include <vector>
#include "monitoring/perf_context_imp.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/comparator.h"
#include "table/block_prefix_index.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/logging.h"
namespace
rocksdb {
static
inline
const
char
* DecodeEntry(
const
char
* p,
const
char
* limit,
uint32_t* shared,
uint32_t* non_shared,
uint32_t* value_length) {
if
(limit - p < 3)
return
nullptr
;
*shared =
reinterpret_cast
<
const
unsigned
char
*>(p)[0];
*non_shared =
reinterpret_cast
<
const
unsigned
char
*>(p)[1];
*value_length =
reinterpret_cast
<
const
unsigned
char
*>(p)[2];
if
((*shared | *non_shared | *value_length) < 128) {
p += 3;
}
else
{
if
((p = GetVarint32Ptr(p, limit, shared)) ==
nullptr
)
return
nullptr
;
if
((p = GetVarint32Ptr(p, limit, non_shared)) ==
nullptr
)
return
nullptr
;
if
((p = GetVarint32Ptr(p, limit, value_length)) ==
nullptr
)
return
nullptr
;
}
if
(
static_cast
<uint32_t>(limit - p) < (*non_shared + *value_length)) {
return
nullptr
;
}
return
p;
}
void
DataBlockIter::Next() {
assert
(Valid());
ParseNextDataKey();
}
void
IndexBlockIter::Next() {
assert
(Valid());
ParseNextIndexKey();
}
void
IndexBlockIter::Prev() {
assert
(Valid());
const
uint32_t original = current_;
while
(GetRestartPoint(restart_index_) >= original) {
if
(restart_index_ == 0) {
current_ = restarts_;
restart_index_ = num_restarts_;
return
;
}
restart_index_--;
}
SeekToRestartPoint(restart_index_);
do
{
if
(!ParseNextIndexKey()) {
break
;
}
}
while
(NextEntryOffset() < original);
}
void
DataBlockIter::Prev() {
assert
(Valid());
assert
(prev_entries_idx_ == -1 ||
static_cast
<
size_t
>(prev_entries_idx_) < prev_entries_.size());
if
(prev_entries_idx_ > 0 &&
prev_entries_[prev_entries_idx_].offset == current_) {
prev_entries_idx_--;
const
CachedPrevEntry& current_prev_entry =
prev_entries_[prev_entries_idx_];
const
char
* key_ptr =
nullptr
;
if
(current_prev_entry.key_ptr !=
nullptr
) {
key_ptr = current_prev_entry.key_ptr;
key_pinned_ =
true
;
}
else
{
key_ptr = prev_entries_keys_buff_.data() + current_prev_entry.key_offset;
key_pinned_ =
false
;
}
const
Slice current_key(key_ptr, current_prev_entry.key_size);
current_ = current_prev_entry.offset;
key_.SetKey(current_key,
false
);
value_ = current_prev_entry.value;
return
;
}
prev_entries_idx_ = -1;
prev_entries_.clear();
prev_entries_keys_buff_.clear();
const
uint32_t original = current_;
while
(GetRestartPoint(restart_index_) >= original) {
if
(restart_index_ == 0) {
current_ = restarts_;
restart_index_ = num_restarts_;
return
;
}
restart_index_--;
}
SeekToRestartPoint(restart_index_);
do
{
if
(!ParseNextDataKey()) {
break
;
}
Slice current_key = key();
if
(key_.IsKeyPinned()) {
prev_entries_.emplace_back(current_, current_key.data(), 0,
current_key.size(), value());
}
else
{
size_t
new_key_offset = prev_entries_keys_buff_.size();
prev_entries_keys_buff_.append(current_key.data(), current_key.size());
prev_entries_.emplace_back(current_,
nullptr
, new_key_offset,
current_key.size(), value());
}
}
while
(NextEntryOffset() < original);
prev_entries_idx_ =
static_cast
<int32_t>(prev_entries_.size()) - 1;
}
void
DataBlockIter::Seek(
const
Slice& target) {
Slice seek_key = target;
PERF_TIMER_GUARD(block_seek_nanos);
if
(data_ ==
nullptr
) {
return
;
}
uint32_t index = 0;
bool
ok = BinarySeek(seek_key, 0, num_restarts_ - 1, &index, comparator_);
if
(!ok) {
return
;
}
SeekToRestartPoint(index);
while
(
true
) {
if
(!ParseNextDataKey() || Compare(key_, seek_key) >= 0) {
return
;
}
}
}
void
IndexBlockIter::Seek(
const
Slice& target) {
Slice seek_key = target;
if
(!key_includes_seq_) {
seek_key = ExtractUserKey(target);
}
PERF_TIMER_GUARD(block_seek_nanos);
if
(data_ ==
nullptr
) {
return
;
}
uint32_t index = 0;
bool
ok =
false
;
if
(prefix_index_) {
ok = PrefixSeek(target, &index);
}
else
{
ok = BinarySeek(seek_key, 0, num_restarts_ - 1, &index, active_comparator_);
}
if
(!ok) {
return
;
}
SeekToRestartPoint(index);
while
(
true
) {
if
(!ParseNextIndexKey() || Compare(key_, seek_key) >= 0) {
return
;
}
}
}
void
DataBlockIter::SeekForPrev(
const
Slice& target) {
PERF_TIMER_GUARD(block_seek_nanos);
Slice seek_key = target;
if
(data_ ==
nullptr
) {
return
;
}
uint32_t index = 0;
bool
ok = BinarySeek(seek_key, 0, num_restarts_ - 1, &index, comparator_);
if
(!ok) {
return
;
}
SeekToRestartPoint(index);
while
(ParseNextDataKey() && Compare(key_, seek_key) < 0) {
}
if
(!Valid()) {
SeekToLast();
}
else
{
while
(Valid() && Compare(key_, seek_key) > 0) {
Prev();
}
}
}
void
DataBlockIter::SeekToFirst() {
if
(data_ ==
nullptr
) {
return
;
}
SeekToRestartPoint(0);
ParseNextDataKey();
}
void
IndexBlockIter::SeekToFirst() {
if
(data_ ==
nullptr
) {
return
;
}
SeekToRestartPoint(0);
ParseNextIndexKey();
}
void
DataBlockIter::SeekToLast() {
if
(data_ ==
nullptr
) {
return
;
}
SeekToRestartPoint(num_restarts_ - 1);
while
(ParseNextDataKey() && NextEntryOffset() < restarts_) {
}
}
void
IndexBlockIter::SeekToLast() {
if
(data_ ==
nullptr
) {
return
;
}
SeekToRestartPoint(num_restarts_ - 1);
while
(ParseNextIndexKey() && NextEntryOffset() < restarts_) {
}
}
void
BlockIter::CorruptionError() {
current_ = restarts_;
restart_index_ = num_restarts_;
status_ = Status::Corruption(
"bad entry in block"
);
key_.Clear();
value_.clear();
}
bool
DataBlockIter::ParseNextDataKey() {
current_ = NextEntryOffset();
const
char
* p = data_ + current_;
const
char
* limit = data_ + restarts_;
if
(p >= limit) {
current_ = restarts_;
restart_index_ = num_restarts_;
return
false
;
}
uint32_t shared, non_shared, value_length;
p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
if
(p ==
nullptr
|| key_.Size() < shared) {
CorruptionError();
return
false
;
}
else
{
if
(shared == 0) {
key_.SetKey(Slice(p, non_shared),
false
);
key_pinned_ =
true
;
}
else
{
key_.TrimAppend(shared, p, non_shared);
key_pinned_ =
false
;
}
if
(global_seqno_ != kDisableGlobalSequenceNumber) {
assert
(GetInternalKeySeqno(key_.GetInternalKey()) == 0);
ValueType value_type = ExtractValueType(key_.GetKey());
assert
(value_type == ValueType::kTypeValue ||
value_type == ValueType::kTypeMerge ||
value_type == ValueType::kTypeDeletion ||
value_type == ValueType::kTypeRangeDeletion);
if
(key_pinned_) {
key_.OwnKey();
key_pinned_ =
false
;
}
key_.UpdateInternalKey(global_seqno_, value_type);
}
value_ = Slice(p + non_shared, value_length);
while
(restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}
return
true
;
}
}
bool
IndexBlockIter::ParseNextIndexKey() {
current_ = NextEntryOffset();
const
char
* p = data_ + current_;
const
char
* limit = data_ + restarts_;
if
(p >= limit) {
current_ = restarts_;
restart_index_ = num_restarts_;
return
false
;
}
uint32_t shared, non_shared, value_length;
p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
if
(p ==
nullptr
|| key_.Size() < shared) {
CorruptionError();
return
false
;
}
if
(shared == 0) {
key_.SetKey(Slice(p, non_shared),
false
);
key_pinned_ =
true
;
}
else
{
key_.TrimAppend(shared, p, non_shared);
key_pinned_ =
false
;
}
value_ = Slice(p + non_shared, value_length);
while
(restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}
return
true
;
}
bool
BlockIter::BinarySeek(
const
Slice& target, uint32_t left, uint32_t right,
uint32_t* index,
const
Comparator* comp) {
assert
(left <= right);
while
(left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const
char
* key_ptr = DecodeEntry(data_ + region_offset, data_ + restarts_,
&shared, &non_shared, &value_length);
if
(key_ptr ==
nullptr
|| (shared != 0)) {
CorruptionError();
return
false
;
}
Slice mid_key(key_ptr, non_shared);
int
cmp = comp->Compare(mid_key, target);
if
(cmp < 0) {
left = mid;
}
else
if
(cmp > 0) {
right = mid - 1;
}
else
{
left = right = mid;
}
}
*index = left;
return
true
;
}
int
IndexBlockIter::CompareBlockKey(uint32_t block_index,
const
Slice& target) {
uint32_t region_offset = GetRestartPoint(block_index);
uint32_t shared, non_shared, value_length;
const
char
* key_ptr = DecodeEntry(data_ + region_offset, data_ + restarts_,
&shared, &non_shared, &value_length);
if
(key_ptr ==
nullptr
|| (shared != 0)) {
CorruptionError();
return
1;
}
Slice block_key(key_ptr, non_shared);
return
Compare(block_key, target);
}
bool
IndexBlockIter::BinaryBlockIndexSeek(
const
Slice& target,
uint32_t* block_ids, uint32_t left,
uint32_t right, uint32_t* index) {
assert
(left <= right);
uint32_t left_bound = left;
while
(left <= right) {
uint32_t mid = (right + left) / 2;
int
cmp = CompareBlockKey(block_ids[mid], target);
if
(!status_.ok()) {
return
false
;
}
if
(cmp < 0) {
left = mid + 1;
}
else
{
if
(left == right)
break
;
right = mid;
}
}
if
(left == right) {
if
(block_ids[left] > 0 &&
(left == left_bound || block_ids[left - 1] != block_ids[left] - 1) &&
CompareBlockKey(block_ids[left] - 1, target) > 0) {
current_ = restarts_;
return
false
;
}
*index = block_ids[left];
return
true
;
}
else
{
assert
(left > right);
current_ = restarts_;
return
false
;
}
}
bool
IndexBlockIter::PrefixSeek(
const
Slice& target, uint32_t* index) {
assert
(prefix_index_);
Slice seek_key = target;
if
(!key_includes_seq_) {
seek_key = ExtractUserKey(target);
}
uint32_t* block_ids =
nullptr
;
uint32_t num_blocks = prefix_index_->GetBlocks(target, &block_ids);
if
(num_blocks == 0) {
current_ = restarts_;
return
false
;
}
else
{
return
BinaryBlockIndexSeek(seek_key, block_ids, 0, num_blocks - 1, index);
}
}
uint32_t Block::NumRestarts()
const
{
assert
(size_ >= 2*
sizeof
(uint32_t));
return
DecodeFixed32(data_ + size_ -
sizeof
(uint32_t));
}
Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
size_t
read_amp_bytes_per_bit, Statistics* statistics)
: contents_(std::move(contents)),
data_(contents_.data.data()),
size_(contents_.data.size()),
restart_offset_(0),
num_restarts_(0),
global_seqno_(_global_seqno) {
if
(size_ <
sizeof
(uint32_t)) {
size_ = 0;
}
else
{
if
(compression_type() == kNoCompression) {
num_restarts_ = NumRestarts();
restart_offset_ =
static_cast
<uint32_t>(size_) - (1 + num_restarts_) *
sizeof
(uint32_t);
if
(restart_offset_ > size_ -
sizeof
(uint32_t)) {
size_ = 0;
}
}
}
if
(read_amp_bytes_per_bit != 0 && statistics && size_ != 0) {
read_amp_bitmap_.reset(
new
BlockReadAmpBitmap(
restart_offset_, read_amp_bytes_per_bit, statistics));
}
}
template
<>
DataBlockIter* Block::NewIterator(
const
Comparator* cmp,
const
Comparator* ucmp,
DataBlockIter* iter, Statistics* stats,
bool
,
bool
,
BlockPrefixIndex*
) {
DataBlockIter* ret_iter;
if
(iter !=
nullptr
) {
ret_iter = iter;
}
else
{
ret_iter =
new
DataBlockIter;
}
if
(size_ < 2 *
sizeof
(uint32_t)) {
ret_iter->Invalidate(Status::Corruption(
"bad block contents"
));
return
ret_iter;
}
if
(num_restarts_ == 0) {
ret_iter->Invalidate(Status::OK());
return
ret_iter;
}
else
{
ret_iter->Initialize(cmp, ucmp, data_, restart_offset_, num_restarts_,
global_seqno_, read_amp_bitmap_.get(), cachable());
if
(read_amp_bitmap_) {
if
(read_amp_bitmap_->GetStatistics() != stats) {
read_amp_bitmap_->SetStatistics(stats);
}
}
}
return
ret_iter;
}
template
<>
IndexBlockIter* Block::NewIterator(
const
Comparator* cmp,
const
Comparator* ucmp, IndexBlockIter* iter,
Statistics*
,
bool
total_order_seek,
bool
key_includes_seq,
BlockPrefixIndex* prefix_index) {
IndexBlockIter* ret_iter;
if
(iter !=
nullptr
) {
ret_iter = iter;
}
else
{
ret_iter =
new
IndexBlockIter;
}
if
(size_ < 2 *
sizeof
(uint32_t)) {
ret_iter->Invalidate(Status::Corruption(
"bad block contents"
));
return
ret_iter;
}
if
(num_restarts_ == 0) {
ret_iter->Invalidate(Status::OK());
return
ret_iter;
}
else
{
BlockPrefixIndex* prefix_index_ptr =
total_order_seek ?
nullptr
: prefix_index;
ret_iter->Initialize(cmp, ucmp, data_, restart_offset_, num_restarts_,
prefix_index_ptr, key_includes_seq, cachable());
}
return
ret_iter;
}
size_t
Block::ApproximateMemoryUsage()
const
{
size_t
usage = usable_size();
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size((
void
*)
this
);
#else
usage +=
sizeof
(*
this
);
#endif // ROCKSDB_MALLOC_USABLE_SIZE
if
(read_amp_bitmap_) {
usage += read_amp_bitmap_->ApproximateMemoryUsage();
}
return
usage;
}
}