LevelDB源码分析——7.Sorted String Table续

七.Sorted String Table续

本系列的上一篇介绍了 Sorted Table 的构建过程,本篇就继续分析 Sorted Table 的读取、解析过程。

4. Block

根据依赖关系,首先来看 table/format.ccReadBlock 函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
struct BlockContents {
Slice data; // Actual contents of data
bool cachable; // True iff data can be cached
bool heap_allocated; // True iff caller should delete[] data.data()
};

// Read the block identified by "handle" from "file". On failure
// return non-OK. On success fill *result and return OK.
Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
const BlockHandle& handle, BlockContents* result) {
result->data = Slice();
result->cachable = false;
result->heap_allocated = false;

// Read the block contents as well as the type/crc footer.
// See table_builder.cc for the code that built this structure.
size_t n = static_cast<size_t>(handle.size());
char* buf = new char[n + kBlockTrailerSize];
Slice contents;
Status s = file->Read(handle.offset(), n + kBlockTrailerSize, &contents, buf);
if (!s.ok()) {
delete[] buf;
return s;
}
if (contents.size() != n + kBlockTrailerSize) {
delete[] buf;
return Status::Corruption("truncated block read");
}

// Check the crc of the type and the block contents
const char* data = contents.data(); // Pointer to where Read put the data
if (options.verify_checksums) {
const uint32_t crc = crc32c::Unmask(DecodeFixed32(data + n + 1));
const uint32_t actual = crc32c::Value(data, n + 1);
if (actual != crc) {
delete[] buf;
s = Status::Corruption("block checksum mismatch");
return s;
}
}

switch (data[n]) {
case kNoCompression:
if (data != buf) {
// File implementation gave us pointer to some other data.
// Use it directly under the assumption that it will be live
// while the file is open.
delete[] buf;
result->data = Slice(data, n);
result->heap_allocated = false;
result->cachable = false; // Do not double-cache
} else {
result->data = Slice(buf, n);
result->heap_allocated = true;
result->cachable = true;
}

// Ok
break;
case kSnappyCompression: {
size_t ulength = 0;
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
return Status::Corruption("corrupted compressed block contents");
}
char* ubuf = new char[ulength];
if (!port::Snappy_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption("corrupted compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, ulength);
result->heap_allocated = true;
result->cachable = true;
break;
}
default:
delete[] buf;
return Status::Corruption("bad block type");
}

return Status::OK();
}

对于已经存储的 Sorted Table 文件,提供 ReadOptionsBlockHandle 后,可以将 handle 对应的 Block 内容读取到 BlockContents 中。该结构体储存 Block 的字节流,以及能否缓存、是否需要手动清理的标记。ReadBlock 的实现非常直接,读取文件对应位置的字节流,进行必要的校验和解压缩。继续看 Block 的解析部分 table/block.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct BlockContents;
class Comparator;

class Block {
public:
// Initialize the block with the specified contents.
explicit Block(const BlockContents& contents);

Block(const Block&) = delete;
Block& operator=(const Block&) = delete;

~Block();

size_t size() const { return size_; }
Iterator* NewIterator(const Comparator* comparator);

private:
class Iter;

uint32_t NumRestarts() const;

const char* data_;
size_t size_;
uint32_t restart_offset_; // Offset in data_ of restart array
bool owned_; // Block owns data_[]
};

接口的核心部分自然是迭代器。迭代器提供 Block 中键值对数据的遍历和查找。继续看对应函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
inline uint32_t Block::NumRestarts() const {
assert(size_ >= sizeof(uint32_t));
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}

Block::Block(const BlockContents& contents)
: data_(contents.data.data()),
size_(contents.data.size()),
owned_(contents.heap_allocated) {
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
size_t max_restarts_allowed = (size_ - sizeof(uint32_t)) / sizeof(uint32_t);
if (NumRestarts() > max_restarts_allowed) {
// The size is too small for NumRestarts()
size_ = 0;
} else {
restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
}
}
}

