那么为什么要版本管理呢?在数据不断写入后,MemTable写满了,这时候就会转换为Level
0的一个SSTable,或者Level n的一个文件和Level n +
1的多个文件进行Compaction,会转换成Level n +
1的多个文件。这会使SSTable文件数量改变,文件内容改变,也就是版本信息改变了,所以需要管理版本。
// Call func(arg, level, f) for every file that overlaps user_key in // order from newest to oldest. If an invocation of func returns // false, makes no more calls. // // REQUIRES: user portion of internal_key == user_key. // 对于所有包含user-key的file,按照从新到旧的顺序,应用func函数 // 直到func返回为false voidForEachOverlapping(Slice user_key, Slice internal_key, void *arg, bool (*func)(void *, int, FileMetaData *));
VersionSet *vset_; // 当前版本所在的版本集合 Version *next_; // 下一个版本实例的指针 Version *prev_; // 前一个版本实例的指针 int refs_; // 引用计数
// 下一个需要压缩的文件 FileMetaData *file_to_compact_; int file_to_compact_level_;
// Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize(). double compaction_score_; int compaction_level_;
public: // 获取sstable文件信息的结构体 structGetStats { FileMetaData *seek_file; int seek_file_level; };
// Append to *iters a sequence of iterators that will // yield the contents of this Version when merged together. // REQUIRES: This version has been saved (see VersionSet::SaveTo) voidAddIterators(const ReadOptions &, std::vector<Iterator *> *iters);
// Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. // REQUIRES: lock is not held // 获取值 Status Get(const ReadOptions &, const LookupKey &key, std::string *val, GetStats *stats);
// Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise. // REQUIRES: lock is held // 将stats 更新到version,stats记录了seek的结果,包括sts文件信息和seek的level boolUpdateStats(const GetStats &stats);
// Record a sample of bytes read at the specified internal key. // Samples are taken approximately once every config::kReadBytesPeriod // bytes. Returns true if a new compaction may need to be triggered. // REQUIRES: lock is held // 取样式get一个key,通过get,来判断当前是否需要触发compact boolRecordReadSample(Slice key);
// 引用计数相关 voidRef(); voidUnref();
// 将所有某层中超过范围的文件保存在input中 voidGetOverlappingInputs( int level, const InternalKey *begin, // nullptr means before all keys const InternalKey *end, // nullptr means after all keys std::vector<FileMetaData *> *inputs);
staticboolMatch(void *arg, int level, FileMetaData *f) { State *state = reinterpret_cast<State *>(arg);
if (state->stats->seek_file == nullptr && state->last_file_read != nullptr) { // We have had more than one seek for this read. Charge the 1st file. state->stats->seek_file = state->last_file_read; state->stats->seek_file_level = state->last_file_read_level; }
// 二分查找 uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key); if (index < num_files) { FileMetaData *f = files_[level][index]; if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) { // All of "f" is past any data for user_key } else { if (!(*func)(arg, level, f)) { return; } } } } }
// Apply *edit to the current version to form a new descriptor that // is both saved to persistent state and installed as the new // current version. Will release *mu while actually writing to the file. // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() // 将根据前面记录的版本修改信息更新当前版本,得到一个新的版本信息 // 同时把这个新的版本信息设置成当前VersionSet的current_,并连入版本集合的链表中 Status LogAndApply(VersionEdit *edit, port::Mutex *mu) EXCLUSIVE_LOCKS_REQUIRED(mu);
// 恢复用 Status Recover(bool *save_manifest);
// 返回当前版本 Version *current()const{ return current_; }
// Arrange to reuse "file_number" unless a newer file number has // already been allocated. // REQUIRES: "file_number" was returned by a call to NewFileNumber(). // 复用文件号,仅当新的文件号被占用时才会调用此函数 voidReuseFileNumber(uint64_t file_number) { if (next_file_number_ == file_number + 1) { next_file_number_ = file_number; } }
// Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_tPrevLogNumber()const{ return prev_log_number_; }
// Pick level and inputs for a new compaction. // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. // 选择一层来执行新的压缩,若没有需要进行的压缩,则返回空 // 否则返回一个堆分配内存来描述压缩结果,需要调用者来释放 // 这个选择是选择实现计算出来下次需要压缩的文件 Compaction *PickCompaction();
// Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that // level that overlaps the specified range. Caller should delete // the result. // 压缩指定层中的指定范围,不重叠的话就没有压缩 Compaction *CompactRange(int level, const InternalKey *begin, const InternalKey *end);
// Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. // 返回最大重叠数据 int64_tMaxNextLevelOverlappingBytes();
// Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. // 返回一个可以从N个SSTable中提取出来不重复key的迭代器。 Iterator *MakeInputIterator(Compaction *c);
// Returns true iff some level needs a compaction. // 是否存在某些层需要压缩 boolNeedsCompaction()const { Version *v = current_; return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr); }
// Add all files listed in any live version to *live. // May also mutate some internal state. // 把所有存活版本中的文件记录到集合中(即记录所有存活文件) voidAddLiveFiles(std::set<uint64_t> *live);
// Return the approximate offset in the database of the data for // "key" as of version "v". // 返回在版本v中,key的大致偏移 uint64_tApproximateOffsetOf(Version *v, const InternalKey &key);
// Return a human-readable short (single-line) summary of the number // of files per level. Uses *scratch as backing store. // 返回人类可读的总结 structLevelSummaryStorage { char buffer[100]; }; constchar *LevelSummary(LevelSummaryStorage *scratch)const; };
// Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file // 在某一层中新增文件 voidAddFile(int level, uint64_t file, uint64_t file_size, const InternalKey &smallest, const InternalKey &largest) { FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; new_files_.push_back(std::make_pair(level, f)); }
// Delete the specified "file" from the specified "level". // 删除某一层的文件 voidRemoveFile(int level, uint64_t file) { deleted_files_.insert(std::make_pair(level, file)); }
// 编码解码 voidEncodeTo(std::string *dst)const; Status DecodeFrom(const Slice &src);
VersionSet *vset_; // 所属的集合 Version *base_; // 旧Version LevelState levels_[config::kNumLevels];
public: // Initialize a builder with the files from *base and other info from *vset Builder(VersionSet *vset, Version *base) : vset_(vset), base_(base) { base_->Ref(); BySmallestKey cmp; cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < config::kNumLevels; level++) { levels_[level].added_files = newFileSet(cmp); } }
// 析构函数,需要对文件解除引用,也需要对版本解除引用 ~Builder() { for (int level = 0; level < config::kNumLevels; level++) { const FileSet *added = levels_[level].added_files; std::vector<FileMetaData *> to_unref; to_unref.reserve(added->size()); for (FileSet::const_iterator it = added->begin(); it != added->end(); ++it) { to_unref.push_back(*it); } delete added; for (uint32_t i = 0; i < to_unref.size(); i++) { FileMetaData *f = to_unref[i]; f->refs--; if (f->refs <= 0) { delete f; } } } base_->Unref(); }
// 应用VersionEdit到当前版本上 voidApply(const VersionEdit *edit) { // 首先更新comact_pointer_,这个直接在VersionSet里更新,因为这个的信息只有一份 // 新版本肯定是覆盖旧版本的,所以直接更新即可 for (size_t i = 0; i < edit->compact_pointers_.size(); i++) { constint level = edit->compact_pointers_[i].first; vset_->compact_pointer_[level] = edit->compact_pointers_[i].second.Encode().ToString(); }
// 把VersionEdit里删除的文件插入到levels_相应Level里面去 for (constauto &deleted_file_set_kvp : edit->deleted_files_) { constint level = deleted_file_set_kvp.first; constuint64_t number = deleted_file_set_kvp.second; levels_[level].deleted_files.insert(number); }
// 把VersionEdit里添加的文件插入到levels_相应的Level里去 for (size_t i = 0; i < edit->new_files_.size(); i++) { constint level = edit->new_files_[i].first; // 因为是新文件,构造一个FileMetaData FileMetaData *f = newFileMetaData(edit->new_files_[i].second); f->refs = 1;
// We arrange to automatically compact this file after // a certain number of seeks. Let's assume: // (1) One seek costs 10ms // (2) Writing or reading 1MB costs 10ms (100MB/s) // (3) A compaction of 1MB does 25MB of IO: // 1MB read from this level // 10-12MB read from next level (boundaries may be misaligned) // 10-12MB written to next level // This implies that 25 seeks cost the same as the compaction // of 1MB of data. I.e., one seek costs approximately the // same as the compaction of 40KB of data. We are a little // conservative and allow approximately one seek for every 16KB // of data before triggering a compaction. f->allowed_seeks = static_cast<int>((f->file_size / 16384U)); if (f->allowed_seeks < 100) f->allowed_seeks = 100;
// 把新版本存储到版本v中 voidSaveTo(Version *v) { BySmallestKey cmp; cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < config::kNumLevels; level++) { // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. // 拿出原本Version里的文件,以及Builder里累积的,添加的文件 const std::vector<FileMetaData *> &base_files = base_->files_[level]; std::vector<FileMetaData *>::const_iterator base_iter = base_files.begin(); std::vector<FileMetaData *>::const_iterator base_end = base_files.end(); const FileSet *added_files = levels_[level].added_files; v->files_[level].reserve(base_files.size() + added_files->size()); // 按顺序进行合并 for (constauto &added_file : *added_files) { // Add all smaller files listed in base_ // 找到base里面比added_file小的文件,添加到新的Version里 // 采用MaybeAddFile,让被删除的文件无法添加 for (std::vector<FileMetaData *>::const_iterator bpos = std::upper_bound(base_iter, base_end, added_file, cmp); base_iter != bpos; ++base_iter) { MaybeAddFile(v, level, *base_iter); }
MaybeAddFile(v, level, added_file); }
// Add remaining base files for (; base_iter != base_end; ++base_iter) { MaybeAddFile(v, level, *base_iter); }
#ifndef NDEBUG // Make sure there is no overlap in levels > 0 if (level > 0) { for (uint32_t i = 1; i < v->files_[level].size(); i++) { const InternalKey &prev_end = v->files_[level][i - 1]->largest; const InternalKey &this_begin = v->files_[level][i]->smallest; if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) { std::fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", prev_end.DebugString().c_str(), this_begin.DebugString().c_str()); std::abort(); } } } #endif } }
voidMaybeAddFile(Version *v, int level, FileMetaData *f) { if (levels_[level].deleted_files.count(f->number) > 0) { // File is deleted: do nothing } else { std::vector<FileMetaData *> *files = &v->files_[level]; if (level > 0 && !files->empty()) { // Must not overlap assert(vset_->icmp_.Compare((*files)[files->size() - 1]->largest, f->smallest) < 0); } f->refs++; files->push_back(f); } } };
// Initialize new descriptor log file if necessary by creating // a temporary file that contains a snapshot of the current version. std::string new_manifest_file; Status s; // 这里只有Open数据库的时候才会走到,如果需要保存新的MANIFEST,此时这个变量为null // 会创建一个新的MANIFEST,然后将当前的状态写入 if (descriptor_log_ == nullptr) { // No reason to unlock *mu here since we only hit this path in the // first call to LogAndApply (when opening the database). assert(descriptor_file_ == nullptr); new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); if (s.ok()) { descriptor_log_ = new log::Writer(descriptor_file_); s = WriteSnapshot(descriptor_log_); } }
// Unlock during expensive MANIFEST log write { mu->Unlock();
// Write new record to MANIFEST log if (s.ok()) { std::string record; edit->EncodeTo(&record); s = descriptor_log_->AddRecord(record); if (s.ok()) { s = descriptor_file_->Sync(); } if (!s.ok()) { Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str()); } }
// If we just created a new descriptor file, install it by writing a // new CURRENT file that points to it. if (s.ok() && !new_manifest_file.empty()) { s = SetCurrentFile(env_, dbname_, manifest_file_number_); }