0%

levelDB源码剖析(15)--数据库的版本管理

数据库版本管理

源码位置与说明

version_edit.h version_edit.cc: 实现VersionEdit
version_set.h version_set.cc:实现Version、VersionSet和Builder

版本其实就是LevelDB数据库的元数据,之前提到过,在MemTable读取不到键时,需要去SSTable读取。SSTable的文件有哪些,每一个文件分别在哪一个Level上面,每个文件里面包含的键的最小值最大值是什么。需要知道这些信息,才可以快速的从SSTable里读取出相应的键的值。而版本就保存了这些信息,让我们通过版本读取SSTable。

那么为什么要版本管理呢?在数据不断写入后,MemTable写满了,这时候就会转换为Level 0的一个SSTable,或者Level n的一个文件和Level n + 1的多个文件进行Compaction,会转换成Level n + 1的多个文件。这会使SSTable文件数量改变,文件内容改变,也就是版本信息改变了,所以需要管理版本。

那么有一个版本就够了吗?在文件数量和内容改变时,修改当前的版本就行了。但是并不是,假设使用了整个数据库的迭代器,LevelDB在迭代数据库时,是提供了一个一致性的视图,也就是只能看到迭代前的写入,而迭代后的写入是看不到,在实现的时候,迭代器会引用这个版本,以及里面的SSTable,直到迭代结束。如果在迭代过程中,有文件删除了,那么就无法引用到这个文件了,就会出错。所以需要多个版本,有一个版本是当前版本,当新生成版本后,旧的版本如果有被其它线程引用,也需要保留,直到不被引用后,才会被删除,所以一个时刻可能有多个版本存在。

先看看LevelDB的版本管理架构图,可以看到主要用到了4个类:

  • Version标识了一个版本,主要信息是这个版本包含的SSTable信息
  • VersionSet是一个版本集,里面保存了Version的一个双链表,其中有一个Version是当前版本,因为只有一个实例,还保存了其它的一些全局的元数据,Dummy Version是链表头
  • VersionEdit保存了要对Version做的修改,在一个Version上应用一个VersionEdit,可以生成一个新的Version
  • Builder是一个帮助类,帮助Version上应用VersionEdit,生成新版本

版本管理的基本工作流程如下:

  • VersionSet里保存着当前版本,以及被引用的历史版本
  • 当有Compaction发生时,会将更改的内容写入到一个VersionEdit中
  • 利用Builder将VersionEdit应用到当前版本上面生成一个新的版本
  • 将新版本链接到VersionSet的双链表上面
  • 将新的版本设置为当前版本
  • 将旧的当前版本Unref,就是引用计数减1

版本控制中使用了引用计数来管理历史版本:

  • 假设一个没有被访问的数据库,这时候只有一个版本,也就是当前版本v1,它的引用计数是1
  • 假设有一个迭代器开始访问数据库,这时候它会对当前版本v1 Ref,引用计数变成了2
  • 其它线程不断写入,导致了Compaction的生成,这时候生成了一个新的版本v2成为了当前版本,引用计数为1,然后对v1 Unref,这时候v1的引用计数为1
  • 因为v1的引用计数为1,所以不会被删除,迭代器线程还可以继续访问v1
  • 迭代器结束后,对v1进行Unref,这时候v1的引用计数变为0,就会从链表上删除了,这时候又只剩下v2版本了
  • 版本里面的SSTable也是采用引用计数来管理的,每个版本引用一个SSTable会Ref,删除版本时会Unref,如果一个SSTable的计数为0,那么这个SSTable就可以被删除了

Version

我们首先来看一下Version这个类。Version这个类标识了一个版本,读取数据库时都需要使用Version里面的版本信息,主要保存的是SSTable的文件信息。

先来看一下头文件部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120

