0%

levelDB源码剖析(4)--levelDB的编码格式

levelDB的编码格式


存储方式

当数据库获取到内存后,其需要将数据插入到内存中,并在适当的时候读取出来,如何高效地利用申请到的内存就是一个比较重要的问题。

在levelDB中,数据是按照下列方法存储的:

  • 整数分为32位和64位定长整数

  • 整数还使用变长整数存储

  • 整数均为小端法存储

  • 字符串采用长度前缀编码

变长整数与定长整数

所谓定长整数就是我们平时在C语言中定义的数据类型,其长度是始终不变的。

而变长整数则顾名思义,其编码的原理是只使用一个字节的低7位去存储数据,而最高位的用于做标识:当最高位为1时表示需要继续读取下一个字节。

如上图所示,这样变长整数就可以使用1-5字节去表示int32的所有整数,表面上看,虽然表示非常大的整数的时候变长整数编码会占用更多的空间,但是由于大整数出现的频率一般是比较小的,所以就普遍而言,使用变长整数会节省更多的内存。

字符串

字符串使用长度前缀编码,即在把字符串的长度放在字符串的前面,然后组合起来编码。

而同时,字符串长度使用变长整数进行编码,所以一个字符串的存储格式是32位变长整数编码字符串长度 + 字符串本身

使用长度前缀编码的方式,字符串能够编码任意字符(比如C语言不能在字符串中包含'\0'),同时,字符串的长度可以预先知道,所以有利于读写操作。另外,对于大部分字符串的长度都比较短的时候,并不会造成大量内存损失。

源码解析部分

源码位置: utils/coding.cc utils/coding.h

头文件

首先,我们先来看一下编码解码和插入数据的头文件,这部分内容很简单,就是处理编码格式的一些方法。

其中要注意用到了一个类Slice,这个类是对字符串的封装,比std::string封装更加低级,所以效率会更高,后面的文章会细🔒它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

void PutFixed32(std::string *dst, uint32_t value); // 插入定长32位整数
void PutFixed64(std::string *dst, uint64_t value); //插入定长64位整数
void PutVarint32(std::string *dst, uint32_t value); //插入变长32位整数
void PutVarint64(std::string *dst, uint64_t value); //插入变长64位整数
void PutLengthPrefixedSlice(std::string *dst, const Slice &value); //插入字符串

bool GetVarint32(Slice *input, uint32_t *value); // 获取变长32位整数
bool GetVarint64(Slice *input, uint64_t *value); // 获取变长64位整数
bool GetLengthPrefixedSlice(Slice *input, Slice *result); // 获取字符串

// 用于解码32/64位变长整数的函数,其中p是指向变长整数,limit是变长整数最大长度
// 而v存放的是解码后的值,如果不能解码则返回null
const char *GetVarint32Ptr(const char *p, const char *limit, uint32_t *v);
const char *GetVarint64Ptr(const char *p, const char *limit, uint64_t *v);

// 这是上面GetVarint32Ptr函数的内部回调函数,是实际处理变长整数的东西
const char *GetVarint32PtrFallback(const char *p, const char *limit,
uint32_t *value);

int VarintLength(uint64_t v); // 返回变长整数(32/64)的'长度'

char *EncodeVarint32(char *dst, uint32_t value); // 编码变长32位整数
char *EncodeVarint64(char *dst, uint64_t value); // 编码变长64位整数


inline void EncodeFixed32(char *dst, uint32_t value); // 编码定长32位整数
inline void EncodeFixed64(char *dst, uint64_t value); // 编码定长64位整数
inline uint32_t DecodeFixed32(const char *ptr); // 解码32位定长整数
inline uint64_t DecodeFixed64(const char *ptr); // 解码64位定长整数

头文件只是给出了函数接口,下面我们就来进一步去分析各个函数。

实现文件

为了方便分析,我们将其分成:编码解码定长整数、编码解码变长整数、Put和Get方法。

这部分代码全部围绕编码解码来实现的,其中定长的编解码很简单,就不在此赘述了。关于变长整数的编解码,其关于编码和解码的相关函数都预留了类似迭代器一样的超尾元素(指针),这种形式的接口可以很方便的确定所处理数据的区间(变长整数的Put和Get相关函数都使用到了预留的超尾元素)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329


// 内联函数本来是在头文件中的,但是此处将其当成一个函数来进行分析

/*=============编码解码定长整数=============*/

// 编码定长32位整数,实现思路就是类似结构体那样去分割uint32即可
// 但注意保证让dst有足够的空间去存储,否则会溢出或段错误
inline void EncodeFixed32(char *dst, uint32_t value)
{
// reinterpret_cast允许指针(引用)之间转换
// 整型与指针类型间的转换以及指针与足够大的整型之间的转换
uint8_t *const buffer = reinterpret_cast<uint8_t *>(dst);

buffer[0] = static_cast<uint8_t>(value);
buffer[1] = static_cast<uint8_t>(value >> 8);
buffer[2] = static_cast<uint8_t>(value >> 16);
buffer[3] = static_cast<uint8_t>(value >> 24);
}

