0%

levelDB源码剖析(14)--数据库功能部分

数据库功能部分

前面已经对levelDB中的各个组件进行了详细的源码剖析,下面我们就开始把这些左键全部串起来,在数据库的层面对其进行理解。

我们人为将LevelDB分为了数据库层和存储引擎层,在前面部分介绍了存储引擎相关的组件,包括:

  • SSTable存储结构实现,提供了在一个磁盘文件里读取一个键的功能,并且可以迭代一个SSTable的所有键
  • MemTable内存结构的实现,提供了读取一个键和写入一个键的功能,迭代MemTable里所有的键,以及可以将一个MemTable转换为一个SSTable
  • Log将写入持久化到磁盘上面,将随机写入转换为顺序写入
  • 迭代器对各种存储组件迭代,以及定位某一个键

利用这些组件提供的功能,就可以实现数据库层。可以将数据库层的功能分为两个部分:

  • Get、Put和Delete接口实现,这里是LevelDB对外提供的操作接口,分别实现键值对的查找、插入和删除
  • 版本管理和Compaction,随着数据的写入,不断的有MemTable转换为SSTable,当有些键不断的更新删除,有些Level的文件太多时,影响了读性能,需要进行Compaction,将低Level的SSTable Compaction到高Level的SSTable里去,提高读的效率。而Compaction是通过版本来管理的,当一次Compaction完成时,就会生成一个新版本

本文主要针对数据库的接口部分进行阐述。

接口管理

源码位置与说明

include/leveldb/db.h db/db_impl.h db/db_impl.cc: 实现了GET/PUT/DELETE
db/version_set.h db/version_set.cc: 实现从一个version的SSTable里读取一个键
db/memtable.h db/memtable.cc: 实现从MemTable读取一个键
inlclude/leveldb/table.h db/table_cache.h db/table_cache.cc db/table.cc: 定义从一个SSTable读取一个键
include/leveldb/write_batch.h db/write_batch.cc :writebatch相关函数
db/write_batch_internal.h : WriteBatchInternal类

我们先来看一下数据库的接口部分,很明显,作为一个库,增删查是必不可少的,这也是数据库的核心功能。

DB接口和接口相关的类

DB类

首先先来看一下DB类的头文件,这个DB类也是一个纯虚基类,是负责定义数据库的接口的。一个DB类(及其派生类)就可以看成是永久有序的kv存储的数据库,且其需要保证线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

// A DB is a persistent ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
class LEVELDB_EXPORT DB
{
public:
// 打开一个数据库,把其保存在一个堆分配的指针上,并返回状态
// 这个指针需要用户手动管理
static Status Open(const Options &options, const std::string &name,
DB **dbptr);

// 构造析构函数等
DB() = default;
DB(const DB &) = delete;
DB &operator=(const DB &) = delete;
virtual ~DB();

// 插入一条数据,这个options中会定义是否同步(即每次都是否写入磁盘)
virtual Status Put(const WriteOptions &options, const Slice &key,
const Slice &value) = 0;

// 删除一条数据,如果key不存在不会报错
// 因为删除其实也是'插入'
// 同样可以定义是否同步到磁盘中
virtual Status Delete(const WriteOptions &options, const Slice &key) = 0;

// 真正的写入函数,会被插入删除调用
virtual Status Write(const WriteOptions &options, WriteBatch *updates) = 0;

// 从数据库中获取一个key
virtual Status Get(const ReadOptions &options, const Slice &key,
std::string *value) = 0;

// 返回一个堆分配的迭代器,用于迭代整个数据库
// 同样需要用户手动管理内存
virtual Iterator *NewIterator(const ReadOptions &options) = 0;

// 返回当前数据库状态的句柄
// 使用此句柄创建的迭代器都将观察当前数据库状态的稳定快照
virtual const Snapshot *GetSnapshot() = 0;

// 释放快照
virtual void ReleaseSnapshot(const Snapshot *snapshot) = 0;

// DB implementations can export properties about their state
// via this method. If "property" is a valid property understood by this
// DB implementation, fills "*value" with its current value and returns
// true. Otherwise returns false.
// 通过这个函数,数据库能向外传递他们自身的状态信息
// 前提是给定的属性(propertiy)是合法的
// 有以下合法的property
// "leveldb.num-files-at-level<N>" - return the number of files at level <N>,
// where <N> is an ASCII representation of a level number (e.g. "0").
// "leveldb.stats" - returns a multi-line string that describes statistics
// about the internal operation of the DB.
// "leveldb.sstables" - returns a multi-line string that describes all
// of the sstables that make up the db contents.
// "leveldb.approximate-memory-usage" - returns the approximate number of
// bytes of memory in use by the DB.
virtual bool GetProperty(const Slice &property, std::string *value) = 0;

// 获取大小估计,其中range是key的范围
// 即获取某个范围内的key-value占用系统的空间
virtual void GetApproximateSizes(const Range *range, int n,
uint64_t *sizes) = 0;

// Compact the underlying storage for the key range [*begin,*end].
// In particular, deleted and overwritten versions are discarded,
// and the data is rearranged to reduce the cost of operations
// needed to access the data. This operation should typically only
// be invoked by users who understand the underlying implementation.
//
// begin==nullptr is treated as a key before all keys in the database.
// end==nullptr is treated as a key after all keys in the database.
// Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr);
// 将某个范围内的数据压缩,如果不是很清楚底层是怎么实现的,就不要管它
virtual void CompactRange(const Slice *begin, const Slice *end) = 0;
};

// Destroy the contents of the specified database.
// Be very careful using this method.
//
// Note: For backwards compatibility, if DestroyDB is unable to list the
// database files, Status::OK() will still be returned masking this failure.
// 删除一个db
LEVELDB_EXPORT Status DestroyDB(const std::string &name,
const Options &options);

// If a DB cannot be opened, you may attempt to call this method to
// resurrect as much of the contents of the database as possible.
// Some data may be lost, so be careful when calling this function
// on a database that contains important information.
// 尝试修复一个db,有些数据可能会丢失
LEVELDB_EXPORT Status RepairDB(const std::string &dbname,
const Options &options);

我们可以看,DB基类除了提供了基本的数据接口外,还提供了一些跟底层结构相关的接口。在DB类中,插入和删除其实都是"插入",需要通过插入数据的标志来区分。此外,我们还能创建快照,即将数据库"锁定"在某个时刻,当然不是真正的锁定,锁定之后所有的操作均不会对查询产生影响,(这个也是比较好理解的,因为levelDB中没有删除的概念,所以只需要过滤掉所有在快照之后的操作即可)

实际上,DB这个类中,有几个函数是有定义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112

Status DB::Put(const WriteOptions &opt, const Slice &key, const Slice &value)
{
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}

Status DB::Delete(const WriteOptions &opt, const Slice &key)
{
WriteBatch batch;
batch.Delete(key);
return Write(opt, &batch);
}

DB::~DB() = default;