class Version
{

private:
friend class Compaction; // 压缩线程友元类
friend class VersionSet; // 版本集合友元类

class LevelFileNumIterator; // 用于迭代某一层的迭代器

// 构造函数,不允许直接创建此类,只能被友元创建
explicit Version(VersionSet *vset)
: vset_(vset),
next_(this),
prev_(this),
refs_(0),
file_to_compact_(nullptr),
file_to_compact_level_(-1),
compaction_score_(-1),
compaction_level_(-1)
{
}
Version(const Version &) = delete;
Version &operator=(const Version &) = delete;
~Version();

// 返回一个双层迭代器,第一层用来遍历对应level的所有文件
// 第二层用来遍历某个文件的cache中的k-v,使用此双层迭代器
// 可以方便的遍历一个level中所有的k-v
Iterator *NewConcatenatingIterator(const ReadOptions &, int level) const;

// Call func(arg, level, f) for every file that overlaps user_key in
// order from newest to oldest. If an invocation of func returns
// false, makes no more calls.
//
// REQUIRES: user portion of internal_key == user_key.
// 对于所有包含user-key的file,按照从新到旧的顺序,应用func函数
// 直到func返回为false
void ForEachOverlapping(Slice user_key, Slice internal_key, void *arg,
bool (*func)(void *, int, FileMetaData *));

VersionSet *vset_; // 当前版本所在的版本集合
Version *next_; // 下一个版本实例的指针
Version *prev_; // 前一个版本实例的指针
int refs_; // 引用计数

// SSTable的信息,每一项代表相应Level的SSTable信息
// 除了Level 0外,每个Level里的文件都是按照最小键的顺序排列的,并且没有重叠
// 通过这个数据项,搜索SSTable时,就可以从Level 0开始搜索
std::vector<FileMetaData *> files_[config::kNumLevels];

// 下一个需要压缩的文件
FileMetaData *file_to_compact_;
int file_to_compact_level_;

// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
double compaction_score_;
int compaction_level_;

public:
// 获取sstable文件信息的结构体
struct GetStats
{
FileMetaData *seek_file;
int seek_file_level;
};

// Append to *iters a sequence of iterators that will
// yield the contents of this Version when merged together.
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions &, std::vector<Iterator *> *iters);

// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
// REQUIRES: lock is not held
// 获取值
Status Get(const ReadOptions &, const LookupKey &key, std::string *val,
GetStats *stats);

// Adds "stats" into the current state. Returns true if a new
// compaction may need to be triggered, false otherwise.
// REQUIRES: lock is held
// 将stats 更新到version,stats记录了seek的结果,包括sts文件信息和seek的level
bool UpdateStats(const GetStats &stats);

// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes. Returns true if a new compaction may need to be triggered.
// REQUIRES: lock is held
// 取样式get一个key,通过get,来判断当前是否需要触发compact
bool RecordReadSample(Slice key);

// 引用计数相关
void Ref();
void Unref();

// 将所有某层中超过范围的文件保存在input中
void GetOverlappingInputs(
int level,
const InternalKey *begin, // nullptr means before all keys
const InternalKey *end, // nullptr means after all keys
std::vector<FileMetaData *> *inputs);

// 当某层中存在文件的键的范围超过限制,就返回1
bool OverlapInLevel(int level, const Slice *smallest_user_key,
const Slice *largest_user_key);

// 计算当memtable中的key范围是[smallest_user_key,largest_user_key]时,
// 应该将其压缩到那一层中(一般是在第0层,最高会堆到第2层)
int PickLevelForMemTableOutput(const Slice &smallest_user_key,
const Slice &largest_user_key);

int NumFiles(int level) const { return files_[level].size(); }

// 返回一条人类可读的字符串来描述当前版本的信息
std::string DebugString() const;
};

其中,我们仔细观察一下其私有成员部分,就能发现,它含有类似于双向量表中结点的前驱后继。实际上,VersuionSet可以看成是一个双向链表,而Version则是其中的结点。

每个Version中,还会将所有的此版本中的SSTable存放在一个vector数组中,每一层对应一个下标。

可以看到Version是比较简单的,最重要的信息就是这个版本包含的SSTable的信息。通过这些信息,版本提供了接口Status Version::Get(const ReadOptions&, const LookupKey& key, std::string* val, GetStats* stats),可以在这些SSTable里读取出相应的键的值。