// 编码定长64位整数,思路与编码定长32位整数一致
inline void EncodeFixed64(char *dst, uint64_t value)
{
uint8_t *const buffer = reinterpret_cast<uint8_t *>(dst);

buffer[0] = static_cast<uint8_t>(value);
buffer[1] = static_cast<uint8_t>(value >> 8);
buffer[2] = static_cast<uint8_t>(value >> 16);
buffer[3] = static_cast<uint8_t>(value >> 24);
buffer[4] = static_cast<uint8_t>(value >> 32);
buffer[5] = static_cast<uint8_t>(value >> 40);
buffer[6] = static_cast<uint8_t>(value >> 48);
buffer[7] = static_cast<uint8_t>(value >> 56);
}

// 解码32位定长整数并返回,这同样不做边界检查
inline uint32_t DecodeFixed32(const char *ptr)
{
const uint8_t *const buffer = reinterpret_cast<const uint8_t *>(ptr);

// 注意是小端存储结构
return (static_cast<uint32_t>(buffer[0])) |
(static_cast<uint32_t>(buffer[1]) << 8) |
(static_cast<uint32_t>(buffer[2]) << 16) |
(static_cast<uint32_t>(buffer[3]) << 24);
}

// 解码64位定长整数并返回,与上类似
inline uint64_t DecodeFixed64(const char *ptr)
{
const uint8_t *const buffer = reinterpret_cast<const uint8_t *>(ptr);

return (static_cast<uint64_t>(buffer[0])) |
(static_cast<uint64_t>(buffer[1]) << 8) |
(static_cast<uint64_t>(buffer[2]) << 16) |
(static_cast<uint64_t>(buffer[3]) << 24) |
(static_cast<uint64_t>(buffer[4]) << 32) |
(static_cast<uint64_t>(buffer[5]) << 40) |
(static_cast<uint64_t>(buffer[6]) << 48) |
(static_cast<uint64_t>(buffer[7]) << 56);
}

/*=============编码解码变长整数=============*/

// 编码32位变长整数
char *EncodeVarint32(char *dst, uint32_t v)
{
// 这一部分的编码就是完全按照定义来的,用了5个if
// 分别对应了1-5个字节范围内的长度
uint8_t *ptr = reinterpret_cast<uint8_t *>(dst);
static const int B = 128;
/* 仿照EncodeVarint64的循环写法来重写这一部分
while(v>=B)
{
*(ptr++) = v|B;
v >>= 7;
}
*(ptr++) = static_cast<uint8_t>(v);
return reinterpret_cast<char *>(ptr);
*/
if (v < (1 << 7))
{
*(ptr++) = v;
}
else if (v < (1 << 14))
{
*(ptr++) = v | B;
*(ptr++) = v >> 7;
}
else if (v < (1 << 21))
{
*(ptr++) = v | B;
*(ptr++) = (v >> 7) | B;
*(ptr++) = v >> 14;
}
else if (v < (1 << 28))
{
*(ptr++) = v | B;
*(ptr++) = (v >> 7) | B;
*(ptr++) = (v >> 14) | B;
*(ptr++) = v >> 21;
}
else
{
*(ptr++) = v | B;
*(ptr++) = (v >> 7) | B;
*(ptr++) = (v >> 14) | B;
*(ptr++) = (v >> 21) | B;
*(ptr++) = v >> 28;
}
return reinterpret_cast<char *>(ptr);
}

// 编码64位变长整数,此处没有使用循环,同样的,上面32位的也可以这样写
char *EncodeVarint64(char *dst, uint64_t v)
{
// 将函数内的常数设置为局部静态变量,减少每次调用函数时的分配与开销
static const int B = 128;
uint8_t *ptr = reinterpret_cast<uint8_t *>(dst);
// B被设置成1<<7,当v>=b时表示7位存不下
while (v >= B)
{
*(ptr++) = v | B;
v >>= 7;
}
*(ptr++) = static_cast<uint8_t>(v);
return reinterpret_cast<char *>(ptr);
}

// 解码32位变长整数,这个函数只处理小于128的单字节变长整数
// 大于127的部分则调用GetVarint32PtrFallback处理
// 此函数将解码后的值放在value中
// limit总是为p+5,因为变长整数最多5个字节
// p是一个指向包含值得字符串
// 返回的是对变长整数解码后的一个字节,设置成这样的目的是用于get方法
inline const char *GetVarint32Ptr(const char *p, const char *limit,
uint32_t *value)
{
if (p < limit)
{
uint32_t result = *(reinterpret_cast<const uint8_t *>(p));
// 内部只解码不超过127的整数,剩下的更长的交给GetVarint32PtrFallback处理
if ((result & 128) == 0)
{
*value = result;
return p + 1;
}
}
return GetVarint32PtrFallback(p, limit, value);
}