Block::~Block() {
if (owned_) {
delete[] data_;
}
}

// Helper routine: decode the next block entry starting at "p",
// storing the number of shared key bytes, non_shared key bytes,
// and the length of the value in "*shared", "*non_shared", and
// "*value_length", respectively. Will not dereference past "limit".
//
// If any errors are detected, returns nullptr. Otherwise, returns a
// pointer to the key delta (just past the three decoded values).
static inline const char* DecodeEntry(const char* p, const char* limit,
uint32_t* shared, uint32_t* non_shared,
uint32_t* value_length) {
if (limit - p < 3) return nullptr;
*shared = reinterpret_cast<const uint8_t*>(p)[0];
*non_shared = reinterpret_cast<const uint8_t*>(p)[1];
*value_length = reinterpret_cast<const uint8_t*>(p)[2];
if ((*shared | *non_shared | *value_length) < 128) {
// Fast path: all three values are encoded in one byte each
p += 3;
} else {
if ((p = GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
if ((p = GetVarint32Ptr(p, limit, non_shared)) == nullptr) return nullptr;
if ((p = GetVarint32Ptr(p, limit, value_length)) == nullptr) return nullptr;
}

if (static_cast<uint32_t>(limit - p) < (*non_shared + *value_length)) {
return nullptr;
}
return p;
}

按照之前描述的 Block 存储结构,最后 4 字节存储复活点的数量,如 NumRestarts 实现。构造时进行必要的判断,在 restart_offset_ 中存储复活点列表的位置。解析条目时,首先假设 sharednon_sharedvalue_length 都小于 128,尝试按照按字节读取长度以提高解析速度。在大部分情况下该规则都是满足的,当少数情况出现超长的串时,会回退到普通的 GetVarint32Ptr。继续看迭代器的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class Block::Iter : public Iterator {
private:
const Comparator* const comparator_;
const char* const data_; // underlying block contents
uint32_t const restarts_; // Offset of restart array (list of fixed32)
uint32_t const num_restarts_; // Number of uint32_t entries in restart array

// current_ is offset in data_ of current entry. >= restarts_ if !Valid
uint32_t current_;
uint32_t restart_index_; // Index of restart block in which current_ falls
std::string key_;
Slice value_;
Status status_;

inline int Compare(const Slice& a, const Slice& b) const {
return comparator_->Compare(a, b);
}

// Return the offset in data_ just past the end of the current entry.
inline uint32_t NextEntryOffset() const {
return (value_.data() + value_.size()) - data_;
}

uint32_t GetRestartPoint(uint32_t index) {
assert(index < num_restarts_);
return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
}

void SeekToRestartPoint(uint32_t index) {
key_.clear();
restart_index_ = index;
// current_ will be fixed by ParseNextKey();

// ParseNextKey() starts at the end of value_, so set value_ accordingly
uint32_t offset = GetRestartPoint(index);
value_ = Slice(data_ + offset, 0);
}

public:
Iter(const Comparator* comparator, const char* data, uint32_t restarts,
uint32_t num_restarts)
: comparator_(comparator),
data_(data),
restarts_(restarts),
num_restarts_(num_restarts),
current_(restarts_),
restart_index_(num_restarts_) {
assert(num_restarts_ > 0);
}

bool Valid() const override { return current_ < restarts_; }
Status status() const override { return status_; }
Slice key() const override {
assert(Valid());
return key_;
}
Slice value() const override {
assert(Valid());
return value_;
}

void Next() override {
assert(Valid());
ParseNextKey();
}

void Prev() override {
assert(Valid());

// Scan backwards to a restart point before current_
const uint32_t original = current_;
while (GetRestartPoint(restart_index_) >= original) {
if (restart_index_ == 0) {
// No more entries
current_ = restarts_;
restart_index_ = num_restarts_;
return;
}
restart_index_--;
}

SeekToRestartPoint(restart_index_);
do {
// Loop until end of current entry hits the start of original entry
} while (ParseNextKey() && NextEntryOffset() < original);
}

void SeekToFirst() override {
SeekToRestartPoint(0);
ParseNextKey();
}

void SeekToLast() override {
SeekToRestartPoint(num_restarts_ - 1);
while (ParseNextKey() && NextEntryOffset() < restarts_) {
// Keep skipping
}
}

private:
void CorruptionError() {
current_ = restarts_;
restart_index_ = num_restarts_;
status_ = Status::Corruption("bad entry in block");
key_.clear();
value_.clear();
}

bool ParseNextKey() {
current_ = NextEntryOffset();
const char* p = data_ + current_;
const char* limit = data_ + restarts_; // Restarts come right after data
if (p >= limit) {
// No more entries to return. Mark as invalid.
current_ = restarts_;
restart_index_ = num_restarts_;
return false;
}

// Decode next entry
uint32_t shared, non_shared, value_length;
p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
if (p == nullptr || key_.size() < shared) {
CorruptionError();
return false;
} else {
key_.resize(shared);
key_.append(p, non_shared);
value_ = Slice(p + non_shared, value_length);
while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}
return true;
}
}
};

Iterator* Block::NewIterator(const Comparator* comparator) {
if (size_ < sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
return NewEmptyIterator();
} else {
return new Iter(comparator, data_, restart_offset_, num_restarts);
}
}

先看成员变量:

  1. Block 的字节流存储于 data_ 中;
  2. restarts_num_restarts_ 存储复活点列表的偏移和数量;
  3. current_ 存储当前迭代器的偏移,restart_index_ 存储 current_ 前面最近的复活点偏移;
  4. key_value_ 存储键值对。注意 key_std::string,因为有共享前缀,需要存储中间恢复的 Key,而 value_ 可以直接从 data_ 中截取。

函数 NextEntryOffset 根据当前的 value 的位置和大小计算下一个键值对的起始位置,因为每个条目最后存储的是 value。函数 GetRestartPoint 读取第 index 个复活点的位置,函数 SeekToRestartPoint 将当前的 key_ 清空、设定 restart_index_ 并将 value_ 设为复活点前的空串,以便于执行 NextEntryOffset 时获得对应复活点偏移。跳到函数 ParseNextKey,将 current_ 设为下一个键值对的起点,通过 DecodeEntry 解析得到需要的长度信息,恢复 key_ 并读取 value_

迭代器存储的状态信息包括 key_ 存储的共享前缀,和 value_ 存储的下个条目起点。当发生起始点的切换时,需要先执行函数 SeekToRestartPoint 清空当前存储的状态信息,再执行函数 ParseNextKey 解析下一个键值对。按照这个过程读函数 PrevSeekToFirstSeekToLast 就非常轻松了。最后来看函数 Seek

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void Seek(const Slice& target) override {
// Binary search in restart array to find the last restart point
// with a key < target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr =
DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
&non_shared, &value_length);
if (key_ptr == nullptr || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}

// Linear search (within restart block) for first key >= target
SeekToRestartPoint(left);
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}