我们这里来看一下Version::Get接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145

Status Version::Get(const ReadOptions &options, const LookupKey &k,
std::string *value, GetStats *stats)
{
stats->seek_file = nullptr;
stats->seek_file_level = -1;

// 定义状态结构体
struct State
{
Saver saver;
GetStats *stats;
const ReadOptions *options;
Slice ikey;
FileMetaData *last_file_read;
int last_file_read_level;

VersionSet *vset;
Status s;
bool found;

static bool Match(void *arg, int level, FileMetaData *f)
{
State *state = reinterpret_cast<State *>(arg);

if (state->stats->seek_file == nullptr &&
state->last_file_read != nullptr)
{
// We have had more than one seek for this read. Charge the 1st file.
state->stats->seek_file = state->last_file_read;
state->stats->seek_file_level = state->last_file_read_level;
}

state->last_file_read = f;
state->last_file_read_level = level;

state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
if (!state->s.ok())
{
state->found = true;
return false;
}
switch (state->saver.state)
{
case kNotFound:
return true; // Keep searching in other files
case kFound:
state->found = true;
return false;
case kDeleted:
return false;
case kCorrupt:
state->s =
Status::Corruption("corrupted key for ", state->saver.user_key);
state->found = true;
return false;
}

// Not reached. Added to avoid false compilation warnings of
// "control reaches end of non-void function".
return false;
}
};

State state;
state.found = false;
state.stats = stats;
state.last_file_read = nullptr;
state.last_file_read_level = -1;

state.options = &options;
state.ikey = k.internal_key();
state.vset = vset_;

state.saver.state = kNotFound;
state.saver.ucmp = vset_->icmp_.user_comparator();
state.saver.user_key = k.user_key();
state.saver.value = value;

// 在每层内将每个受当前版本管理的sstable中进行查找
// 就相当于sstable的查找
ForEachOverlapping(state.saver.user_key, state.ikey, &state, &State::Match);

return state.found ? state.s : Status::NotFound(Slice());
}

void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void *arg,
bool (*func)(void *, int, FileMetaData *))
{
const Comparator *ucmp = vset_->icmp_.user_comparator();

// 第0层的查找,每个都查找
std::vector<FileMetaData *> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++)
{
FileMetaData *f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0)
{
tmp.push_back(f);
}
}
if (!tmp.empty())
{
// 由新到旧排序,并比对返回数据
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++)
{
if (!(*func)(arg, 0, tmp[i]))
{
return;
}
}
}

// 查找其他层
for (int level = 1; level < config::kNumLevels; level++)
{
size_t num_files = files_[level].size();
if (num_files == 0)
continue;

// 二分查找
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files)
{
FileMetaData *f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0)
{
// All of "f" is past any data for user_key
}
else
{
if (!(*func)(arg, level, f))
{
return;
}
}
}
}
}

由于每个版本负责管理一系列的SSTable,所以对于这一系列的SSTable的查找也是通过Version来实现的。其中还是要考虑到第0层和其他层查找的差异性。

VersionSet

VersionSet就是一个Version的集合,里面包含了一个当前存活的Version的双链表。不过除了Version以外,VersionSet还保存了一些全局的只有一份的元数据。VersionSet只有一个实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177

