0%

levelDB源码剖析(17)--Recover

Recover

LevelDB打开需要做以下事情:

  • 如果数据库目录不存在,创建目录
  • 加文件锁,锁住整个数据库
  • 读取MANIFEST文件,恢复系统关闭时的元数据,也就是版本信息,或者新建MAINFEST文件
  • 如果上一次关闭时,MemTable里有数据,或者Immutable MemTable写入到SSTable未完成,那么需要做数据恢复,从WAL恢复数据
  • 创建数据库相关的内存数据结构,如Version、VersionSet等

打开数据库的接口是DB::Open,在其中调用DBImpl::Recover来完成主要的工作,如果调用成功,则创建MemTable和WAL相关的数据结构,重写MANIFEST文件。

而DBImpl::Recover做了以下事情:

  • 创建数据库目录
  • 对这个数据库里面的LOCK文件加文件锁,LevelDB是单进程多线程的,需要保证每次只有一个进程能够打开数据库,方式就是使用了文件锁,如果有其它进程打开了数据库,那么加锁就会失败
  • 如果数据库不存在,那么调用DBImpl::NewDB创建新的数据库
  • 调用VersionSet::Recover来读取MANIFEST,恢复版本信息
  • 根据版本信息,搜索数据库目录,找到关闭时没有写入到SSTable的日志,按日志写入顺序逐个恢复日志数据。DBImpl::RecoverLogFile会创建一个MemTable,开始读取日志信息,将日志的数据插入到MemTable,并根据需要调用DBImpl::WriteLevel0Table将MemTable写入到SSTable中,这里不过多介绍
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171

Status DB::Open(const Options &options, const std::string &dbname, DB **dbptr)
{
*dbptr = nullptr;

DBImpl *impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
if (s.ok() && impl->mem_ == nullptr)
{
// Create new log and a corresponding memtable.
// 如果需要重写MANIFEST文件,那么做一个版本变更,这里面会创建一个新的MANIFEST
// 将当前的版本信息写入,然后将edit的内容写入。
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile *lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok())
{
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
if (s.ok() && save_manifest)
{
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok())
{
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok())
{
assert(impl->mem_ != nullptr);
*dbptr = impl;
}
else
{
delete impl;
}
return s;
}

Status DBImpl::Recover(VersionEdit *edit, bool *save_manifest)
{
mutex_.AssertHeld();

// Ignore error from CreateDir since the creation of the DB is
// committed only when the descriptor is created, and this directory
// may already exist from a previous failed creation attempt.
env_->CreateDir(dbname_); // 创建数据库目录
assert(db_lock_ == nullptr);
// 加文件锁,防止其他进程进入
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok())
{
return s;
}
// 如果CURRENT文件不存在,说明需要新创建数据库
if (!env_->FileExists(CurrentFileName(dbname_)))
{
if (options_.create_if_missing)
{
Log(options_.info_log, "Creating DB %s since it was missing.",
dbname_.c_str());
s = NewDB();
if (!s.ok())
{
return s;
}
}
else
{
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
}
else
{
if (options_.error_if_exists)
{
return Status::InvalidArgument(dbname_,
"exists (error_if_exists is true)");
}
}
// 读取MANIFEST文件进行版本信息的恢复
s = versions_->Recover(save_manifest);
if (!s.ok())
{
return s;
}
SequenceNumber max_sequence(0);

// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok())
{
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
// 之前的MANIFEST恢复,会得到版本信息,里面包含了之前的log number
// 搜索文件系统里的log,如果这些日志的编号 >= 这个log number,那么这些
// 日志都是关闭时丢失的数据,需要恢复,这里将日志按顺序存储在logs里面

// 逐个恢复日志的内容
for (size_t i = 0; i < filenames.size(); i++)
{
if (ParseFileName(filenames[i], &number, &type))
{
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty())
{
char buf[50];
std::snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}

// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++)
{
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok())
{
return s;
}

// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}

if (versions_->LastSequence() < max_sequence)
{
versions_->SetLastSequence(max_sequence);
}

return Status::OK();
}

而新建数据库时,会调用DBImpl::NewDB,一个新的数据库没有任何数据,所以不需要日志和SSTable,只需要有一个MANIFEST文件,包含一些元数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

Status DBImpl::NewDB()
{
VersionEdit new_db;
// 保存比较器的名称,下次打开时需要用相同的名称打开
new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(0); // 分配日志文件的编号为0
new_db.SetNextFile(2); // 下一个待分配的文件编号是2,因为1分配给了MANIFEST文件
new_db.SetLastSequence(0);

// 创建MANIFEST文件,将VersionEdit写入
const std::string manifest = DescriptorFileName(dbname_, 1);
WritableFile *file;
Status s = env_->NewWritableFile(manifest, &file);
if (!s.ok())
{
return s;
}
{
log::Writer log(file);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok())
{
s = file->Sync();
}
if (s.ok())
{
s = file->Close();
}
}
delete file;
if (s.ok())
{
// Make "CURRENT" file that points to the new manifest file.
// 让CURRENT文件指向这个MANIFEST文件
s = SetCurrentFile(env_, dbname_, 1);
}
else
{
env_->RemoveFile(manifest);
}
return s;
}

可以看到DBImpl::NewDB非常简单,就是创建一个MANIFEST文件,将以下信息写入到MANIFEST文件:

  • 比较器名称
  • 当前日志的编号
  • 下一个使用的文件编号
  • 上一个使用的SequenceNumber
  • 最后CURRENT指向新创建的MANIFEST文件

而VersionSet::Recover完成MANIFEST文件的读取和版本的构造,需要知道数据库里有哪些SSTable文件,每个文件处于哪个Level,当前日志的编号等等信息。