首先在复活点上做二分查找,这里实现的二分查找的结果就是 left 对应的复活点 Key 是严格小于 target 的最大 Key。二分查找完成后跳到复活点处,按顺序恢复每个条目的键值,对比返回。

最后来看一个细节:restart_index_ 只会在函数 Prev 里读取到,以确定上一个复活点的位置。如果仔细观察函数 ParseNextKey 中关于 restart_index_ 的更新:

1
2
3
4
while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}

如果这里 current_ 刚好到达复活点 irestart_index_ 仍然会保持在 i - 1 的,这样在执行 Prevwhile 循环可以少做一次。Google 大佬实力可见一斑。但从语义的角度,这种做法比较容易让人困惑,我应该不会这么做(所以成不了 Google 大佬。

5. 迭代器链

上一节中分析了 Block 的迭代器实现,而对一个 Sorted Table 来说,还需要其他几种迭代器共同组成迭代器链,以高效地完成对 Sorted Table 的遍历和查找。首先来看 IteratorWrapper 的实现 table/iterator_wrapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// A internal wrapper class with an interface similar to Iterator that
// caches the valid() and key() results for an underlying iterator.
// This can help avoid virtual function calls and also gives better
// cache locality.
class IteratorWrapper {
public:
IteratorWrapper() : iter_(nullptr), valid_(false) {}
explicit IteratorWrapper(Iterator* iter) : iter_(nullptr) { Set(iter); }
~IteratorWrapper() { delete iter_; }
Iterator* iter() const { return iter_; }

// Takes ownership of "iter" and will delete it when destroyed, or
// when Set() is invoked again.
void Set(Iterator* iter) {
delete iter_;
iter_ = iter;
if (iter_ == nullptr) {
valid_ = false;
} else {
Update();
}
}

// Iterator interface methods
bool Valid() const { return valid_; }
Slice key() const {
assert(Valid());
return key_;
}
Slice value() const {
assert(Valid());
return iter_->value();
}
// Methods below require iter() != nullptr
Status status() const {
assert(iter_);
return iter_->status();
}
void Next() {
assert(iter_);
iter_->Next();
Update();
}
void Prev() {
assert(iter_);
iter_->Prev();
Update();
}
void Seek(const Slice& k) {
assert(iter_);
iter_->Seek(k);
Update();
}
void SeekToFirst() {
assert(iter_);
iter_->SeekToFirst();
Update();
}
void SeekToLast() {
assert(iter_);
iter_->SeekToLast();
Update();
}

private:
void Update() {
valid_ = iter_->Valid();
if (valid_) {
key_ = iter_->key();
}
}

Iterator* iter_;
bool valid_;
Slice key_;
};

迭代器的简单包装,缓存了 key_valid_ 属性。按照注释所说的,可以减少虚函数的调用,并且提供更好的缓存局部性。前者很好理解,后者仍然有些困惑。接着来看二级迭代器 TwoLevelIterator 的实现 table/two_level_iterator.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#include "leveldb/table.h"
#include "table/block.h"
#include "table/format.h"
#include "table/iterator_wrapper.h"

namespace leveldb {

namespace {

typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, const Slice&);

class TwoLevelIterator : public Iterator {
public:
TwoLevelIterator(Iterator* index_iter, BlockFunction block_function,
void* arg, const ReadOptions& options);

~TwoLevelIterator() override;

void Seek(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
void Next() override;
void Prev() override;

bool Valid() const override { return data_iter_.Valid(); }
Slice key() const override {
assert(Valid());
return data_iter_.key();
}
Slice value() const override {
assert(Valid());
return data_iter_.value();
}
Status status() const override {
// It'd be nice if status() returned a const Status& instead of a Status
if (!index_iter_.status().ok()) {
return index_iter_.status();
} else if (data_iter_.iter() != nullptr && !data_iter_.status().ok()) {
return data_iter_.status();
} else {
return status_;
}
}

private:
void SaveError(const Status& s) {
if (status_.ok() && !s.ok()) status_ = s;
}
void SkipEmptyDataBlocksForward();
void SkipEmptyDataBlocksBackward();
void SetDataIterator(Iterator* data_iter);
void InitDataBlock();

BlockFunction block_function_;
void* arg_;
const ReadOptions options_;
Status status_;
IteratorWrapper index_iter_;
IteratorWrapper data_iter_; // May be nullptr
// If data_iter_ is non-null, then "data_block_handle_" holds the
// "index_value" passed to block_function_ to create the data_iter_.
std::string data_block_handle_;
};

TwoLevelIterator::TwoLevelIterator(Iterator* index_iter,
BlockFunction block_function, void* arg,
const ReadOptions& options)
: block_function_(block_function),
arg_(arg),
options_(options),
index_iter_(index_iter),
data_iter_(nullptr) {}

TwoLevelIterator::~TwoLevelIterator() = default;

void TwoLevelIterator::Seek(const Slice& target) {
index_iter_.Seek(target);
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.Seek(target);
SkipEmptyDataBlocksForward();
}

void TwoLevelIterator::SeekToFirst() {
index_iter_.SeekToFirst();
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst();
SkipEmptyDataBlocksForward();
}

void TwoLevelIterator::SeekToLast() {
index_iter_.SeekToLast();
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToLast();
SkipEmptyDataBlocksBackward();
}

void TwoLevelIterator::Next() {
assert(Valid());
data_iter_.Next();
SkipEmptyDataBlocksForward();
}

void TwoLevelIterator::Prev() {
assert(Valid());
data_iter_.Prev();
SkipEmptyDataBlocksBackward();
}

void TwoLevelIterator::SkipEmptyDataBlocksForward() {
while (data_iter_.iter() == nullptr || !data_iter_.Valid()) {
// Move to next block
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
return;
}
index_iter_.Next();
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst();
}
}

void TwoLevelIterator::SkipEmptyDataBlocksBackward() {
while (data_iter_.iter() == nullptr || !data_iter_.Valid()) {
// Move to next block
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
return;
}
index_iter_.Prev();
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToLast();
}
}

void TwoLevelIterator::SetDataIterator(Iterator* data_iter) {
if (data_iter_.iter() != nullptr) SaveError(data_iter_.status());
data_iter_.Set(data_iter);
}

void TwoLevelIterator::InitDataBlock() {
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
} else {
Slice handle = index_iter_.value();
if (data_iter_.iter() != nullptr &&
handle.compare(data_block_handle_) == 0) {
// data_iter_ is already constructed with this iterator, so
// no need to change anything
} else {
Iterator* iter = (*block_function_)(arg_, options_, handle);
data_block_handle_.assign(handle.data(), handle.size());
SetDataIterator(iter);
}
}
}

} // namespace