Status DB::Open(const Options &options, const std::string &dbname, DB **dbptr)
{
*dbptr = nullptr;

// DBImpl是真正的数据库类,继承自DB
DBImpl *impl = new DBImpl(options, dbname);
// 先获取锁
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
// 调用DBImpl::Recover完成MANIFEST的加载和故障恢复
Status s = impl->Recover(&edit, &save_manifest);
// 创建日志和相应的MemTable
if (s.ok() && impl->mem_ == nullptr)
{
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile *lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok())
{
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
// 如果需要重写MANIFEST文件,那么做一个版本变更,这里面会创建一个新的MANIFEST
// 将当前的版本信息写入,然后将edit的内容写入。
if (s.ok() && save_manifest)
{
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok())
{
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
// 解锁
impl->mutex_.Unlock();
if (s.ok())
{
assert(impl->mem_ != nullptr);
*dbptr = impl;
}
else
{
delete impl;
}
return s;
}

Status DestroyDB(const std::string &dbname, const Options &options)
{
Env *env = options.env;
std::vector<std::string> filenames;
Status result = env->GetChildren(dbname, &filenames);
if (!result.ok())
{
// Ignore error in case directory does not exist
return Status::OK();
}

FileLock *lock;
const std::string lockname = LockFileName(dbname);
result = env->LockFile(lockname, &lock);
if (result.ok())
{
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++)
{
if (ParseFileName(filenames[i], &number, &type) &&
type != kDBLockFile)
{ // Lock file will be deleted at end
Status del = env->RemoveFile(dbname + "/" + filenames[i]);
if (result.ok() && !del.ok())
{
result = del;
}
}
}
env->UnlockFile(lock); // Ignore error since state is already gone
env->RemoveFile(lockname);
env->RemoveDir(dbname); // Ignore error in case dir contains other files
}
return result;
}

我们通过这几个函数可以看到,在打开数据库时,调用DBImpl::Recover来完成主要的工作,如果调用成功,则创建MemTable和WAL相关的数据结构,重写MANIFEST文件。关于打开数据库的操作,由于涉及到了版本管理之类的东西,具体内容留到后面再讲。而插入和删除的时候都是先定义了一个WriteBatch,然后调用Write函数来把这个WriteBatch写入。

WriteBatch类头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

class LEVELDB_EXPORT WriteBatch
{
public:
// 一个handler
class LEVELDB_EXPORT Handler
{
public:
virtual ~Handler();
virtual void Put(const Slice &key, const Slice &value) = 0;
virtual void Delete(const Slice &key) = 0;
};

// 构造析构函数
WriteBatch();
~WriteBatch();
WriteBatch(const WriteBatch &) = default;
WriteBatch &operator=(const WriteBatch &) = default;

// 插入kv
void Put(const Slice &key, const Slice &value);

// 移除kv
void Delete(const Slice &key);

// 清空缓存
void Clear();

// 估计大小
size_t ApproximateSize() const;

// Copies the operations in "source" to this batch.
//
// This runs in O(source size) time. However, the constant factor is better
// than calling Iterate() over the source batch with a Handler that replicates
// the operations into this batch.
// 把另一个WriteBatch添加到自身中
void Append(const WriteBatch &source);

// Support for iterating over the contents of a batch.
// 对每个元素进行解码然后执行对应的删除与插入操作
// 这个是用于批量执行时的接口
Status Iterate(Handler *handler) const;

private:
friend class WriteBatchInternal; // 友元类

// 所有的记录都会存储在这里
std::string rep_; // See comment in write_batch.cc for the format of rep_
};

WriteBatch只是一个辅助结构,可以将多个Kv的写入按顺序累积起来,然后一次性写入提高效率(单次删除和插入调用delete/put,但是并没有真正执行,而是存储起来;而将一批数据完成相关操作时调用Iterate)。其所有的kv都会被存储在其中的字符串中,这个字符串的格式是这样的。

开头是12字节的头部,由fixed64的序列号和fixed32的总数组成。然后下面是变长的record,每个record由类型和前缀码编码的字符串组成(即一个变长的长度在数据之前)

其中sequence序列号是整个WriteBatch写入时当前的SequenceNumber,按顺序逐个赋予record,比如第一个record的SequenceNumber是sequence,第二个是sequence + 1,以此类推。

实际上,WriteBatch类本身也有两个辅助类,分别是WriteBatchInternal和MemTableInserter。其中

  • WriteBatchInternal:定义了一些静态方法,主要用于操作WriteBatch中的那个字符串,涉及到序列号和数目,因为这两个东西被编码在一个字符串中。同时其中的InsertInto函数也负责调用WriteBatch的Iterate把当前batch的更新记录插入到memtable中。
  • MemTableInserter:这个辅助类继承自WriteBatch::Handler(内部只有put和delete方法),负责通过WriteBatchInternal生成序列号并在操作时管理序列号,并保存memtable的指针用于插入。

我们来看一下这两个辅助类

WriteBatchInternal和MemTableInserter类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

// WriteBatchInternal类只定义了静态函数
// 主要用于操作序列号相关
class WriteBatchInternal
{
public:
// 返回count数目
static int Count(const WriteBatch *batch);

// 设置count
static void SetCount(WriteBatch *batch, int n);

// 返回WriteBatch中序列号
static SequenceNumber Sequence(const WriteBatch *batch);

// 设置WriteBatch的序列号
static void SetSequence(WriteBatch *batch, SequenceNumber seq);

// 返回编码后的字符串
static Slice Contents(const WriteBatch *batch) { return Slice(batch->rep_); }

// 返回编码后字符串的大小
static size_t ByteSize(const WriteBatch *batch) { return batch->rep_.size(); }

// 设置字符串为指定的值
static void SetContents(WriteBatch *batch, const Slice &contents);

// 调用batch的Iterate,从而把所有的record插入到memtable中
static Status InsertInto(const WriteBatch *batch, MemTable *memtable);

// 把一个添加到另一个
static void Append(WriteBatch *dst, const WriteBatch *src);
};

namespace // 又是匿名空间
{
// 继承自WriteBatch::Handler,重写它的put和delete方法
class MemTableInserter : public WriteBatch::Handler
{
public:
SequenceNumber sequence_; // 管理序列号
MemTable *mem_; // memtable指针

void Put(const Slice &key, const Slice &value) override
{
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice &key) override
{
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
};
} // namespace

这两个辅助类还是很简单的,WriteBatchInternal是WriteBatch和其内部的rep_这个编码字符串的桥梁(其实感觉有点怪,明明rep_已经在其内部了,还额外需要一个外部的类来管理...),MemTableInserter是memtable与WriteBatch的桥梁

理解了这两个类,我们就能愉快地看一下WriteBatch的实现了。

WriteBatch的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

// writebatch的头部,由8字节序列号和4字节的计数组成
static const size_t kHeader = 12;

// 一些构造析构函数
WriteBatch::WriteBatch() { Clear(); }
WriteBatch::~WriteBatch() = default;
WriteBatch::Handler::~Handler() = default;
void WriteBatch::Clear()
{
rep_.clear();
rep_.resize(kHeader);
}

// 估计大小的函数
size_t WriteBatch::ApproximateSize() const { return rep_.size(); }

// 把编码后的条目中一个一个,并通过handler的put/delete(这个handler是MemTableInserter)
// 来把存储再条目中的项一个一个真正地插入到memtable中
Status WriteBatch::Iterate(Handler *handler) const
{
Slice input(rep_);
if (input.size() < kHeader)
{
// 至少都需要12字节
return Status::Corruption("malformed WriteBatch (too small)");
}

// 把头部去掉
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty())
{
found++;
char tag = input[0];
input.remove_prefix(1);
switch (tag)
{
case kTypeValue:
// put方法
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value))
{
handler->Put(key, value);
}
else
{
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
// delete方法
if (GetLengthPrefixedSlice(&input, &key))
{
handler->Delete(key);
}
else
{
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
// 插入到memtable中的条目数目是否跟记录的条目数目相同
if (found != WriteBatchInternal::Count(this))
{
return Status::Corruption("WriteBatch has wrong count");
}
else
{
return Status::OK();
}
}

// put函数,把kv编码成前缀码再插入
void WriteBatch::Put(const Slice &key, const Slice &value)
{
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}

// delete函数,类似put,只是插入的类型不一样
void WriteBatch::Delete(const Slice &key)
{
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion));
PutLengthPrefixedSlice(&rep_, key);
}

// 把另一个writebatch弄到这个batch里来
void WriteBatch::Append(const WriteBatch &source)
{
WriteBatchInternal::Append(this, &source);
}

WriteBatch可能比较令人困惑的点就是其中的Handler类和Iterate方法了,弄懂Iterate方法中的参数就是MemTableInserter其实就不难理解了。

注意这里面涉及了很多的put、delete函数,我们来好好捋一捋。

什么类的put/delete(或指定函数) 作用
DB及其派生类 将一个kv插入到writebatch中,并返回writebatch
writebatch 将一个kv编码成前缀码作为一条record插入到其条目中储存起来
MemTableInserter(继承自WriteBatch::Handler) 把一kv插入到memtable中(同时负责管理序列号相关)
MemTableInserter::InsertInto 把一个writebatch中所有数据插入到memtable中
writebatch::Iterate 调用WriteBatch::Handler的put/delete来把writebatch的条目中的所有数据插入到WriteBatch中

到现在,我们弄清楚了怎么把一个WriteBatch插入到MemTable中,但是如何通过DB类构建出一个WriteBatch,以及,多线程之间是怎么插入的、如何调度管理WriteBatch,我们还没有弄清

其实,这些与另外一个类有密切关系,这个类也是非常重量级的一个类,代码足足有上千行,这个类就是DBImpl,继承自DB,是数据库的真正实现。

DBImpl类

这个类继承自DB类,所以可以通过DB指针来操作DBImpl类,它实现了数据库的所有功能,我们真正使用数据库时实际上也是通过这个类来实现的。它的内容非常多,主要可以分为这几类:

  • 接口管理,如PUT、DELETE等等用于提供数据库接口
  • 多线程写入转换成单线程写入
  • 对多个sstable进行压缩操作
  • 从之前的旧版本中进行数据恢复
  • 测试功能,包括对数据库进行测试的函数,一般我们不管他

其中接口部分相对比较简单,但是将用于写入的数据持久化这部分的代码涉及到多线程写入,为了效能提升,需要将多线程写入转换成单线程写入。

我们来看一下其头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228

class DBImpl : public DB
{
public:
// 构造析构函数相关
DBImpl(const Options &options, const std::string &dbname);
DBImpl(const DBImpl &) = delete;
DBImpl &operator=(const DBImpl &) = delete;
~DBImpl() override;

// DB类的接口
Status Put(const WriteOptions &, const Slice &key,
const Slice &value) override;
Status Delete(const WriteOptions &, const Slice &key) override;
Status Write(const WriteOptions &options, WriteBatch *updates) override;
Status Get(const ReadOptions &options, const Slice &key,
std::string *value) override;
Iterator *NewIterator(const ReadOptions &) override;
const Snapshot *GetSnapshot() override;
void ReleaseSnapshot(const Snapshot *snapshot) override;
bool GetProperty(const Slice &property, std::string *value) override;
void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) override;
void CompactRange(const Slice *begin, const Slice *end) override;

// 其他的功能,用于测试,我们就不管这些了
// Compact any files in the named level that overlap [*begin,*end]
void TEST_CompactRange(int level, const Slice *begin, const Slice *end);
// Force current memtable contents to be compacted.
Status TEST_CompactMemTable();
// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed.
Iterator *TEST_NewInternalIterator();
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes();
// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes.
void RecordReadSample(Slice key);

private:
friend class DB; // 有点怪,继承自DB,但是把DB作为友元
struct CompactionState; // 压缩状态
struct Writer; // Writer结构体

// 提供手动压缩时的信息
struct ManualCompaction
{
int level;
bool done;
const InternalKey *begin; // null means beginning of key range
const InternalKey *end; // null means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
};

// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
// 每一层的压缩状态,stats_[level]存储对应层的压缩信息
struct CompactionStats
{
CompactionStats() : micros(0), bytes_read(0), bytes_written(0) {}

void Add(const CompactionStats &c)
{
this->micros += c.micros;
this->bytes_read += c.bytes_read;
this->bytes_written += c.bytes_written;
}

int64_t micros;
int64_t bytes_read;
int64_t bytes_written;
};

// 创建InternalIterator,是一个Merger Iteator
// 包含mem一个迭代器、imm一个迭代器
// 同时返回了快照序列号
// 是完整的db迭代器的一部分
// 被this->NewIterator调用
Iterator *NewInternalIterator(const ReadOptions &,
SequenceNumber *latest_snapshot,
uint32_t *seed);

// 创建一个新DB对象
// 由于这个方法是私有方法,也不是静态方法,它的作用是被this->Recover调用
// 调用的条件是需要恢复的数据库不存在(所以创建一个新的)
Status NewDB();

// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
// 恢复一个数据库,会尝试各种方式
// 实际上这个函数被DB::Open调用
Status Recover(VersionEdit *edit, bool *save_manifest)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// 根据options中的设置来是否忽略掉一部分错误
// 即s中的错误如果在options中被忽略就把它设置为ok
// 被this->RecoverLogFile调用
void MaybeIgnoreError(Status *s) const;

// 删除不再需要的文件
// 被DB::Open调用
void RemoveObsoleteFiles() EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
// Errors are recorded in bg_error_.
// 合并内存中的memtable并持久化到磁盘中,同时创建一个新的memtable
// 被this->DoCompactionWork和this->BackgroundCompaction调用
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// 从log中恢复数据
// 被this->Recover调用
Status RecoverLogFile(uint64_t log_number, bool last_log, bool *save_manifest,
VersionEdit *edit, SequenceNumber *max_sequence)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// 把imm持久化到第0层
// 被this->CompactMemTable和this->RecoverLogFile调用
Status WriteLevel0Table(MemTable *mem, VersionEdit *edit, Version *base)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// 确保mem_有空间可以写入(memtable默认4mb就要被持久化了)
// 被this->Write调用,保证在写入之前有足够的空间
Status MakeRoomForWrite(bool force /* compact even if there is room? */)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// 这个函数目的是将多个线程写入转化成单个线程写入时用到的(后面具体讲)
// 会将多个writebatch拼接成一个writebatch
// 被this->Write调用
WriteBatch *BuildBatchGroup(Writer **last_writer)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// 唤醒所有的线程报错
void RecordBackgroundError(const Status &s);

// 它会进行一些检测,在有background compaction正在进行的过程中,
// db已经关闭时以及immutable MemTable不存在,
// 没有manual触发compaction和当前不需要compaction的情况下不会触发compaction,
// 在不满足上述条件时触发compaction。
// 其中当前是否需要compaction的判断条件是compaction score是否大于1以及有没有需要被compact的SSTable
// 与compaction过程有关,读写时都会触发要不要compaction
// 这几个函数都是压缩相关的,并且都是逐级调用的。
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void *db);
void BackgroundCall();
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState *compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState *compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// 也是压缩相关的,涉及到打开压缩文件、完成压缩过程、应用压缩到versionEdit
Status OpenCompactionOutputFile(CompactionState *compact);
Status FinishCompactionOutputFile(CompactionState *compact, Iterator *input);
Status InstallCompactionResults(CompactionState *compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// 返回一个internal比较器
const Comparator *user_comparator() const
{
return internal_comparator_.user_comparator();
}

// Constant after construction
// 私有成员

// 环境配置对象
Env *const env_;
// InternalKey比较器和过滤策略
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
const Options options_; // options_.comparator == &internal_comparator_
// 两个标志位,从options_中初始化,用于析构时是否需要释放某些内存
const bool owns_info_log_;
const bool owns_cache_;
// 名字
const std::string dbname_;

// TableCache
TableCache *const table_cache_;

// 锁
FileLock *db_lock_;

// 用mutex_保护的状态信息
// 包含memtable、immtable、log和相关的状态信息
port::Mutex mutex_;
std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable *mem_;
MemTable *imm_ GUARDED_BY(mutex_); // Memtable being compacted
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile *logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer *log_;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.

// 写入队列,这个涉及到写入相关
// 会将多线程写入转换成单线程写入(后面细锁)
std::deque<Writer *> writers_ GUARDED_BY(mutex_);
WriteBatch *tmp_batch_ GUARDED_BY(mutex_);

// 快照列表
SnapshotList snapshots_ GUARDED_BY(mutex_);

// 待compact的文件列表,保护以防误删
std::set<uint64_t> pending_outputs_ GUARDED_BY(mutex_);

// 后台压缩是否正在被调度或者运行的一个标志位
bool background_compaction_scheduled_ GUARDED_BY(mutex_);

// 手动压缩结构体
// 这个结构体也是位于本类private中
ManualCompaction *manual_compaction_ GUARDED_BY(mutex_);

// 版本集合,是一个双向链表,用于版本管理
VersionSet *const versions_ GUARDED_BY(mutex_);

// 是否后台出错
Status bg_error_ GUARDED_BY(mutex_);

// 压缩信息,这个结构体也是位于本类的private中
// 每一层的压缩状态,stats_[level]存储对应层的压缩信息
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
};

