#ifndef ROCKSDB_LITE
#include "rocksdb/db.h"
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "db/db_impl.h"
#include "db/log_format.h"
#include "db/version_set.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/table.h"
#include "rocksdb/write_batch.h"
#include "util/filename.h"
#include "util/string_util.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace
rocksdb {
static
const
int
kValueSize = 1000;
class
CorruptionTest :
public
testing::Test {
public
:
test::ErrorEnv env_;
std::string dbname_;
shared_ptr<Cache> tiny_cache_;
Options options_;
DB* db_;
CorruptionTest() {
tiny_cache_ = NewLRUCache(100, 4);
options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
options_.env = &env_;
dbname_ = test::PerThreadDBPath(
"corruption_test"
);
DestroyDB(dbname_, options_);
db_ =
nullptr
;
options_.create_if_missing =
true
;
BlockBasedTableOptions table_options;
table_options.block_size_deviation = 0;
options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen();
options_.create_if_missing =
false
;
}
~CorruptionTest() {
delete
db_;
DestroyDB(dbname_, Options());
}
void
CloseDb() {
delete
db_;
db_ =
nullptr
;
}
Status TryReopen(Options* options =
nullptr
) {
delete
db_;
db_ =
nullptr
;
Options opt = (options ? *options : options_);
opt.env = &env_;
opt.arena_block_size = 4096;
BlockBasedTableOptions table_options;
table_options.block_cache = tiny_cache_;
table_options.block_size_deviation = 0;
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
return
DB::Open(opt, dbname_, &db_);
}
void
Reopen(Options* options =
nullptr
) {
ASSERT_OK(TryReopen(options));
}
void
RepairDB() {
delete
db_;
db_ =
nullptr
;
ASSERT_OK(::rocksdb::RepairDB(dbname_, options_));
}
void
Build(
int
n,
int
flush_every = 0) {
std::string key_space, value_space;
WriteBatch batch;
for
(
int
i = 0; i < n; i++) {
if
(flush_every != 0 && i != 0 && i % flush_every == 0) {
DBImpl* dbi =
reinterpret_cast
<DBImpl*>(db_);
dbi->TEST_FlushMemTable();
}
Slice key = Key(i, &key_space);
batch.Clear();
batch.Put(key, Value(i, &value_space));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
}
void
Check(
int
min_expected,
int
max_expected) {
uint64_t next_expected = 0;
uint64_t missed = 0;
int
bad_keys = 0;
int
bad_values = 0;
int
correct = 0;
std::string value_space;
Iterator* iter = db_->NewIterator(ReadOptions(
false
,
true
));
for
(iter->SeekToFirst(); iter->Valid(); iter->Next()) {
uint64_t key;
Slice in(iter->key());
if
(!ConsumeDecimalNumber(&in, &key) ||
!in.empty() ||
key < next_expected) {
bad_keys++;
continue
;
}
missed += (key - next_expected);
next_expected = key + 1;
if
(iter->value() != Value(
static_cast
<
int
>(key), &value_space)) {
bad_values++;
}
else
{
correct++;
}
}
delete
iter;
fprintf
(stderr,
"expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%llu\n"
,
min_expected, max_expected, correct, bad_keys, bad_values,
static_cast
<unsigned
long
long
>(missed));
ASSERT_LE(min_expected, correct);
ASSERT_GE(max_expected, correct);
}
void
CorruptFile(
const
std::string& fname,
int
offset,
int
bytes_to_corrupt) {
struct
stat sbuf;
if
(stat(fname.c_str(), &sbuf) != 0) {
const
char
* msg =
strerror
(
errno
);
FAIL() << fname <<
": "
<< msg;
}
if
(offset < 0) {
if
(-offset > sbuf.st_size) {
offset = 0;
}
else
{
offset =
static_cast
<
int
>(sbuf.st_size + offset);
}
}
if
(offset > sbuf.st_size) {
offset =
static_cast
<
int
>(sbuf.st_size);
}
if
(offset + bytes_to_corrupt > sbuf.st_size) {
bytes_to_corrupt =
static_cast
<
int
>(sbuf.st_size - offset);
}
std::string contents;
Status s = ReadFileToString(Env::Default(), fname, &contents);
ASSERT_TRUE(s.ok()) << s.ToString();
for
(
int
i = 0; i < bytes_to_corrupt; i++) {
contents[i + offset] ^= 0x80;
}
s = WriteStringToFile(Env::Default(), contents, fname);
ASSERT_TRUE(s.ok()) << s.ToString();
Options options;
EnvOptions env_options;
ASSERT_NOK(VerifySstFileChecksum(options, env_options, fname));
}
void
Corrupt(FileType filetype,
int
offset,
int
bytes_to_corrupt) {
std::vector<std::string> filenames;
ASSERT_OK(env_.GetChildren(dbname_, &filenames));
uint64_t number;
FileType type;
std::string fname;
int
picked_number = -1;
for
(
size_t
i = 0; i < filenames.size(); i++) {
if
(ParseFileName(filenames[i], &number, &type) &&
type == filetype &&
static_cast
<
int
>(number) > picked_number) {
fname = dbname_ +
"/"
+ filenames[i];
picked_number =
static_cast
<
int
>(number);
}
}
ASSERT_TRUE(!fname.empty()) << filetype;
CorruptFile(fname, offset, bytes_to_corrupt);
}
void
CorruptTableFileAtLevel(
int
level,
int
offset,
int
bytes_to_corrupt) {
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
for
(
const
auto
& m : metadata) {
if
(m.level == level) {
CorruptFile(dbname_ +
"/"
+ m.name, offset, bytes_to_corrupt);
return
;
}
}
FAIL() <<
"no file found at level"
;
}
int
Property(
const
std::string& name) {
std::string property;
int
result;
if
(db_->GetProperty(name, &property) &&
sscanf
(property.c_str(),
"%d"
, &result) == 1) {
return
result;
}
else
{
return
-1;
}
}
Slice Key(
int
i, std::string* storage) {
char
buf[100];
snprintf(buf,
sizeof
(buf),
"%016d"
, i);
storage->assign(buf,
strlen
(buf));
return
Slice(*storage);
}
Slice Value(
int
k, std::string* storage) {
if
(k == 0) {
*storage = std::string(kValueSize,
' '
);
return
Slice(*storage);
}
else
{
Random r(k);
return
test::RandomString(&r, kValueSize, storage);
}
}
};
TEST_F(CorruptionTest, Recovery) {
Build(100);
Check(100, 100);
#ifdef OS_WIN
CloseDb();
#endif
Corrupt(kLogFile, 19, 1);
Corrupt(kLogFile,
log
::kBlockSize + 1000, 1);
ASSERT_TRUE(!TryReopen().ok());
options_.paranoid_checks =
false
;
Reopen(&options_);
Check(36, 36);
}
TEST_F(CorruptionTest, RecoverWriteError) {
env_.writable_file_error_ =
true
;
Status s = TryReopen();
ASSERT_TRUE(!s.ok());
}
TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
env_.writable_file_error_ =
true
;
const
int
num =
static_cast
<
int
>(3 + (Options().write_buffer_size / kValueSize));
std::string value_storage;
Status s;
bool
failed =
false
;
for
(
int
i = 0; i < num; i++) {
WriteBatch batch;
batch.Put(
"a"
, Value(100, &value_storage));
s = db_->Write(WriteOptions(), &batch);
if
(!s.ok()) {
failed =
true
;
}
ASSERT_TRUE(!failed || !s.ok());
}
ASSERT_TRUE(!s.ok());
ASSERT_GE(env_.num_writable_file_errors_, 1);
env_.writable_file_error_ =
false
;
Reopen();
}
TEST_F(CorruptionTest, TableFile) {
Build(100);
DBImpl* dbi =
reinterpret_cast
<DBImpl*>(db_);
dbi->TEST_FlushMemTable();
dbi->TEST_CompactRange(0,
nullptr
,
nullptr
);
dbi->TEST_CompactRange(1,
nullptr
,
nullptr
);
Corrupt(kTableFile, 100, 1);
Check(99, 99);
ASSERT_NOK(dbi->VerifyChecksum());
}
TEST_F(CorruptionTest, TableFileIndexData) {
Options options;
options.write_buffer_size = 100 * 1024 * 1024;
Reopen(&options);
Build(10000, 5000);
DBImpl* dbi =
reinterpret_cast
<DBImpl*>(db_);
dbi->TEST_FlushMemTable();
Corrupt(kTableFile, -2000, 500);
Reopen();
dbi =
reinterpret_cast
<DBImpl*>(db_);
Check(0, 5000);
ASSERT_NOK(dbi->VerifyChecksum());
}
TEST_F(CorruptionTest, MissingDescriptor) {
Build(1000);
RepairDB();
Reopen();
Check(1000, 1000);
}
TEST_F(CorruptionTest, SequenceNumberRecovery) {
ASSERT_OK(db_->Put(WriteOptions(),
"foo"
,
"v1"
));
ASSERT_OK(db_->Put(WriteOptions(),
"foo"
,
"v2"
));
ASSERT_OK(db_->Put(WriteOptions(),
"foo"
,
"v3"
));
ASSERT_OK(db_->Put(WriteOptions(),
"foo"
,
"v4"
));
ASSERT_OK(db_->Put(WriteOptions(),
"foo"
,
"v5"
));
RepairDB();
Reopen();
std::string v;
ASSERT_OK(db_->Get(ReadOptions(),
"foo"
, &v));
ASSERT_EQ(
"v5"
, v);
ASSERT_OK(db_->Put(WriteOptions(),
"foo"
,
"v6"
));
ASSERT_OK(db_->Get(ReadOptions(),
"foo"
, &v));
ASSERT_EQ(
"v6"
, v);
Reopen();
ASSERT_OK(db_->Get(ReadOptions(),
"foo"
, &v));
ASSERT_EQ(
"v6"
, v);
}
TEST_F(CorruptionTest, CorruptedDescriptor) {
ASSERT_OK(db_->Put(WriteOptions(),
"foo"
,
"hello"
));
DBImpl* dbi =
reinterpret_cast
<DBImpl*>(db_);
dbi->TEST_FlushMemTable();
dbi->TEST_CompactRange(0,
nullptr
,
nullptr
);
Corrupt(kDescriptorFile, 0, 1000);
Status s = TryReopen();
ASSERT_TRUE(!s.ok());
RepairDB();
Reopen();
std::string v;
ASSERT_OK(db_->Get(ReadOptions(),
"foo"
, &v));
ASSERT_EQ(
"hello"
, v);
}
TEST_F(CorruptionTest, CompactionInputError) {
Options options;
Reopen(&options);
Build(10);
DBImpl* dbi =
reinterpret_cast
<DBImpl*>(db_);
dbi->TEST_FlushMemTable();
dbi->TEST_CompactRange(0,
nullptr
,
nullptr
);
dbi->TEST_CompactRange(1,
nullptr
,
nullptr
);
ASSERT_EQ(1, Property(
"rocksdb.num-files-at-level2"
));
Corrupt(kTableFile, 100, 1);
Check(9, 9);
ASSERT_NOK(dbi->VerifyChecksum());
Build(10000);
Check(10000, 10000);
ASSERT_NOK(dbi->VerifyChecksum());
}
TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
Options options;
options.paranoid_checks =
true
;
options.write_buffer_size = 131072;
options.max_write_buffer_number = 2;
Reopen(&options);
DBImpl* dbi =
reinterpret_cast
<DBImpl*>(db_);
for
(
int
level = 1; level < dbi->NumberLevels(); level++) {
dbi->Put(WriteOptions(),
""
,
"begin"
);
dbi->Put(WriteOptions(),
"~"
,
"end"
);
dbi->TEST_FlushMemTable();
for
(
int
comp_level = 0; comp_level < dbi->NumberLevels() - level;
++comp_level) {
dbi->TEST_CompactRange(comp_level,
nullptr
,
nullptr
);
}
}
Reopen(&options);
dbi =
reinterpret_cast
<DBImpl*>(db_);
Build(10);
dbi->TEST_FlushMemTable();
dbi->TEST_WaitForCompact();
ASSERT_EQ(1, Property(
"rocksdb.num-files-at-level0"
));
CorruptTableFileAtLevel(0, 100, 1);
Check(9, 9);
ASSERT_NOK(dbi->VerifyChecksum());
Status s;
std::string tmp1, tmp2;
bool
failed =
false
;
for
(
int
i = 0; i < 10000; i++) {
s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
if
(!s.ok()) {
failed =
true
;
}
ASSERT_TRUE(!failed || !s.ok()) <<
"write did not fail in a corrupted db"
;
}
ASSERT_TRUE(!s.ok()) <<
"write did not fail in corrupted paranoid db"
;
}
TEST_F(CorruptionTest, UnrelatedKeys) {
Build(10);
DBImpl* dbi =
reinterpret_cast
<DBImpl*>(db_);
dbi->TEST_FlushMemTable();
Corrupt(kTableFile, 100, 1);
ASSERT_NOK(dbi->VerifyChecksum());
std::string tmp1, tmp2;
ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2)));
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
dbi->TEST_FlushMemTable();
ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
}
TEST_F(CorruptionTest, FileSystemStateCorrupted) {
for
(
int
iter = 0; iter < 2; ++iter) {
Options options;
options.paranoid_checks =
true
;
options.create_if_missing =
true
;
Reopen(&options);
Build(10);
ASSERT_OK(db_->Flush(FlushOptions()));
DBImpl* dbi =
reinterpret_cast
<DBImpl*>(db_);
std::vector<LiveFileMetaData> metadata;
dbi->GetLiveFilesMetaData(&metadata);
ASSERT_GT(metadata.size(),
size_t
(0));
std::string filename = dbname_ + metadata[0].name;
delete
db_;
db_ =
nullptr
;
if
(iter == 0) {
unique_ptr<WritableFile> file;
env_.NewWritableFile(filename, &file, EnvOptions());
file->Append(Slice(
"corrupted sst"
));
file.reset();
}
else
{
env_.DeleteFile(filename);
}
Status x = TryReopen(&options);
ASSERT_TRUE(x.IsCorruption());
DestroyDB(dbname_, options_);
Reopen(&options);
}
}
}
int
main(
int
argc,
char
** argv) {
::testing::InitGoogleTest(&argc, argv);
return
RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int
main(
int
,
char
**
) {
fprintf
(stderr,
"SKIPPED as RepairDB() is not supported in ROCKSDB_LITE\n"
);
return
0;
}
#endif // !ROCKSDB_LITE