// 解码32位变长整数中大于127的数字,即存储超过1个字节的变长32位整数
// 被GetVarint32Ptr调用
const char *GetVarint32PtrFallback(const char *p, const char *limit,
uint32_t *value)
{
uint32_t result = 0;

// 由于32位变长整数只能由28位存储数字,所以偏移的上限就是28位
// 每个字节只能存储7位有效字符,所以每处理一字节就偏移7位
// shift同时也是每个字节中有效数字的权
for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7)
{
// 解码出一个字节
uint32_t byte = *(reinterpret_cast<const uint8_t *>(p));
p++;
// 判断最高位,如果是1则继续取并执行,否则就返回解码的数据
if (byte & 128)
{
result |= ((byte & 127) << shift);
}
else
{
result |= (byte << shift);
*value = result;
return reinterpret_cast<const char *>(p);
}
}

// 当变长整数编码不正确就返回nullptr
// 不正确的原因只能是因为连续5个字节都是被标记为1
return nullptr;
}

// 解码64位变长整数的函数,与解码32位变长的那个函数类似
// 不过这个函数没有回调,是直接在本函数内进行全部的解码工作
// 原因我觉得是这样的:因为小整数比较多,而小整数一般就以32位变长存储
// 而解码最小整数的函数被设置成内联函数,这样效率就会提升很多
const char *GetVarint64Ptr(const char *p, const char *limit, uint64_t *value)
{
uint64_t result = 0;
// 这个函数内部与GetVarint32PtrFallback一致
for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7)
{
uint64_t byte = *(reinterpret_cast<const uint8_t *>(p));
p++;
if (byte & 128)
{
result |= ((byte & 127) << shift);
}
else
{
result |= (byte << shift);
*value = result;
return reinterpret_cast<const char *>(p);
}
}
return nullptr;
}

/*=============Put与Get方法=============*/

// 把32位定长转换成字符串
void PutFixed32(std::string *dst, uint32_t value)
{
// 设置一个缓存区,然后把value编码到缓存区后插入
char buf[sizeof(value)];
EncodeFixed32(buf, value);
dst->append(buf, sizeof(buf));
}

// 把64位定长转化成字符串
void PutFixed64(std::string *dst, uint64_t value)
{
// 设置一个缓存区,然后把value编码到缓存区后插入
char buf[sizeof(value)];
EncodeFixed64(buf, value);
dst->append(buf, sizeof(buf));
}

// 把32位变长转化成字符串
void PutVarint32(std::string *dst, uint32_t v)
{
// 设置最大的缓冲区,注意EncodeVarint32会返回指向编码后的一个字节的指针
// 所以就能根据指针偏移来计算出当前变长整数的长度
char buf[5];
char *ptr = EncodeVarint32(buf, v);
dst->append(buf, ptr - buf);
}

// 把64位变长转化成字符串
void PutVarint64(std::string *dst, uint64_t v)
{
// 原理和PutVarint32一样,只是缓冲区大小大了一些
char buf[10];
char *ptr = EncodeVarint64(buf, v);
dst->append(buf, ptr - buf);
}

// 把slice转化成字符串存储
void PutLengthPrefixedSlice(std::string *dst, const Slice &value)
{
// 字符串的编码格式是长度在前,然后是字符串本身
// 所以按照定义就可以插入了
// 先插入长度,然后把字符串移动过去
// 由于Slice末尾不需要'\0',所以需要指定长度
PutVarint32(dst, value.size());
dst->append(value.data(), value.size());
}

// 计算变长整数的长度
int VarintLength(uint64_t v)
{
// 这个函数思路很简单,不解析了
int len = 1;
while (v >= 128)
{
v >>= 7;
len++;
}
return len;
}

// 从字符串中获取32位变长
bool GetVarint32(Slice *input, uint32_t *value)
{
const char *p = input->data(); // 先拿到字符串的首指针
const char *limit = p + input->size(); // 然后获取尾部地址
const char *q = GetVarint32Ptr(p, limit, value); //先解码出数据
if (q == nullptr)
{
// 所请求元素不合法
return false;
}
else
{
// 由于一个字符串可能内部不止一个整数
// 解码完一个整数后可能还剩余一部分未解码的数据
// 比如字符串存储格式是前面一个32位变长+字符串本身
*input = Slice(q, limit - q);
return true;
}
}

