LevelDB源码分析——2.基本的数据结构续

二.基本的数据结构续

本系列的上一篇介绍了 LevelDB 中的 SliceHashLRUCache 的实现,这一篇将继续分析布隆过滤器、内存池和跳表。

4. 布隆过滤器 BloomFilter

在介绍布隆过滤器之前,先介绍下 LevelDB 中的过滤器策略 FilterPolicy。考虑一个场景:在 LevelDB 中查询某个指定 key = query 对应的 value,如果我们事先知道了所有的 key 里都找不到这个 query,那也就不需要进一步的读取磁盘、精确查找了,可以有效地减少磁盘访问数量。

FilterPolicy 就负责这件事情:它可以根据一组 key 创建一个小的过滤器 filter,并且可以将该过滤器和键值对存储在磁盘中,在查询时快速判断 query 是否在 filter 中。默认使用的 FilterPolicy 即为布隆过滤器。FilterPolicy 定义于 include/leveldb/filter_policy.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <string>

#include "leveldb/export.h"

namespace leveldb {

class Slice;

/**过滤器,用于快速判断 查找的Key是否存在。默认采用布隆过滤器
*
* 暴露的接口除了 FilterPolicy 接口类,还有 NewBloomFilterPolicy 函数,
* 其他代码中均使用 FilterPolicy。
* 这样的设计可以保证使用者可以自行定义策略类、方便地替换原有的布隆过滤器。
* 这种设计也称之为策略模式,将策略单独设计为一个类或接口,
* 不同的子类对应不同的策略方法。
*
*/


class LEVELDB_EXPORT FilterPolicy {
public:
virtual ~FilterPolicy();

// Return the name of this policy. Note that if the filter encoding
// changes in an incompatible way, the name returned by this method
// must be changed. Otherwise, old incompatible filters may be
// passed to methods of this type.
virtual const char* Name() const = 0;

// keys[0,n-1] contains a list of keys (potentially with duplicates)
// that are ordered according to the user supplied comparator.
// Append a filter that summarizes keys[0,n-1] to *dst.
//
// Warning: do not change the initial contents of *dst. Instead,
// append the newly constructed filter to *dst.
// 根据给定的keys创建Filter
virtual void CreateFilter(const Slice* keys, int n,
std::string* dst) const = 0;

// "filter" contains the data appended by a preceding call to
// CreateFilter() on this class. This method must return true if
// the key was in the list of keys passed to CreateFilter().
// This method may return true or false if the key was not on the
// list, but it should aim to return false with a high probability.
// 注意函数名称的May,即存在 false positive的可能。
virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const = 0;
};

// Return a new filter policy that uses a bloom filter with approximately
// the specified number of bits per key. A good value for bits_per_key
// is 10, which yields a filter with ~ 1% false positive rate.
//
// Callers must delete the result after any database that is using the
// result has been closed.
//
// Note: if you are using a custom comparator that ignores some parts
// of the keys being compared, you must not use NewBloomFilterPolicy()
// and must provide your own FilterPolicy that also ignores the
// corresponding parts of the keys. For example, if the comparator
// ignores trailing spaces, it would be incorrect to use a
// FilterPolicy (like NewBloomFilterPolicy) that does not ignore
// trailing spaces in keys.
// 默认采用bloom过滤器,推荐的bits_per_key参数为10,此时false positive rate约等于1%
LEVELDB_EXPORT const FilterPolicy* NewBloomFilterPolicy(int bits_per_key);

} // namespace leveldb

#endif // STORAGE_LEVELDB_INCLUDE_FILTER_POLICY_H_

FilterPolicy 中,CreateFilter 负责创建 filterKeyMayMatch 负责判断 key 是否在 filter 中。注意这个 May,即这里 Match 判断可能会出错,也允许会出错。对于布隆过滤器,如果 Keyfilter 里,那么一定会 Match 上;反之如果不在,那么有小概率也会 Match 上,进而会多做一些磁盘访问,只要这个概率足够小也无伤大雅。这也刚好符合 KeyMayMatch 函数的需求。

