LevelDB源码分析——8.版本管理

八.版本管理

本系列的上一篇介绍了 Sorted Table 的构建和读取过程。当 Sorted Table 不断构建出来时,需要使用适当的方式来组织、管理生成的 .ldb 文件。并且 LevelDB 支持快照,这需要 LevelDB 具有版本管理能力。本篇将分析 LevelDB 版本管理相关的代码。

1. 版本概述

LevelDB 中的版本管理是针对 Sorted Table 文件的变化的。当有新的内存数据库转为 Sorted Table,或者发生 Compaction 导致有 Sorted Table 增删,都会触发版本的更新。版本管理可以类比 git:

  1. 初始的时候是一个空的 repo(没有 Sorted Table 文件);
  2. 当有文件增删时,会在之前的版本上做增量的 commit 记录(VersionEdit);
  3. 根据之前的版本和 commit 记录,可以推断出现在的版本(Version);
  4. 根据初始状态和所有 commit 记录可以推断出所有版本(VersionSet);
  5. 使用 HEAD 指向当前使用的版本(CURRENT 文件)。

2. 版本 Version

首先来看没有未知依赖的 db/version_edit.h

1
2
3
4
5
6
7
8
9
10
struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}

int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
};

FileMetaData 记录了 .ldb 文件的元信息,包括允许查找的次数、文件编号 number 和大小 file_size 以及最小和最大的 Key。接下来是 VersionEdit 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class VersionEdit {
public:
VersionEdit() { Clear(); }
~VersionEdit() = default;

void Clear();

void SetComparatorName(const Slice& name) {
has_comparator_ = true;
comparator_ = name.ToString();
}
void SetLogNumber(uint64_t num) {
has_log_number_ = true;
log_number_ = num;
}
void SetPrevLogNumber(uint64_t num) {
has_prev_log_number_ = true;
prev_log_number_ = num;
}
void SetNextFile(uint64_t num) {
has_next_file_number_ = true;
next_file_number_ = num;
}
void SetLastSequence(SequenceNumber seq) {
has_last_sequence_ = true;
last_sequence_ = seq;
}
void SetCompactPointer(int level, const InternalKey& key) {
compact_pointers_.push_back(std::make_pair(level, key));
}

// Add the specified file at the specified number.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
void AddFile(int level, uint64_t file, uint64_t file_size,
const InternalKey& smallest, const InternalKey& largest) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.smallest = smallest;
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}

// Delete the specified "file" from the specified "level".
void DeleteFile(int level, uint64_t file) {
deleted_files_.insert(std::make_pair(level, file));
}

void EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& src);

std::string DebugString() const;

private:
friend class VersionSet;

typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;

std::string comparator_;
uint64_t log_number_;
uint64_t prev_log_number_;
uint64_t next_file_number_;
SequenceNumber last_sequence_;
bool has_comparator_;
bool has_log_number_;
bool has_prev_log_number_;
bool has_next_file_number_;
bool has_last_sequence_;

std::vector<std::pair<int, InternalKey>> compact_pointers_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;
};

VersionEdit 包含了几项可编辑属性:

  1. comparator_,比较器的名称;
  2. log_number_,日志编号;
  3. prev_log_number_,上一个日志编号;
  4. next_file_number_,下一个文件编号;
  5. last_sequence_,最后的序列号;
  6. compact_pointers_,暂时不清楚是什么;
  7. delted_files_,删除的文件,记录了 level 和文件号;
  8. new_files_,新增的文件,记录了 levelFileMetaData

上面几项属性有些还不清楚作用,先搁置不用慌。另外两个重要的接口 EncodeToDecodeFrom 负责编解码,实现在对应的 .cc 中,不在赘述。接下来,继续看 Version 的定义 db/version_set.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
class Version {
public:
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
// REQUIRES: lock is not held
struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
};

// Append to *iters a sequence of iterators that will
// yield the contents of this Version when merged together.
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);

Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
GetStats* stats);

// Adds "stats" into the current state. Returns true if a new
// compaction may need to be triggered, false otherwise.
// REQUIRES: lock is held
bool UpdateStats(const GetStats& stats);

// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes. Returns true if a new compaction may need to be triggered.
// REQUIRES: lock is held
bool RecordReadSample(Slice key);