头文件可以说是相当的长,不过从头文件中我们也能看出一个数据库的大致功能了。能够给我们使用的一些接口其实并不算多,主要就是数据的写入和删除,其他的东西levelDB会自动帮我们管理。剩下一些接口主要是迭代、快照、压缩管理之类一些比较高级的操作。

我们先来关注一下写入部分

DB_Impl的写入

向数据的写入在levelDB中包含两个部分,涉及到插入一个kv的写入和删除一个kv的写入。通过查看Put和Delete的源码可知,DB_Impl调用的是DB的Put和Delete函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Status DB::Put(const WriteOptions &opt, const Slice &key, const Slice &value)
{
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}

Status DB::Delete(const WriteOptions &opt, const Slice &key)
{
WriteBatch batch;
batch.Delete(key);
return Write(opt, &batch);
}

都是先生成一个WriteBatch,然后再调用this->Write函数,并且这个Write函数也是一个公有接口,所以我们主要来关注一下这个Write函数。

我们回想一下之前讲述的写入过程:先写入wal,然后写入memtable。并且写入过程中,如果MemTable太大会触发MemTable写入到SSTable,当然还涉及到一些具体的实现细节,我们看完源码后再说。

Write函数也是按照这个步骤来实现的。不过,由于levelDB是多线程的,为了实现线程安全,每个线程在写入时,都需要获取锁,但是这样会阻塞其它的线程,降低并发度。针对这个问题LevelDB做了一个优化,写入时当获取锁后,会将WriteBatch放入到一个std::deque<Writer\*> DBImpl::writers_里,然后会检查writers_里的第一个元素是不是自己,如果不是的话,就会释放锁。当一个线程检查到writers_头元素是自己时,会再次获取锁,然后将writers_里的数据尽可能多的写入。一次写入会涉及到写日志,占时间比较长,一个线程的数据可能被其它线程批量写入进去了,减少了等待。