Iterator* NewTwoLevelIterator(Iterator* index_iter,
BlockFunction block_function, void* arg,
const ReadOptions& options) {
return new TwoLevelIterator(index_iter, block_function, arg, options);
}

Sorted Table 中存储了多个 Data Block,使用 Index Block 完成对 Data Block 的索引。二级迭代器的第一级 index_iter_ 完成对 Index Block 的迭代,第二级 data_iter_ 完成对 Data Block 的迭代。遍历时移动 data_iter_,并在 data_iter_ 边界的地方使用函数 SkipEmptyDataBlocksForwardSkipEmptyDataBlocksBackward 实现 Data Block 的前后切换。查找时同样先在 index_iter_ 上查找,Index Block 的每个条目存储了 Data Block 的 max_key 和位置大小信息,可以二分;确定 index_iter_ 的位置后再读取对应的 Data Block 进一步二分查找。

总结来看 Sorted Table 使用的迭代器们组成的链路如下:

1
2
TwoLevelIterator -> IteratorWrapper(index_iter_) -> Block::Iter
-> IteratorWrapper(data_iter_) -> Block::Iter

另外还有一个合并迭代器 MergingIterator,将会在多 Sorted Table 文件的遍历中使用到。该迭代器管理 nn 个子迭代器,NextSeek 操作时齐头并进、选择最小的那个,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class MergingIterator : public Iterator {
public:
MergingIterator(const Comparator* comparator, Iterator** children, int n)
: comparator_(comparator),
children_(new IteratorWrapper[n]),
n_(n),
current_(nullptr),
direction_(kForward) {
for (int i = 0; i < n; i++) {
children_[i].Set(children[i]);
}
}

~MergingIterator() override { delete[] children_; }

bool Valid() const override { return (current_ != nullptr); }

void SeekToFirst() override {
for (int i = 0; i < n_; i++) {
children_[i].SeekToFirst();
}
FindSmallest();
direction_ = kForward;
}

void SeekToLast() override {
for (int i = 0; i < n_; i++) {
children_[i].SeekToLast();
}
FindLargest();
direction_ = kReverse;
}

void Seek(const Slice& target) override {
for (int i = 0; i < n_; i++) {
children_[i].Seek(target);
}
FindSmallest();
direction_ = kForward;
}

void Next() override {
assert(Valid());

// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kForward) {
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid() &&
comparator_->Compare(key(), child->key()) == 0) {
child->Next();
}
}
}
direction_ = kForward;
}