class VersionSet
{
private:
class Builder; // 辅助构建类。主要用于保存中间变量相关等

friend class Compaction; // 友元类,压缩线程
friend class Version; // 友元类,版本

// 恢复Manifest文件
bool ReuseManifest(const std::string &dscname, const std::string &dscbase);

// 计算得到Size Compaction的最佳level和比率score
// 用于压缩
void Finalize(Version *v);

// 获取所有input中的key最小最大
void GetRange(const std::vector<FileMetaData *> &inputs, InternalKey *smallest,
InternalKey *largest);
void GetRange2(const std::vector<FileMetaData *> &inputs1,
const std::vector<FileMetaData *> &inputs2,
InternalKey *smallest, InternalKey *largest);

// 在指定的level+1中选择那些key range与input _[0]中所有文件key range重合的文件填入input_[1]
void SetupOtherInputs(Compaction *c);

// 保存快照
Status WriteSnapshot(log::Writer *log);

// 让版本v成为当前版本
void AppendVersion(Version *v);

/**一些变量**/
Env *const env_; // 环境
const std::string dbname_; // 数据库名字
const Options *const options_; // 选项
TableCache *const table_cache_; // 缓存
const InternalKeyComparator icmp_; // 比较器
uint64_t next_file_number_; // 下一个序列号
uint64_t manifest_file_number_; // manifest的序列号
uint64_t last_sequence_; // 上一个序列号
uint64_t log_number_; // 日志号
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted

// Opened lazily
WritableFile *descriptor_file_; // 文件写入
log::Writer *descriptor_log_; // 日志写入
Version dummy_versions_; // 双向链表头结点
Version *current_; // == dummy_versions_.prev_

// 这是用来记录Compact的进度,Compact总是从某一Level的最小的键开始到某个键结束,
// 下次再从下一个键开始,所以这个就是下一次这个Level从哪个键开始Compact
std::string compact_pointer_[config::kNumLevels];

public:
// 构造析构函数
VersionSet(const std::string &dbname, const Options *options,
TableCache *table_cache, const InternalKeyComparator *);
VersionSet(const VersionSet &) = delete;
VersionSet &operator=(const VersionSet &) = delete;
~VersionSet();

// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
// current version. Will release *mu while actually writing to the file.
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
// 将根据前面记录的版本修改信息更新当前版本,得到一个新的版本信息
// 同时把这个新的版本信息设置成当前VersionSet的current_,并连入版本集合的链表中
Status LogAndApply(VersionEdit *edit, port::Mutex *mu)
EXCLUSIVE_LOCKS_REQUIRED(mu);

// 恢复用
Status Recover(bool *save_manifest);

// 返回当前版本
Version *current() const { return current_; }

// 返回当前文件号
uint64_t ManifestFileNumber() const { return manifest_file_number_; }

// 分配并返回新的文件号
uint64_t NewFileNumber() { return next_file_number_++; }

// Arrange to reuse "file_number" unless a newer file number has
// already been allocated.
// REQUIRES: "file_number" was returned by a call to NewFileNumber().
// 复用文件号,仅当新的文件号被占用时才会调用此函数
void ReuseFileNumber(uint64_t file_number)
{
if (next_file_number_ == file_number + 1)
{
next_file_number_ = file_number;
}
}

// 返回某一层的文件数目
int NumLevelFiles(int level) const;

// 返回某一层的文件字节数目
int64_t NumLevelBytes(int level) const;

// 返回最后序列号
uint64_t LastSequence() const { return last_sequence_; }

// 设置最后序列号为s
void SetLastSequence(uint64_t s)
{
assert(s >= last_sequence_);
last_sequence_ = s;
}

// 标记一个序列号为已使用
void MarkFileNumberUsed(uint64_t number);

// 返回当前log编号
uint64_t LogNumber() const { return log_number_; }

// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t PrevLogNumber() const { return prev_log_number_; }

// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
// 选择一层来执行新的压缩,若没有需要进行的压缩,则返回空
// 否则返回一个堆分配内存来描述压缩结果,需要调用者来释放
// 这个选择是选择实现计算出来下次需要压缩的文件
Compaction *PickCompaction();

// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
// the result.
// 压缩指定层中的指定范围,不重叠的话就没有压缩
Compaction *CompactRange(int level, const InternalKey *begin,
const InternalKey *end);

// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
// 返回最大重叠数据
int64_t MaxNextLevelOverlappingBytes();

// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
// 返回一个可以从N个SSTable中提取出来不重复key的迭代器。
Iterator *MakeInputIterator(Compaction *c);

// Returns true iff some level needs a compaction.
// 是否存在某些层需要压缩
bool NeedsCompaction() const
{
Version *v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
}

// Add all files listed in any live version to *live.
// May also mutate some internal state.
// 把所有存活版本中的文件记录到集合中(即记录所有存活文件)
void AddLiveFiles(std::set<uint64_t> *live);

// Return the approximate offset in the database of the data for
// "key" as of version "v".
// 返回在版本v中,key的大致偏移
uint64_t ApproximateOffsetOf(Version *v, const InternalKey &key);

// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
// 返回人类可读的总结
struct LevelSummaryStorage
{
char buffer[100];
};
const char *LevelSummary(LevelSummaryStorage *scratch) const;
};