// Reference count management (so Versions do not disappear out from
// under live iterators)
void Ref();
void Unref();

void GetOverlappingInputs(
int level,
const InternalKey* begin, // nullptr means before all keys
const InternalKey* end, // nullptr means after all keys
std::vector<FileMetaData*>* inputs);

// Returns true iff some file in the specified level overlaps
// some part of [*smallest_user_key,*largest_user_key].
// smallest_user_key==nullptr represents a key smaller than all the DB's keys.
// largest_user_key==nullptr represents a key largest than all the DB's keys.
bool OverlapInLevel(int level, const Slice* smallest_user_key,
const Slice* largest_user_key);

// Return the level at which we should place a new memtable compaction
// result that covers the range [smallest_user_key,largest_user_key].
int PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key);

int NumFiles(int level) const { return files_[level].size(); }

// Return a human readable string that describes this version's contents.
std::string DebugString() const;

private:
friend class Compaction;
friend class VersionSet;

class LevelFileNumIterator;

explicit Version(VersionSet* vset)
: vset_(vset),
next_(this),
prev_(this),
refs_(0),
file_to_compact_(nullptr),
file_to_compact_level_(-1),
compaction_score_(-1),
compaction_level_(-1) {}

Version(const Version&) = delete;
Version& operator=(const Version&) = delete;

~Version();

Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;

// Call func(arg, level, f) for every file that overlaps user_key in
// order from newest to oldest. If an invocation of func returns
// false, makes no more calls.
//
// REQUIRES: user portion of internal_key == user_key.
void ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
bool (*func)(void*, int, FileMetaData*));

VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version

// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];

// Next file to compact based on seek stats.
FileMetaData* file_to_compact_;
int file_to_compact_level_;

// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
double compaction_score_;
int compaction_level_;
};

首先把属性列出来:

  1. vset_VersionSet 对象指针,该类下文再介绍,搁置;
  2. next_prev_,成对出现,双向链表无疑;
  3. refs_,引用计数;
  4. files_,每一个 level 中的文件元信息列表;
  5. file_to_compact_file_to_compact_level_,准备合并的文件及其 level,搁置;
  6. compaction_score_compaction_level_,需要执行合并的 level 及打分,搁置。

对属性有一个印象就好。接下来拆开来看这个类成员函数的实现 db/version_set.cc,首先看迭代器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file.
// REQUIRES: "files" contains a sorted list of non-overlapping files.
int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files, const Slice& key) {
uint32_t left = 0;
uint32_t right = files.size();
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
// Key at "mid.largest" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
return right;
}

// An internal iterator. For a given version/level pair, yields
// information about the files in the level. For a given entry, key()
// is the largest key that occurs in the file, and value() is an
// 16-byte value containing the file number and file size, both
// encoded using EncodeFixed64.
class Version::LevelFileNumIterator : public Iterator {
public:
LevelFileNumIterator(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>* flist)
: icmp_(icmp), flist_(flist), index_(flist->size()) { // Marks as invalid
}
bool Valid() const override { return index_ < flist_->size(); }
void Seek(const Slice& target) override {
index_ = FindFile(icmp_, *flist_, target);
}
void SeekToFirst() override { index_ = 0; }
void SeekToLast() override {
index_ = flist_->empty() ? 0 : flist_->size() - 1;
}
void Next() override {
assert(Valid());
index_++;
}
void Prev() override {
assert(Valid());
if (index_ == 0) {
index_ = flist_->size(); // Marks as invalid
} else {
index_--;
}
}
Slice key() const override {
assert(Valid());
return (*flist_)[index_]->largest.Encode();
}
Slice value() const override {
assert(Valid());
EncodeFixed64(value_buf_, (*flist_)[index_]->number);
EncodeFixed64(value_buf_ + 8, (*flist_)[index_]->file_size);
return Slice(value_buf_, sizeof(value_buf_));
}
Status status() const override { return Status::OK(); }

private:
const InternalKeyComparator icmp_;
const std::vector<FileMetaData*>* const flist_;
uint32_t index_;

// Backing store for value(). Holds the file number and size.
mutable char value_buf_[16];
};