暴露的接口除了 FilterPolicy 接口类,还有 NewBloomFilterPolicy 函数,其他代码中均使用 FilterPolicy。这样的设计可以保证使用者可以自行定义策略类、方便地替换原有的布隆过滤器。这种设计也称之为策略模式,将策略单独设计为一个类或接口,不同的子类对应不同的策略方法。继续看布隆过滤器的实现 util/bloom.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include "leveldb/filter_policy.h"

#include "leveldb/slice.h"
#include "util/hash.h"

namespace leveldb {

namespace {
//对key进行哈希操作
static uint32_t BloomHash(const Slice& key) {
return Hash(key.data(), key.size(), 0xbc9f1d34);
}

class BloomFilterPolicy : public FilterPolicy {
public:
explicit BloomFilterPolicy(int bits_per_key) : bits_per_key_(bits_per_key) {
// We intentionally round down to reduce probing cost a little bit
// 哈希函数的数目为 bits_per_key * ln2 ,上下限分别为30与1
k_ = static_cast<size_t>(bits_per_key * 0.69); // 0.69 =~ ln(2)
if (k_ < 1) k_ = 1;
if (k_ > 30) k_ = 30;
}

//返回过滤策略的名称
const char* Name() const override { return "leveldb.BuiltinBloomFilter2"; }

void CreateFilter(const Slice* keys, int n, std::string* dst) const override {
// Compute bloom filter size (in both bits and bytes)
// 根据 keys的数量n 与 每个key需要的比特位数,计算bloom filter的总体大小
size_t bits = n * bits_per_key_;

// For small n, we can see a very high false positive rate. Fix it
// by enforcing a minimum bloom filter length.
// Bloom filter的容量过小时,很容易false positive,因此最小容量设置为64
if (bits < 64) bits = 64;

//Bloom filter容量大小按字节大小向上取整,即为8bits的倍数
size_t bytes = (bits + 7) / 8;
bits = bytes * 8;

const size_t init_size = dst->size();
dst->resize(init_size + bytes, 0);
dst->push_back(static_cast<char>(k_)); // Remember # of probes in filter
char* array = &(*dst)[init_size];
for (int i = 0; i < n; i++) {
// Use double-hashing to generate a sequence of hash values.
// See analysis in [Kirsch,Mitzenmacher 2006].
uint32_t h = BloomHash(keys[i]);
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (size_t j = 0; j < k_; j++) {
const uint32_t bitpos = h % bits;
array[bitpos / 8] |= (1 << (bitpos % 8));
h += delta;
}
}
}

bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const override {
const size_t len = bloom_filter.size();
if (len < 2) return false;

const char* array = bloom_filter.data();
const size_t bits = (len - 1) * 8;

// Use the encoded k so that we can read filters generated by
// bloom filters created using different parameters.
const size_t k = array[len - 1];
if (k > 30) {
// Reserved for potentially new encodings for short bloom filters.
// Consider it a match.
return true;
}

uint32_t h = BloomHash(key);
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (size_t j = 0; j < k; j++) {
const uint32_t bitpos = h % bits;
if ((array[bitpos / 8] & (1 << (bitpos % 8))) == 0) return false;
h += delta;
}
return true;
}

private:
size_t bits_per_key_; //每一个key需要的bits位数,用于计算过滤器的容量
size_t k_; //表示Bloom Filter中哈希函数的数目
};
} // namespace

const FilterPolicy* NewBloomFilterPolicy(int bits_per_key) {
return new BloomFilterPolicy(bits_per_key);
}

} // namespace leveldb

BloomFilterPolicy 构造时需要提供 bits_per_key,后再根据 key 的数量 n一起计算出所需要的 bits 数 m。而代码中的 k_ 即为布隆过滤器中哈希函数的数目 k,这里 k=m*/nln2,详细介绍可以参考维基百科

而后,依次计算 n个 key 的 k个哈希结果。这里使用了 Double Hash

Double Hash 一般用于开放寻址哈希的优化。这里直接取连续的 k个哈希结果作为布隆过滤器需要的 k个哈希函数结果,一切为了速度。