可以看到,VersionSet除了保存了Version的双链表以外,还保存了其它的一些元数据。有些元数据,比如当前的版本,当数据库关闭后,再次打开的时候还是需要的,这些信息就持久化到MANIFEST文件中。

当然,还涉及到一些压缩相关的内容,这个在后面深入学习。

VersionEdit

VersionEdit记录了一次版本变更有哪些改变。即有Version(旧) + VersionEdit(修改部分) = Version(新),其中对于加号是Builder的体现,后面再说。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

class VersionEdit
{
private:
friend class VersionSet; // 因为需要从VersionSet中来修改Version

typedef std::set<std::pair<int, uint64_t>> DeletedFileSet; // 删除掉的文件集合别名

std::string comparator_; // 比较器名字
uint64_t log_number_; // 日志号
uint64_t prev_log_number_; // 上一个ldb、log和manifset文件编号
uint64_t next_file_number_; // 下一个编号
SequenceNumber last_sequence_; // 上一个序列号

bool has_comparator_; // 记录上面对应内容是否存在,存在才会持久化的MANIFEST中的一些字段
bool has_log_number_;
bool has_prev_log_number_;
bool has_next_file_number_;
bool has_last_sequence_;

// 和VersionSet里面的compact_pointers_相同
// 用来记录Compact的进度,Compact总是从某一Level的最小的键开始到某个键结束
std::vector<std::pair<int, InternalKey>> compact_pointers_;
// 有哪些文件被删除,就是Version里哪些SSTable被删除
DeletedFileSet deleted_files_;
// 有哪些文件被增加,pair的第一个参数是Level,第二个参数是文件的元信息
std::vector<std::pair<int, FileMetaData>> new_files_;

public:
VersionEdit() { Clear(); }
~VersionEdit() = default;

void Clear();

// 下列函数就是在设置私有成员中的值的接口
void SetComparatorName(const Slice &name)
{
has_comparator_ = true;
comparator_ = name.ToString();
}
void SetLogNumber(uint64_t num)
{
has_log_number_ = true;
log_number_ = num;
}
void SetPrevLogNumber(uint64_t num)
{
has_prev_log_number_ = true;
prev_log_number_ = num;
}
void SetNextFile(uint64_t num)
{
has_next_file_number_ = true;
next_file_number_ = num;
}
void SetLastSequence(SequenceNumber seq)
{
has_last_sequence_ = true;
last_sequence_ = seq;
}
void SetCompactPointer(int level, const InternalKey &key)
{
compact_pointers_.push_back(std::make_pair(level, key));
}

// Add the specified file at the specified number.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
// 在某一层中新增文件
void AddFile(int level, uint64_t file, uint64_t file_size,
const InternalKey &smallest, const InternalKey &largest)
{
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.smallest = smallest;
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}

// Delete the specified "file" from the specified "level".
// 删除某一层的文件
void RemoveFile(int level, uint64_t file)
{
deleted_files_.insert(std::make_pair(level, file));
}

// 编码解码
void EncodeTo(std::string *dst) const;
Status DecodeFrom(const Slice &src);

// 返回人类可读字串
std::string DebugString() const;
};

可以看到VersionEdit类中基本上就是一些私有成员,这些信息主要是为了持久化到MANIFEST,其中对SSTable文件的修改则通过对应的增加删除函数来完成。