current_->Next();
FindSmallest();
}

void Prev() override {
assert(Valid());

// Ensure that all children are positioned before key().
// If we are moving in the reverse direction, it is already
// true for all of the non-current_ children since current_ is
// the largest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kReverse) {
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid()) {
// Child is at first entry >= key(). Step back one to be < key()
child->Prev();
} else {
// Child has no entries >= key(). Position at last entry.
child->SeekToLast();
}
}
}
direction_ = kReverse;
}

current_->Prev();
FindLargest();
}

Slice key() const override {
assert(Valid());
return current_->key();
}

Slice value() const override {
assert(Valid());
return current_->value();
}

Status status() const override {
Status status;
for (int i = 0; i < n_; i++) {
status = children_[i].status();
if (!status.ok()) {
break;
}
}
return status;
}

private:
// Which direction is the iterator moving?
enum Direction { kForward, kReverse };

void FindSmallest();
void FindLargest();

// We might want to use a heap in case there are lots of children.
// For now we use a simple array since we expect a very small number
// of children in leveldb.
const Comparator* comparator_;
IteratorWrapper* children_;
int n_;
IteratorWrapper* current_;
Direction direction_;
};

void MergingIterator::FindSmallest() {
IteratorWrapper* smallest = nullptr;
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child->Valid()) {
if (smallest == nullptr) {
smallest = child;
} else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
smallest = child;
}
}
}
current_ = smallest;
}