static Iterator* GetFileIterator(void* arg, const ReadOptions& options,
const Slice& file_value) {
TableCache* cache = reinterpret_cast<TableCache*>(arg);
if (file_value.size() != 16) {
return NewErrorIterator(
Status::Corruption("FileReader invoked with unexpected value"));
} else {
return cache->NewIterator(options, DecodeFixed64(file_value.data()),
DecodeFixed64(file_value.data() + 8));
}
}

Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(vset_->icmp_, &files_[level]), &GetFileIterator,
vset_->table_cache_, options);
}

void Version::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
// Merge all level zero files together since they may overlap
for (size_t i = 0; i < files_[0].size(); i++) {
iters->push_back(vset_->table_cache_->NewIterator(
options, files_[0][i]->number, files_[0][i]->file_size));
}

// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < config::kNumLevels; level++) {
if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, level));
}
}
}

FindFile 函数实现了一个简单的二分查找,可以在文件信息列表里快速找到第一个 largest_key >= key 的文件信息编号。而后这里定义了一个文件信息列表的迭代器 Version::LevelFileNumIterator,其实现的功能是将 largest_key 作为 Key,文件编号和大小作为 Value,遍历和检索文件信息列表。该函数将在 Version::NewConcatenatingIterator 中作为第一级迭代器,对应的第二级便是其 Value 对应的 Sorted Table 文件的迭代器 GetFileIterator。这样就可以根据某个 Level 的文件信息列表,生成该 Level 下所有 Sorted Table 数据的迭代器。这还没结束,Version::AddIterators 会将所有 Level 的迭代器组合成一个列表,用来生成一个 MergingIterator 以遍历所有 Level 的数据(实现位于 DBImpl::NewInternalIterator)。仔细体会这个精妙的设计,然后继续来看 Version::Get 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Callback from TableCache::Get()
namespace {
enum SaverState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
};
struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
std::string* value;
};
} // namespace
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
Saver* s = reinterpret_cast<Saver*>(arg);
ParsedInternalKey parsed_key;
if (!ParseInternalKey(ikey, &parsed_key)) {
s->state = kCorrupt;
} else {
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
if (s->state == kFound) {
s->value->assign(v.data(), v.size());
}
}
}
}

static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}

void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
bool (*func)(void*, int, FileMetaData*)) {
const Comparator* ucmp = vset_->icmp_.user_comparator();

// Search level-0 in order from newest to oldest.
std::vector<FileMetaData*> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}

// Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;

// Binary search to find earliest index whose largest key >= internal_key.
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}

Status Version::Get(const ReadOptions& options, const LookupKey& k,
std::string* value, GetStats* stats) {
stats->seek_file = nullptr;
stats->seek_file_level = -1;

struct State {
Saver saver;
GetStats* stats;
const ReadOptions* options;
Slice ikey;
FileMetaData* last_file_read;
int last_file_read_level;

VersionSet* vset;
Status s;
bool found;

static bool Match(void* arg, int level, FileMetaData* f) {
State* state = reinterpret_cast<State*>(arg);

if (state->stats->seek_file == nullptr &&
state->last_file_read != nullptr) {
// We have had more than one seek for this read. Charge the 1st file.
state->stats->seek_file = state->last_file_read;
state->stats->seek_file_level = state->last_file_read_level;
}

state->last_file_read = f;
state->last_file_read_level = level;

state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
if (!state->s.ok()) {
state->found = true;
return false;
}
switch (state->saver.state) {
case kNotFound:
return true; // Keep searching in other files
case kFound:
state->found = true;
return false;
case kDeleted:
return false;
case kCorrupt:
state->s =
Status::Corruption("corrupted key for ", state->saver.user_key);
state->found = true;
return false;
}
}
};

State state;
state.found = false;
state.stats = stats;
state.last_file_read = nullptr;
state.last_file_read_level = -1;

state.options = &options;
state.ikey = k.internal_key();
state.vset = vset_;

state.saver.state = kNotFound;
state.saver.ucmp = vset_->icmp_.user_comparator();
state.saver.user_key = k.user_key();
state.saver.value = value;

ForEachOverlapping(state.saver.user_key, state.ikey, &state, &State::Match);

return state.found ? state.s : Status::NotFound(Slice());
}