VersionEdit主要有两个作用一个就是应用到版本上作版本变迁,这个就是Builder做的事情,这主要发生在内存的数据结构中。另外一个作用就是持久化到MANIFEST记录版本变迁的内容,这里会记录更多的内容,包括last_sequence_、next_file_number等,由VersionEdit::EncodeTo完成。

主要有两个地方需要持久化:

  • 初始写入,需要写入当前数据状态的信息
  • 版本变迁写入,写入的是每一次版本变化的写入

初始写入,是由VersionSet::WriteSnapshot完成,当打开一个已存在的数据库,读取完现有的MANIFEST后,如果要新建一个MANIFEST替换现有的MANIFEST,就要先调用这个函数,将现有的数据库状态写入,主要包括三部分内容:

  • 比较器的名称
  • compact_pointer_数组,可能有多条记录,有Level和对应的Internal Key,表示Size Compaction的进程
  • 当前存在的SSTable文件,包含元数据,包括在哪一Level,文件编号,文件大小,里面的最小键和最大键

版本变迁写入,就是当需要生成一个新版本时,都会向MANIFEST写入一条新纪录,记录了新版本相对旧版本有哪些变化,和旧版本合并后可以得出当前的状态,主要包括以下几部分:

  • 写入时刻的日志编号
  • 写入时刻下个可用文件的编号
  • 写入时刻上一个用掉的SequenceNumber
  • compact_pointer_数组,可能有多条记录,有Level和对应的Internal Key
  • 记录删除了哪些文件,只记录了Level和对应的file number
  • 当前存在的SSTable文件,包含元数据,包括在哪一Level,文件编号,文件大小,里面的最小键和最大键

这部分是其持久化时的源码,很简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

enum Tag
{
kComparator = 1, // 记录Comparator的名字
kLogNumber = 2, // 记录当前时刻的log_number
kNextFileNumber = 3, // 记录当前时刻的next_file_number_
kLastSequence = 4, // 记录当前时刻的last_sequence
kCompactPointer = 5, // 记录compact_pointer
kDeletedFile = 6, // 记录删除的文件信息
kNewFile = 7 // 记录新增的文件信息
};

void VersionEdit::EncodeTo(std::string *dst) const
{
if (has_comparator_)
{
PutVarint32(dst, kComparator);
PutLengthPrefixedSlice(dst, comparator_);
}
if (has_log_number_)
{
PutVarint32(dst, kLogNumber);
PutVarint64(dst, log_number_);
}
if (has_prev_log_number_)
{
PutVarint32(dst, kPrevLogNumber);
PutVarint64(dst, prev_log_number_);
}
if (has_next_file_number_)
{
PutVarint32(dst, kNextFileNumber);
PutVarint64(dst, next_file_number_);
}
if (has_last_sequence_)
{
PutVarint32(dst, kLastSequence);
PutVarint64(dst, last_sequence_);
}

for (size_t i = 0; i < compact_pointers_.size(); i++)
{
PutVarint32(dst, kCompactPointer);
PutVarint32(dst, compact_pointers_[i].first); // level
PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
}

for (const auto &deleted_file_kvp : deleted_files_)
{
PutVarint32(dst, kDeletedFile);
PutVarint32(dst, deleted_file_kvp.first); // level
PutVarint64(dst, deleted_file_kvp.second); // file number
}

for (size_t i = 0; i < new_files_.size(); i++)
{
const FileMetaData &f = new_files_[i].second;
PutVarint32(dst, kNewFile);
PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.number);
PutVarint64(dst, f.file_size);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
}
}

Builder

前面也提到过,Version(旧) + VersionEdit(修改部分) = Version(新),而其中的加号就是Builder。

我们可能会问为什么还需要Builder,当有VersionEdit改变版本时,直接引用到当前的Version生成一个新的Version即可。这其实是为了效率,才引入了Builder。正常情况下,一次Compaction或者MemTable写入后,会产生一个VersionEdit,将这个VersionEdit应用到当前Version上生成一个新的Version,这没有什么问题。不过在版本变更时,也会将VersionEdit的内容写入MANIFEST中。当重新打开一个数据库时,需要读取MANIFEST重新构造版本信息,这个版本信息由初始的Version和多个VersionEdit生成,如果直接用VersionEdit应用会生成多个版本,降低了效率。所以使用了Builder,将多个VersionEdit的内容累积到Builder上,然后一次性应用到当前Version即可生成新的Version。