void MergingIterator::FindLargest() {
IteratorWrapper* largest = nullptr;
for (int i = n_ - 1; i >= 0; i--) {
IteratorWrapper* child = &children_[i];
if (child->Valid()) {
if (largest == nullptr) {
largest = child;
} else if (comparator_->Compare(child->key(), largest->key()) > 0) {
largest = child;
}
}
}
current_ = largest;
}

6. Table

最后来看 Sorted Table 的实现 table/table.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
struct Table::Rep {
~Rep() {
delete filter;
delete[] filter_data;
delete index_block;
}

Options options;
Status status;
RandomAccessFile* file;
uint64_t cache_id;
FilterBlockReader* filter;
const char* filter_data;

BlockHandle metaindex_handle; // Handle to metaindex_block: saved from footer
Block* index_block;
};

Status Table::Open(const Options& options, RandomAccessFile* file,
uint64_t size, Table** table) {
*table = nullptr;
if (size < Footer::kEncodedLength) {
return Status::Corruption("file is too short to be an sstable");
}

char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;

Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;

// Read the index block
BlockContents index_block_contents;
if (s.ok()) {
ReadOptions opt;
if (options.paranoid_checks) {
opt.verify_checksums = true;
}
s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);
}

if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Block* index_block = new Block(index_block_contents);
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = nullptr;
rep->filter = nullptr;
*table = new Table(rep);
(*table)->ReadMeta(footer);
}

return s;
}

void Table::ReadMeta(const Footer& footer) {
if (rep_->options.filter_policy == nullptr) {
return; // Do not need any metadata
}

// TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
// it is an empty block.
ReadOptions opt;
if (rep_->options.paranoid_checks) {
opt.verify_checksums = true;
}
BlockContents contents;
if (!ReadBlock(rep_->file, opt, footer.metaindex_handle(), &contents).ok()) {
// Do not propagate errors since meta info is not needed for operation
return;
}
Block* meta = new Block(contents);

Iterator* iter = meta->NewIterator(BytewiseComparator());
std::string key = "filter.";
key.append(rep_->options.filter_policy->Name());
iter->Seek(key);
if (iter->Valid() && iter->key() == Slice(key)) {
ReadFilter(iter->value());
}
delete iter;
delete meta;
}