总结来说,一个线程的写入有两种情况:一种是恰好自己是头结点,自己写入,另外一种是别的线程帮助自己写入了,自己会检查到写入,然后就可以返回了。

我们来详细看一下Write函数的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126

// 用来封装一个WriteBatch,用来标识状态
struct DBImpl::Writer
{
explicit Writer(port::Mutex *mu)
: batch(nullptr), sync(false), done(false), cv(mu) {}

Status status;
WriteBatch *batch;
bool sync;
bool done;
port::CondVar cv;
};

Status DBImpl::Write(const WriteOptions &options, WriteBatch *updates)
{
// 构造一个writer结构体,插入到wrtiers_里,注意插入前需要先获取锁
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);
// 检查done判断是否完成,或者自己是否是writers_队列里的第一个成员。
// 有可能其它线程写入时把自己的内容也写入了,
// 这样自己就是done,或者当自己是头元素了,表示轮到自己写入了
while (!w.done && &w != writers_.front())
{
w.cv.Wait();
}
if (w.done)
{
return w.status;
}
// 如果是非头结点,则至此,其内容已经被头结点写入了
// 下面的所有函数都是针对头结点而言的

// May temporarily unlock and wait.

// 先检查一下memtable中是否有足够的空间
// 同时获取到最新的序列号
// 注意MakeRoomForWrite函数中如果发现写入太快,会进行休眠限流
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer *last_writer = &w;
// 已经完成写入的准备工作
if (status.ok() && updates != nullptr)
{ // nullptr batch is for compactions
// 调用这个函数来将队列中的多个batch组合成一个batch
// 并为这个batch设置好序列号
WriteBatch *write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

// 下面是正式的写入,会写入log和memtable
//
{
// 注意这一步解锁,很关键,因为接下来的写入可能是一个费时的过程
// 解锁后,其它线程可以Get,其它线程也可以继续将writer
// 插入到writers_里面,但是插入后,因为不是头元素,会等待,所以不会冲突
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync)
{
status = logfile_->Sync();
if (!status.ok())
{
sync_error = true;
}
}
// 然后写入到memtable中
if (status.ok())
{
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}

// 加锁需要修改全局的SequenceNumber以及writers_
mutex_.Lock();
if (sync_error)
{
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
// RecordBackgroundError会将错误记录到 bg_error_ 字段中
// 并唤醒 background_work_finished_signal_ 用于向后台任务周知此错误
// 想说明的是:刚才写入到log中的数据在下一次db重新打开时可能不会出现
// 所以让数据库所有的后续写入都失败
RecordBackgroundError(status);
}
}
// 正常情况下tmp_batch_就是write_batch
// 因为BuildBatchGroup中tmp_batch_被设置成result,同时返回值也是result(这是指针)
if (write_batch == tmp_batch_)
tmp_batch_->Clear();