匿名空间中声明了枚举类 SaverState,为查找操作的四种状态:未找到,找到,删除和中断。Saver 负责记录输入的比较器和 user_key,以及输出的 SaverState 和查找得到的 valueSaveValue 作为查找操作的回调函数,将会在 Seek 操作完成后执行,其参数为 SaverState 及键值对。通过判断 user_key 是否一致,对 SaverState 进行更新。

再来看 Version::Get,其调用的 Version::ForEachOverlapping 会根据 smallest_keylargest_key 筛选出要查找的文件,再通过回调函数调用 table_cache_->Get 进行查找,如果找到合法的结果则调用回调函数 SaveValue,如果回调得到的结果是 kFound,就可以提前返回了。Level 0 的文件由于可能存在重叠,所以每个文件都需要进行判断;而 Level 1 及以上的文件可以使用 FindFile 二分查找了。

3. 版本集 VersionSet

首先来看 VersionSet::Builder 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
private:
// Helper to sort by v->files_[file_number].smallest
struct BySmallestKey {
const InternalKeyComparator* internal_comparator;

bool operator()(FileMetaData* f1, FileMetaData* f2) const {
int r = internal_comparator->Compare(f1->smallest, f2->smallest);
if (r != 0) {
return (r < 0);
} else {
// Break ties by file number
return (f1->number < f2->number);
}
}
};

typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};

VersionSet* vset_;
Version* base_;
LevelState levels_[config::kNumLevels];

public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) {
base_->Ref();
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
levels_[level].added_files = new FileSet(cmp);
}
}

~Builder() {
for (int level = 0; level < config::kNumLevels; level++) {
const FileSet* added = levels_[level].added_files;
std::vector<FileMetaData*> to_unref;
to_unref.reserve(added->size());
for (FileSet::const_iterator it = added->begin(); it != added->end();
++it) {
to_unref.push_back(*it);
}
delete added;
for (uint32_t i = 0; i < to_unref.size(); i++) {
FileMetaData* f = to_unref[i];
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
base_->Unref();
}

// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
// Update compaction pointers
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}

// Delete files
for (const auto& deleted_file_set_kvp : edit->deleted_files_) {
const int level = deleted_file_set_kvp.first;
const uint64_t number = deleted_file_set_kvp.second;
levels_[level].deleted_files.insert(number);
}

// Add new files
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;

// We arrange to automatically compact this file after
// a certain number of seeks. Let's assume:
// (1) One seek costs 10ms
// (2) Writing or reading 1MB costs 10ms (100MB/s)
// (3) A compaction of 1MB does 25MB of IO:
// 1MB read from this level
// 10-12MB read from next level (boundaries may be misaligned)
// 10-12MB written to next level
// This implies that 25 seeks cost the same as the compaction
// of 1MB of data. I.e., one seek costs approximately the
// same as the compaction of 40KB of data. We are a little
// conservative and allow approximately one seek for every 16KB
// of data before triggering a compaction.
f->allowed_seeks = static_cast<int>((f->file_size / 16384U));
if (f->allowed_seeks < 100) f->allowed_seeks = 100;

levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}

// Save the current state in *v.
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added_files = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added_files->size());
for (const auto& added_file : *added_files) {
// Add all smaller files listed in base_
for (std::vector<FileMetaData*>::const_iterator bpos =
std::upper_bound(base_iter, base_end, added_file, cmp);
base_iter != bpos; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}

MaybeAddFile(v, level, added_file);
}

// Add remaining base files
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}

#ifndef NDEBUG
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i - 1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
abort();
}
}
}
#endif
}
}

void MaybeAddFile(Version* v, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing
} else {
std::vector<FileMetaData*>* files = &v->files_[level];
if (level > 0 && !files->empty()) {
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size() - 1]->largest,
f->smallest) < 0);
}
f->refs++;
files->push_back(f);
}
}
};

VersionSet::Builder 中首先定义了一个比较器 BySmallestKey,其会按照文件信息中的 smallest 对文件信息集合 FileSet 中存储的 FileMetaData 排序;定义的结构体 LevelState 中则包括删除的文件编号列表 deleted_files 和新增的文件集合 added_filesVersionSet::Builder 的成员 levels_ 则储存所有 Level 的 LevelStateBuilder 的构造和析构完成必要的内存申请和释放,成员还包括版本集 vset_ 和基础版本 base,核心接口为 ApplySaveToApply 函数中先忽略 compact_pointer_ 相关的操作,剩下的就是将 edit 中的增删文件信息插入到 Builder::levels_ 里;而 SaveTo 则是将基础版本 base 中的文件信息和 edit 中的增删文件信息合并,按顺序插入到新版本 v 里。

