#ifndef ROCKSDB_LITE
#include "utilities/date_tiered/date_tiered_db_impl.h"
#include <limits>
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/write_batch_internal.h"
#include "monitoring/instrumented_mutex.h"
#include "options/options_helper.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/utilities/date_tiered_db.h"
#include "table/merging_iterator.h"
#include "util/coding.h"
#include "util/filename.h"
#include "util/string_util.h"
namespace
rocksdb {
DateTieredDBImpl::DateTieredDBImpl(
DB* db, Options options,
const
std::vector<ColumnFamilyDescriptor>& descriptors,
const
std::vector<ColumnFamilyHandle*>& handles, int64_t ttl,
int64_t column_family_interval)
: db_(db),
cf_options_(ColumnFamilyOptions(options)),
ioptions_(ImmutableCFOptions(options)),
moptions_(MutableCFOptions(options)),
icomp_(cf_options_.comparator),
ttl_(ttl),
column_family_interval_(column_family_interval),
mutex_(options.statistics.get(), db->GetEnv(), DB_MUTEX_WAIT_MICROS,
options.use_adaptive_mutex) {
latest_timebound_ = std::numeric_limits<int64_t>::min();
for
(
size_t
i = 0; i < handles.size(); ++i) {
const
auto
& name = descriptors[i].name;
int64_t timestamp = 0;
try
{
timestamp = ParseUint64(name);
}
catch
(
const
std::invalid_argument&) {
db_->DestroyColumnFamilyHandle(handles[i]);
continue
;
}
if
(timestamp > latest_timebound_) {
latest_timebound_ = timestamp;
}
handle_map_.insert(std::make_pair(timestamp, handles[i]));
}
}
DateTieredDBImpl::~DateTieredDBImpl() {
for
(
auto
handle : handle_map_) {
db_->DestroyColumnFamilyHandle(handle.second);
}
delete
db_;
db_ =
nullptr
;
}
Status DateTieredDB::Open(
const
Options& options,
const
std::string& dbname,
DateTieredDB** dbptr, int64_t ttl,
int64_t column_family_interval,
bool
read_only) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> descriptors;
std::vector<ColumnFamilyHandle*> handles;
DB* db;
Status s;
std::vector<std::string> column_family_names;
s = DB::ListColumnFamilies(db_options, dbname, &column_family_names);
if
(!s.ok()) {
s = DB::Open(options, dbname, &db);
if
(!s.ok()) {
return
s;
}
}
else
{
for
(
auto
name : column_family_names) {
descriptors.emplace_back(ColumnFamilyDescriptor(name, cf_options));
}
if
(read_only) {
s = DB::OpenForReadOnly(db_options, dbname, descriptors, &handles, &db);
}
else
{
s = DB::Open(db_options, dbname, descriptors, &handles, &db);
}
}
if
(s.ok()) {
*dbptr =
new
DateTieredDBImpl(db, options, descriptors, handles, ttl,
column_family_interval);
}
return
s;
}
bool
DateTieredDBImpl::IsStale(int64_t keytime, int64_t ttl, Env* env) {
if
(ttl <= 0) {
return
false
;
}
int64_t curtime;
if
(!env->GetCurrentTime(&curtime).ok()) {
return
false
;
}
return
curtime >= keytime + ttl;
}
Status DateTieredDBImpl::DropObsoleteColumnFamilies() {
int64_t curtime;
Status s;
s = db_->GetEnv()->GetCurrentTime(&curtime);
if
(!s.ok()) {
return
s;
}
{
InstrumentedMutexLock l(&mutex_);
auto
iter = handle_map_.begin();
while
(iter != handle_map_.end()) {
if
(iter->first <= curtime - ttl_) {
s = db_->DropColumnFamily(iter->second);
if
(!s.ok()) {
return
s;
}
delete
iter->second;
iter = handle_map_.erase(iter);
}
else
{
break
;
}
}
}
return
Status::OK();
}
Status DateTieredDBImpl::GetTimestamp(
const
Slice& key, int64_t* result) {
if
(key.size() < kTSLength) {
return
Status::Corruption(
"Bad timestamp in key"
);
}
const
char
* pos = key.data() + key.size() - 8;
int64_t timestamp = 0;
if
(port::kLittleEndian) {
int
bytes_to_fill = 8;
for
(
int
i = 0; i < bytes_to_fill; ++i) {
timestamp |= (
static_cast
<uint64_t>(
static_cast
<unsigned
char
>(pos[i]))
<< ((bytes_to_fill - i - 1) << 3));
}
}
else
{
memcpy
(×tamp, pos,
sizeof
(timestamp));
}
*result = timestamp;
return
Status::OK();
}
Status DateTieredDBImpl::CreateColumnFamily(
ColumnFamilyHandle** column_family) {
int64_t curtime;
Status s;
mutex_.AssertHeld();
s = db_->GetEnv()->GetCurrentTime(&curtime);
if
(!s.ok()) {
return
s;
}
int64_t new_timebound;
if
(handle_map_.empty()) {
new_timebound = curtime + column_family_interval_;
}
else
{
new_timebound =
latest_timebound_ +
((curtime - latest_timebound_) / column_family_interval_ + 1) *
column_family_interval_;
}
std::string cf_name = ToString(new_timebound);
latest_timebound_ = new_timebound;
s = db_->CreateColumnFamily(cf_options_, cf_name, column_family);
if
(s.ok()) {
handle_map_.insert(std::make_pair(new_timebound, *column_family));
}
return
s;
}
Status DateTieredDBImpl::FindColumnFamily(int64_t keytime,
ColumnFamilyHandle** column_family,
bool
create_if_missing) {
*column_family =
nullptr
;
{
InstrumentedMutexLock l(&mutex_);
auto
iter = handle_map_.upper_bound(keytime);
if
(iter == handle_map_.end()) {
if
(!create_if_missing) {
return
Status::NotFound();
}
else
{
return
CreateColumnFamily(column_family);
}
}
*column_family = iter->second;
}
return
Status::OK();
}
Status DateTieredDBImpl::Put(
const
WriteOptions& options,
const
Slice& key,
const
Slice& val) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, ×tamp);
if
(!s.ok()) {
return
s;
}
DropObsoleteColumnFamilies();
if
(IsStale(timestamp, ttl_, db_->GetEnv())) {
return
Status::InvalidArgument();
}
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family,
true
);
if
(!s.ok()) {
return
s;
}
WriteBatch batch;
batch.Put(column_family, key, val);
return
Write(options, &batch);
}
Status DateTieredDBImpl::Get(
const
ReadOptions& options,
const
Slice& key,
std::string* value) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, ×tamp);
if
(!s.ok()) {
return
s;
}
if
(IsStale(timestamp, ttl_, db_->GetEnv())) {
return
Status::NotFound();
}
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family,
false
);
if
(!s.ok()) {
return
s;
}
if
(column_family ==
nullptr
) {
return
Status::NotFound();
}
return
db_->Get(options, column_family, key, value);
}
bool
DateTieredDBImpl::KeyMayExist(
const
ReadOptions& options,
const
Slice& key,
std::string* value,
bool
* value_found) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, ×tamp);
if
(!s.ok()) {
return
false
;
}
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family,
false
);
if
(!s.ok() || column_family ==
nullptr
) {
return
false
;
}
if
(IsStale(timestamp, ttl_, db_->GetEnv())) {
return
false
;
}
return
db_->KeyMayExist(options, column_family, key, value, value_found);
}
Status DateTieredDBImpl::Delete(
const
WriteOptions& options,
const
Slice& key) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, ×tamp);
if
(!s.ok()) {
return
s;
}
DropObsoleteColumnFamilies();
if
(IsStale(timestamp, ttl_, db_->GetEnv())) {
return
Status::NotFound();
}
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family,
false
);
if
(!s.ok()) {
return
s;
}
if
(column_family ==
nullptr
) {
return
Status::NotFound();
}
return
db_->Delete(options, column_family, key);
}
Status DateTieredDBImpl::Merge(
const
WriteOptions& options,
const
Slice& key,
const
Slice& value) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, ×tamp);
if
(!s.ok()) {
return
s;
}
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family,
true
);
if
(!s.ok()) {
return
s;
}
WriteBatch batch;
batch.Merge(column_family, key, value);
return
Write(options, &batch);
}
Status DateTieredDBImpl::Write(
const
WriteOptions& opts, WriteBatch* updates) {
class
Handler :
public
WriteBatch::Handler {
public
:
explicit
Handler() {}
WriteBatch updates_ttl;
Status batch_rewrite_status;
virtual
Status PutCF(uint32_t column_family_id,
const
Slice& key,
const
Slice& value) override {
WriteBatchInternal::Put(&updates_ttl, column_family_id, key, value);
return
Status::OK();
}
virtual
Status MergeCF(uint32_t column_family_id,
const
Slice& key,
const
Slice& value) override {
WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, value);
return
Status::OK();
}
virtual
Status DeleteCF(uint32_t column_family_id,
const
Slice& key) override {
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
return
Status::OK();
}
virtual
void
LogData(
const
Slice& blob) override {
updates_ttl.PutLogData(blob);
}
};
Handler handler;
updates->Iterate(&handler);
if
(!handler.batch_rewrite_status.ok()) {
return
handler.batch_rewrite_status;
}
else
{
return
db_->Write(opts, &(handler.updates_ttl));
}
}
Iterator* DateTieredDBImpl::NewIterator(
const
ReadOptions& opts) {
if
(handle_map_.empty()) {
return
NewEmptyIterator();
}
DBImpl* db_impl =
reinterpret_cast
<DBImpl*>(db_);
auto
db_iter = NewArenaWrappedDbIterator(
db_impl->GetEnv(), opts, ioptions_, moptions_, kMaxSequenceNumber,
cf_options_.max_sequential_skip_in_iterations, 0,
nullptr
);
auto
arena = db_iter->GetArena();
MergeIteratorBuilder builder(&icomp_, arena);
for
(
auto
& item : handle_map_) {
auto
handle = item.second;
builder.AddIterator(db_impl->NewInternalIterator(
arena, db_iter->GetRangeDelAggregator(), handle));
}
auto
internal_iter = builder.Finish();
db_iter->SetIterUnderDBIter(internal_iter);
return
db_iter;
}
}
#endif // ROCKSDB_LITE