// 更新序列号
versions_->SetLastSequence(last_sequence);
}

// 从writers_队列从头开始,将写入完成的writer标识成done,并且弹出,通知这些writer
// 这样这些writer的线程会被唤醒,发现自己的写入已经完成了,就会返回
while (true)
{
Writer *ready = writers_.front();
writers_.pop_front();
if (ready != &w)
{
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer)
break;
}

// 如果writers_里还有元素,就通知头元素,让它可以进来开始写入
if (!writers_.empty())
{
writers_.front()->cv.Signal();
}

return status;
}

Write函数的过程为:先获取锁,同时把batch插入到write_中,检测头结点是不是自己以及队列中的batch是否插入完成了,如果都没有则进入等待中并释放锁。反之,则先将若干个batch打包成一个大的batch,然后按照先写入wal再写入memtable的顺序完成写入。在写入期间锁会被解开,其他线程可以往write_中插入batch。写入完成后重新上锁,更新序列号并从write_中删除写入的部分。随后唤醒新的write_中的头线程(如果有的话)。同样的来看一下这个函数的流程图。

多个线程想要写入的时候,先把需要写入的batch加入到队列中,但是这些线程之间会相互竞争需要注意的是,仅有某个线程正在写入或没有线程获取锁的时候,才能有线程插入到write_中

假设线程是t1-t5,然后write_队列中的竞争结果为t2、t1、t3,那么此时只有t2线程会执行,剩余的线程会被锁住,t2线程会把t2、t1、t3这几个batch一起打包成一个大的batch,然后插入到log和memtable中,在持久化过程中,t4、t5又会把对应的batch插入到write_中,那么此时的write_为t2、t1、t3、t4、t5,等到t2持久化完成后,会把t2、t1、t3标记为已插入并从队列中移除,随后通知t4进行插入。这样就利用了磁盘写的长IO耗时,有效地增加了效率。过程如图。

写入过程中的限流策略

我们上述对write的描述其实忽略了一部分细节。比如:memtable达到了最大的空间限制怎么办?immemtable仍然存在的时候memtable写满了怎么办?level0层块太多怎么办?

这部分问题在Write函数中具体的体现其实就是MakeRoomForWrite和BuildBatchGroup这两个函数。我们来看一下这两个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
// 需要保证锁住,且此函数需要被write_中第一个结点对用的线程调用
// 作用是为写入提供准备,包括限流等策略
Status DBImpl::MakeRoomForWrite(bool force)
{
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true)
{
if (!bg_error_.ok())
{
// Yield previous error
s = bg_error_;
break;
}
else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger)
{
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
// 如果第0层数目超过限制(默认8),就会sleep 1ms,并在此期间解开锁
// 这有助于压缩线程工作
// 并且此限流策略只会执行一次
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
}
else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size))
{
// There is room in current memtable
// 如果memtable有空间,就直接返回,此时保证了memtable有空间可以写入
break;
}
else if (imm_ != nullptr)
{
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
// 如果memtable没有空间,且immemtable存在
// 即之前转换的immemtable存在,就一直等,直到上一次的immemtable被持久化
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
}
else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger)
{
// There are too many level-0 files.
// 如果第0层数目太多(默认12),就一直等待,直到满足条件
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();
}
else
{
// 否则就尝试将一个memtable转换成immemtable来写入
// 同时还需要创建一个新的memtable
// 还要触发压缩线程
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile *lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok())
{
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}

// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-null batch
// 需要保证write_队列非空,且队列中第一个batch也是非空的
WriteBatch *DBImpl::BuildBatchGroup(Writer **last_writer)
{
mutex_.AssertHeld();
assert(!writers_.empty());
Writer *first = writers_.front();
WriteBatch *result = first->batch;
assert(result != nullptr);

// 首先计算出一个max_size,表示构造的WriterBatch的最大尺寸
size_t size = WriteBatchInternal::ByteSize(first->batch);

// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128 << 10))
{
max_size = size + (128 << 10);
}

// 然后开始扫描writers_数组,直到满足WriterBatch超过max_size
// 或者碰到一个与当前batch的sync不匹配的就结束
*last_writer = first;
std::deque<Writer *>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter)
{
Writer *w = *iter;
if (w->sync && !first->sync)
{
// Do not include a sync write into a batch handled by a non-sync write.
break;
}

if (w->batch != nullptr)
{
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size)
{
// Do not make batch too big
break;
}

// Append to *result
if (result == first->batch)
{
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}

先说MakeRoomForWrite,它主要是限流和触发后台线程等工作:

  • 首先判断Level 0的文件是否>=8,是的话就sleep 1ms,这里是限流的作用,Level 0的文件太多,说明写入太快,Compaction跟不上写入的速度,而在读取的时候Level 0的文件之间可能有重叠,所以太多的话,影响读取的效率,这算是比较轻微的限流,最多sleep一次
  • 接下来判断MemTable里是否有空间,有空间的话就可以返回了,写入就可以继续
  • 如果MemTable没有空间,判断Immutable MemTable是否存在,存在的话,说明上一次写满的MemTable还没有完成写入到SSTable中,说明写入太快了,需要等待Immutable MemTable写入完成
  • 再判断Level 0的文件数是否>=12,如果太大,说明写入太快了,需要等待Compaction的完成
  • 到这一步说明可以写入,但是MemTable已经写满了,需要将MemTable变成Immutable MemTable,生成一个新的MemTable,触发后台线程写入到SSTable中

然后来说一下BuildBatchGroup,这个函数的作用是将多个位于write_队列中的batch合并成一个更大的batch用于写入,它会从writers_头部开始扫描,将尽可能多的writer生成一个新的WriterBatch,将这些writer的内容批量写入。步骤如下:

  • 首先计算出一个max_size,表示构造的WriterBatch的最大尺寸;
  • 然后开始扫描writers_数组,直到满足WriterBatch超过max_size
  • 另外,第一个writer的sync决定了这整个write是不是sync的,如果第一个writer不是sync的,碰到一个sync的writer,表面这个writer无法加入到这个批量写入中,所以扫描就结束了

DB_Impl的读取

了解写入后,再来说读取,LSM Tree里面,读取往往比写入更复杂,写入往往只涉及一次磁盘IO,但是读取可能涉及多次磁盘IO。

LevelDB的数据可能存在三个地方:

  • 最新的写入在MemTable里
  • 如果上一次MemTable写满后,转换为Immutable MemTable,如果还没有完成写入到SSTable里,那么就有可能存在于Immutable MemTable里
  • 存在于SSTable里,SSTable里的数据从Level 0开始从新变旧,Level越低,数据越新。Level 0里的SSTable比较特殊,因为是MemTable转换过来的,虽然每个SSTable都是有序的,但是每个SSTable的键的范围可能有重叠,也就是一个键可能存在于多个SSTable里,文件名下标越大的SSTable里的键越新。而非level 0的SSTable,都是后台生成的,保证了SSTable之间的数据无重叠,一个键只有可能存在于一个SSTable里

所以读取一个键时:

  • 先读取MemTable,存在就读到数据了;
  • 不存在的话,看看Immutable MemTable是否存在,存在的话,读取数据;
  • 否则,从SSTable的Level 0开始读取,如果Level 0还没有找到话,读Level 1,以此类推,直到读到最高层。

另外,别忘了,levelDB是有缓存的,在读取的时候还要查找缓存。写入的话可以把memtable理解成缓存

DBImpl::Get

先来看一下DBImpl中的Get实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

Status DBImpl::Get(const ReadOptions &options, const Slice &key,
std::string *value)
{
Status s;
// 加锁,读取元数据,增加引用
MutexLock l(&mutex_);

// 设定snapshot,其实就是一个SequenceNumber,这是实现Snapshot的关键,设置成选项中的snapshot,或者
// 当前最近的SequenceNumber
SequenceNumber snapshot;
if (options.snapshot != nullptr)
{
snapshot =
static_cast<const SnapshotImpl *>(options.snapshot)->sequence_number();
}
else
{
snapshot = versions_->LastSequence();
}

// mem是MemTable,imm是Immutable MemTable,
// SSTable则由当前的version代表,一个version包含了当前版本的SSTable的集合
MemTable *mem = mem_;
MemTable *imm = imm_;
Version *current = versions_->current();
// 这里都调用了Ref,LevelDB里大量使用了这种引用计数的方式管理对象,这里表示读取需要引用这些对象,假设在读取过程
// 中其他线程发生了MemTable写满了,或者Immutable MemTable写入完成了需要删除了,或者做了一次Compaction,生
// 成了新的Version,所引用的SSTable不再有效了,这些都需要对这些对象做一些改变,比如删除等,但是当前线程还引用着
// 这些对象,所以这些对象还不能被删除。采用引用计数,其它线程删除对象时只是简单的Unref,因为当前线程还引用着这些
// 对象,所以计数>=1,这些对象不会被删除,而当读取结束,调用Unref时,如果对象的计数是0,那么对象会被删除。
mem->Ref();
if (imm != nullptr)
imm->Ref();
current->Ref();

bool have_stat_update = false;
Version::GetStats stats;

// 实际读取时解锁
{
mutex_.Unlock();
// 构造一个Lookup Key搜索MemTable
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s))
{
// 找到了
}
// 然后查找imm中
else if (imm != nullptr && imm->Get(lkey, value, &s))
{
// 找到
}
else
{
// 然后需要从当前的sstable集合中查找
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}

if (have_stat_update && current->UpdateStats(stats))
{
// 表示由于某个文件seek次数过多需要合并
// 合并条件是在sstable中找到了key,但是不止在一个sstable中找到(后面细锁)
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr)
imm->Unref();
current->Unref();
return s;
}

DBImpl中Get其实就是简单地从memtable、immemtable和所有sstable的集合中查找key,由于memtable、immemtable均为跳表,所以需要用lookupkey来查找。另外,由于levelDB把所有sstable看作是一个版本(这个后面重点会讲,版本管理也是非常多的代码),所以current->Get这个本质上也是从sstable中查找,这时候要用到internalkey。

还有一个比较值得注意的地方就是调用MaybeScheduleCompaction函数,可能会令人困惑。它的目的是对sstable进行压缩清理,此处不展开。

由于levelDB本质上没有delete key的说法,所以多次对于一个key的修改啥的都会保留,只是通过序列号来判断哪个是最新的。而key在多个sstable中被查到,说明此key存在多次修改,不如将他合并成一个sstable,这样会带来性能上的提升(合并的话就删除掉一些sstable,然后生成一个大的sstable) 此处有错误

启用压缩的条件是:存在一个sstable多次未命中数据。这样做可以把这个sstable压缩至更高层,以减少该sstable的读取次数。

但实际上,读写操作都会调用MaybeScheduleCompaction函数,这个函数可能会启动压缩。其中读在Get中调用,写则是在MakeRoomForWrite中调用

由于memtable的查找本质上就是跳表的查找,比较简单,就不赘述了。

Version::Get

我们来看看SSTable的读取,version保存了SSTable,是一个分层的SSTable的集合,这其实就是LevelDB名字的由来。

version里SSTable的集合有以下特点:

  • 数据从Level 0开始由新变旧,所以最新的数据在最低层,也就是如果一个键有多个值的话,新的肯定在低层,旧的在高层
  • 因为 Level 0里的SSTable是MemTable 写入的,Level 0里的每个SSTable的键之间可能有重叠,所以一个键可能存在于多个SSTable里
  • 而其它Level,SSTable都是Compaction产生的,键没有重叠,一个键只有可能存在于一个SSTable里
  • 对于Level 0的SSTable,文件编号越大的,里面的键越新

如图,红色是需要读取的SSTable,当读取时,Level 0可能有多个SSTable需要读取,而其它Level最多只有一个SSTable需要读取。

关于Version我们后面具体讲这个重量级的东西。先简单看一下,一个SSTable在内存里使用FileMetaData来表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// db/version_edit.h

struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}

int refs;
int allowed_seeks; // 判断什么时候可以Compaction
uint64_t number; // 文件编号
uint64_t file_size; // 文件大小bytes
InternalKey smallest; // 这个SSTable里最小的Internal Key
InternalKey largest; // 这个SSTable里最大的Internal Key
};

std::vector<FileMetaData*> Version::files_[config::kNumLevels];

可以看到每个FileMetaData都有一个键的范围,所以在读取时可以快速判断键是否可能在这个SSTable里,这样就可以选出相应的键。

而一个Version里的SSTable保存在一个vector数组里,每一个Level对应一个vector,每个vector保存了FileMetaData,对于非Level 0,这些FileMetaData是有序的,也就是第n个SSTable的最大键小于第n + 1个SSTable的最小键,所以可以通过二分搜索找到某个键位于哪个SSTable,但是level 0层的sstable是memtable的副本,所以key上会有重叠。

我们来看一下其查找的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162