5. 内存池 Arena

LevelDB 中实现了一个简单的内存池组建 Arena,位于 util/arena.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <vector>

namespace leveldb {

/**
* LevelDB的内存池
* 申请内存时,将申请到的内存块放入std::vector blocks_中,
* 在Arena的生命周期结束后,统一释放掉所有申请到的内存
*
* 每次会申请一个大的 block,默认大小为 4KB。
* 而后申请 bytes 长度的空间时,
* 如果当前 block 的剩余大小足够分配,则返回分配的内存地址并更新余下的起始位置和大小;
* 否则将会直接申请新的 block。析构时会删除所有 block。
*
* 当前空间不足时有一个优化,如果申请的空间大于 kBlockSize / 4 也就是 1KB 时,
* 会直接申请对应长度的 block 返回,不更新当前剩余 block 的起始位置和大小,
* 这样下次申请小空间时依然可以使用当前余下的空间;
* 否则将放弃当前剩余空间,重新申请一块 4KB 的 block 再分配
*/
class Arena {
public:
Arena();

Arena(const Arena&) = delete;
Arena& operator=(const Arena&) = delete;

~Arena();

// 直接分配内存
// Return a pointer to a newly allocated memory block of "bytes" bytes.
char* Allocate(size_t bytes);

// 申请对齐的内存空间
// Allocate memory with the normal alignment guarantees provided by malloc.
char* AllocateAligned(size_t bytes);

// Returns an estimate of the total memory usage of data allocated
// by the arena.
size_t MemoryUsage() const {
return memory_usage_.load(std::memory_order_relaxed);
}

private:
char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);

// Allocation state
// alloc_ptr_标记1个4KB block内部分配内存的起始地址
char* alloc_ptr_;
//alloc_bytes_remaining_记录1个4KB block内部剩余可用的内存字节数
size_t alloc_bytes_remaining_;

// Array of new[] allocated memory blocks
// blocks_存储多个4KB block
std::vector<char*> blocks_;

// Total memory usage of the arena.
//
// TODO(costan): This member is accessed via atomics, but the others are
// accessed without any locking. Is this OK?
std::atomic<size_t> memory_usage_;
};

inline char* Arena::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
return AllocateFallback(bytes);
}

} // namespace leveldb

Arena 提供三个接口,AllocateAllocateAlignedMemoryUsage,分别实现申请指定大小内存、申请对齐的指定大小内存和查询内存使用。具体的函数实现在 util/arena.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include "util/arena.h"

namespace leveldb {

static const int kBlockSize = 4096;

Arena::Arena()
: alloc_ptr_(nullptr), alloc_bytes_remaining_(0), memory_usage_(0) {}

Arena::~Arena() {
for (size_t i = 0; i < blocks_.size(); i++) {
delete[] blocks_[i];
}
}

char* Arena::AllocateFallback(size_t bytes) {
// 当申请的内存大小 大于 kBlockSize/4,即1KB时,会直接申请对应需要长度的block返回
if (bytes > kBlockSize / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char* result = AllocateNewBlock(bytes);
return result;
}
//当申请的内存大小小于1KB时,直接分配4KB block,并更新剩余block的起始位置和大小
// We waste the remaining space in the current block.
alloc_ptr_ = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize;

char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}

char* Arena::AllocateAligned(size_t bytes) {
const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
static_assert((align & (align - 1)) == 0,
"Pointer size should be a power of 2");
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align - 1);
size_t slop = (current_mod == 0 ? 0 : align - current_mod);
size_t needed = bytes + slop;
char* result;
if (needed <= alloc_bytes_remaining_) {
result = alloc_ptr_ + slop;
alloc_ptr_ += needed;
alloc_bytes_remaining_ -= needed;
} else {
// AllocateFallback always returned aligned memory
result = AllocateFallback(bytes);
}
assert((reinterpret_cast<uintptr_t>(result) & (align - 1)) == 0);
return result;
}

char* Arena::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
blocks_.push_back(result);
memory_usage_.fetch_add(block_bytes + sizeof(char*),
std::memory_order_relaxed);
return result;
}

} // namespace leveldb

