#ifndef ROCKSDB_LITE
#include "utilities/ttl/db_ttl_impl.h"
#include "db/write_batch_internal.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "util/coding.h"
#include "util/filename.h"
namespace
rocksdb {
void
DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
Env* env) {
if
(options->compaction_filter) {
options->compaction_filter =
new
TtlCompactionFilter(ttl, env, options->compaction_filter);
}
else
{
options->compaction_filter_factory =
std::shared_ptr<CompactionFilterFactory>(
new
TtlCompactionFilterFactory(
ttl, env, options->compaction_filter_factory));
}
if
(options->merge_operator) {
options->merge_operator.reset(
new
TtlMergeOperator(options->merge_operator, env));
}
}
DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {}
DBWithTTLImpl::~DBWithTTLImpl() {
CancelAllBackgroundWork(db_,
true
);
delete
GetOptions().compaction_filter;
}
Status UtilityDB::OpenTtlDB(
const
Options& options,
const
std::string& dbname,
StackableDB** dbptr, int32_t ttl,
bool
read_only) {
DBWithTTL* db;
Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
if
(s.ok()) {
*dbptr = db;
}
else
{
*dbptr =
nullptr
;
}
return
s;
}
Status DBWithTTL::Open(
const
Options& options,
const
std::string& dbname,
DBWithTTL** dbptr, int32_t ttl,
bool
read_only) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;
Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
dbptr, {ttl}, read_only);
if
(s.ok()) {
assert
(handles.size() == 1);
delete
handles[0];
}
return
s;
}
Status DBWithTTL::Open(
const
DBOptions& db_options,
const
std::string& dbname,
const
std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
std::vector<int32_t> ttls,
bool
read_only) {
if
(ttls.size() != column_families.size()) {
return
Status::InvalidArgument(
"ttls size has to be the same as number of column families"
);
}
std::vector<ColumnFamilyDescriptor> column_families_sanitized =
column_families;
for
(
size_t
i = 0; i < column_families_sanitized.size(); ++i) {
DBWithTTLImpl::SanitizeOptions(
ttls[i], &column_families_sanitized[i].options,
db_options.env ==
nullptr
? Env::Default() : db_options.env);
}
DB* db;
Status st;
if
(read_only) {
st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
handles, &db);
}
else
{
st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
}
if
(st.ok()) {
*dbptr =
new
DBWithTTLImpl(db);
}
else
{
*dbptr =
nullptr
;
}
return
st;
}
Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
const
ColumnFamilyOptions& options,
const
std::string& column_family_name,
ColumnFamilyHandle** handle,
int
ttl) {
ColumnFamilyOptions sanitized_options = options;
DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv());
return
DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
handle);
}
Status DBWithTTLImpl::CreateColumnFamily(
const
ColumnFamilyOptions& options,
const
std::string& column_family_name,
ColumnFamilyHandle** handle) {
return
CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
}
Status DBWithTTLImpl::AppendTS(
const
Slice& val, std::string* val_with_ts,
Env* env) {
val_with_ts->reserve(kTSLength + val.size());
char
ts_string[kTSLength];
int64_t curtime;
Status st = env->GetCurrentTime(&curtime);
if
(!st.ok()) {
return
st;
}
EncodeFixed32(ts_string, (int32_t)curtime);
val_with_ts->append(val.data(), val.size());
val_with_ts->append(ts_string, kTSLength);
return
st;
}
Status DBWithTTLImpl::SanityCheckTimestamp(
const
Slice& str) {
if
(str.size() < kTSLength) {
return
Status::Corruption(
"Error: value's length less than timestamp's\n"
);
}
int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
if
(timestamp_value < kMinTimestamp) {
return
Status::Corruption(
"Error: Timestamp < ttl feature release time!\n"
);
}
return
Status::OK();
}
bool
DBWithTTLImpl::IsStale(
const
Slice& value, int32_t ttl, Env* env) {
if
(ttl <= 0) {
return
false
;
}
int64_t curtime;
if
(!env->GetCurrentTime(&curtime).ok()) {
return
false
;
}
int32_t timestamp_value =
DecodeFixed32(value.data() + value.size() - kTSLength);
return
(timestamp_value + ttl) < curtime;
}
Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
Status st;
if
(pinnable_val->size() < kTSLength) {
return
Status::Corruption(
"Bad timestamp in key-value"
);
}
pinnable_val->remove_suffix(kTSLength);
return
st;
}
Status DBWithTTLImpl::StripTS(std::string* str) {
Status st;
if
(str->length() < kTSLength) {
return
Status::Corruption(
"Bad timestamp in key-value"
);
}
str->erase(str->length() - kTSLength, kTSLength);
return
st;
}
Status DBWithTTLImpl::Put(
const
WriteOptions& options,
ColumnFamilyHandle* column_family,
const
Slice& key,
const
Slice& val) {
WriteBatch batch;
batch.Put(column_family, key, val);
return
Write(options, &batch);
}
Status DBWithTTLImpl::Get(
const
ReadOptions& options,
ColumnFamilyHandle* column_family,
const
Slice& key,
PinnableSlice* value) {
Status st = db_->Get(options, column_family, key, value);
if
(!st.ok()) {
return
st;
}
st = SanityCheckTimestamp(*value);
if
(!st.ok()) {
return
st;
}
return
StripTS(value);
}
std::vector<Status> DBWithTTLImpl::MultiGet(
const
ReadOptions& options,
const
std::vector<ColumnFamilyHandle*>& column_family,
const
std::vector<Slice>& keys, std::vector<std::string>* values) {
auto
statuses = db_->MultiGet(options, column_family, keys, values);
for
(
size_t
i = 0; i < keys.size(); ++i) {
if
(!statuses[i].ok()) {
continue
;
}
statuses[i] = SanityCheckTimestamp((*values)[i]);
if
(!statuses[i].ok()) {
continue
;
}
statuses[i] = StripTS(&(*values)[i]);
}
return
statuses;
}
bool
DBWithTTLImpl::KeyMayExist(
const
ReadOptions& options,
ColumnFamilyHandle* column_family,
const
Slice& key, std::string* value,
bool
* value_found) {
bool
ret = db_->KeyMayExist(options, column_family, key, value, value_found);
if
(ret && value !=
nullptr
&& value_found !=
nullptr
&& *value_found) {
if
(!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
return
false
;
}
}
return
ret;
}
Status DBWithTTLImpl::Merge(
const
WriteOptions& options,
ColumnFamilyHandle* column_family,
const
Slice& key,
const
Slice& value) {
WriteBatch batch;
batch.Merge(column_family, key, value);
return
Write(options, &batch);
}
Status DBWithTTLImpl::Write(
const
WriteOptions& opts, WriteBatch* updates) {
class
Handler :
public
WriteBatch::Handler {
public
:
explicit
Handler(Env* env) : env_(env) {}
WriteBatch updates_ttl;
Status batch_rewrite_status;
virtual
Status PutCF(uint32_t column_family_id,
const
Slice& key,
const
Slice& value) override {
std::string value_with_ts;
Status st = AppendTS(value, &value_with_ts, env_);
if
(!st.ok()) {
batch_rewrite_status = st;
}
else
{
WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
value_with_ts);
}
return
Status::OK();
}
virtual
Status MergeCF(uint32_t column_family_id,
const
Slice& key,
const
Slice& value) override {
std::string value_with_ts;
Status st = AppendTS(value, &value_with_ts, env_);
if
(!st.ok()) {
batch_rewrite_status = st;
}
else
{
WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
value_with_ts);
}
return
Status::OK();
}
virtual
Status DeleteCF(uint32_t column_family_id,
const
Slice& key) override {
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
return
Status::OK();
}
virtual
void
LogData(
const
Slice& blob) override {
updates_ttl.PutLogData(blob);
}
private
:
Env* env_;
};
Handler handler(GetEnv());
updates->Iterate(&handler);
if
(!handler.batch_rewrite_status.ok()) {
return
handler.batch_rewrite_status;
}
else
{
return
db_->Write(opts, &(handler.updates_ttl));
}
}
Iterator* DBWithTTLImpl::NewIterator(
const
ReadOptions& opts,
ColumnFamilyHandle* column_family) {
return
new
TtlIterator(db_->NewIterator(opts, column_family));
}
void
DBWithTTLImpl::SetTtl(ColumnFamilyHandle *h, int32_t ttl) {
std::shared_ptr<TtlCompactionFilterFactory> filter;
Options opts;
opts = GetOptions(h);
filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
opts.compaction_filter_factory);
if
(!filter)
return
;
filter->SetTtl(ttl);
}
}
#endif // ROCKSDB_LITE