Status Version::Get(const ReadOptions &options, const LookupKey &k,
std::string *value, GetStats *stats)
{
stats->seek_file = nullptr;
stats->seek_file_level = -1;

// 描述查询状态的结构体
struct State
{
Saver saver;
GetStats *stats;
const ReadOptions *options;
Slice ikey;
FileMetaData *last_file_read;
int last_file_read_level;

VersionSet *vset;
Status s;
bool found;

// 这个函数的作用是在第level层的f中查找key
static bool Match(void *arg, int level, FileMetaData *f)
{
State *state = reinterpret_cast<State *>(arg);

if (state->stats->seek_file == nullptr &&
state->last_file_read != nullptr)
{
// We have had more than one seek for this read. Charge the 1st file.
state->stats->seek_file = state->last_file_read;
state->stats->seek_file_level = state->last_file_read_level;
}

state->last_file_read = f;
state->last_file_read_level = level;

// 从缓存中获取sstable
state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
// 从sstable中查找
if (!state->s.ok())
{
state->found = true;
return false;
}
switch (state->saver.state)
{
case kNotFound:
return true; // Keep searching in other files
case kFound:
state->found = true;
return false;
case kDeleted:
return false;
case kCorrupt:
state->s =
Status::Corruption("corrupted key for ", state->saver.user_key);
state->found = true;
return false;
}

// Not reached. Added to avoid false compilation warnings of
// "control reaches end of non-void function".
return false;
}
};

State state;
state.found = false;
state.stats = stats;
state.last_file_read = nullptr;
state.last_file_read_level = -1;

state.options = &options;
state.ikey = k.internal_key();
state.vset = vset_;

state.saver.state = kNotFound;
state.saver.ucmp = vset_->icmp_.user_comparator();
state.saver.user_key = k.user_key();
state.saver.value = value;

// 尝试找到对应的key
ForEachOverlapping(state.saver.user_key, state.ikey, &state, &State::Match);

return state.found ? state.s : Status::NotFound(Slice());
}

// 从所有sstable中逐层查找,直到找到
// 其中arg是state结构体
// func是在state中查找sstable的函数
void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void *arg,
bool (*func)(void *, int, FileMetaData *))
{
const Comparator *ucmp = vset_->icmp_.user_comparator();

// 第0层的key存在重叠,所以每个都要查找
std::vector<FileMetaData *> tmp;
// 这里是直接对数组扩容,因为我们已经知道大小了,就不要让他自动扩容了
tmp.reserve(files_[0].size());

// 遍历查找所有可能符合的key范围的sstable
for (uint32_t i = 0; i < files_[0].size(); i++)
{
// 遍历第0层所有files
FileMetaData *f = files_[0][i];
// 如果查找的key就在某一个sstable的范围内,就把这个sstable放入tmp中
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0)
{
tmp.push_back(f);
}
}

if (!tmp.empty())
{
// 由新到旧排序,NewestFirst是一个根据序列号的比较器
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++)
{
// 找到了的话就直接返回
// func返回true说明是没找到
if (!(*func)(arg, 0, tmp[i]))
{
return;
}
}
}

// 在第0层找不到就继续向下查找
for (int level = 1; level < config::kNumLevels; level++)
{
size_t num_files = files_[level].size();
if (num_files == 0)
continue;

// 由于除了第0层外,剩余层的key范围不重叠,所以每层至多一个sstable包含key
// 此处就是把可能含有key的那个sstable找出来
// 查找返回的结果是比要查找的key大的最小的一个sstable
// 二分法
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files)
{
FileMetaData *f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0)
{
// 这一层内唯一个的一个可能含有key的sstable的范围也不包含key
}
else
{
if (!(*func)(arg, level, f))
{
// 找到就返回
return;
}
}
}
}
}

我们可以看到,在Version::Get中,定义了一个结构体,可以理解成这个结构体就是全部sstable的集合,主要是为了对一些数据进行封装,方便编码。这个结构体有一个Match函数,这个函数的作用是在第level层的f中查找key(这三个都是传入的参数,其中f可以理解成sstable的索引)。在Version::Get中,真正开始遍历查询各个层的sstable的函数其实是ForEachOverlapping,当然这里的sstable也指的是索引,只记录了key的范围和其他一些信息,下面这两段也是通过sstable索引查找

在这个函数中,它会从第0层开始,逐层查找每层中可能含有key的sstable,并使用结构体中的Match函数判断是否真的存在,这个Match函数是通过函数指针传递进去的。另外,由于第0层可能有多个sstable含有key,而其他层则只会最多一个sstable含有key,所以这部分逻辑还是稍有区别的,具体体现在:第0层是遍历;其他层是二分查找。

最后来再看一下这个Match函数,这个函数就是利用sstable的索引来从sstable缓存中拿到真正的那个sstable,并从中查找key,并通过key的类型(删除还是插入还是key损坏)来返回结果。

TableCache::Get

虽然之前在缓存中已经阐述过了TableCache::Get方法,但是为了连贯性,此处还是把这一点点代码单独拿出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

// 从缓存中获取key-value
Status TableCache::Get(const ReadOptions &options, uint64_t file_number,
uint64_t file_size, const Slice &k, void *arg,
void (*handle_result)(void *, const Slice &,
const Slice &))
{
Cache::Handle *handle = nullptr;
// 先从缓存中查找对应的table的缓存项
// 如果缓存中没有这个table,就会从文件中打开并插入到缓存中
// 但是TableCache缓存的实际是sstable的索引项
Status s = FindTable(file_number, file_size, &handle);
if (s.ok())
{
Table *t = reinterpret_cast<TableAndFile *>(cache_->Value(handle))->table;
// 然后从sstable中把这个值给拿出来,这个值会被handle_result这个回调函数处理
s = t->InternalGet(options, k, arg, handle_result);
cache_->Release(handle);
}
return s;
}

// 从TableCache中查找Table缓存的函数
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle **handle)
{
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf)); // 查找时的key为filenumber的编码
*handle = cache_->Lookup(key);
if (*handle == nullptr)
{
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile *file = nullptr;
Table *table = nullptr;
s = env_->NewRandomAccessFile(fname, &file);
if (!s.ok())
{
std::string old_fname = SSTTableFileName(dbname_, file_number);
if (env_->NewRandomAccessFile(old_fname, &file).ok())
{
s = Status::OK();
}
}
if (s.ok())
{
s = Table::Open(options_, file, file_size, &table);
}

if (!s.ok())
{
assert(table == nullptr);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
}
else
{
// 找不到就新打开它
TableAndFile *tf = new TableAndFile;
tf->file = file;
tf->table = table;
*handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
}
return s;
}

我们之前也提到过,TableCache中存储的是sstable的索引项,所以我们能通过这些索引来快速一个key是否真正位于此sstable中(布隆过滤器),以及它会在哪里出现(block的索引)。TableCache::Get就是先调用FindTable来查找一下这个sstable的索引,然后调用InternalGet来通过索引获取这个值。其中FindTable作用也很简单,就是先在已有的table索引缓存项中查找,如果找到了就返回这个索引;找不到,就从文件中打开这个sstable,并把缓存插入到索引中。

