#include <atomic>
#include <memory>
#include <thread>
#include <vector>
#include "db/db_test_util.h"
#include "db/write_batch_internal.h"
#include "db/write_thread.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/string_util.h"
#include "util/sync_point.h"
namespace
rocksdb {
class
DBWriteTest :
public
DBTestBase,
public
testing::WithParamInterface<
int
> {
public
:
DBWriteTest() : DBTestBase(
"/db_write_test"
) {}
Options GetOptions() {
return
DBTestBase::GetOptions(GetParam()); }
void
Open() { DBTestBase::Reopen(GetOptions()); }
};
TEST_P(DBWriteTest, SyncAndDisableWAL) {
WriteOptions write_options;
write_options.sync =
true
;
write_options.disableWAL =
true
;
ASSERT_TRUE(dbfull()->Put(write_options,
"foo"
,
"bar"
).IsInvalidArgument());
WriteBatch batch;
ASSERT_OK(batch.Put(
"foo"
,
"bar"
));
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
}
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
constexpr
int
kNumThreads = 5;
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new
FaultInjectionTestEnv(Env::Default()));
Options options = GetOptions();
options.env = mock_env.get();
Reopen(options);
std::atomic<
int
> ready_count{0};
std::atomic<
int
> leader_count{0};
std::vector<port::Thread> threads;
mock_env->SetFilesystemActive(
false
);
SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait"
, [&](
void
* arg) {
ready_count++;
auto
* w =
reinterpret_cast
<WriteThread::Writer*>(arg);
if
(w->state == WriteThread::STATE_GROUP_LEADER) {
leader_count++;
while
(ready_count < kNumThreads) {
}
}
});
SyncPoint::GetInstance()->EnableProcessing();
for
(
int
i = 0; i < kNumThreads; i++) {
threads.push_back(port::Thread(
[&](
int
index) {
auto
res = Put(
"key"
+ ToString(index),
"value"
);
if
(options.manual_wal_flush) {
ASSERT_TRUE(res.ok());
}
else
{
ASSERT_FALSE(res.ok());
}
},
i));
}
for
(
int
i = 0; i < kNumThreads; i++) {
threads[i].join();
}
ASSERT_EQ(1, leader_count);
Close();
}
TEST_P(DBWriteTest, ManualWalFlushInEffect) {
Options options = GetOptions();
Reopen(options);
ASSERT_TRUE(Put(
"key"
+ ToString(0),
"value"
).ok());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->FlushWAL(
false
).ok());
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
dbfull()->TEST_SwitchWAL();
ASSERT_TRUE(Put(
"key"
+ ToString(0),
"value"
).ok());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->FlushWAL(
false
).ok());
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
}
TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new
FaultInjectionTestEnv(Env::Default()));
Options options = GetOptions();
options.env = mock_env.get();
Reopen(options);
for
(
int
i = 0; i < 2; i++) {
mock_env->SetFilesystemActive(i != 0);
auto
res = Put(
"key"
+ ToString(i),
"value"
);
if
(!options.manual_wal_flush) {
ASSERT_FALSE(res.ok());
}
}
Close();
}
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,
DBTestBase::kPipelinedWrite));
}
int
main(
int
argc,
char
** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return
RUN_ALL_TESTS();
}