void Table::ReadFilter(const Slice& filter_handle_value) {
Slice v = filter_handle_value;
BlockHandle filter_handle;
if (!filter_handle.DecodeFrom(&v).ok()) {
return;
}

// We might want to unify with ReadBlock() if we start
// requiring checksum verification in Table::Open.
ReadOptions opt;
if (rep_->options.paranoid_checks) {
opt.verify_checksums = true;
}
BlockContents block;
if (!ReadBlock(rep_->file, opt, filter_handle, &block).ok()) {
return;
}
if (block.heap_allocated) {
rep_->filter_data = block.data.data(); // Will need to delete later
}
rep_->filter = new FilterBlockReader(rep_->options.filter_policy, block.data);
}

Table::~Table() { delete rep_; }

开始部分依然是熟悉的 pImpl 范式,毕竟 Table 是对外的接口,需要保持稳定。Table::Open 时,首先读取文件尾部的 Footer,根据 Footer::index_handle() 读取 index_block 到内存中,并按需读取 meta_blockfilter;这些析构时也会做对应的删除。接着看迭代器部分的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
static void DeleteCachedBlock(const Slice& key, void* value) {
Block* block = reinterpret_cast<Block*>(value);
delete block;
}

static void ReleaseBlock(void* arg, void* h) {
Cache* cache = reinterpret_cast<Cache*>(arg);
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
cache->Release(handle);
}

// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
Iterator* Table::BlockReader(void* arg, const ReadOptions& options,
const Slice& index_value) {
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache;
Block* block = nullptr;
Cache::Handle* cache_handle = nullptr;

BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.

if (s.ok()) {
BlockContents contents;
if (block_cache != nullptr) {
char cache_key_buffer[16];
EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
EncodeFixed64(cache_key_buffer + 8, handle.offset());
Slice key(cache_key_buffer, sizeof(cache_key_buffer));
cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
if (contents.cachable && options.fill_cache) {
cache_handle = block_cache->Insert(key, block, block->size(),
&DeleteCachedBlock);
}
}
}
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
}
}
}

Iterator* iter;
if (block != nullptr) {
iter = block->NewIterator(table->rep_->options.comparator);
if (cache_handle == nullptr) {
iter->RegisterCleanup(&DeleteBlock, block, nullptr);
} else {
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
}
} else {
iter = NewErrorIterator(s);
}
return iter;
}

Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader, const_cast<Table*>(this), options);
}

Table::NewIterator 中会构造一个二级迭代器,第一级自然是 index_block 的迭代器,并且提供了第二级迭代器的创建函数 Table::BlockReader。该函数的第一个参数实际上为 Table 对象的指针,第三个参数是 index_block 键值对中的 Value,也就是对应的 Data Block Handle。如果不考虑缓存部分,代码还是很容易理解的:首先解析对应的 BlockHandle,据此读取 block,创建迭代器并且注册迭代器清理函数 DeleteBlock,当删除迭代器时删除对应的 block。当考虑缓存时,可以回忆下系列第一篇介绍的 LRUCache 再来看代码:使用 cache_idhandle.offset 构建一个缓存的 Key,将 block 作为缓存的 Value,后者的清理函数为 DeleteCachedBlock;当前使用 block 创建迭代器增加了 block 的引用计数,当迭代器析构时需要调用 ReleaseBlock 以减少缓存的 block 的引用计数。这样就非常合理且高效了。

7. Table Cache