过程大致如下:

1
2
3
4
5
6
7
8

VersionSet::Builder builder(&vset, curr_version); // 创建一个builder
builder.Apply(version_edit1); // 应用VersionEdit,可应用多个
builder.Apply(version_edit2);
Version new_version;
builder.SaveTo(&new_version); // 将之前的Version和VersionEdit生成一个新的Version
vset.AppendVersion(new_version); // 将新Version添加到VersionSet

我们来看一下Builder类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205

class VersionSet::Builder
{
private:
// Helper to sort by v->files_[file_number].smallest
// 用来辅助将v->files_[file_number].smallest进行排序
struct BySmallestKey
{
const InternalKeyComparator *internal_comparator;

bool operator()(FileMetaData *f1, FileMetaData *f2) const
{
int r = internal_comparator->Compare(f1->smallest, f2->smallest);
if (r != 0)
{
return (r < 0);
}
else
{
// Break ties by file number
return (f1->number < f2->number);
}
}
};

typedef std::set<FileMetaData *, BySmallestKey> FileSet;
struct LevelState
{
std::set<uint64_t> deleted_files;
FileSet *added_files;
};

VersionSet *vset_; // 所属的集合
Version *base_; // 旧Version
LevelState levels_[config::kNumLevels];

public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet *vset, Version *base) : vset_(vset), base_(base)
{
base_->Ref();
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++)
{
levels_[level].added_files = new FileSet(cmp);
}
}

// 析构函数,需要对文件解除引用,也需要对版本解除引用
~Builder()
{
for (int level = 0; level < config::kNumLevels; level++)
{
const FileSet *added = levels_[level].added_files;
std::vector<FileMetaData *> to_unref;
to_unref.reserve(added->size());
for (FileSet::const_iterator it = added->begin(); it != added->end();
++it)
{
to_unref.push_back(*it);
}
delete added;
for (uint32_t i = 0; i < to_unref.size(); i++)
{
FileMetaData *f = to_unref[i];
f->refs--;
if (f->refs <= 0)
{
delete f;
}
}
}
base_->Unref();
}

// 应用VersionEdit到当前版本上
void Apply(const VersionEdit *edit)
{
// 首先更新comact_pointer_,这个直接在VersionSet里更新,因为这个的信息只有一份
// 新版本肯定是覆盖旧版本的,所以直接更新即可
for (size_t i = 0; i < edit->compact_pointers_.size(); i++)
{
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}

// 把VersionEdit里删除的文件插入到levels_相应Level里面去
for (const auto &deleted_file_set_kvp : edit->deleted_files_)
{
const int level = deleted_file_set_kvp.first;
const uint64_t number = deleted_file_set_kvp.second;
levels_[level].deleted_files.insert(number);
}

// 把VersionEdit里添加的文件插入到levels_相应的Level里去
for (size_t i = 0; i < edit->new_files_.size(); i++)
{
const int level = edit->new_files_[i].first;
// 因为是新文件,构造一个FileMetaData
FileMetaData *f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;

// We arrange to automatically compact this file after
// a certain number of seeks. Let's assume:
// (1) One seek costs 10ms
// (2) Writing or reading 1MB costs 10ms (100MB/s)
// (3) A compaction of 1MB does 25MB of IO:
// 1MB read from this level
// 10-12MB read from next level (boundaries may be misaligned)
// 10-12MB written to next level
// This implies that 25 seeks cost the same as the compaction
// of 1MB of data. I.e., one seek costs approximately the
// same as the compaction of 40KB of data. We are a little
// conservative and allow approximately one seek for every 16KB
// of data before triggering a compaction.
f->allowed_seeks = static_cast<int>((f->file_size / 16384U));
if (f->allowed_seeks < 100)
f->allowed_seeks = 100;

levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}

// 把新版本存储到版本v中
void SaveTo(Version *v)
{
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++)
{
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
// 拿出原本Version里的文件,以及Builder里累积的,添加的文件
const std::vector<FileMetaData *> &base_files = base_->files_[level];
std::vector<FileMetaData *>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData *>::const_iterator base_end = base_files.end();
const FileSet *added_files = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added_files->size());
// 按顺序进行合并
for (const auto &added_file : *added_files)
{
// Add all smaller files listed in base_
// 找到base里面比added_file小的文件,添加到新的Version里
// 采用MaybeAddFile,让被删除的文件无法添加
for (std::vector<FileMetaData *>::const_iterator bpos =
std::upper_bound(base_iter, base_end, added_file, cmp);
base_iter != bpos; ++base_iter)
{
MaybeAddFile(v, level, *base_iter);
}

MaybeAddFile(v, level, added_file);
}

// Add remaining base files
for (; base_iter != base_end; ++base_iter)
{
MaybeAddFile(v, level, *base_iter);
}

#ifndef NDEBUG
// Make sure there is no overlap in levels > 0
if (level > 0)
{
for (uint32_t i = 1; i < v->files_[level].size(); i++)
{
const InternalKey &prev_end = v->files_[level][i - 1]->largest;
const InternalKey &this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0)
{
std::fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
std::abort();
}
}
}
#endif
}
}