代码也很容易看懂。每次会申请一个大的 block,默认大小为 4KB。而后申请 bytes 长度的空间时,如果当前 block 的剩余大小足够分配,则返回分配的内存地址并更新余下的起始位置和大小;否则将会直接申请新的 block。析构时会删除所有 block。

当前空间不足时有一个优化,如果申请的空间大于 kBlockSize / 4 也就是 1KB 时,会直接申请对应长度的 block 返回,不更新当前剩余 block 的起始位置和大小,这样下次申请小空间时依然可以使用当前余下的空间;否则将放弃当前剩余空间,重新申请一块 4KB 的 block 再分配。

当频繁申请小内存时,内存池可以规避掉大部分的系统级申请。接下来的介绍的跳表、以及后续文章介绍的 MemTable 中均会使用到该内存池。

6. 跳表 SkipList

跳表是在传统链表上加入跳跃连接的有序链表。因为有序,所以可以根据顺序关系,快速跳过无关元素。查询和插入的平均复杂度均为 \mathcal O(\log n)O(logn)。维基百科上有一副经典图

img

LevelDB 中实现的跳表位于 db/skiplist.h,所有实现均在该头文件里。仔细看图,对照代码,就很容易理解了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
#include <atomic>
#include <cassert>
#include <cstdlib>

#include "util/arena.h"
#include "util/random.h"

namespace leveldb {

//跳表的实现

template <typename Key, class Comparator>
class SkipList {
private:
struct Node;

public:
// Create a new SkipList object that will use "cmp" for comparing keys,
// and will allocate memory using "*arena". Objects allocated in the arena
// must remain allocated for the lifetime of the skiplist object.
explicit SkipList(Comparator cmp, Arena* arena);
// 不允许拷贝构造
SkipList(const SkipList&) = delete;
SkipList& operator=(const SkipList&) = delete;

// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
void Insert(const Key& key);

// Returns true iff an entry that compares equal to key is in the list.
bool Contains(const Key& key) const;

// Iteration over the contents of a skip list
// 定义迭代器类,用于对跳表进行操作,包含SkipList*与Node*,2个成员变量
class Iterator {
public:
// Initialize an iterator over the specified list.
// The returned iterator is not valid.
explicit Iterator(const SkipList* list);

// Returns true iff the iterator is positioned at a valid node.
bool Valid() const;

// Returns the key at the current position.
// REQUIRES: Valid()
const Key& key() const;

// Advances to the next position.
// REQUIRES: Valid()
void Next();

// Advances to the previous position.
// REQUIRES: Valid()
void Prev();

// Advance to the first entry with a key >= target
void Seek(const Key& target);

// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst();

// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToLast();

private:
const SkipList* list_;
Node* node_;
// Intentionally copyable
};

private:
//记录SkipList最大的高度
enum { kMaxHeight = 12 };

inline int GetMaxHeight() const {
return max_height_.load(std::memory_order_relaxed);
}

Node* NewNode(const Key& key, int height);
int RandomHeight();
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }

// Return true if key is greater than the data stored in "n"
bool KeyIsAfterNode(const Key& key, Node* n) const;

// Return the earliest node that comes at or after key.
// Return nullptr if there is no such node.
//
// If prev is non-null, fills prev[level] with pointer to previous
// node at "level" for every level in [0..max_height_-1].
Node* FindGreaterOrEqual(const Key& key, Node** prev) const;

// Return the latest node with a key < key.
// Return head_ if there is no such node.
Node* FindLessThan(const Key& key) const;

// Return the last node in the list.
// Return head_ if list is empty.
Node* FindLast() const;

// Immutable after construction
// 比较器
Comparator const compare_;
// 跳表的内存池
Arena* const arena_; // Arena used for allocations of nodes
// SkipList 的前置哨兵节点
Node* const head_;

// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
// 最大高度
std::atomic<int> max_height_; // Height of the entire list

// Read/written only by Insert().
// 随机数生成器
Random rnd_;
};

// Implementation details follow
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}

// 存储主键Key
Key const key;

// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
// 获取当前节点在指定level的下一个节点
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
// 将当前节点在指定level的下一个节点设置为x
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}

// 无内存障碍版本的实现
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}

private:
// Array of length equal to the node height. next_[0] is lowest level link.
//1.这里提前使用声明分配1个对象的内存,是因为,第0层数据肯定是都有的,而且,是全部数据
//2.使用数组方式,那么后续分配的内存就是连续的,cache-friend
std::atomic<Node*> next_[1];
};

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
//因为Node中的成员变量已定义1个Node*,所以此处只需height-1
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
// 定位new ,空间换时间,从LevelDB的内存池Arena中开辟的node_memory中,new Node(key)
// 与常规new相比,不需要申请空间,仅需调用构造函数,优化CPU缓存
return new (node_memory) Node(key);
}

template <typename Key, class Comparator>
inline SkipList<Key, Comparator>::Iterator::Iterator(const SkipList* list) {
list_ = list;
node_ = nullptr;
}

template <typename Key, class Comparator>
inline bool SkipList<Key, Comparator>::Iterator::Valid() const {
return node_ != nullptr;
}

template <typename Key, class Comparator>
inline const Key& SkipList<Key, Comparator>::Iterator::key() const {
assert(Valid());
return node_->key;
}

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Next() {
assert(Valid());
node_ = node_->Next(0);
}

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
node_ = list_->FindLessThan(node_->key);
if (node_ == list_->head_) {
node_ = nullptr;
}
}

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::SeekToFirst() {
node_ = list_->head_->Next(0);
}

template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::SeekToLast() {
node_ = list_->FindLast();
if (node_ == list_->head_) {
node_ = nullptr;
}
}

template <typename Key, class Comparator>
int SkipList<Key, Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1;
while (height < kMaxHeight && rnd_.OneIn(kBranching)) {
height++;
}
assert(height > 0);
assert(height <= kMaxHeight);
return height;
}

template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
// null n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
}

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_;
// level从0开始编码,0 —— MaxHeight-1
int level = GetMaxHeight() - 1;
// 从最高层level开始查找
while (true) {
// 定位到当前level的下一个节点
Node* next = x->Next(level);
// key 大于next的key,说明不在当前区间
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
// key在当前区间段内,则向更低的level中继续查找
// 在查找的同时,对传入的参数prev进行设置,设置prev节点
if (prev != nullptr) prev[level] = x;
// 在最底层的level中找到相应位置
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
}
}
}

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0) {
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
x = next;
}
}
}

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::FindLast()
const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (next == nullptr) {
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
x = next;
}
}
}

template <typename Key, class Comparator>
SkipList<Key, Comparator>::SkipList(Comparator cmp, Arena* arena)
: compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
max_height_(1),
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) {
head_->SetNext(i, nullptr);
}
}

template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
// 声明prev节点数组,代表插入位置的前一个节点
Node* prev[kMaxHeight];
//使用FindGreaterOrEqual函数,找到第一个大于等于插入key的位置
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
// 使用随机数获取该节点的插入高度
int height = RandomHeight();
if (height > GetMaxHeight()) {
// 大于当前SkipList的最高高度的话,将多出来的高度的prev设置为哨兵节点
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
// 更新max_height_
max_height_.store(height, std::memory_order_relaxed);
}
// 创建要插入的节点对象x
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
// 首先将x的next指向prev的下一个节点
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
// 将prev指向x
prev[i]->SetNext(i, x);
}
}

template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, nullptr);
if (x != nullptr && Equal(key, x->key)) {
return true;
} else {
return false;
}
}

} // namespace leveldb

总结

基本的数据结构主要介绍的是局部细节,没有过多的代码依赖,容易看懂。而真正困难的部分,是梳理清楚整体的流程和结构。这时自上而下就要容易很多,比如看看文档、架构图、流程图等,不至于在庞杂的代码细节中迷失。

下一篇会开始介绍 LevelDB 中的核心组件。

  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2021-2022 Xufei Pan
  • Visitors: | Views:

请我喝杯奶茶吧~

支付宝
微信