LevelDB 中会使用 file_number 给 Sorted Table 编号。为了提高读取性能、简化使用,LevelDB 提供了 TableCache 用以缓存 Sorted Table 及对应的 .ldb 文件,定义于 db/table_cache.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class TableCache {
public:
TableCache(const std::string& dbname, const Options& options, int entries);
~TableCache();

// Return an iterator for the specified file number (the corresponding
// file length must be exactly "file_size" bytes). If "tableptr" is
// non-null, also sets "*tableptr" to point to the Table object
// underlying the returned iterator, or to nullptr if no Table object
// underlies the returned iterator. The returned "*tableptr" object is owned
// by the cache and should not be deleted, and is valid for as long as the
// returned iterator is live.
Iterator* NewIterator(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, Table** tableptr = nullptr);

// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value).
Status Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&, const Slice&));

// Evict any entry for the specified file number
void Evict(uint64_t file_number);

private:
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**);

Env* const env_;
const std::string dbname_;
const Options& options_;
Cache* cache_;
};

核心接口 TableCache::NewIterator,只需要提供 file_numberfile_size,就可以返回对应的 Sorted Table 对象及其迭代器。TableCache 封装了缓存和清理的逻辑,其实现也非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
struct TableAndFile {
RandomAccessFile* file;
Table* table;
};

static void DeleteEntry(const Slice& key, void* value) {
TableAndFile* tf = reinterpret_cast<TableAndFile*>(value);
delete tf->table;
delete tf->file;
delete tf;
}

static void UnrefEntry(void* arg1, void* arg2) {
Cache* cache = reinterpret_cast<Cache*>(arg1);
Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
cache->Release(h);
}

TableCache::TableCache(const std::string& dbname, const Options& options,
int entries)
: env_(options.env),
dbname_(dbname),
options_(options),
cache_(NewLRUCache(entries)) {}

TableCache::~TableCache() { delete cache_; }

Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
*handle = cache_->Lookup(key);
if (*handle == nullptr) {
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = nullptr;
Table* table = nullptr;
s = env_->NewRandomAccessFile(fname, &file);
if (!s.ok()) {
std::string old_fname = SSTTableFileName(dbname_, file_number);
if (env_->NewRandomAccessFile(old_fname, &file).ok()) {
s = Status::OK();
}
}
if (s.ok()) {
s = Table::Open(options_, file, file_size, &table);
}

if (!s.ok()) {
assert(table == nullptr);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
*handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
}
return s;
}

Iterator* TableCache::NewIterator(const ReadOptions& options,
uint64_t file_number, uint64_t file_size,
Table** tableptr) {
if (tableptr != nullptr) {
*tableptr = nullptr;
}

Cache::Handle* handle = nullptr;
Status s = FindTable(file_number, file_size, &handle);
if (!s.ok()) {
return NewErrorIterator(s);
}

Table* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
Iterator* result = table->NewIterator(options);
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (tableptr != nullptr) {
*tableptr = table;
}
return result;
}

Status TableCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Cache::Handle* handle = nullptr;
Status s = FindTable(file_number, file_size, &handle);
if (s.ok()) {
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, handle_result);
cache_->Release(handle);
}
return s;
}

void TableCache::Evict(uint64_t file_number) {
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
cache_->Erase(Slice(buf, sizeof(buf)));
}

TableCache::FindTable 中会根据 file_number 构建缓存的 Key,首先尝试在缓存中查找,如果找不到则手动的打开文件、构造 Table。对应迭代器的实现也很简单,只需要设定好对应的清理函数 DeleteEntryUnrefEntry,就可以放心使用了。每多加一层封装,就多屏蔽一分底层实现的细节,对使用者来说就更易用。

总结

前后两篇完成了对 Sorted Table 代码的阅读和分析。当数据位于内存时,查找过程中随机访问的时间微乎其微;但当数据保存到硬盘后,将 Sorted Table 载入内存的 IO 时间就非常可观了。Sorted Table 使用多级迭代器来缓和这个问题,首先完整地读取了 Index Block 到内存中;进行查找时首先在 Index Block 上二分,确定 Data Block 的位置后再进行必要的 IO 读取,并且通过缓存 Data Block 的方式提升读取的性能。

  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2021-2022 Xufei Pan
  • Visitors: | Views:

请我喝杯奶茶吧~

支付宝
微信