#include <stdio.h>
#include <algorithm>
#include <set>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "db/dbformat.h"
#include "db/write_batch_internal.h"
#include "db/memtable.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/table.h"
#include "rocksdb/slice_transform.h"
#include "table/block.h"
#include "table/block_builder.h"
#include "table/format.h"
#include "util/random.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace
rocksdb {
static
std::string RandomString(Random* rnd,
int
len) {
std::string r;
test::RandomString(rnd, len, &r);
return
r;
}
std::string GenerateKey(
int
primary_key,
int
secondary_key,
int
padding_size,
Random *rnd) {
char
buf[50];
char
*p = &buf[0];
snprintf(buf,
sizeof
(buf),
"%6d%4d"
, primary_key, secondary_key);
std::string k(p);
if
(padding_size) {
k += RandomString(rnd, padding_size);
}
return
k;
}
void
GenerateRandomKVs(std::vector<std::string> *keys,
std::vector<std::string> *values,
const
int
from,
const
int
len,
const
int
step = 1,
const
int
padding_size = 0,
const
int
keys_share_prefix = 1) {
Random rnd(302);
for
(
int
i = from; i < from + len; i += step) {
for
(
int
j = 0; j < keys_share_prefix; ++j) {
keys->emplace_back(GenerateKey(i, j, padding_size, &rnd));
values->emplace_back(RandomString(&rnd, 100));
}
}
}
class
BlockTest :
public
testing::Test {};
TEST_F(BlockTest, SimpleTest) {
Random rnd(301);
Options options = Options();
std::unique_ptr<InternalKeyComparator> ic;
ic.reset(
new
test::PlainInternalKeyComparator(options.comparator));
std::vector<std::string> keys;
std::vector<std::string> values;
BlockBuilder builder(16);
int
num_records = 100000;
GenerateRandomKVs(&keys, &values, 0, num_records);
for
(
int
i = 0; i < num_records; i++) {
builder.Add(keys[i], values[i]);
}
Slice rawblock = builder.Finish();
BlockContents contents;
contents.data = rawblock;
contents.cachable =
false
;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
int
count = 0;
InternalIterator *iter =
reader.NewIterator<DataBlockIter>(options.comparator, options.comparator);
for
(iter->SeekToFirst();iter->Valid(); count++, iter->Next()) {
Slice k = iter->key();
Slice v = iter->value();
ASSERT_EQ(k.ToString().compare(keys[count]), 0);
ASSERT_EQ(v.ToString().compare(values[count]), 0);
}
delete
iter;
iter =
reader.NewIterator<DataBlockIter>(options.comparator, options.comparator);
for
(
int
i = 0; i < num_records; i++) {
int
index = rnd.Uniform(num_records);
Slice k(keys[index]);
iter->Seek(k);
ASSERT_TRUE(iter->Valid());
Slice v = iter->value();
ASSERT_EQ(v.ToString().compare(values[index]), 0);
}
delete
iter;
}
BlockContents GetBlockContents(std::unique_ptr<BlockBuilder> *builder,
const
std::vector<std::string> &keys,
const
std::vector<std::string> &values,
const
int
= 1) {
builder->reset(
new
BlockBuilder(1
));
for
(
size_t
i = 0; i < keys.size(); ++i) {
(*builder)->Add(keys[i], values[i]);
}
Slice rawblock = (*builder)->Finish();
BlockContents contents;
contents.data = rawblock;
contents.cachable =
false
;
return
contents;
}
void
CheckBlockContents(BlockContents contents,
const
int
max_key,
const
std::vector<std::string> &keys,
const
std::vector<std::string> &values) {
const
size_t
prefix_size = 6;
BlockContents contents_ref(contents.data, contents.cachable,
contents.compression_type);
Block reader1(std::move(contents), kDisableGlobalSequenceNumber);
Block reader2(std::move(contents_ref), kDisableGlobalSequenceNumber);
std::unique_ptr<
const
SliceTransform> prefix_extractor(
NewFixedPrefixTransform(prefix_size));
std::unique_ptr<InternalIterator> regular_iter(
reader2.NewIterator<DataBlockIter>(BytewiseComparator(),
BytewiseComparator()));
for
(
size_t
i = 0; i < keys.size(); i++) {
regular_iter->Seek(keys[i]);
ASSERT_OK(regular_iter->status());
ASSERT_TRUE(regular_iter->Valid());
Slice v = regular_iter->value();
ASSERT_EQ(v.ToString().compare(values[i]), 0);
}
for
(
int
i = 1; i < max_key - 1; i += 2) {
auto
key = GenerateKey(i, 0, 0,
nullptr
);
regular_iter->Seek(key);
ASSERT_TRUE(regular_iter->Valid());
}
}
TEST_F(BlockTest, SimpleIndexHash) {
const
int
kMaxKey = 100000;
std::vector<std::string> keys;
std::vector<std::string> values;
GenerateRandomKVs(&keys, &values, 0
,
kMaxKey
, 2
,
8
);
std::unique_ptr<BlockBuilder> builder;
auto
contents = GetBlockContents(&builder, keys, values);
CheckBlockContents(std::move(contents), kMaxKey, keys, values);
}
TEST_F(BlockTest, IndexHashWithSharedPrefix) {
const
int
kMaxKey = 100000;
const
int
kPrefixGroup = 5;
std::vector<std::string> keys;
std::vector<std::string> values;
GenerateRandomKVs(&keys, &values, 0,
kMaxKey,
2,
10,
kPrefixGroup);
std::unique_ptr<BlockBuilder> builder;
auto
contents = GetBlockContents(&builder, keys, values, kPrefixGroup);
CheckBlockContents(std::move(contents), kMaxKey, keys, values);
}
class
BlockReadAmpBitmapSlowAndAccurate {
public
:
void
Mark(
size_t
start_offset,
size_t
end_offset) {
assert
(end_offset >= start_offset);
marked_ranges_.emplace(end_offset, start_offset);
}
void
ResetCheckSequence() { iter_valid_ =
false
; }
bool
IsPinMarked(
size_t
offset) {
if
(iter_valid_) {
for
(
int
i = 0; i < 64; i++) {
if
(offset < iter_->second) {
return
false
;
}
if
(offset <= iter_->first) {
return
true
;
}
iter_++;
if
(iter_ == marked_ranges_.end()) {
iter_valid_ =
false
;
return
false
;
}
}
}
iter_ = marked_ranges_.lower_bound(
std::make_pair(offset,
static_cast
<
size_t
>(0)));
if
(iter_ == marked_ranges_.end()) {
iter_valid_ =
false
;
return
false
;
}
iter_valid_ =
true
;
return
offset <= iter_->first && offset >= iter_->second;
}
private
:
std::set<std::pair<
size_t
,
size_t
>> marked_ranges_;
std::set<std::pair<
size_t
,
size_t
>>::iterator iter_;
bool
iter_valid_ =
false
;
};
TEST_F(BlockTest, BlockReadAmpBitmap) {
uint32_t pin_offset = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlockReadAmpBitmap:rnd"
, [&pin_offset](
void
* arg) {
pin_offset = *(
static_cast
<uint32_t*>(arg));
});
SyncPoint::GetInstance()->EnableProcessing();
std::vector<
size_t
> block_sizes = {
1,
32,
61,
64,
512,
1024,
1024 * 4,
1024 * 10,
1024 * 50,
1024 * 1024 * 4,
777,
124653,
};
const
size_t
kBytesPerBit = 64;
Random rnd(301);
for
(
size_t
block_size : block_sizes) {
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
BlockReadAmpBitmap read_amp_bitmap(block_size, kBytesPerBit, stats.get());
BlockReadAmpBitmapSlowAndAccurate read_amp_slow_and_accurate;
size_t
needed_bits = (block_size / kBytesPerBit);
if
(block_size % kBytesPerBit != 0) {
needed_bits++;
}
ASSERT_EQ(stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES), block_size);
std::vector<
size_t
> random_entry_offsets;
for
(
int
i = 0; i < 1000; i++) {
random_entry_offsets.push_back(rnd.Next() % block_size);
}
std::sort(random_entry_offsets.begin(), random_entry_offsets.end());
auto
it =
std::unique(random_entry_offsets.begin(), random_entry_offsets.end());
random_entry_offsets.resize(
std::distance(random_entry_offsets.begin(), it));
std::vector<std::pair<
size_t
,
size_t
>> random_entries;
for
(
size_t
i = 0; i < random_entry_offsets.size(); i++) {
size_t
entry_start = random_entry_offsets[i];
size_t
entry_end;
if
(i + 1 < random_entry_offsets.size()) {
entry_end = random_entry_offsets[i + 1] - 1;
}
else
{
entry_end = block_size - 1;
}
random_entries.emplace_back(entry_start, entry_end);
}
for
(
size_t
i = 0; i < random_entries.size(); i++) {
read_amp_slow_and_accurate.ResetCheckSequence();
auto
¤t_entry = random_entries[rnd.Next() % random_entries.size()];
read_amp_bitmap.Mark(
static_cast
<uint32_t>(current_entry.first),
static_cast
<uint32_t>(current_entry.second));
read_amp_slow_and_accurate.Mark(current_entry.first,
current_entry.second);
size_t
total_bits = 0;
for
(
size_t
bit_idx = 0; bit_idx < needed_bits; bit_idx++) {
total_bits += read_amp_slow_and_accurate.IsPinMarked(
bit_idx * kBytesPerBit + pin_offset);
}
size_t
expected_estimate_useful = total_bits * kBytesPerBit;
size_t
got_estimate_useful =
stats->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
ASSERT_EQ(expected_estimate_useful, got_estimate_useful);
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(BlockTest, BlockWithReadAmpBitmap) {
Random rnd(301);
Options options = Options();
std::unique_ptr<InternalKeyComparator> ic;
ic.reset(
new
test::PlainInternalKeyComparator(options.comparator));
std::vector<std::string> keys;
std::vector<std::string> values;
BlockBuilder builder(16);
int
num_records = 10000;
GenerateRandomKVs(&keys, &values, 0, num_records, 1);
for
(
int
i = 0; i < num_records; i++) {
builder.Add(keys[i], values[i]);
}
Slice rawblock = builder.Finish();
const
size_t
kBytesPerBit = 8;
{
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
BlockContents contents;
contents.data = rawblock;
contents.cachable =
true
;
Block reader(std::move(contents), kDisableGlobalSequenceNumber,
kBytesPerBit, stats.get());
size_t
read_bytes = 0;
DataBlockIter *iter =
static_cast
<DataBlockIter *>(reader.NewIterator<DataBlockIter>(
options.comparator, options.comparator,
nullptr
, stats.get()));
for
(iter->SeekToFirst(); iter->Valid(); iter->Next()) {
iter->value();
read_bytes += iter->TEST_CurrentEntrySize();
double
semi_acc_read_amp =
static_cast
<
double
>(read_bytes) / rawblock.size();
double
read_amp =
static_cast
<
double
>(stats->getTickerCount(
READ_AMP_ESTIMATE_USEFUL_BYTES)) /
stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
double
error_pct =
fabs
(semi_acc_read_amp - read_amp) * 100;
EXPECT_LT(error_pct, 1);
}
delete
iter;
}
{
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
BlockContents contents;
contents.data = rawblock;
contents.cachable =
true
;
Block reader(std::move(contents), kDisableGlobalSequenceNumber,
kBytesPerBit, stats.get());
size_t
read_bytes = 0;
DataBlockIter *iter =
static_cast
<DataBlockIter *>(reader.NewIterator<DataBlockIter>(
options.comparator, options.comparator,
nullptr
, stats.get()));
for
(
int
i = 0; i < num_records; i++) {
Slice k(keys[i]);
iter->Seek(k);
iter->value();
read_bytes += iter->TEST_CurrentEntrySize();
double
semi_acc_read_amp =
static_cast
<
double
>(read_bytes) / rawblock.size();
double
read_amp =
static_cast
<
double
>(stats->getTickerCount(
READ_AMP_ESTIMATE_USEFUL_BYTES)) /
stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
double
error_pct =
fabs
(semi_acc_read_amp - read_amp) * 100;
EXPECT_LT(error_pct, 1);
}
delete
iter;
}
{
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
BlockContents contents;
contents.data = rawblock;
contents.cachable =
true
;
Block reader(std::move(contents), kDisableGlobalSequenceNumber,
kBytesPerBit, stats.get());
size_t
read_bytes = 0;
DataBlockIter *iter =
static_cast
<DataBlockIter *>(reader.NewIterator<DataBlockIter>(
options.comparator, options.comparator,
nullptr
, stats.get()));
std::unordered_set<
int
> read_keys;
for
(
int
i = 0; i < num_records; i++) {
int
index = rnd.Uniform(num_records);
Slice k(keys[index]);
iter->Seek(k);
iter->value();
if
(read_keys.find(index) == read_keys.end()) {
read_keys.insert(index);
read_bytes += iter->TEST_CurrentEntrySize();
}
double
semi_acc_read_amp =
static_cast
<
double
>(read_bytes) / rawblock.size();
double
read_amp =
static_cast
<
double
>(stats->getTickerCount(
READ_AMP_ESTIMATE_USEFUL_BYTES)) /
stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
double
error_pct =
fabs
(semi_acc_read_amp - read_amp) * 100;
EXPECT_LT(error_pct, 2);
}
delete
iter;
}
}
TEST_F(BlockTest, ReadAmpBitmapPow2) {
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
ASSERT_EQ(BlockReadAmpBitmap(100, 1, stats.get()).GetBytesPerBit(), 1);
ASSERT_EQ(BlockReadAmpBitmap(100, 2, stats.get()).GetBytesPerBit(), 2);
ASSERT_EQ(BlockReadAmpBitmap(100, 4, stats.get()).GetBytesPerBit(), 4);
ASSERT_EQ(BlockReadAmpBitmap(100, 8, stats.get()).GetBytesPerBit(), 8);
ASSERT_EQ(BlockReadAmpBitmap(100, 16, stats.get()).GetBytesPerBit(), 16);
ASSERT_EQ(BlockReadAmpBitmap(100, 32, stats.get()).GetBytesPerBit(), 32);
ASSERT_EQ(BlockReadAmpBitmap(100, 3, stats.get()).GetBytesPerBit(), 2);
ASSERT_EQ(BlockReadAmpBitmap(100, 7, stats.get()).GetBytesPerBit(), 4);
ASSERT_EQ(BlockReadAmpBitmap(100, 11, stats.get()).GetBytesPerBit(), 8);
ASSERT_EQ(BlockReadAmpBitmap(100, 17, stats.get()).GetBytesPerBit(), 16);
ASSERT_EQ(BlockReadAmpBitmap(100, 33, stats.get()).GetBytesPerBit(), 32);
ASSERT_EQ(BlockReadAmpBitmap(100, 35, stats.get()).GetBytesPerBit(), 32);
}
}
int
main(
int
argc,
char
**argv) {
::testing::InitGoogleTest(&argc, argv);
return
RUN_ALL_TESTS();
}