接着来看 VersionSet 的定义 db/version_set.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class VersionSet {
public:
VersionSet(const std::string& dbname, const Options* options,
TableCache* table_cache, const InternalKeyComparator*);
VersionSet(const VersionSet&) = delete;
VersionSet& operator=(const VersionSet&) = delete;

~VersionSet();

// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
// current version. Will release *mu while actually writing to the file.
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit* edit, port::Mutex* mu)
EXCLUSIVE_LOCKS_REQUIRED(mu);

// Recover the last saved descriptor from persistent storage.
Status Recover(bool* save_manifest);

// Return the current version.
Version* current() const { return current_; }

// Return the current manifest file number
uint64_t ManifestFileNumber() const { return manifest_file_number_; }

// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_++; }

// Arrange to reuse "file_number" unless a newer file number has
// already been allocated.
// REQUIRES: "file_number" was returned by a call to NewFileNumber().
void ReuseFileNumber(uint64_t file_number) {
if (next_file_number_ == file_number + 1) {
next_file_number_ = file_number;
}
}

// Return the number of Table files at the specified level.
int NumLevelFiles(int level) const;

// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;

// Return the last sequence number.
uint64_t LastSequence() const { return last_sequence_; }

// Set the last sequence number to s.
void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_);
last_sequence_ = s;
}

// Mark the specified file number as used.
void MarkFileNumberUsed(uint64_t number);

// Return the current log file number.
uint64_t LogNumber() const { return log_number_; }

// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t PrevLogNumber() const { return prev_log_number_; }

// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
Compaction* PickCompaction();

// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
// the result.
Compaction* CompactRange(int level, const InternalKey* begin,
const InternalKey* end);

// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t MaxNextLevelOverlappingBytes();

// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
Iterator* MakeInputIterator(Compaction* c);

// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
}

// Add all files listed in any live version to *live.
// May also mutate some internal state.
void AddLiveFiles(std::set<uint64_t>* live);

// Return the approximate offset in the database of the data for
// "key" as of version "v".
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);

// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
struct LevelSummaryStorage {
char buffer[100];
};
const char* LevelSummary(LevelSummaryStorage* scratch) const;

private:
class Builder;

friend class Compaction;
friend class Version;

bool ReuseManifest(const std::string& dscname, const std::string& dscbase);

void Finalize(Version* v);

void GetRange(const std::vector<FileMetaData*>& inputs, InternalKey* smallest,
InternalKey* largest);

void GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest, InternalKey* largest);

void SetupOtherInputs(Compaction* c);

// Save current contents to *log
Status WriteSnapshot(log::Writer* log);

void AppendVersion(Version* v);

Env* const env_;
const std::string dbname_;
const Options* const options_;
TableCache* const table_cache_;
const InternalKeyComparator icmp_;
uint64_t next_file_number_;
uint64_t manifest_file_number_;
uint64_t last_sequence_;
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted

// Opened lazily
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;
Version dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions_.prev_

// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
};

定义很长,先放着,继续看实现的部分(顺序经过重排):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
void Version::Ref() { ++refs_; }

void Version::Unref() {
assert(this != &vset_->dummy_versions_);
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
delete this;
}
}

Version::~Version() {
assert(refs_ == 0);

// Remove from linked list
prev_->next_ = next_;
next_->prev_ = prev_;

// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
}

VersionSet::VersionSet(const std::string& dbname, const Options* options,
TableCache* table_cache,
const InternalKeyComparator* cmp)
: env_(options->env),
dbname_(dbname),
options_(options),
table_cache_(table_cache),
icmp_(*cmp),
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
last_sequence_(0),
log_number_(0),
prev_log_number_(0),
descriptor_file_(nullptr),
descriptor_log_(nullptr),
dummy_versions_(this),
current_(nullptr) {
AppendVersion(new Version(this));
}

VersionSet::~VersionSet() {
current_->Unref();
assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty
delete descriptor_log_;
delete descriptor_file_;
}

