#include "db/version_set.h"
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <algorithm>
#include <map>
#include <set>
#include <climits>
#include <unordered_map>
#include <vector>
#include <stdio.h>
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/table_cache.h"
#include "db/compaction.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "table/table_reader.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "table/format.h"
#include "table/plain_table_factory.h"
#include "table/meta_blocks.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/stop_watch.h"
namespace
rocksdb {
namespace
{
int
FindFileInRange(
const
InternalKeyComparator& icmp,
const
FileLevel& file_level,
const
Slice& key,
uint32_t left,
uint32_t right) {
while
(left < right) {
uint32_t mid = (left + right) / 2;
const
FdWithKeyRange& f = file_level.files[mid];
if
(icmp.InternalKeyComparator::Compare(f.largest_key, key) < 0) {
left = mid + 1;
}
else
{
right = mid;
}
}
return
right;
}
bool
NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
if
(a->smallest_seqno != b->smallest_seqno) {
return
a->smallest_seqno > b->smallest_seqno;
}
if
(a->largest_seqno != b->largest_seqno) {
return
a->largest_seqno > b->largest_seqno;
}
return
a->fd.GetNumber() > b->fd.GetNumber();
}
bool
BySmallestKey(FileMetaData* a, FileMetaData* b,
const
InternalKeyComparator* cmp) {
int
r = cmp->Compare(a->smallest, b->smallest);
if
(r != 0) {
return
(r < 0);
}
return
(a->fd.GetNumber() < b->fd.GetNumber());
}
class
FilePicker {
public
:
FilePicker(
std::vector<FileMetaData*>* files,
const
Slice& user_key,
const
Slice& ikey,
autovector<FileLevel>* file_levels,
unsigned
int
num_levels,
FileIndexer* file_indexer,
const
Comparator* user_comparator,
const
InternalKeyComparator* internal_comparator)
: num_levels_(num_levels),
curr_level_(-1),
search_left_bound_(0),
search_right_bound_(FileIndexer::kLevelMaxIndex),
#ifndef NDEBUG
files_(files),
#endif
file_levels_(file_levels),
user_key_(user_key),
ikey_(ikey),
file_indexer_(file_indexer),
user_comparator_(user_comparator),
internal_comparator_(internal_comparator) {
search_ended_ = !PrepareNextLevel();
if
(!search_ended_) {
for
(unsigned
int
i = 0; i < (*file_levels_)[0].num_files; ++i) {
auto
* r = (*file_levels_)[0].files[i].fd.table_reader;
if
(r) {
r->Prepare(ikey);
}
}
}
}
FdWithKeyRange* GetNextFile() {
while
(!search_ended_) {
while
(curr_index_in_curr_level_ < curr_file_level_->num_files) {
FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
int
cmp_largest = -1;
if
(num_levels_ > 1 || curr_file_level_->num_files > 3) {
assert
(
curr_level_ == 0 ||
curr_index_in_curr_level_ == start_index_in_curr_level_ ||
user_comparator_->Compare(user_key_,
ExtractUserKey(f->smallest_key)) <= 0);
int
cmp_smallest = user_comparator_->Compare(user_key_,
ExtractUserKey(f->smallest_key));
if
(cmp_smallest >= 0) {
cmp_largest = user_comparator_->Compare(user_key_,
ExtractUserKey(f->largest_key));
}
if
(curr_level_ > 0) {
file_indexer_->GetNextLevelIndex(curr_level_,
curr_index_in_curr_level_,
cmp_smallest, cmp_largest,
&search_left_bound_,
&search_right_bound_);
}
if
(cmp_smallest < 0 || cmp_largest > 0) {
if
(curr_level_ == 0) {
++curr_index_in_curr_level_;
continue
;
}
else
{
break
;
}
}
}
#ifndef NDEBUG
if
(prev_file_) {
if
(curr_level_ != 0) {
int
comp_sign = internal_comparator_->Compare(
prev_file_->largest_key, f->smallest_key);
assert
(comp_sign < 0);
}
else
{
assert
(curr_index_in_curr_level_ > 0);
assert
(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
files_[0][curr_index_in_curr_level_-1]));
}
}
prev_file_ = f;
#endif
if
(curr_level_ > 0 && cmp_largest < 0) {
search_ended_ = !PrepareNextLevel();
}
else
{
++curr_index_in_curr_level_;
}
return
f;
}
search_ended_ = !PrepareNextLevel();
}
return
nullptr
;
}
private
:
unsigned
int
num_levels_;
unsigned
int
curr_level_;
int
search_left_bound_;
int
search_right_bound_;
#ifndef NDEBUG
std::vector<FileMetaData*>* files_;
#endif
autovector<FileLevel>* file_levels_;
bool
search_ended_;
FileLevel* curr_file_level_;
unsigned
int
curr_index_in_curr_level_;
unsigned
int
start_index_in_curr_level_;
Slice user_key_;
Slice ikey_;
FileIndexer* file_indexer_;
const
Comparator* user_comparator_;
const
InternalKeyComparator* internal_comparator_;
#ifndef NDEBUG
FdWithKeyRange* prev_file_;
#endif
bool
PrepareNextLevel() {
curr_level_++;
while
(curr_level_ < num_levels_) {
curr_file_level_ = &(*file_levels_)[curr_level_];
if
(curr_file_level_->num_files == 0) {
assert
(search_left_bound_ == 0);
assert
(search_right_bound_ == -1 ||
search_right_bound_ == FileIndexer::kLevelMaxIndex);
search_left_bound_ = 0;
search_right_bound_ = FileIndexer::kLevelMaxIndex;
curr_level_++;
continue
;
}
int32_t start_index;
if
(curr_level_ == 0) {
start_index = 0;
}
else
{
if
(search_left_bound_ == search_right_bound_) {
start_index = search_left_bound_;
}
else
if
(search_left_bound_ < search_right_bound_) {
if
(search_right_bound_ == FileIndexer::kLevelMaxIndex) {
search_right_bound_ = curr_file_level_->num_files - 1;
}
start_index = FindFileInRange(*internal_comparator_,
*curr_file_level_, ikey_,
search_left_bound_, search_right_bound_);
}
else
{
search_left_bound_ = 0;
search_right_bound_ = FileIndexer::kLevelMaxIndex;
curr_level_++;
continue
;
}
}
start_index_in_curr_level_ = start_index;
curr_index_in_curr_level_ = start_index;
#ifndef NDEBUG
prev_file_ =
nullptr
;
#endif
return
true
;
}
return
false
;
}
};
}
Version::~Version() {
assert
(refs_ == 0);
prev_->next_ = next_;
next_->prev_ = prev_;
for
(
int
level = 0; level < num_levels_; level++) {
for
(
size_t
i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert
(f->refs > 0);
f->refs--;
if
(f->refs <= 0) {
if
(f->table_reader_handle) {
cfd_->table_cache()->ReleaseHandle(f->table_reader_handle);
f->table_reader_handle =
nullptr
;
}
vset_->obsolete_files_.push_back(f);
}
}
}
delete
[] files_;
}
int
FindFile(
const
InternalKeyComparator& icmp,
const
FileLevel& file_level,
const
Slice& key) {
return
FindFileInRange(icmp, file_level, key, 0, file_level.num_files);
}
void
DoGenerateFileLevel(FileLevel* file_level,
const
std::vector<FileMetaData*>& files,
Arena* arena) {
assert
(file_level);
assert
(files.size() >= 0);
assert
(arena);
size_t
num = files.size();
file_level->num_files = num;
char
* mem = arena->AllocateAligned(num *
sizeof
(FdWithKeyRange));
file_level->files =
new
(mem)FdWithKeyRange[num];
for
(
size_t
i = 0; i < num; i++) {
Slice smallest_key = files[i]->smallest.Encode();
Slice largest_key = files[i]->largest.Encode();
size_t
smallest_size = smallest_key.size();
size_t
largest_size = largest_key.size();
mem = arena->AllocateAligned(smallest_size + largest_size);
memcpy
(mem, smallest_key.data(), smallest_size);
memcpy
(mem + smallest_size, largest_key.data(), largest_size);
FdWithKeyRange& f = file_level->files[i];
f.fd = files[i]->fd;
f.smallest_key = Slice(mem, smallest_size);
f.largest_key = Slice(mem + smallest_size, largest_size);
}
}
static
bool
AfterFile(
const
Comparator* ucmp,
const
Slice* user_key,
const
FdWithKeyRange* f) {
return
(user_key !=
nullptr
&&
ucmp->Compare(*user_key, ExtractUserKey(f->largest_key)) > 0);
}
static
bool
BeforeFile(
const
Comparator* ucmp,
const
Slice* user_key,
const
FdWithKeyRange* f) {
return
(user_key !=
nullptr
&&
ucmp->Compare(*user_key, ExtractUserKey(f->smallest_key)) < 0);
}
bool
SomeFileOverlapsRange(
const
InternalKeyComparator& icmp,
bool
disjoint_sorted_files,
const
FileLevel& file_level,
const
Slice* smallest_user_key,
const
Slice* largest_user_key) {
const
Comparator* ucmp = icmp.user_comparator();
if
(!disjoint_sorted_files) {
for
(
size_t
i = 0; i < file_level.num_files; i++) {
const
FdWithKeyRange* f = &(file_level.files[i]);
if
(AfterFile(ucmp, smallest_user_key, f) ||
BeforeFile(ucmp, largest_user_key, f)) {
}
else
{
return
true
;
}
}
return
false
;
}
uint32_t index = 0;
if
(smallest_user_key !=
nullptr
) {
InternalKey small(*smallest_user_key, kMaxSequenceNumber,kValueTypeForSeek);
index = FindFile(icmp, file_level, small.Encode());
}
if
(index >= file_level.num_files) {
return
false
;
}
return
!BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
}
class
Version::LevelFileNumIterator :
public
Iterator {
public
:
LevelFileNumIterator(
const
InternalKeyComparator& icmp,
const
FileLevel* flevel)
: icmp_(icmp),
flevel_(flevel),
index_(flevel->num_files),
current_value_(0, 0, 0) {
}
virtual
bool
Valid()
const
{
return
index_ < flevel_->num_files;
}
virtual
void
Seek(
const
Slice& target) {
index_ = FindFile(icmp_, *flevel_, target);
}
virtual
void
SeekToFirst() { index_ = 0; }
virtual
void
SeekToLast() {
index_ = (flevel_->num_files == 0) ? 0 : flevel_->num_files - 1;
}
virtual
void
Next() {
assert
(Valid());
index_++;
}
virtual
void
Prev() {
assert
(Valid());
if
(index_ == 0) {
index_ = flevel_->num_files;
}
else
{
index_--;
}
}
Slice key()
const
{
assert
(Valid());
return
flevel_->files[index_].largest_key;
}
Slice value()
const
{
assert
(Valid());
auto
file_meta = flevel_->files[index_];
current_value_ = file_meta.fd;
return
Slice(
reinterpret_cast
<
const
char
*>(¤t_value_),
sizeof
(FileDescriptor));
}
virtual
Status status()
const
{
return
Status::OK(); }
private
:
const
InternalKeyComparator icmp_;
const
FileLevel* flevel_;
uint32_t index_;
mutable
FileDescriptor current_value_;
};
class
Version::LevelFileIteratorState :
public
TwoLevelIteratorState {
public
:
LevelFileIteratorState(TableCache* table_cache,
const
ReadOptions& read_options,
const
EnvOptions& env_options,
const
InternalKeyComparator& icomparator,
bool
for_compaction,
bool
prefix_enabled)
: TwoLevelIteratorState(prefix_enabled),
table_cache_(table_cache), read_options_(read_options),
env_options_(env_options), icomparator_(icomparator),
for_compaction_(for_compaction) {}
Iterator* NewSecondaryIterator(
const
Slice& meta_handle) override {
if
(meta_handle.size() !=
sizeof
(FileDescriptor)) {
return
NewErrorIterator(
Status::Corruption(
"FileReader invoked with unexpected value"
));
}
else
{
const
FileDescriptor* fd =
reinterpret_cast
<
const
FileDescriptor*>(meta_handle.data());
return
table_cache_->NewIterator(
read_options_, env_options_, icomparator_, *fd,
nullptr
, for_compaction_);
}
}
bool
PrefixMayMatch(
const
Slice& internal_key) override {
return
true
;
}
private
:
TableCache* table_cache_;
const
ReadOptions read_options_;
const
EnvOptions& env_options_;
const
InternalKeyComparator& icomparator_;
bool
for_compaction_;
};
Status Version::GetTableProperties(std::shared_ptr<
const
TableProperties>* tp,
const
FileMetaData* file_meta,
const
std::string* fname) {
auto
table_cache = cfd_->table_cache();
auto
options = cfd_->options();
Status s = table_cache->GetTableProperties(
vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd,
tp,
true
);
if
(s.ok()) {
return
s;
}
if
(!s.IsIncomplete()) {
return
s;
}
std::unique_ptr<RandomAccessFile> file;
if
(fname !=
nullptr
) {
s = options->env->NewRandomAccessFile(
*fname, &file, vset_->storage_options_);
}
else
{
s = options->env->NewRandomAccessFile(
TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId()),
&file, vset_->storage_options_);
}
if
(!s.ok()) {
return
s;
}
TableProperties* raw_table_properties;
s = ReadTableProperties(
file.get(), file_meta->fd.GetFileSize(),
Footer::kInvalidTableMagicNumber
,
vset_->env_, options->info_log.get(), &raw_table_properties);
if
(!s.ok()) {
return
s;
}
RecordTick(options->statistics.get(), NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
*tp = std::shared_ptr<
const
TableProperties>(raw_table_properties);
return
s;
}
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
for
(
int
level = 0; level < num_levels_; level++) {
for
(
const
auto
& file_meta : files_[level]) {
auto
fname =
TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
std::shared_ptr<
const
TableProperties> table_properties;
Status s = GetTableProperties(&table_properties, file_meta, &fname);
if
(s.ok()) {
props->insert({fname, table_properties});
}
else
{
return
s;
}
}
}
return
Status::OK();
}
size_t
Version::GetMemoryUsageByTableReaders() {
size_t
total_usage = 0;
for
(
auto
& file_level : file_levels_) {
for
(
size_t
i = 0; i < file_level.num_files; i++) {
total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
vset_->storage_options_, cfd_->internal_comparator(),
file_level.files[i].fd);
}
}
return
total_usage;
}
uint64_t Version::GetEstimatedActiveKeys() {
return
num_non_deletions_ - num_deletions_;
}
void
Version::AddIterators(
const
ReadOptions& read_options,
const
EnvOptions& soptions,
std::vector<Iterator*>* iters) {
for
(
size_t
i = 0; i < file_levels_[0].num_files; i++) {
const
auto
& file = file_levels_[0].files[i];
iters->push_back(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd));
}
for
(
int
level = 1; level < num_levels_; level++) {
if
(file_levels_[level].num_files != 0) {
iters->push_back(NewTwoLevelIterator(
new
LevelFileIteratorState(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(),
false
,
cfd_->options()->prefix_extractor !=
nullptr
),
new
LevelFileNumIterator(cfd_->internal_comparator(),
&file_levels_[level])));
}
}
}
void
Version::AddIterators(
const
ReadOptions& read_options,
const
EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder) {
for
(
size_t
i = 0; i < file_levels_[0].num_files; i++) {
const
auto
& file = file_levels_[0].files[i];
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd,
nullptr
,
false
, merge_iter_builder->GetArena()));
}
for
(
int
level = 1; level < num_levels_; level++) {
if
(file_levels_[level].num_files != 0) {
merge_iter_builder->AddIterator(NewTwoLevelIterator(
new
LevelFileIteratorState(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(),
false
,
cfd_->options()->prefix_extractor !=
nullptr
),
new
LevelFileNumIterator(cfd_->internal_comparator(),
&file_levels_[level]), merge_iter_builder->GetArena()));
}
}
}
enum
SaverState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
kMerge
};
namespace
version_set {
struct
Saver {
SaverState state;
const
Comparator* ucmp;
Slice user_key;
bool
* value_found;
std::string* value;
const
MergeOperator* merge_operator;
MergeContext* merge_context;
Logger* logger;
Statistics* statistics;
};
}
static
void
MarkKeyMayExist(
void
* arg) {
version_set::Saver* s =
reinterpret_cast
<version_set::Saver*>(arg);
s->state = kFound;
if
(s->value_found !=
nullptr
) {
*(s->value_found) =
false
;
}
}
static
bool
SaveValue(
void
* arg,
const
ParsedInternalKey& parsed_key,
const
Slice& v) {
version_set::Saver* s =
reinterpret_cast
<version_set::Saver*>(arg);
MergeContext* merge_contex = s->merge_context;
std::string merge_result;
assert
(s !=
nullptr
&& merge_contex !=
nullptr
);
if
(s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
switch
(parsed_key.type) {
case
kTypeValue:
if
(kNotFound == s->state) {
s->state = kFound;
s->value->assign(v.data(), v.size());
}
else
if
(kMerge == s->state) {
assert
(s->merge_operator !=
nullptr
);
s->state = kFound;
if
(!s->merge_operator->FullMerge(s->user_key, &v,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
}
}
else
{
assert
(
false
);
}
return
false
;
case
kTypeDeletion:
if
(kNotFound == s->state) {
s->state = kDeleted;
}
else
if
(kMerge == s->state) {
s->state = kFound;
if
(!s->merge_operator->FullMerge(s->user_key,
nullptr
,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
}
}
else
{
assert
(
false
);
}
return
false
;
case
kTypeMerge:
assert
(s->state == kNotFound || s->state == kMerge);
s->state = kMerge;
merge_contex->PushOperand(v);
return
true
;
default
:
assert
(
false
);
break
;
}
}
return
false
;
}
Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
uint64_t version_number)
: cfd_(cfd),
internal_comparator_((cfd ==
nullptr
) ?
nullptr
: &cfd->internal_comparator()),
user_comparator_(
(cfd ==
nullptr
) ?
nullptr
: internal_comparator_->user_comparator()),
table_cache_((cfd ==
nullptr
) ?
nullptr
: cfd->table_cache()),
merge_operator_((cfd ==
nullptr
) ?
nullptr
: cfd->options()->merge_operator.get()),
info_log_((cfd ==
nullptr
) ?
nullptr
: cfd->options()->info_log.get()),
db_statistics_((cfd ==
nullptr
) ?
nullptr
: cfd->options()->statistics.get()),
num_levels_(cfd ==
nullptr
? 0 : cfd->NumberLevels()),
num_non_empty_levels_(num_levels_),
file_indexer_(cfd ==
nullptr
?
nullptr
: cfd->internal_comparator().user_comparator()),
vset_(vset),
next_(
this
),
prev_(
this
),
refs_(0),
files_(
new
std::vector<FileMetaData*>[num_levels_]),
files_by_size_(num_levels_),
next_file_to_compact_by_size_(num_levels_),
compaction_score_(num_levels_),
compaction_level_(num_levels_),
version_number_(version_number),
total_file_size_(0),
total_raw_key_size_(0),
total_raw_value_size_(0),
num_non_deletions_(0),
num_deletions_(0) {
if
(cfd !=
nullptr
&& cfd->current() !=
nullptr
) {
total_file_size_ = cfd->current()->total_file_size_;
total_raw_key_size_ = cfd->current()->total_raw_key_size_;
total_raw_value_size_ = cfd->current()->total_raw_value_size_;
num_non_deletions_ = cfd->current()->num_non_deletions_;
num_deletions_ = cfd->current()->num_deletions_;
}
}
void
Version::Get(
const
ReadOptions& options,
const
LookupKey& k,
std::string* value,
Status* status,
MergeContext* merge_context,
bool
* value_found) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
assert
(status->ok() || status->IsMergeInProgress());
version_set::Saver saver;
saver.state = status->ok()? kNotFound : kMerge;
saver.ucmp = user_comparator_;
saver.user_key = user_key;
saver.value_found = value_found;
saver.value = value;
saver.merge_operator = merge_operator_;
saver.merge_context = merge_context;
saver.logger = info_log_;
saver.statistics = db_statistics_;
FilePicker fp(files_, user_key, ikey, &file_levels_, num_non_empty_levels_,
&file_indexer_, user_comparator_, internal_comparator_);
FdWithKeyRange* f = fp.GetNextFile();
while
(f !=
nullptr
) {
*status = table_cache_->Get(options, *internal_comparator_, f->fd, ikey,
&saver, SaveValue, MarkKeyMayExist);
if
(!status->ok()) {
return
;
}
switch
(saver.state) {
case
kNotFound:
break
;
case
kFound:
return
;
case
kDeleted:
*status = Status::NotFound();
return
;
case
kCorrupt:
*status = Status::Corruption(
"corrupted key for "
, user_key);
return
;
case
kMerge:
break
;
}
f = fp.GetNextFile();
}
if
(kMerge == saver.state) {
if
(!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized."
);
return
;
}
if
(merge_operator_->FullMerge(user_key,
nullptr
,
saver.merge_context->GetOperands(), value,
info_log_)) {
*status = Status::OK();
}
else
{
RecordTick(db_statistics_, NUMBER_MERGE_FAILURES);
*status = Status::Corruption(
"could not perform end-of-key merge for "
,
user_key);
}
}
else
{
*status = Status::NotFound();
}
}
void
Version::GenerateFileLevels() {
file_levels_.resize(num_non_empty_levels_);
for
(
int
level = 0; level < num_non_empty_levels_; level++) {
DoGenerateFileLevel(&file_levels_[level], files_[level], &arena_);
}
}
void
Version::PrepareApply(std::vector<uint64_t>& size_being_compacted) {
UpdateTemporaryStats();
ComputeCompactionScore(size_being_compacted);
UpdateFilesBySize();
UpdateNumNonEmptyLevels();
file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_);
GenerateFileLevels();
}
bool
Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
if
(file_meta->num_entries > 0) {
return
false
;
}
std::shared_ptr<
const
TableProperties> tp;
Status s = GetTableProperties(&tp, file_meta);
if
(!s.ok()) {
return
false
;
}
if
(tp.get() ==
nullptr
)
return
false
;
file_meta->num_entries = tp->num_entries;
file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties);
file_meta->raw_value_size = tp->raw_value_size;
file_meta->raw_key_size = tp->raw_key_size;
return
true
;
}
void
Version::UpdateTemporaryStats() {
static
const
int
kDeletionWeightOnCompaction = 2;
int
init_count = 0;
int
total_count = 0;
for
(
int
level = 0; level < num_levels_; level++) {
for
(
auto
* file_meta : files_[level]) {
if
(MaybeInitializeFileMetaData(file_meta)) {
total_file_size_ += file_meta->fd.GetFileSize();
total_raw_key_size_ += file_meta->raw_key_size;
total_raw_value_size_ += file_meta->raw_value_size;
num_non_deletions_ +=
file_meta->num_entries - file_meta->num_deletions;
num_deletions_ += file_meta->num_deletions;
init_count++;
}
total_count++;
}
}
uint64_t average_value_size = GetAverageValueSize();
for
(
int
level = 0; level < num_levels_; level++) {
for
(
auto
* file_meta : files_[level]) {
if
(file_meta->compensated_file_size == 0) {
file_meta->compensated_file_size = file_meta->fd.GetFileSize() +
file_meta->num_deletions * average_value_size *
kDeletionWeightOnCompaction;
}
}
}
}
void
Version::ComputeCompactionScore(
std::vector<uint64_t>& size_being_compacted) {
double
max_score = 0;
int
max_score_level = 0;
int
max_input_level =
cfd_->compaction_picker()->MaxInputLevel(NumberLevels());
for
(
int
level = 0; level <= max_input_level; level++) {
double
score;
if
(level == 0) {
int
numfiles = 0;
uint64_t total_size = 0;
for
(unsigned
int
i = 0; i < files_[level].size(); i++) {
if
(!files_[level][i]->being_compacted) {
total_size += files_[level][i]->compensated_file_size;
numfiles++;
}
}
if
(cfd_->options()->compaction_style == kCompactionStyleFIFO) {
score =
static_cast
<
double
>(total_size) /
cfd_->options()->compaction_options_fifo.max_table_files_size;
}
else
if
(numfiles >= cfd_->options()->level0_stop_writes_trigger) {
score = 1000000;
}
else
if
(numfiles >= cfd_->options()->level0_slowdown_writes_trigger) {
score = 10000;
}
else
{
score =
static_cast
<
double
>(numfiles) /
cfd_->options()->level0_file_num_compaction_trigger;
}
}
else
{
const
uint64_t level_bytes =
TotalCompensatedFileSize(files_[level]) - size_being_compacted[level];
score =
static_cast
<
double
>(level_bytes) /
cfd_->compaction_picker()->MaxBytesForLevel(level);
if
(max_score < score) {
max_score = score;
max_score_level = level;
}
}
compaction_level_[level] = level;
compaction_score_[level] = score;
}
max_compaction_score_ = max_score;
max_compaction_score_level_ = max_score_level;
for
(
int
i = 0; i < NumberLevels() - 2; i++) {
for
(
int
j = i + 1; j < NumberLevels() - 1; j++) {
if
(compaction_score_[i] < compaction_score_[j]) {
double
score = compaction_score_[i];
int
level = compaction_level_[i];
compaction_score_[i] = compaction_score_[j];
compaction_level_[i] = compaction_level_[j];
compaction_score_[j] = score;
compaction_level_[j] = level;
}
}
}
}
namespace
{
bool
CompareCompensatedSizeDescending(
const
Version::Fsize& first,
const
Version::Fsize& second) {
return
(first.file->compensated_file_size >
second.file->compensated_file_size);
}
}
void
Version::UpdateNumNonEmptyLevels() {
num_non_empty_levels_ = num_levels_;
for
(
int
i = num_levels_ - 1; i >= 0; i--) {
if
(files_[i].size() != 0) {
return
;
}
else
{
num_non_empty_levels_ = i;
}
}
}
void
Version::UpdateFilesBySize() {
if
(cfd_->options()->compaction_style == kCompactionStyleFIFO ||
cfd_->options()->compaction_style == kCompactionStyleUniversal) {
return
;
}
for
(
int
level = 0; level < NumberLevels() - 1; level++) {
const
std::vector<FileMetaData*>& files = files_[level];
auto
& files_by_size = files_by_size_[level];
assert
(files_by_size.size() == 0);
std::vector<Fsize> temp(files.size());
for
(unsigned
int
i = 0; i < files.size(); i++) {
temp[i].index = i;
temp[i].file = files[i];
}
size_t
num = Version::number_of_files_to_sort_;
if
(num > temp.size()) {
num = temp.size();
}
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
CompareCompensatedSizeDescending);
assert
(temp.size() == files.size());
for
(unsigned
int
i = 0; i < temp.size(); i++) {
files_by_size.push_back(temp[i].index);
}
next_file_to_compact_by_size_[level] = 0;
assert
(files_[level].size() == files_by_size_[level].size());
}
}
void
Version::Ref() {
++refs_;
}
bool
Version::Unref() {
assert
(refs_ >= 1);
--refs_;
if
(refs_ == 0) {
delete
this
;
return
true
;
}
return
false
;
}
bool
Version::NeedsCompaction()
const
{
int
max_input_level =
cfd_->compaction_picker()->MaxInputLevel(NumberLevels());
for
(
int
i = 0; i <= max_input_level; i++) {
if
(compaction_score_[i] >= 1) {
return
true
;
}
}
return
false
;
}
bool
Version::OverlapInLevel(
int
level,
const
Slice* smallest_user_key,
const
Slice* largest_user_key) {
return
SomeFileOverlapsRange(cfd_->internal_comparator(), (level > 0),
file_levels_[level], smallest_user_key,
largest_user_key);
}
int
Version::PickLevelForMemTableOutput(
const
Slice& smallest_user_key,
const
Slice& largest_user_key) {
int
level = 0;
if
(!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0,
static_cast
<ValueType>(0));
std::vector<FileMetaData*> overlaps;
int
max_mem_compact_level = cfd_->options()->max_mem_compaction_level;
while
(max_mem_compact_level > 0 && level < max_mem_compact_level) {
if
(OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break
;
}
if
(level + 2 >= num_levels_) {
level++;
break
;
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const
uint64_t sum = TotalFileSize(overlaps);
if
(sum > cfd_->compaction_picker()->MaxGrandParentOverlapBytes(level)) {
break
;
}
level++;
}
}
return
level;
}
void
Version::GetOverlappingInputs(
int
level,
const
InternalKey* begin,
const
InternalKey* end,
std::vector<FileMetaData*>* inputs,
int
hint_index,
int
* file_index) {
inputs->clear();
Slice user_begin, user_end;
if
(begin !=
nullptr
) {
user_begin = begin->user_key();
}
if
(end !=
nullptr
) {
user_end = end->user_key();
}
if
(file_index) {
*file_index = -1;
}
const
Comparator* user_cmp = cfd_->internal_comparator().user_comparator();
if
(begin !=
nullptr
&& end !=
nullptr
&& level > 0) {
GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs,
hint_index, file_index);
return
;
}
for
(
size_t
i = 0; i < file_levels_[level].num_files; ) {
FdWithKeyRange* f = &(file_levels_[level].files[i++]);
const
Slice file_start = ExtractUserKey(f->smallest_key);
const
Slice file_limit = ExtractUserKey(f->largest_key);
if
(begin !=
nullptr
&& user_cmp->Compare(file_limit, user_begin) < 0) {
}
else
if
(end !=
nullptr
&& user_cmp->Compare(file_start, user_end) > 0) {
}
else
{
inputs->push_back(files_[level][i-1]);
if
(level == 0) {
if
(begin !=
nullptr
&& user_cmp->Compare(file_start, user_begin) < 0) {
user_begin = file_start;
inputs->clear();
i = 0;
}
else
if
(end !=
nullptr
&& user_cmp->Compare(file_limit, user_end) > 0) {
user_end = file_limit;
inputs->clear();
i = 0;
}
}
else
if
(file_index) {
*file_index = i-1;
}
}
}
}
void
Version::GetOverlappingInputsBinarySearch(
int
level,
const
Slice& user_begin,
const
Slice& user_end,
std::vector<FileMetaData*>* inputs,
int
hint_index,
int
* file_index) {
assert
(level > 0);
int
min = 0;
int
mid = 0;
int
max = files_[level].size() -1;
bool
foundOverlap =
false
;
const
Comparator* user_cmp = cfd_->internal_comparator().user_comparator();
if
(hint_index != -1) {
mid = hint_index;
foundOverlap =
true
;
}
while
(!foundOverlap && min <= max) {
mid = (min + max)/2;
FdWithKeyRange* f = &(file_levels_[level].files[mid]);
const
Slice file_start = ExtractUserKey(f->smallest_key);
const
Slice file_limit = ExtractUserKey(f->largest_key);
if
(user_cmp->Compare(file_limit, user_begin) < 0) {
min = mid + 1;
}
else
if
(user_cmp->Compare(user_end, file_start) < 0) {
max = mid - 1;
}
else
{
foundOverlap =
true
;
break
;
}
}
if
(!foundOverlap) {
return
;
}
if
(file_index) {
*file_index = mid;
}
ExtendOverlappingInputs(level, user_begin, user_end, inputs, mid);
}
void
Version::ExtendOverlappingInputs(
int
level,
const
Slice& user_begin,
const
Slice& user_end,
std::vector<FileMetaData*>* inputs,
unsigned
int
midIndex) {
const
Comparator* user_cmp = cfd_->internal_comparator().user_comparator();
const
FdWithKeyRange* files = file_levels_[level].files;
#ifndef NDEBUG
{
assert
(midIndex < file_levels_[level].num_files);
const
FdWithKeyRange* f = &files[midIndex];
const
Slice fstart = ExtractUserKey(f->smallest_key);
const
Slice flimit = ExtractUserKey(f->largest_key);
if
(user_cmp->Compare(fstart, user_begin) >= 0) {
assert
(user_cmp->Compare(fstart, user_end) <= 0);
}
else
{
assert
(user_cmp->Compare(flimit, user_begin) >= 0);
}
}
#endif
int
startIndex = midIndex + 1;
int
endIndex = midIndex;
int
count __attribute__((unused)) = 0;
for
(
int
i = midIndex; i >= 0 ; i--) {
const
FdWithKeyRange* f = &files[i];
const
Slice file_limit = ExtractUserKey(f->largest_key);
if
(user_cmp->Compare(file_limit, user_begin) >= 0) {
startIndex = i;
assert
((count++,
true
));
}
else
{
break
;
}
}
for
(unsigned
int
i = midIndex+1; i < file_levels_[level].num_files; i++) {
const
FdWithKeyRange* f = &files[i];
const
Slice file_start = ExtractUserKey(f->smallest_key);
if
(user_cmp->Compare(file_start, user_end) <= 0) {
assert
((count++,
true
));
endIndex = i;
}
else
{
break
;
}
}
assert
(count == endIndex - startIndex + 1);
for
(
int
i = startIndex; i <= endIndex; i++) {
FileMetaData* f = files_[level][i];
inputs->push_back(f);
}
}
bool
Version::HasOverlappingUserKey(
const
std::vector<FileMetaData*>* inputs,
int
level) {
if
(inputs->empty() || level == 0){
return
false
;
}
const
Comparator* user_cmp = cfd_->internal_comparator().user_comparator();
const
FileLevel& file_level = file_levels_[level];
const
FdWithKeyRange* files = file_levels_[level].files;
const
size_t
kNumFiles = file_level.num_files;
size_t
last_file = FindFile(cfd_->internal_comparator(), file_level,
inputs->back()->largest.Encode());
assert
(0 <= last_file && last_file < kNumFiles);
if
(last_file < kNumFiles-1) {
const
Slice last_key_in_input = ExtractUserKey(
files[last_file].largest_key);
const
Slice first_key_after = ExtractUserKey(
files[last_file+1].smallest_key);
if
(user_cmp->Compare(last_key_in_input, first_key_after) == 0) {
return
true
;
}
}
size_t
first_file = FindFile(cfd_->internal_comparator(), file_level,
inputs->front()->smallest.Encode());
assert
(0 <= first_file && first_file <= last_file);
if
(first_file > 0) {
const
Slice& first_key_in_input = ExtractUserKey(
files[first_file].smallest_key);
const
Slice& last_key_before = ExtractUserKey(
files[first_file-1].largest_key);
if
(user_cmp->Compare(first_key_in_input, last_key_before) == 0) {
return
true
;
}
}
return
false
;
}
int64_t Version::NumLevelBytes(
int
level)
const
{
assert
(level >= 0);
assert
(level < NumberLevels());
return
TotalFileSize(files_[level]);
}
const
char
* Version::LevelSummary(LevelSummaryStorage* scratch)
const
{
int
len = snprintf(scratch->buffer,
sizeof
(scratch->buffer),
"files["
);
for
(
int
i = 0; i < NumberLevels(); i++) {
int
sz =
sizeof
(scratch->buffer) - len;
int
ret = snprintf(scratch->buffer + len, sz,
"%d "
,
int
(files_[i].size()));
if
(ret < 0 || ret >= sz)
break
;
len += ret;
}
if
(len > 0) {
--len;
}
snprintf(scratch->buffer + len,
sizeof
(scratch->buffer) - len,
"]"
);
return
scratch->buffer;
}
const
char
* Version::LevelFileSummary(FileSummaryStorage* scratch,
int
level)
const
{
int
len = snprintf(scratch->buffer,
sizeof
(scratch->buffer),
"files_size["
);
for
(
const
auto
& f : files_[level]) {
int
sz =
sizeof
(scratch->buffer) - len;
char
sztxt[16];
AppendHumanBytes(f->fd.GetFileSize(), sztxt, 16);
int
ret = snprintf(scratch->buffer + len, sz,
"#%"
PRIu64
"(seq=%"
PRIu64
",sz=%s,%d) "
,
f->fd.GetNumber(), f->smallest_seqno, sztxt,
static_cast
<
int
>(f->being_compacted));
if
(ret < 0 || ret >= sz)
break
;
len += ret;
}
if
(files_[level].size() && len > 0) {
--len;
}
snprintf(scratch->buffer + len,
sizeof
(scratch->buffer) - len,
"]"
);
return
scratch->buffer;
}
int64_t Version::MaxNextLevelOverlappingBytes() {
uint64_t result = 0;
std::vector<FileMetaData*> overlaps;
for
(
int
level = 1; level < NumberLevels() - 1; level++) {
for
(
const
auto
& f : files_[level]) {
GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
const
uint64_t sum = TotalFileSize(overlaps);
if
(sum > result) {
result = sum;
}
}
}
return
result;
}
void
Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
for
(
int
level = 0; level < NumberLevels(); level++) {
const
std::vector<FileMetaData*>& files = files_[level];
for
(
const
auto
& file : files) {
live->push_back(file->fd);
}
}
}
std::string Version::DebugString(
bool
hex)
const
{
std::string r;
for
(
int
level = 0; level < num_levels_; level++) {
r.append(
"--- level "
);
AppendNumberTo(&r, level);
r.append(
" --- version# "
);
AppendNumberTo(&r, version_number_);
r.append(
" ---\n"
);
const
std::vector<FileMetaData*>& files = files_[level];
for
(
size_t
i = 0; i < files.size(); i++) {
r.push_back(
' '
);
AppendNumberTo(&r, files[i]->fd.GetNumber());
r.push_back(
':'
);
AppendNumberTo(&r, files[i]->fd.GetFileSize());
r.append(
"["
);
r.append(files[i]->smallest.DebugString(hex));
r.append(
" .. "
);
r.append(files[i]->largest.DebugString(hex));
r.append(
"]\n"
);
}
}
return
r;
}
struct
VersionSet::ManifestWriter {
Status status;
bool
done;
port::CondVar cv;
ColumnFamilyData* cfd;
VersionEdit* edit;
explicit
ManifestWriter(port::Mutex* mu, ColumnFamilyData* cfd,
VersionEdit* e)
: done(
false
), cv(mu), cfd(cfd), edit(e) {}
};
class
VersionSet::Builder {
private
:
struct
FileComparator {
enum
SortMethod {
kLevel0 = 0,
kLevelNon0 = 1,
} sort_method;
const
InternalKeyComparator* internal_comparator;
bool
operator()(FileMetaData* f1, FileMetaData* f2)
const
{
switch
(sort_method) {
case
kLevel0:
return
NewestFirstBySeqNo(f1, f2);
case
kLevelNon0:
return
BySmallestKey(f1, f2, internal_comparator);
}
assert
(
false
);
return
false
;
}
};
typedef
std::set<FileMetaData*, FileComparator> FileSet;
struct
LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};
ColumnFamilyData* cfd_;
Version* base_;
LevelState* levels_;
FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_;
public
:
Builder(ColumnFamilyData* cfd) : cfd_(cfd), base_(cfd->current()) {
base_->Ref();
levels_ =
new
LevelState[base_->NumberLevels()];
level_zero_cmp_.sort_method = FileComparator::kLevel0;
level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
level_nonzero_cmp_.internal_comparator = &cfd->internal_comparator();
levels_[0].added_files =
new
FileSet(level_zero_cmp_);
for
(
int
level = 1; level < base_->NumberLevels(); level++) {
levels_[level].added_files =
new
FileSet(level_nonzero_cmp_);
}
}
~Builder() {
for
(
int
level = 0; level < base_->NumberLevels(); level++) {
const
FileSet* added = levels_[level].added_files;
std::vector<FileMetaData*> to_unref;
to_unref.reserve(added->size());
for
(FileSet::const_iterator it = added->begin();
it != added->end(); ++it) {
to_unref.push_back(*it);
}
delete
added;
for
(uint32_t i = 0; i < to_unref.size(); i++) {
FileMetaData* f = to_unref[i];
f->refs--;
if
(f->refs <= 0) {
if
(f->table_reader_handle) {
cfd_->table_cache()->ReleaseHandle(f->table_reader_handle);
f->table_reader_handle =
nullptr
;
}
delete
f;
}
}
}
delete
[] levels_;
base_->Unref();
}
void
CheckConsistency(Version* v) {
#ifndef NDEBUG
for
(
int
level = 0; level < v->NumberLevels(); level++) {
for
(
size_t
i = 1; i < v->files_[level].size(); i++) {
auto
f1 = v->files_[level][i - 1];
auto
f2 = v->files_[level][i];
if
(level == 0) {
assert
(level_zero_cmp_(f1, f2));
assert
(f1->largest_seqno > f2->largest_seqno);
}
else
{
assert
(level_nonzero_cmp_(f1, f2));
if
(cfd_->internal_comparator().Compare(f1->largest, f2->smallest) >=
0) {
fprintf
(stderr,
"overlapping ranges in same level %s vs. %s\n"
,
(f1->largest).DebugString().c_str(),
(f2->smallest).DebugString().c_str());
abort
();
}
}
}
}
#endif
}
void
CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
int
level) {
#ifndef NDEBUG
bool
found =
false
;
for
(
int
l = 0; !found && l < base_->NumberLevels(); l++) {
const
std::vector<FileMetaData*>& base_files = base_->files_[l];
for
(unsigned
int
i = 0; i < base_files.size(); i++) {
FileMetaData* f = base_files[i];
if
(f->fd.GetNumber() == number) {
found =
true
;
break
;
}
}
}
for
(
int
l = level+1; !found && l < base_->NumberLevels(); l++) {
const
FileSet* added = levels_[l].added_files;
for
(FileSet::const_iterator added_iter = added->begin();
added_iter != added->end(); ++added_iter) {
FileMetaData* f = *added_iter;
if
(f->fd.GetNumber() == number) {
found =
true
;
break
;
}
}
}
if
(!found) {
const
FileSet* added = levels_[level].added_files;
for
(FileSet::const_iterator added_iter = added->begin();
added_iter != added->end(); ++added_iter) {
FileMetaData* f = *added_iter;
if
(f->fd.GetNumber() == number) {
found =
true
;
break
;
}
}
}
if
(!found) {
fprintf
(stderr,
"not found %"
PRIu64
"\n"
, number);
}
assert
(found);
#endif
}
void
Apply(VersionEdit* edit) {
CheckConsistency(base_);
const
VersionEdit::DeletedFileSet& del = edit->deleted_files_;
for
(
const
auto
& del_file : del) {
const
auto
level = del_file.first;
const
auto
number = del_file.second;
levels_[level].deleted_files.insert(number);
CheckConsistencyForDeletes(edit, number, level);
}
for
(
const
auto
& new_file : edit->new_files_) {
const
int
level = new_file.first;
FileMetaData* f =
new
FileMetaData(new_file.second);
f->refs = 1;
levels_[level].deleted_files.erase(f->fd.GetNumber());
levels_[level].added_files->insert(f);
}
}
void
SaveTo(Version* v) {
CheckConsistency(base_);
CheckConsistency(v);
for
(
int
level = 0; level < base_->NumberLevels(); level++) {
const
auto
& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
const
auto
& base_files = base_->files_[level];
auto
base_iter = base_files.begin();
auto
base_end = base_files.end();
const
auto
& added_files = *levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added_files.size());
for
(
const
auto
& added : added_files) {
for
(
auto
bpos = std::upper_bound(base_iter, base_end, added, cmp);
base_iter != bpos;
++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
MaybeAddFile(v, level, added);
}
for
(; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
}
CheckConsistency(v);
}
void
LoadTableHandlers() {
for
(
int
level = 0; level < cfd_->NumberLevels(); level++) {
for
(
auto
& file_meta : *(levels_[level].added_files)) {
assert
(!file_meta->table_reader_handle);
cfd_->table_cache()->FindTable(
base_->vset_->storage_options_, cfd_->internal_comparator(),
file_meta->fd, &file_meta->table_reader_handle,
false
);
if
(file_meta->table_reader_handle !=
nullptr
) {
file_meta->fd.table_reader =
cfd_->table_cache()->GetTableReaderFromHandle(
file_meta->table_reader_handle);
}
}
}
}
void
MaybeAddFile(Version* v,
int
level, FileMetaData* f) {
if
(levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
}
else
{
auto
* files = &v->files_[level];
if
(level > 0 && !files->empty()) {
assert
(cfd_->internal_comparator().Compare(
(*files)[files->size() - 1]->largest, f->smallest) < 0);
}
f->refs++;
files->push_back(f);
}
}
};
VersionSet::VersionSet(
const
std::string& dbname,
const
DBOptions* options,
const
EnvOptions& storage_options, Cache* table_cache)
: column_family_set_(
new
ColumnFamilySet(dbname, options, storage_options,
table_cache)),
env_(options->env),
dbname_(dbname),
options_(options),
next_file_number_(2),
manifest_file_number_(0),
pending_manifest_file_number_(0),
last_sequence_(0),
prev_log_number_(0),
current_version_number_(0),
manifest_file_size_(0),
storage_options_(storage_options),
storage_options_compactions_(storage_options_) {}
VersionSet::~VersionSet() {
column_family_set_.reset();
for
(
auto
file : obsolete_files_) {
delete
file;
}
obsolete_files_.clear();
}
void
VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Version* v) {
assert
(v->refs_ == 0);
Version* current = column_family_data->current();
assert
(v != current);
if
(current !=
nullptr
) {
assert
(current->refs_ > 0);
current->Unref();
}
column_family_data->SetCurrent(v);
v->Ref();
v->prev_ = column_family_data->dummy_versions()->prev_;
v->next_ = column_family_data->dummy_versions();
v->prev_->next_ = v;
v->next_->prev_ = v;
}
Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
VersionEdit* edit, port::Mutex* mu,
Directory* db_directory,
bool
new_descriptor_log,
const
ColumnFamilyOptions* options) {
mu->AssertHeld();
if
(column_family_data ==
nullptr
) {
assert
(edit->is_column_family_add_);
assert
(options !=
nullptr
);
}
ManifestWriter w(mu, column_family_data, edit);
manifest_writers_.push_back(&w);
while
(!w.done && &w != manifest_writers_.front()) {
w.cv.Wait();
}
if
(w.done) {
return
w.status;
}
if
(column_family_data !=
nullptr
&& column_family_data->IsDropped()) {
manifest_writers_.pop_front();
if
(!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
return
Status::OK();
}
std::vector<VersionEdit*> batch_edits;
Version* v =
nullptr
;
std::unique_ptr<Builder> builder(
nullptr
);
ManifestWriter* last_writer = &w;
assert
(!manifest_writers_.empty());
assert
(manifest_writers_.front() == &w);
if
(edit->IsColumnFamilyManipulation()) {
LogAndApplyCFHelper(edit);
batch_edits.push_back(edit);
}
else
{
v =
new
Version(column_family_data,
this
, current_version_number_++);
builder.reset(
new
Builder(column_family_data));
for
(
const
auto
& writer : manifest_writers_) {
if
(writer->edit->IsColumnFamilyManipulation() ||
writer->cfd->GetID() != column_family_data->GetID()) {
break
;
}
last_writer = writer;
LogAndApplyHelper(column_family_data, builder.get(), v, last_writer->edit,
mu);
batch_edits.push_back(last_writer->edit);
}
builder->SaveTo(v);
}
uint64_t new_manifest_file_size = 0;
Status s;
assert
(pending_manifest_file_number_ == 0);
if
(!descriptor_log_ ||
manifest_file_size_ > options_->max_manifest_file_size) {
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_);
new_descriptor_log =
true
;
}
else
{
pending_manifest_file_number_ = manifest_file_number_;
}
if
(new_descriptor_log) {
if
(column_family_set_->GetMaxColumnFamily() > 0) {
edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
}
}
{
std::vector<uint64_t> size_being_compacted;
if
(!edit->IsColumnFamilyManipulation()) {
size_being_compacted.resize(v->NumberLevels() - 1);
column_family_data->compaction_picker()->SizeBeingCompacted(
size_being_compacted);
}
mu->Unlock();
if
(!edit->IsColumnFamilyManipulation() && options_->max_open_files == -1) {
builder->LoadTableHandlers();
}
if
(new_descriptor_log) {
Log(options_->info_log,
"Creating manifest %"
PRIu64
"\n"
, pending_manifest_file_number_);
unique_ptr<WritableFile> descriptor_file;
s = env_->NewWritableFile(
DescriptorFileName(dbname_, pending_manifest_file_number_),
&descriptor_file, env_->OptimizeForManifestWrite(storage_options_));
if
(s.ok()) {
descriptor_file->SetPreallocationBlockSize(
options_->manifest_preallocation_size);
descriptor_log_.reset(
new
log
::Writer(std::move(descriptor_file)));
s = WriteSnapshot(descriptor_log_.get());
}
}
if
(!edit->IsColumnFamilyManipulation()) {
v->PrepareApply(size_being_compacted);
}
if
(s.ok()) {
for
(
auto
& e : batch_edits) {
std::string record;
e->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if
(!s.ok()) {
break
;
}
}
if
(s.ok()) {
if
(options_->use_fsync) {
StopWatch sw(env_, options_->statistics.get(),
MANIFEST_FILE_SYNC_MICROS);
s = descriptor_log_->file()->Fsync();
}
else
{
StopWatch sw(env_, options_->statistics.get(),
MANIFEST_FILE_SYNC_MICROS);
s = descriptor_log_->file()->Sync();
}
}
if
(!s.ok()) {
Log(options_->info_log,
"MANIFEST write: %s\n"
, s.ToString().c_str());
bool
all_records_in =
true
;
for
(
auto
& e : batch_edits) {
std::string record;
e->EncodeTo(&record);
if
(!ManifestContains(pending_manifest_file_number_, record)) {
all_records_in =
false
;
break
;
}
}
if
(all_records_in) {
Log(options_->info_log,
"MANIFEST contains log record despite error; advancing to new "
"version to prevent mismatch between in-memory and logged state"
" If paranoid is set, then the db is now in readonly mode."
);
s = Status::OK();
}
}
}
if
(s.ok() && new_descriptor_log) {
s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
db_directory);
if
(s.ok() && pending_manifest_file_number_ > manifest_file_number_) {
Log(options_->info_log,
"Deleting manifest %"
PRIu64
" current manifest %"
PRIu64
"\n"
,
manifest_file_number_, pending_manifest_file_number_);
env_->DeleteFile(DescriptorFileName(dbname_, manifest_file_number_));
}
}
if
(s.ok()) {
new_manifest_file_size = descriptor_log_->file()->GetFileSize();
}
LogFlush(options_->info_log);
mu->Lock();
}
if
(s.ok()) {
if
(edit->is_column_family_add_) {
assert
(batch_edits.size() == 1);
assert
(options !=
nullptr
);
CreateColumnFamily(*options, edit);
}
else
if
(edit->is_column_family_drop_) {
assert
(batch_edits.size() == 1);
column_family_data->SetDropped();
if
(column_family_data->Unref()) {
delete
column_family_data;
}
}
else
{
uint64_t max_log_number_in_batch = 0;
for
(
auto
& e : batch_edits) {
if
(e->has_log_number_) {
max_log_number_in_batch =
std::max(max_log_number_in_batch, e->log_number_);
}
}
if
(max_log_number_in_batch != 0) {
assert
(column_family_data->GetLogNumber() <= max_log_number_in_batch);
column_family_data->SetLogNumber(max_log_number_in_batch);
}
AppendVersion(column_family_data, v);
}
manifest_file_number_ = pending_manifest_file_number_;
manifest_file_size_ = new_manifest_file_size;
prev_log_number_ = edit->prev_log_number_;
}
else
{
Log(options_->info_log,
"Error in committing version %lu to [%s]"
,
(unsigned
long
)v->GetVersionNumber(),
column_family_data->GetName().c_str());
delete
v;
if
(new_descriptor_log) {
Log(options_->info_log,
"Deleting manifest %"
PRIu64
" current manifest %"
PRIu64
"\n"
,
manifest_file_number_, pending_manifest_file_number_);
descriptor_log_.reset();
env_->DeleteFile(
DescriptorFileName(dbname_, pending_manifest_file_number_));
}
}
pending_manifest_file_number_ = 0;
while
(
true
) {
ManifestWriter* ready = manifest_writers_.front();
manifest_writers_.pop_front();
if
(ready != &w) {
ready->status = s;
ready->done =
true
;
ready->cv.Signal();
}
if
(ready == last_writer)
break
;
}
if
(!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
return
s;
}
void
VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
assert
(edit->IsColumnFamilyManipulation());
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
if
(edit->is_column_family_drop_) {
edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
}
}
void
VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder,
Version* v, VersionEdit* edit,
port::Mutex* mu) {
mu->AssertHeld();
assert
(!edit->IsColumnFamilyManipulation());
if
(edit->has_log_number_) {
assert
(edit->log_number_ >= cfd->GetLogNumber());
assert
(edit->log_number_ < next_file_number_);
}
if
(!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
builder->Apply(edit);
}
Status VersionSet::Recover(
const
std::vector<ColumnFamilyDescriptor>& column_families,
bool
read_only) {
std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
for
(
auto
cf : column_families) {
cf_name_to_options.insert({cf.name, cf.options});
}
std::unordered_map<
int
, std::string> column_families_not_found;
std::string manifest_filename;
Status s = ReadFileToString(
env_, CurrentFileName(dbname_), &manifest_filename
);
if
(!s.ok()) {
return
s;
}
if
(manifest_filename.empty() ||
manifest_filename.back() !=
'\n'
) {
return
Status::Corruption(
"CURRENT file does not end with newline"
);
}
manifest_filename.resize(manifest_filename.size() - 1);
FileType type;
bool
parse_ok =
ParseFileName(manifest_filename, &manifest_file_number_, &type);
if
(!parse_ok || type != kDescriptorFile) {
return
Status::Corruption(
"CURRENT file corrupted"
);
}
Log(options_->info_log,
"Recovering from manifest file: %s\n"
,
manifest_filename.c_str());
manifest_filename = dbname_ +
"/"
+ manifest_filename;
unique_ptr<SequentialFile> manifest_file;
s = env_->NewSequentialFile(manifest_filename, &manifest_file,
storage_options_);
if
(!s.ok()) {
return
s;
}
uint64_t manifest_file_size;
s = env_->GetFileSize(manifest_filename, &manifest_file_size);
if
(!s.ok()) {
return
s;
}
bool
have_log_number =
false
;
bool
have_prev_log_number =
false
;
bool
have_next_file =
false
;
bool
have_last_sequence =
false
;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
uint32_t max_column_family = 0;
std::unordered_map<uint32_t, Builder*> builders;
auto
default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
if
(default_cf_iter == cf_name_to_options.end()) {
return
Status::InvalidArgument(
"Default column family not specified"
);
}
VersionEdit default_cf_edit;
default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
builders.insert({0,
new
Builder(default_cfd)});
{
VersionSet::LogReporter reporter;
reporter.status = &s;
log
::Reader reader(std::move(manifest_file), &reporter,
true
,
0
);
Slice record;
std::string scratch;
while
(reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if
(!s.ok()) {
break
;
}
bool
cf_in_not_found =
column_families_not_found.find(edit.column_family_) !=
column_families_not_found.end();
bool
cf_in_builders =
builders.find(edit.column_family_) != builders.end();
assert
(!(cf_in_not_found && cf_in_builders));
ColumnFamilyData* cfd =
nullptr
;
if
(edit.is_column_family_add_) {
if
(cf_in_builders || cf_in_not_found) {
s = Status::Corruption(
"Manifest adding the same column family twice"
);
break
;
}
auto
cf_options = cf_name_to_options.find(edit.column_family_name_);
if
(cf_options == cf_name_to_options.end()) {
column_families_not_found.insert(
{edit.column_family_, edit.column_family_name_});
}
else
{
cfd = CreateColumnFamily(cf_options->second, &edit);
builders.insert({edit.column_family_,
new
Builder(cfd)});
}
}
else
if
(edit.is_column_family_drop_) {
if
(cf_in_builders) {
auto
builder = builders.find(edit.column_family_);
assert
(builder != builders.end());
delete
builder->second;
builders.erase(builder);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
if
(cfd->Unref()) {
delete
cfd;
cfd =
nullptr
;
}
else
{
assert
(
false
);
}
}
else
if
(cf_in_not_found) {
column_families_not_found.erase(edit.column_family_);
}
else
{
s = Status::Corruption(
"Manifest - dropping non-existing column family"
);
break
;
}
}
else
if
(!cf_in_not_found) {
if
(!cf_in_builders) {
s = Status::Corruption(
"Manifest record referencing unknown column family"
);
break
;
}
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert
(cfd !=
nullptr
);
if
(edit.max_level_ >= cfd->current()->NumberLevels()) {
s = Status::InvalidArgument(
"db has more levels than options.num_levels"
);
break
;
}
auto
builder = builders.find(edit.column_family_);
assert
(builder != builders.end());
builder->second->Apply(&edit);
}
if
(cfd !=
nullptr
) {
if
(edit.has_log_number_) {
if
(cfd->GetLogNumber() > edit.log_number_) {
Log(options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing"
);
}
else
{
cfd->SetLogNumber(edit.log_number_);
have_log_number =
true
;
}
}
if
(edit.has_comparator_ &&
edit.comparator_ != cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
cfd->user_comparator()->Name(),
"does not match existing comparator "
+ edit.comparator_);
break
;
}
}
if
(edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number =
true
;
}
if
(edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file =
true
;
}
if
(edit.has_max_column_family_) {
max_column_family = edit.max_column_family_;
}
if
(edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence =
true
;
}
}
}
if
(s.ok()) {
if
(!have_next_file) {
s = Status::Corruption(
"no meta-nextfile entry in descriptor"
);
}
else
if
(!have_log_number) {
s = Status::Corruption(
"no meta-lognumber entry in descriptor"
);
}
else
if
(!have_last_sequence) {
s = Status::Corruption(
"no last-sequence-number entry in descriptor"
);
}
if
(!have_prev_log_number) {
prev_log_number = 0;
}
column_family_set_->UpdateMaxColumnFamily(max_column_family);
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if
(read_only ==
false
&& column_families_not_found.size() > 0) {
std::string list_of_not_found;
for
(
const
auto
& cf : column_families_not_found) {
list_of_not_found +=
", "
+ cf.second;
}
list_of_not_found = list_of_not_found.substr(2);
s = Status::InvalidArgument(
"You have to open all column families. Column families not opened: "
+
list_of_not_found);
}
if
(s.ok()) {
for
(
auto
cfd : *column_family_set_) {
auto
builders_iter = builders.find(cfd->GetID());
assert
(builders_iter != builders.end());
auto
builder = builders_iter->second;
if
(options_->max_open_files == -1) {
builder->LoadTableHandlers();
}
Version* v =
new
Version(cfd,
this
, current_version_number_++);
builder->SaveTo(v);
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(size_being_compacted);
AppendVersion(cfd, v);
}
manifest_file_size_ = manifest_file_size;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
prev_log_number_ = prev_log_number;
Log(options_->info_log,
"Recovered from manifest file:%s succeeded,"
"manifest_file_number is %lu, next_file_number is %lu, "
"last_sequence is %lu, log_number is %lu,"
"prev_log_number is %lu,"
"max_column_family is %u\n"
,
manifest_filename.c_str(), (unsigned
long
)manifest_file_number_,
(unsigned
long
)next_file_number_, (unsigned
long
)last_sequence_,
(unsigned
long
)log_number, (unsigned
long
)prev_log_number_,
column_family_set_->GetMaxColumnFamily());
for
(
auto
cfd : *column_family_set_) {
Log(options_->info_log,
"Column family [%s] (ID %u), log number is %"
PRIu64
"\n"
,
cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
}
}
for
(
auto
builder : builders) {
delete
builder.second;
}
return
s;
}
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
const
std::string& dbname, Env* env) {
EnvOptions soptions;
std::string current;
Status s = ReadFileToString(env, CurrentFileName(dbname), ¤t);
if
(!s.ok()) {
return
s;
}
if
(current.empty() || current[current.size()-1] !=
'\n'
) {
return
Status::Corruption(
"CURRENT file does not end with newline"
);
}
current.resize(current.size() - 1);
std::string dscname = dbname +
"/"
+ current;
unique_ptr<SequentialFile> file;
s = env->NewSequentialFile(dscname, &file, soptions);
if
(!s.ok()) {
return
s;
}
std::map<uint32_t, std::string> column_family_names;
column_family_names.insert({0, kDefaultColumnFamilyName});
VersionSet::LogReporter reporter;
reporter.status = &s;
log
::Reader reader(std::move(file), &reporter,
true
,
0
);
Slice record;
std::string scratch;
while
(reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if
(!s.ok()) {
break
;
}
if
(edit.is_column_family_add_) {
if
(column_family_names.find(edit.column_family_) !=
column_family_names.end()) {
s = Status::Corruption(
"Manifest adding the same column family twice"
);
break
;
}
column_family_names.insert(
{edit.column_family_, edit.column_family_name_});
}
else
if
(edit.is_column_family_drop_) {
if
(column_family_names.find(edit.column_family_) ==
column_family_names.end()) {
s = Status::Corruption(
"Manifest - dropping non-existing column family"
);
break
;
}
column_family_names.erase(edit.column_family_);
}
}
column_families->clear();
if
(s.ok()) {
for
(
const
auto
& iter : column_family_names) {
column_families->push_back(iter.second);
}
}
return
s;
}
#ifndef ROCKSDB_LITE
Status VersionSet::ReduceNumberOfLevels(
const
std::string& dbname,
const
Options* options,
const
EnvOptions& storage_options,
int
new_levels) {
if
(new_levels <= 1) {
return
Status::InvalidArgument(
"Number of levels needs to be bigger than 1"
);
}
ColumnFamilyOptions cf_options(*options);
std::shared_ptr<Cache> tc(NewLRUCache(
options->max_open_files - 10, options->table_cache_numshardbits,
options->table_cache_remove_scan_count_limit));
VersionSet versions(dbname, options, storage_options, tc.get());
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions(*options));
dummy.push_back(dummy_descriptor);
status = versions.Recover(dummy);
if
(!status.ok()) {
return
status;
}
Version* current_version =
versions.GetColumnFamilySet()->GetDefault()->current();
int
current_levels = current_version->NumberLevels();
if
(current_levels <= new_levels) {
return
Status::OK();
}
int
first_nonempty_level = -1;
int
first_nonempty_level_filenum = 0;
for
(
int
i = new_levels - 1; i < current_levels; i++) {
int
file_num = current_version->NumLevelFiles(i);
if
(file_num != 0) {
if
(first_nonempty_level < 0) {
first_nonempty_level = i;
first_nonempty_level_filenum = file_num;
}
else
{
char
msg[255];
snprintf(msg,
sizeof
(msg),
"Found at least two levels containing files: "
"[%d:%d],[%d:%d].\n"
,
first_nonempty_level, first_nonempty_level_filenum, i,
file_num);
return
Status::InvalidArgument(msg);
}
}
}
std::vector<FileMetaData*>* old_files_list = current_version->files_;
std::vector<FileMetaData*>* new_files_list =
new
std::vector<FileMetaData*>[current_levels];
for
(
int
i = 0; i < new_levels - 1; i++) {
new_files_list[i] = old_files_list[i];
}
if
(first_nonempty_level > 0) {
new_files_list[new_levels - 1] = old_files_list[first_nonempty_level];
}
delete
[] current_version->files_;
current_version->files_ = new_files_list;
current_version->num_levels_ = new_levels;
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
return
versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(), &ve,
&dummy_mutex,
nullptr
,
true
);
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
bool
verbose,
bool
hex) {
unique_ptr<SequentialFile> file;
Status s = options.env->NewSequentialFile(dscname, &file, storage_options_);
if
(!s.ok()) {
return
s;
}
bool
have_prev_log_number =
false
;
bool
have_next_file =
false
;
bool
have_last_sequence =
false
;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t prev_log_number = 0;
int
count = 0;
std::unordered_map<uint32_t, std::string> comparators;
std::unordered_map<uint32_t, Builder*> builders;
VersionEdit default_cf_edit;
default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
builders.insert({0,
new
Builder(default_cfd)});
{
VersionSet::LogReporter reporter;
reporter.status = &s;
log
::Reader reader(std::move(file), &reporter,
true
,
0
);
Slice record;
std::string scratch;
while
(reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if
(!s.ok()) {
break
;
}
if
(verbose) {
printf
(
"*************************Edit[%d] = %s\n"
,
count, edit.DebugString(hex).c_str());
}
count++;
bool
cf_in_builders =
builders.find(edit.column_family_) != builders.end();
if
(edit.has_comparator_) {
comparators.insert({edit.column_family_, edit.comparator_});
}
ColumnFamilyData* cfd =
nullptr
;
if
(edit.is_column_family_add_) {
if
(cf_in_builders) {
s = Status::Corruption(
"Manifest adding the same column family twice"
);
break
;
}
cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
builders.insert({edit.column_family_,
new
Builder(cfd)});
}
else
if
(edit.is_column_family_drop_) {
if
(!cf_in_builders) {
s = Status::Corruption(
"Manifest - dropping non-existing column family"
);
break
;
}
auto
builder_iter = builders.find(edit.column_family_);
delete
builder_iter->second;
builders.erase(builder_iter);
comparators.erase(edit.column_family_);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert
(cfd !=
nullptr
);
cfd->Unref();
delete
cfd;
cfd =
nullptr
;
}
else
{
if
(!cf_in_builders) {
s = Status::Corruption(
"Manifest record referencing unknown column family"
);
break
;
}
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert
(cfd !=
nullptr
);
auto
builder = builders.find(edit.column_family_);
assert
(builder != builders.end());
builder->second->Apply(&edit);
}
if
(cfd !=
nullptr
&& edit.has_log_number_) {
cfd->SetLogNumber(edit.log_number_);
}
if
(edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number =
true
;
}
if
(edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file =
true
;
}
if
(edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence =
true
;
}
if
(edit.has_max_column_family_) {
column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
}
}
}
file.reset();
if
(s.ok()) {
if
(!have_next_file) {
s = Status::Corruption(
"no meta-nextfile entry in descriptor"
);
printf
(
"no meta-nextfile entry in descriptor"
);
}
else
if
(!have_last_sequence) {
printf
(
"no last-sequence-number entry in descriptor"
);
s = Status::Corruption(
"no last-sequence-number entry in descriptor"
);
}
if
(!have_prev_log_number) {
prev_log_number = 0;
}
}
if
(s.ok()) {
for
(
auto
cfd : *column_family_set_) {
auto
builders_iter = builders.find(cfd->GetID());
assert
(builders_iter != builders.end());
auto
builder = builders_iter->second;
Version* v =
new
Version(cfd,
this
, current_version_number_++);
builder->SaveTo(v);
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(size_being_compacted);
delete
builder;
printf
(
"--------------- Column family \"%s\" (ID %u) --------------\n"
,
cfd->GetName().c_str(), (unsigned
int
)cfd->GetID());
printf
(
"log number: %lu\n"
, (unsigned
long
)cfd->GetLogNumber());
auto
comparator = comparators.find(cfd->GetID());
if
(comparator != comparators.end()) {
printf
(
"comparator: %s\n"
, comparator->second.c_str());
}
else
{
printf
(
"comparator: <NO COMPARATOR>\n"
);
}
printf
(
"%s \n"
, v->DebugString(hex).c_str());
delete
v;
}
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
prev_log_number_ = prev_log_number;
printf
(
"next_file_number %lu last_sequence "
"%lu prev_log_number %lu max_column_family %u\n"
,
(unsigned
long
)next_file_number_, (unsigned
long
)last_sequence,
(unsigned
long
)prev_log_number,
column_family_set_->GetMaxColumnFamily());
}
return
s;
}
#endif // ROCKSDB_LITE
void
VersionSet::MarkFileNumberUsed(uint64_t number) {
if
(next_file_number_ <= number) {
next_file_number_ = number + 1;
}
}
Status VersionSet::WriteSnapshot(
log
::Writer*
log
) {
for
(
auto
cfd : *column_family_set_) {
{
VersionEdit edit;
if
(cfd->GetID() != 0) {
edit.AddColumnFamily(cfd->GetName());
edit.SetColumnFamily(cfd->GetID());
}
edit.SetComparatorName(
cfd->internal_comparator().user_comparator()->Name());
std::string record;
edit.EncodeTo(&record);
Status s =
log
->AddRecord(record);
if
(!s.ok()) {
return
s;
}
}
{
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
for
(
int
level = 0; level < cfd->NumberLevels(); level++) {
for
(
const
auto
& f : cfd->current()->files_[level]) {
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
}
}
edit.SetLogNumber(cfd->GetLogNumber());
std::string record;
edit.EncodeTo(&record);
Status s =
log
->AddRecord(record);
if
(!s.ok()) {
return
s;
}
}
}
return
Status::OK();
}
bool
VersionSet::ManifestContains(uint64_t manifest_file_number,
const
std::string& record)
const
{
std::string fname =
DescriptorFileName(dbname_, manifest_file_number);
Log(options_->info_log,
"ManifestContains: checking %s\n"
, fname.c_str());
unique_ptr<SequentialFile> file;
Status s = env_->NewSequentialFile(fname, &file, storage_options_);
if
(!s.ok()) {
Log(options_->info_log,
"ManifestContains: %s\n"
, s.ToString().c_str());
Log(options_->info_log,
"ManifestContains: is unable to reopen the manifest file %s"
,
fname.c_str());
return
false
;
}
log
::Reader reader(std::move(file),
nullptr
,
true
, 0);
Slice r;
std::string scratch;
bool
result =
false
;
while
(reader.ReadRecord(&r, &scratch)) {
if
(r == Slice(record)) {
result =
true
;
break
;
}
}
Log(options_->info_log,
"ManifestContains: result = %d\n"
, result ? 1 : 0);
return
result;
}
uint64_t VersionSet::ApproximateOffsetOf(Version* v,
const
InternalKey& ikey) {
uint64_t result = 0;
for
(
int
level = 0; level < v->NumberLevels(); level++) {
const
std::vector<FileMetaData*>& files = v->files_[level];
for
(
size_t
i = 0; i < files.size(); i++) {
if
(v->cfd_->internal_comparator().Compare(files[i]->largest, ikey) <=
0) {
result += files[i]->fd.GetFileSize();
}
else
if
(v->cfd_->internal_comparator().Compare(files[i]->smallest,
ikey) > 0) {
if
(level > 0) {
break
;
}
}
else
{
TableReader* table_reader_ptr;
Iterator* iter = v->cfd_->table_cache()->NewIterator(
ReadOptions(), storage_options_, v->cfd_->internal_comparator(),
files[i]->fd, &table_reader_ptr);
if
(table_reader_ptr !=
nullptr
) {
result += table_reader_ptr->ApproximateOffsetOf(ikey.Encode());
}
delete
iter;
}
}
}
return
result;
}
void
VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
int64_t total_files = 0;
for
(
auto
cfd : *column_family_set_) {
Version* dummy_versions = cfd->dummy_versions();
for
(Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
for
(
int
level = 0; level < v->NumberLevels(); level++) {
total_files += v->files_[level].size();
}
}
}
live_list->reserve(live_list->size() + total_files);
for
(
auto
cfd : *column_family_set_) {
Version* dummy_versions = cfd->dummy_versions();
for
(Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
for
(
int
level = 0; level < v->NumberLevels(); level++) {
for
(
const
auto
& f : v->files_[level]) {
live_list->push_back(f->fd);
}
}
}
}
}
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
auto
cfd = c->column_family_data();
ReadOptions read_options;
read_options.verify_checksums =
cfd->options()->verify_checksums_in_compaction;
read_options.fill_cache =
false
;
const
int
space = (c->level() == 0 ?
c->input_levels(0)->num_files + c->num_input_levels() - 1:
c->num_input_levels());
Iterator** list =
new
Iterator*[space];
int
num = 0;
for
(
int
which = 0; which < c->num_input_levels(); which++) {
if
(c->input_levels(which)->num_files != 0) {
if
(c->level(which) == 0) {
const
FileLevel* flevel = c->input_levels(which);
for
(
size_t
i = 0; i < flevel->num_files; i++) {
list[num++] = cfd->table_cache()->NewIterator(
read_options, storage_options_compactions_,
cfd->internal_comparator(), flevel->files[i].fd,
nullptr
,
true
);
}
}
else
{
list[num++] = NewTwoLevelIterator(
new
Version::LevelFileIteratorState(
cfd->table_cache(), read_options, storage_options_,
cfd->internal_comparator(),
true
,
false
),
new
Version::LevelFileNumIterator(cfd->internal_comparator(),
c->input_levels(which)));
}
}
}
assert
(num <= space);
Iterator* result = NewMergingIterator(
&c->column_family_data()->internal_comparator(), list, num);
delete
[] list;
return
result;
}
bool
VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
#ifndef NDEBUG
Version* version = c->column_family_data()->current();
if
(c->input_version() != version) {
Log(options_->info_log,
"[%s] VerifyCompactionFileConsistency version mismatch"
,
c->column_family_data()->GetName().c_str());
}
int
level = c->level();
for
(
int
i = 0; i < c->num_input_files(0); i++) {
uint64_t number = c->input(0, i)->fd.GetNumber();
bool
found =
false
;
for
(unsigned
int
j = 0; j < version->files_[level].size(); j++) {
FileMetaData* f = version->files_[level][j];
if
(f->fd.GetNumber() == number) {
found =
true
;
break
;
}
}
if
(!found) {
return
false
;
}
}
level++;
for
(
int
i = 0; i < c->num_input_files(1); i++) {
uint64_t number = c->input(1, i)->fd.GetNumber();
bool
found =
false
;
for
(unsigned
int
j = 0; j < version->files_[level].size(); j++) {
FileMetaData* f = version->files_[level][j];
if
(f->fd.GetNumber() == number) {
found =
true
;
break
;
}
}
if
(!found) {
return
false
;
}
}
#endif
return
true
;
}
Status VersionSet::GetMetadataForFile(uint64_t number,
int
* filelevel,
FileMetaData** meta,
ColumnFamilyData** cfd) {
for
(
auto
cfd_iter : *column_family_set_) {
Version* version = cfd_iter->current();
for
(
int
level = 0; level < version->NumberLevels(); level++) {
for
(
const
auto
& file : version->files_[level]) {
if
(file->fd.GetNumber() == number) {
*meta = file;
*filelevel = level;
*cfd = cfd_iter;
return
Status::OK();
}
}
}
}
return
Status::NotFound(
"File not present in any level"
);
}
void
VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
for
(
auto
cfd : *column_family_set_) {
for
(
int
level = 0; level < cfd->NumberLevels(); level++) {
for
(
const
auto
& file : cfd->current()->files_[level]) {
LiveFileMetaData filemetadata;
filemetadata.column_family_name = cfd->GetName();
uint32_t path_id = file->fd.GetPathId();
if
(path_id < options_->db_paths.size()) {
filemetadata.db_path = options_->db_paths[path_id].path;
}
else
{
assert
(!options_->db_paths.empty());
filemetadata.db_path = options_->db_paths.back().path;
}
filemetadata.name = MakeTableFileName(
""
, file->fd.GetNumber());
filemetadata.level = level;
filemetadata.size = file->fd.GetFileSize();
filemetadata.smallestkey = file->smallest.user_key().ToString();
filemetadata.largestkey = file->largest.user_key().ToString();
filemetadata.smallest_seqno = file->smallest_seqno;
filemetadata.largest_seqno = file->largest_seqno;
metadata->push_back(filemetadata);
}
}
}
}
void
VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files) {
files->insert(files->end(), obsolete_files_.begin(), obsolete_files_.end());
obsolete_files_.clear();
}
ColumnFamilyData* VersionSet::CreateColumnFamily(
const
ColumnFamilyOptions& options, VersionEdit* edit) {
assert
(edit->is_column_family_add_);
Version* dummy_versions =
new
Version(
nullptr
,
this
);
auto
new_cfd = column_family_set_->CreateColumnFamily(
edit->column_family_name_, edit->column_family_, dummy_versions, options);
Version* v =
new
Version(new_cfd,
this
, current_version_number_++);
AppendVersion(new_cfd, v);
new_cfd->CreateNewMemtable();
new_cfd->SetLogNumber(edit->log_number_);
return
new_cfd;
}
}