0%

levelDB源码剖析(16)--Compaction

Compaction

Compaction的功能无非是删除冗余的数据,精简SSTable文件的数量,再就是给Key排序,新生成的SSTable文件中的key是有序排列的。又因为等待Compaction的数据量和文件必然是庞大的,不可能通过一次Compaction就能搞定,这不现实,而且还会有不断地写操作进来,因此Compaction的时机也很有讲究。这些都是需要考虑的点。
Compaction其实就是一个归并排序的过程,对多个输入的SSTable,多路归并,输出多个连续的SSTable,代替原来的文件。根据Level 0的特殊性,可以分为两种类型。

  • level0 -> level1
  • level(n) -> level(n+1)

其中level0 -> level1相对比较复杂,因为第0层存在不少key范围重叠的sstable,因此如果需要压缩时,先从第0层选择出几个重叠的sstable,然后从第1层选择一些同样重叠的sstable,然后多路归并即可。

而level(n) -> level(n+1)就比较简单,只需要在level(n)层选择一个块,然后再在level(n+1)层选择与上述块重叠的块一起参与多路归并即可。

LevelDB里有三种方式选择Compaction,分别是:

  • Manual Compaction
  • Size Compaction
  • Seek Compaction

其中Manual Compaction就是手动触发一次Compaction,通过DBImpl::TEST_CompactRange触发,为了测试而存在。

Size Compaction的思想是比较简单直观的,对于Level 0的SSTable,因为键范围可能有重叠,所以需要控制文对件不超过4个,而于Level n(n > 0)的SSTable,总大小不能超过10^nMB,一旦这些条件不满足了,需要Compaction,将文件推向更高的Level,使得条件继续满足。

Size Compaction就是根据这个思想触发的,计算每一Level实际大小相对于最大大小的比率,优先Compaction比率最大的Level。

SSTable文件的增减,只会在版本变更的时候出现,所以只需要在版本变更完成时,计算比率最大的Level,这个计算过程由VersionSet::LogAndApply里面的void VersionSet::Finalize(Version* v)来完成(打开数据库的时候也会计算一次)。

Size Compaction是对一整个Level进行的,一个Level的SSTable可能会很多,无法在一次Compaction中完成,需要分多次完成。第一次完成最小键到某个键的范围内的Compaction,下一次再从某个键继续完成,以此类推。那么,需要记录下一次这个Level的Compaction从哪个键开始,这个信息会记录在compact_pointer_中。

对于Seek Compaction而言,在搜索SSTable时,会找到键范围包含待搜索键的SSTable,从Level 0开始搜索,如果一个SSTable没有找到,会搜索下一个SSTable,直到找到或者确定无法找到为止。假设一次搜索,搜索了超过一个SSTable,那么标记第一个SSTable搜索了一次,假设这种情况出现了多次,说明这个文件和多个其它的文件键范围有重叠,影响了搜索的效率,需要Compaction这个文件,使得这种重叠减少,进而提高读取的效率。

Seek Compaction就是基于这个原理实现的Compaction,是精确到每个SSTable的,一个FileMetaData里面包含一个字段allowed_seeks,会被设定为一个初始值,每当搜索SSTable时,如果第一个文件没有搜到,就会对第一个SSTable的allowed_seeks减1,当某个SSTable的allowed_seeks变为0时,那么这个SSTable就需要被Compaction。

allowed_seeks的减小发生在两个地方:

  • DB::GET接口
  • DBIter数据库迭代过程中

每次调用DB::GET时,如果需要搜索超过1个SSTable,就会对第一个SSTable的allowed_seeks减一。

而DBIter更特别一点,因为是迭代SSTable的,所以不会存在搜索一个键读取多个SSTable的情况,这边为了将Seek Compaction考虑进去,采用了抽样的方式,每读取2MB的数据,会抽样一个键,模拟读取的情况,更新相应的allowed_seeks。

当某个文件的allowed_seeks减小到0了,就会将当前Version的file_to_compact_和file_to_compact_level设置为这个文件以及它的Level,后台线程就会来处理。

allowed_seeks的初始值设置就非常关键,它决定了一个文件多少次Seek后,才会被Compaction,LevelDB将这个值设为:static_cast((f->file_size / 16384U))

这是有依据的,首先假设:

  • 一次Seek花费10ms
  • 读写1MB的花费10ms (100MB/s)

而Compaction 1MB的数据花费25MB左右的IO:当前Level读取1MB,上一个Level读取10-12MB,上一个Level写入10-12MB。
所以25次Seek的开销和Compaction 1MB数据的开销一样,也就是1次Seek和40KB数据Compaction的开销一样,我们比较保守,假设1次Seek和16KB的数据的开销一样。那么设置为上面的值后,Seek的操作和Compaction操作的开销相同,那么Compaction不会过于频繁,影响性能。

Compaction实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