void VersionSet::AppendVersion(Version* v) {
// Make "v" current
assert(v->refs_ == 0);
assert(v != current_);
if (current_ != nullptr) {
current_->Unref();
}
current_ = v;
v->Ref();

// Append to linked list
v->prev_ = dummy_versions_.prev_;
v->next_ = &dummy_versions_;
v->prev_->next_ = v;
v->next_->prev_ = v;
}

VersionSet 构造函数的参数包括数据库的 nameoptions,缓存 table_cache 以及内部比较器 cmp。大部分成员变量都初始化为 0 或 nullptr,值得注意的是 next_file_number_=2,还有 dummy_versions_(this)dummpy_versions_ 注释中说明了是版本双向链表的头部,不会有其他实际功能。构造函数中会执行 AppendVersion 增加一个新版本,也就是在双向链表的尾部插入版本 v,并且将 current_ 指向这个最新的版本;而析构函数中会要求当 current_ 降低引用计数、完成可能的析构后,dummy_versions_ 所指向的双向链表为空。继续看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}

if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}

edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);

Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);

// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
if (descriptor_log_ == nullptr) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == nullptr);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}

// Unlock during expensive MANIFEST log write
{
mu->Unlock();

// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}

// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}

mu->Lock();
}

// Install the new version
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = nullptr;
descriptor_file_ = nullptr;
env_->DeleteFile(new_manifest_file);
}
}

return s;
}

void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;

for (int level = 0; level < config::kNumLevels - 1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}

if (score > best_score) {
best_level = level;
best_score = score;
}
}

v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}

核心接口 LogAndApply 会根据当前版本 current_ 和修订部分 edit,合成一个新版本 v,而后将修订的记录写入 Manifest 文件中,最后将新版本 AppendVersion 到版本集中作为新的 current_,这样就完成了一个新版本的构建。而 Recover 则对应从 Manifest 中恢复版本的过程。由于 Manifest 记录了所有的版本变更信息,使用 VersionSet::Builder 逐个 Apply 就可以获得存储的最新版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
Status VersionSet::Recover(bool* save_manifest) {
struct LogReporter : public log::Reader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};

// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size() - 1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);

std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
if (s.IsNotFound()) {
return Status::Corruption("CURRENT points to a non-existent file",
s.ToString());
}
return s;
}

bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);

{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true /*checksum*/,
0 /*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + " does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}

if (s.ok()) {
builder.Apply(&edit);
}

if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}

if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}

if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}

if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = nullptr;

if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}

if (!have_prev_log_number) {
prev_log_number = 0;
}

MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}

if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;

// See if we can reuse the existing MANIFEST file.
if (ReuseManifest(dscname, current)) {
// No need to save new manifest
} else {
*save_manifest = true;
}
}

return s;
}

bool VersionSet::ReuseManifest(const std::string& dscname,
const std::string& dscbase) {
if (!options_->reuse_logs) {
return false;
}
FileType manifest_type;
uint64_t manifest_number;
uint64_t manifest_size;
if (!ParseFileName(dscbase, &manifest_number, &manifest_type) ||
manifest_type != kDescriptorFile ||
!env_->GetFileSize(dscname, &manifest_size).ok() ||
// Make new compacted MANIFEST if old one is too big
manifest_size >= TargetFileSize(options_)) {
return false;
}

assert(descriptor_file_ == nullptr);
assert(descriptor_log_ == nullptr);
Status r = env_->NewAppendableFile(dscname, &descriptor_file_);
if (!r.ok()) {
Log(options_->info_log, "Reuse MANIFEST: %s\n", r.ToString().c_str());
assert(descriptor_file_ == nullptr);
return false;
}

Log(options_->info_log, "Reusing MANIFEST %s\n", dscname.c_str());
descriptor_log_ = new log::Writer(descriptor_file_, manifest_size);
manifest_file_number_ = manifest_number;
return true;
}

void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_ <= number) {
next_file_number_ = number + 1;
}
}

总结

本篇分析了版本管理相关的代码,包括 VersionEditVersionVersionSet 的实现。VersionSet 还有一大部分代码是与 Compaction 相关的,将会在下篇中继续分析。

  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2021-2022 Xufei Pan
  • Visitors: | Views:

请我喝杯奶茶吧~

支付宝
微信