void MaybeAddFile(Version *v, int level, FileMetaData *f)
{
if (levels_[level].deleted_files.count(f->number) > 0)
{
// File is deleted: do nothing
}
else
{
std::vector<FileMetaData *> *files = &v->files_[level];
if (level > 0 && !files->empty())
{
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size() - 1]->largest,
f->smallest) < 0);
}
f->refs++;
files->push_back(f);
}
}
};

其中apply函数是负责将修改记录VersionEdit应用,saveto是将应用过若干版本修改的最终版本保存到一个新版本上。

版本变迁

版本变迁是通过VersionSet中的LogAndApply来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

// 输入参数edit表示的是改变的内容,比如一次Compaction可以得到edit
Status VersionSet::LogAndApply(VersionEdit *edit, port::Mutex *mu)
{
// 前面是对edit进行赋值
if (edit->has_log_number_)
{
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
}
else
{
edit->SetLogNumber(log_number_);
}

if (!edit->has_prev_log_number_)
{
edit->SetPrevLogNumber(prev_log_number_);
}

edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);

// 创建一个新版本,新版本是current_和edit的结合
Version *v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v); // 对v进行可能的压缩

// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
// 这里只有Open数据库的时候才会走到,如果需要保存新的MANIFEST,此时这个变量为null
// 会创建一个新的MANIFEST,然后将当前的状态写入
if (descriptor_log_ == nullptr)
{
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == nullptr);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok())
{
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}

// Unlock during expensive MANIFEST log write
{
mu->Unlock();

// Write new record to MANIFEST log
if (s.ok())
{
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok())
{
s = descriptor_file_->Sync();
}
if (!s.ok())
{
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}

// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty())
{
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}

mu->Lock();
}

// 安装新版本,会把v放到VersionSet的链表中,然后将当前Version指向v
if (s.ok())
{
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
}
else
{
delete v;
if (!new_manifest_file.empty())
{
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = nullptr;
descriptor_file_ = nullptr;
env_->RemoveFile(new_manifest_file);
}
}

return s;
}

版本变迁基本的流程如下:

  • 首先有了一个准备好的VersionEdit实例,包含了版本变迁的内容,然后设置其它的一些字段
  • 然后生成一个新的Version,使用Builder将当前的版本和VersionEdit应用生成一个新的Version
  • 如果当前MANIFEST的描述符不存在,新生成一个MANIFEST文件,将当前状态写入
  • 将VersionEdit的内容写入到MANIFEST文件
  • 将新生成的Version加入到VersionSet,替换当前的Version