至于InternalGet这个函数,它就是正经从sstable的block中获取值了。

Table::InternalGet

我们来看一下这个函数的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

// 从sstable的block中获取值
Status Table::InternalGet(const ReadOptions &options, const Slice &k, void *arg,
void (*handle_result)(void *, const Slice &,
const Slice &))
{
Status s;
// 搜索索引
Iterator *iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);
if (iiter->Valid())
{
Slice handle_value = iiter->value();
// 如果使用了布隆过滤器,则先查找布隆过滤器,如果没有发现,就直接返回了
FilterBlockReader *filter = rep_->filter;
BlockHandle handle;
if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k))
{
// Not found
}
else
{
// 读取一个Data Block
Iterator *block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid())
{
// 找到对应的键,调用回调函数
(*handle_result)(arg, block_iter->key(), block_iter->value());
}
s = block_iter->status();
delete block_iter;
}
}
if (s.ok())
{
s = iiter->status();
}
delete iiter;
return s;
}

// 读取一个Block,BlockCache就在这里哦
Iterator *Table::BlockReader(void *arg, const ReadOptions &options,
const Slice &index_value)
{
Table *table = reinterpret_cast<Table *>(arg);
Cache *block_cache = table->rep_->options.block_cache;
Block *block = nullptr;
Cache::Handle *cache_handle = nullptr;

BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.

if (s.ok())
{
BlockContents contents;
// BlockCache在这里
// 根据block handle,首先尝试从blockcache中直接取出block,不在blockcache中则
// 调用ReadBlock从文件读取,读取成功后,根据option尝试将block加入到blockcache中。
// 并在Insert的时候注册了释放函数DeleteCachedBlock。
if (block_cache != nullptr)
{
char cache_key_buffer[16];
EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
EncodeFixed64(cache_key_buffer + 8, handle.offset());
Slice key(cache_key_buffer, sizeof(cache_key_buffer));
// 先查一下缓存,key是由cache_id和handler的偏移值共同组成
cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr)
{
// 查找到就优先从缓存中返回
block = reinterpret_cast<Block *>(block_cache->Value(cache_handle));
}
else
{
// 否则就添加到缓存
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok())
{
block = new Block(contents);
if (contents.cachable && options.fill_cache)
{
cache_handle = block_cache->Insert(key, block, block->size(),
&DeleteCachedBlock);
}
}
}
}
else
{
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok())
{
block = new Block(contents);
}
}
}

// 返回block的迭代器
Iterator *iter;
if (block != nullptr)
{
iter = block->NewIterator(table->rep_->options.comparator);
if (cache_handle == nullptr)
{
iter->RegisterCleanup(&DeleteBlock, block, nullptr);
}
else
{
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
}
}
else
{
iter = NewErrorIterator(s);
}
return iter;
}

而对于Table::InternalGet,基于前面介绍的SSTable的内存结构是比较好理解的:

  • 先搜索索引块,找到对应的索引,根据索引找到对应的Data Block
  • 查找Data Block。

其中还需要注意在读一个DataBlock的时候是使用到BlockCache的,只不过缓存这个东西有点感觉不到。

因为SSTable里保存的是Internal Key,但是搜索的是User Key,而Iterator Seek的是第一个大于等于待搜索的键的数据项,如果某个User Key不存在,是会定位到下一个User Key上面的,所以找到Internal Key后,还需要比较里的User Key是否相同,这就是回调函数的作用了。

读写的总结

读取

对于读取,如果键刚好在MemTable里,那么就是内存访问会非常快。否则就和需要读取的SSTable数量有关。前面说过SSTable里索引等数据是读入内存的,所以每次读取SSTable时,最多只需要一次磁盘IO,读取一个Data Block。一般Level 0的文件最多为4个,而Level最高为7层,这样就需要10次IO。对于一个不存在的键或者一个很早就写入的键,但是键在多个SSTable的范围内,往往需要花费最大的磁盘IO。然而这是最坏的情景,实际上有优化来减少了磁盘IO:

  • Data Block的缓存,如果在缓存里发现了要读取的Data Block,那么不需要磁盘IO;
  • 布隆过滤器,对于不存在的键有很好的优化,默认情况下99%的正确率,可以宣告一个键不在这个SSTable里,那么也无需缓存,典型的空间换时间。

所以很多读只需要一次磁盘IO,或者不需要磁盘IO。

  • 需要从DB中读取一个key的时候,首先调用DBImpl::Get函数,这个函数首先查找memtable、immemtable这些位于内存中的跳表,调用跳表的get方法来查找,这中间能够查找到的一定是最新的数据,可以直接返回
  • 如果上述查找不到,则会调用Version::Get从所有sstable的集合中查找。并且如果一个sstable多次查不到数据,还会尝试将冷数据堆到更高层上去。
  • 在Version::Get中,会调用Version::ForEachOverlapping来对每一层的sstable记录进行查找,找到范围包括key的sstable。其中要区别第0层和其他层的sstable的不同之处(即第0层key会有重叠,其他层不会)
  • Version::ForEachOverlapping就是简单的遍历一下各个sstable的范围,调用Version::State::Match来对每个可能含有key的sstable进行具体查找。
  • Version::State::Match则首先调用TableCache::Get来从TableCache中获取key对应的值,然后根据值的类型(插入、删除)来调用回调函数处理。
  • TableCache::Get会调用TableCache::FindTable找出sstable的索引,如果索引在缓存不命中,就会打开这个sstable的索引并插入到缓存中。
  • TableCache::Get在拿到sstable的索引后会调用Table::InternalGet来根据索引和BlockCache来正式获取值。
  • 由于user_key和internal_key不同,还需要回调函数来比较一下查找到的key是不是我们要的key,如果是的话,就返回。

写入

对于写入,如果不用sync的方式写入,其实基本就是写内存,而对于sync的方式,每次write都需要做一次磁盘IO,但是磁盘IO是写文件,是顺序IO,所以也是相当快的。而且在写入时,后面来的写请求,都会堆积起来,在后一个写入中批量写入,这样的一次磁盘IO其实是被平摊了。这就是为什么LSM Tree写入快的原因了。

  • 调用DBImpl::Put或DBImpl::Delete进行写入,会生成一个WriteBatch,然后调用DBImpl::Write去处理这个WriteBatch。
  • Write函数的过程为:先获取锁,同时把batch插入到write_中,检测头结点是不是自己以及队列中的batch是否插入完成了,如果都没有则进入等待中并释放锁。反之,则先将若干个batch打包成一个大的batch,然后按照先写入wal再写入memtable的顺序完成写入。在写入期间锁会被解开,其他线程可以往write_中插入batch。写入完成后重新上锁,更新序列号并从write_中删除写入的部分。
  • 当然,在真正写入之前,会调用DBImpl::MakeRoomForWrite,这个函数会做一些限流策略。

总结

  • 使用队列来将线程异步写转化成单线程同步写
  • 设计的时候需要多多分层,按照层级来设置缓存和接口,接口要尽可能简单,但是背后的逻辑可能会有点复杂