// 从字符串中获取64位变长
bool GetVarint64(Slice *input, uint64_t *value)
{
// 同上,只是部分接口不一样
const char *p = input->data();
const char *limit = p + input->size();
const char *q = GetVarint64Ptr(p, limit, value);
if (q == nullptr)
{
return false;
}
else
{
*input = Slice(q, limit - q);
return true;
}
}

// 解码出字符串
bool GetLengthPrefixedSlice(Slice *input, Slice *result)
{
uint32_t len;
// 先获取长度,然后根据长度去解码
if (GetVarint32(input, &len) && input->size() >= len)
{
*result = Slice(input->data(), len);
input->remove_prefix(len);
return true;
}
else
{
return false;
}
}

其中,关于字符串的解码部分我认为还是很巧妙的,因为字符串编码后的长度在前,且长度是一个变长整数,所以解码时需要先提取出长度。正常情况下的写法可能是先对整个编码后字符串遍历,解析出前面的长度,记录下这个字符串长度以及变长整数的长度后再去处理后面的字符串。

但是levelDB中的处理方式是让变长解码函数解码多少就把输入中解码的这段数据给弄掉(修改了源字符串的指针),因为这部分需要解码的值已经被解析出来并返回了,也就不再需要了。

每个解码函数只处理字符串开头,并且处理完成后及时改变字符串的首地址(后移),这样就不需要记录这部分的状态并考虑处理细节了

能够这样做的原因就是通过传递超尾元素来标定界限,同时所有处理函数都以指针作为数据传递的方式(统一接口标准)。

而直接移动字符串指针很容易就引出另外一个问题:会不会内存泄漏呢?

答案是不会,因为内存并不由Slice管理,Slice只是对底层字符串做出了映射而已,其主要目的是用于传递,后续我们会对Slice进行解析。

再特意把这段代码贴出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38


bool GetVarint32(Slice *input, uint32_t *value)
{
const char *p = input->data(); // 先拿到字符串的首指针
const char *limit = p + input->size(); // 然后获取尾部地址
const char *q = GetVarint32Ptr(p, limit, value); //先解码出数据,q是超尾元素(相对变长整数而言)
if (q == nullptr)
{
// 所请求元素不合法
return false;
}
else
{
// 由于一个字符串可能内部不止一个整数
// 解码完一个整数后可能还剩余一部分未解码的数据
// 比如字符串存储格式是前面一个32位变长+字符串本身
*input = Slice(q, limit - q);
return true;
}
}

bool GetLengthPrefixedSlice(Slice *input, Slice *result)
{
uint32_t len;
// 先获取长度,然后根据长度去解码
if (GetVarint32(input, &len) && input->size() >= len)
{
*result = Slice(input->data(), len);
input->remove_prefix(len);
return true;
}
else
{
return false;
}
}


总结部分

函数/变量 作用
inline void EncodeFixed32(char *dst, uint32_t value) 编码定长32位整数
inline void EncodeFixed64(char *dst, uint64_t value) 编码定长64位整数
inline uint32_t DecodeFixed32(const char *ptr) 解码32位定长整数
inline uint64_t DecodeFixed64(const char *ptr) 解码64位定长整数
char EncodeVarint32(char dst, uint32_t v) 编码32位变长整数
char EncodeVarint64(char dst, uint64_t v) 编码64位变长整数
inline const char GetVarint32Ptr(const char p, const char limit, uint32_t value) 解码32位变长整数
const char GetVarint32PtrFallback(const char p, const char limit,uint32_t value) 解码32位变长整数中大于127的数字,被GetVarint32Ptr调用
const char GetVarint64Ptr(const char p, const char limit, uint64_t value) 解码64位变长整数的函数
void PutFixed32(std::string *dst, uint32_t value) 把32位定长转换成字符串
void PutFixed64(std::string *dst, uint64_t value) 把64位定长转化成字符串
void PutVarint32(std::string *dst, uint32_t v) 把32位变长转化成字符串
void PutVarint64(std::string *dst, uint64_t v) 把64位变长转化成字符串
void PutLengthPrefixedSlice(std::string *dst, const Slice &value) 把slice转化成字符串存储
int VarintLength(uint64_t v) 计算变长整数的长度
bool GetVarint32(Slice input, uint32_t value) 从字符串中获取32位变长
bool GetVarint64(Slice input, uint64_t value) 从字符串中获取64位变长
bool GetLengthPrefixedSlice(Slice input, Slice result) 解码出字符串

编码解码部分使用到了很多位级编程,这部分都很有意思,可以做到四两拨千斤的作用。

编程小技巧

  • 在函数内需要用到的常量可以定义成内部静态常量static const,这样就避免了全局变量的额外开销与权限混乱、同时也能避免局部变量的每次都需要额外分配的开销。

  • 当函数是处理连续的区间时,预留出超尾元素接口是很有必要的。