// A Compaction encapsulates information about a compaction.
class Compaction
{
private:
friend class Version; // 友元类和私有的构造函数
friend class VersionSet;
Compaction(const Options *options, int level);

int level_; // Compaction文件所在的Level
uint64_t max_output_file_size_; // 生成文件最大值
Version *input_version_; // 输入的版本号
VersionEdit edit_; // Compaction结果保存的VersionEdit

// Each compaction reads inputs from "level_" and "level_+1"
// 两个元素分别对应两层的sstable
std::vector<FileMetaData *> inputs_[2]; // The two sets of inputs

// State used to check for number of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2)
// 一些状态,用来检查是否存在重叠现象
std::vector<FileMetaData *> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen
int64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files

// State for implementing IsBaseLevelForKey

// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each
// higher level than the ones involved in this compaction (i.e. for
// all L >= level_ + 2).
size_t level_ptrs_[config::kNumLevels];

public:
~Compaction();

// Return the level that is being compacted. Inputs from "level"
// and "level+1" will be merged to produce a set of "level+1" files.
// 返回正在压缩的level
int level() const { return level_; }

// Return the object that holds the edits to the descriptor done
// by this compaction.
// 返回压缩完成后的VersionEdit
VersionEdit *edit() { return &edit_; }

// "which" must be either 0 or 1
// 输入的文件个数
int num_input_files(int which) const { return inputs_[which].size(); }

// Return the ith input file at "level()+which" ("which" must be 0 or 1).
// 返回输入的文件
FileMetaData *input(int which, int i) const { return inputs_[which][i]; }

// Maximum size of files to build during this compaction.
// 压缩中最大的文件数目
uint64_t MaxOutputFileSize() const { return max_output_file_size_; }

// Is this a trivial compaction that can be implemented by just
// moving a single input file to the next level (no merging or splitting)
// 是否是平凡移动,即不经过任何归并直接移动到下一层
bool IsTrivialMove() const;

// Add all inputs to this compaction as delete operations to *edit.
// 把所有的输入文件在一个VersionEdit中删掉
void AddInputDeletions(VersionEdit *edit);

// Returns true if the information we have available guarantees that
// the compaction is producing data in "level+1" for which no data exists
// in levels greater than "level+1".
// 函数返回true表示user_key不存在于高层中
bool IsBaseLevelForKey(const Slice &user_key);

// Returns true iff we should stop building the current output
// before processing "internal_key".
// 在处理内部key之前需要停止构建当前输出的时候返回true
bool ShouldStopBefore(const Slice &internal_key);

// Release the input version for the compaction, once the compaction
// is successful.
// 压缩完成,释放输入版本
void ReleaseInputs();
};

Compaction的文件在两个Level,假设为level_ 和level_ + 1,选定一个或几个SSTable Compaction时,就是选定了level_ 的文件,然后调用void VersionSet::SetupOtherInputs(Compaction* c)可以获取到level_ + 1中与level_中选定的文件有重叠的文件,这样输入的SSTable就选好了,一次Compactdoin要做的工作也就确定了。

void DBImpl::BackgroundCompaction()首先会选定构造一个Compaction类的实例,也就是选定Compaction的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172

void DBImpl::BackgroundCompaction()
{
mutex_.AssertHeld();

if (imm_ != nullptr)
{
CompactMemTable();
return;
}

Compaction *c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
// 先处理手动触发的压缩
if (is_manual)
{
ManualCompaction *m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == nullptr);
if (c != nullptr)
{
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
}
else
{
// 选择一部分文件做压缩
c = versions_->PickCompaction();
}

Status status;
if (c == nullptr)
// 没有合并的文件
{
// Nothing to do
}
else if (!is_manual && c->IsTrivialMove())
{
// 处理一种特殊情况,也就是参与Compaction的文件,level_有一个文件,而level_ + 1 没有
// 这时候只需要直接更改元数据,然后文件移动到level_ + 1即可,不需要多路归并
assert(c->num_input_files(0) == 1);
FileMetaData *f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok())
{
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), versions_->LevelSummary(&tmp));
}
else
{
CompactionState *compact = new CompactionState(c);
// 调用DBImpl::DoCompactionWork做实际的Compaction
status = DoCompactionWork(compact);
if (!status.ok())
{
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();
}
delete c;

if (status.ok())
{
// Done
}
else if (shutting_down_.load(std::memory_order_acquire))
{
// Ignore compaction errors found during shutting down
}
else
{
Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
}

if (is_manual)
{
ManualCompaction *m = manual_compaction_;
if (!status.ok())
{
m->done = true;
}
if (!m->done)
{
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = nullptr;
}
}

Compaction *VersionSet::PickCompaction()
{
Compaction *c;
int level;

// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction)
{
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);

// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++)
{
FileMetaData *f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0)
{
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty())
{
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
}
else if (seek_compaction)
{
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
}
else
{
return nullptr;
}

c->input_version_ = current_;
c->input_version_->Ref();

// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0)
{
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}

SetupOtherInputs(c);

return c;
}

其中DBImpl::DoCompactionWork是真正处理多路归并的函数,会考虑以下几点:

  • 迭代按照Internal Key的顺序进行,多个连续的Internal Key里面可能包含相同的User Key,按照SequenceNumber降序排列
  • 相同的User Key里只有第一个User Key是有效的,因为它的SequenceNumber是最大的,覆盖了旧的User Key,但是无法只保留第一个User Key,因为LevelDB支持多版本,旧的User Key可能依然有线程可以引用,但是不再引用的User Key可以安全的删除
  • 碰到一个删除时,并且它的SequenceNumber <= 最新的Snapshot,会判断更高Level是否有这个User Key存在。如果存在,那么无法丢弃这个删除操作,因为一旦丢弃了,更高Level原被删除的User Key又可见了。如果不存在,那么可以安全的丢弃这个删除操作,这个键就找不到了
  • 对于生成的SSTable文件,设置两个上限,哪个先达到,都会开始新的SSTable。一个就是2MB,另外一个就是判断上一Level和这个文件的重叠的文件数量,不超过10个,这是为了控制这个生成的文件Compaction的时候,不会和太多的上层文件重叠

最后通过DBImpl::InstallCompactionResults安装Compaction的结果,将改删除的文件和改添加的文件更新到VersionEdit里,然后应用版本更新。