#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/sync_point.h"
namespace
rocksdb {
class
DBFlushTest :
public
DBTestBase {
public
:
DBFlushTest() : DBTestBase(
"/db_flush_test"
) {}
};
class
DBFlushDirectIOTest :
public
DBFlushTest,
public
::testing::WithParamInterface<
bool
> {
public
:
DBFlushDirectIOTest() : DBFlushTest() {}
};
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
Options options;
options.disable_auto_compactions =
true
;
options.max_background_flushes = 2;
options.env = env_;
Reopen(options);
FlushOptions no_wait;
no_wait.wait =
false
;
SyncPoint::GetInstance()->LoadDependency(
{{
"VersionSet::LogAndApply:WriteManifest"
,
"DBFlushTest::FlushWhileWritingManifest:1"
},
{
"MemTableList::InstallMemtableFlushResults:InProgress"
,
"VersionSet::LogAndApply:WriteManifestDone"
}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(
"foo"
,
"v"
));
ASSERT_OK(dbfull()->Flush(no_wait));
TEST_SYNC_POINT(
"DBFlushTest::FlushWhileWritingManifest:1"
);
ASSERT_OK(Put(
"bar"
,
"v"
));
ASSERT_OK(dbfull()->Flush(no_wait));
dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE
ASSERT_EQ(2, TotalTableFiles());
#endif // ROCKSDB_LITE
}
TEST_F(DBFlushTest, SyncFail) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new
FaultInjectionTestEnv(env_));
Options options;
options.disable_auto_compactions =
true
;
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->LoadDependency(
{{
"DBFlushTest::SyncFail:1"
,
"DBImpl::SyncClosedLogs:Start"
},
{
"DBImpl::SyncClosedLogs:Failed"
,
"DBFlushTest::SyncFail:2"
}});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
Put(
"key"
,
"value"
);
auto
* cfd =
reinterpret_cast
<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd();
FlushOptions flush_options;
flush_options.wait =
false
;
ASSERT_OK(dbfull()->Flush(flush_options));
auto
current_before = cfd->current();
int
refs_before = cfd->current()->TEST_refs();
fault_injection_env->SetFilesystemActive(
false
);
TEST_SYNC_POINT(
"DBFlushTest::SyncFail:1"
);
TEST_SYNC_POINT(
"DBFlushTest::SyncFail:2"
);
fault_injection_env->SetFilesystemActive(
true
);
dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE
ASSERT_EQ(
""
, FilesPerLevel());
#endif // ROCKSDB_LITE
ASSERT_EQ(current_before, cfd->current());
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
Destroy(options);
}
TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.memtable_factory.reset(
new
SpecialSkipListFactory(1));
Reopen(options);
env_->SetBackgroundThreads(0, Env::HIGH);
std::
thread
::id tid;
int
num_flushes = 0, num_compactions = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BGWorkFlush"
, [&](
void
*
) {
if
(tid == std::
thread
::id()) {
tid = std::this_thread::get_id();
}
else
{
ASSERT_EQ(tid, std::this_thread::get_id());
}
++num_flushes;
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BGWorkCompaction"
, [&](
void
*
) {
ASSERT_EQ(tid, std::this_thread::get_id());
++num_compactions;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(
"key"
,
"val"
));
for
(
int
i = 0; i < 4; ++i) {
ASSERT_OK(Put(
"key"
,
"val"
));
dbfull()->TEST_WaitForFlushMemTable();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(4, num_flushes);
ASSERT_EQ(1, num_compactions);
}
TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
Options options = CurrentOptions();
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
Reopen(options);
SyncPoint::GetInstance()->LoadDependency(
{{
"DBImpl::BGWorkFlush"
,
"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"
},
{
"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2"
,
"FlushJob::WriteLevel0Table"
}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(
"key1"
,
"value1"
));
port::Thread t([&]() {
ASSERT_OK(Flush());
});
TEST_SYNC_POINT(
"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"
);
ASSERT_OK(Put(
"key2"
,
"value2"
));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
TEST_SYNC_POINT(
"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2"
);
t.join();
}
TEST_P(DBFlushDirectIOTest, DirectIO) {
Options options;
options.create_if_missing =
true
;
options.disable_auto_compactions =
true
;
options.max_background_flushes = 2;
options.use_direct_io_for_flush_and_compaction = GetParam();
options.env =
new
MockEnv(Env::Default());
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:create_file"
, [&](
void
* arg) {
bool
* use_direct_writes =
static_cast
<
bool
*>(arg);
ASSERT_EQ(*use_direct_writes,
options.use_direct_io_for_flush_and_compaction);
});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_OK(Put(
"foo"
,
"v"
));
FlushOptions flush_options;
flush_options.wait =
true
;
ASSERT_OK(dbfull()->Flush(flush_options));
Destroy(options);
delete
options.env;
}
TEST_F(DBFlushTest, FlushError) {
Options options;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new
FaultInjectionTestEnv(env_));
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.disable_auto_compactions =
true
;
options.env = fault_injection_env.get();
Reopen(options);
ASSERT_OK(Put(
"key1"
,
"value1"
));
ASSERT_OK(Put(
"key2"
,
"value2"
));
fault_injection_env->SetFilesystemActive(
false
);
Status s = dbfull()->TEST_SwitchMemtable();
fault_injection_env->SetFilesystemActive(
true
);
Destroy(options);
ASSERT_NE(s, Status::OK());
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());
}
int
main(
int
argc,
char
** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return
RUN_ALL_TESTS();
}