Redis探究之跳表
跳表数据结构
1typedef struct zskiplist {
2 struct zskiplistNode *header, *tail; // 指向跳表的头结点
3 unsigned long length; // 跳跃表的总节点数量, 除了头节点以外的
4 int level; // 跳跃表的高度
5} zskiplist;
6
7
8
9typedef struct zskiplistNode {
10 sds ele; // 用于存储字符串类型的数据
11 double score; // 用于排序的值
12 struct zskiplistNode *backward; // 后退指针
13 struct zskiplistLevel {
14 struct zskiplistNode *forward; // 指向当前层的下一个节点
15 unsigned int span // 指向节点与本节点之间的元素个数
16 } level[];
17} zskiplistNode;
zskiplistNode *header: 跳跃表的头结点是一个特殊的节点, 他的level数组的元素个数为64, 即默认的level可到的最大值。
通过结构可以知道获取跳跃表头结点, 尾节点, 长度和高度的时间复杂度都是O(1)
| 函数 | 作用 | 复杂度 |
|---|---|---|
zslCreateNode |
创建并返回一个新的跳跃表节点 | 最坏 O(1) |
zslFreeNode |
释放给定的跳跃表节点 | 最坏 O(1) |
zslCreate |
创建并初始化一个新的跳跃表 | 最坏 O(1) |
zslFree |
释放给定的跳跃表 | 最坏 O(N) |
zslInsert |
将一个包含给定 score 和 member 的新节点添加到跳跃表中 |
最坏 O(N) 平均 O(logN) |
zslDeleteNode |
删除给定的跳跃表节点 | 最坏 O(N) |
zslDelete |
删除匹配给定 member 和 score 的元素 |
最坏 O(N)平均 O(logN) |
zslFirstInRange |
找到跳跃表中第一个符合给定范围的元素 | 最坏 O(N)平均 O(logN) |
zslLastInRange |
找到跳跃表中最后一个符合给定范围的元素 | 最坏 O(N)平均 O(logN) |
zslDeleteRangeByScore |
删除 score 值在给定范围内的所有节点 |
最坏 O(N2) |
zslDeleteRangeByRank |
删除给定排序范围内的所有节点 | 最坏 O(N2) |
zslGetRank |
返回目标元素在有序集中的排位 | 最坏 O(N) 平均 O(logN) |
zslGetElementByRank |
根据给定排位,返回该排位上的元素节点 | 最坏 O(N)平均 O(logN) |
创建跳表
zskiplist 中的跳跃表的高度最小值为0, 最大值为:
1#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
Redis通过zslRandomLevel() 方法为新生成的节点创建高度, 高度的值为1~64之间随机的一个值, 作为新节点的高度, 值越大出现的概率越低, 高度一旦确定, 后面不会去修改。
1/* Returns a random level for the new skiplist node we are going to create.
2 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
3 * (both inclusive), with a powerlaw-alike distribution where higher
4 * levels are less likely to be returned.
5 **/
6int zslRandomLevel(void) {
7 // 初始化层级为1
8 int level = 1;
9
10 // 判断随机数的低16位是否小于(ZSKIPLIST_P * 0xFFFF)
11 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
12 // 条件成立,层级 level 增加 1,然后继续判断,直到条件不成立为止。
13 level += 1;
14
15 // 返回的层级不超过 ZSKIPLIST_MAXLEVEL,即使循环可能将 level 增加到比最大层级还高的值。
16 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
17}
模拟过程
我们可以通过下面的例子来看一下过程:
- 第一个随机数生成过程:
random() & 0xFFFF可能生成值为 20000。ZSKIPLIST_P * 0xFFFF计算结果约为 16384(即0.25 * 65536)。- 由于 20000 大于 16384,循环条件不成立,
level保持为 1。
- 第二个随机数生成过程:
random() & 0xFFFF可能生成值为 10000。- 10000 小于 16384,
level增加到 2。 - 再次生成随机数,假设结果为 5000。
- 5000 小于 16384,
level增加到 3。 - 再次生成随机数,假设结果为 20000。
- 20000 大于 16384,循环终止,
level最终为 3。
Level期望
我可可以计算出这个level的期望:
要计算 level 的期望值 $ E[\text{level}] $,我们首先需要定义概率和公式。
定义和概率
设定 level 的初始值为 1,并在每次满足条件(random() & 0xFFFF) < (ZSKIPLIST_P * 0xFFFF) 时增加 level。假设 ( P ) 表示增加 level 的概率。即:
$ P = ZSKIPLIST_P $
每次递增 level 的概率是 ( P ),则 level 为 ( k ) 的概率 $P(\text{level} = k) $ 是:
$ P(\text{level} = k) = (1 - P) \times P^{k-1} $
期望值公式
期望值 $ E[\text{level}] $ 定义为每个可能的 level 乘以其对应的概率的和:
$ E[\text{level}] = \sum_{k=1}^{\infty} k \times P(\text{level} = k) $
代入概率 $ P(\text{level} = k) $ 得:
$ E[\text{level}] = \sum_{k=1}^{\infty} k \times (1 - P) \times P^{k-1} $
将常数 ( (1 - P) ) 提到求和符号外:
$ E[\text{level}] = (1 - P) \sum_{k=1}^{\infty} k \times P^{k-1} $
等比级数求和
为了计算上述求和部分:
$ \sum_{k=1}^{\infty} k \times P^{k-1} $
我们可以使用等比级数的求和公式。首先,设:
$ S = \sum_{k=1}^{\infty} k \times P^{k-1} $
我们可以通过以下变换来求解:
$ S = P^0 + 2P^1 + 3P^2 + 4P^3 + \ldots $
将 ( S ) 乘以 ( P ):
$ PS = P^1 + 2P^2 + 3P^3 + 4P^4 + \ldots $
将这两个式子相减:
$ S - PS = (P^0 + P^1 + P^2 + P^3 + \ldots) = \sum_{k=0}^{\infty} P^k $
这部分是一个无穷等比数列的求和,其和为:
$ \sum_{k=0}^{\infty} P^k = \frac{1}{1 - P} $
所以,得到:
$ S(1 - P) = \frac{1}{1 - P} $
即:
$ S = \frac{1}{(1 - P)^2} $
计算期望值
代回到原来的期望值公式中:
$ E[\text{level}] = (1 - P) \times \frac{1}{(1 - P)^2} = \frac{1}{1 - P} $
结论
因此,level 的期望值$ E[\text{level}] $为:
$ E[\text{level}] = \frac{1}{1 - P} $
例子
如果 ( P = 0.25 ):
$ E[\text{level}] = \frac{1}{1 - 0.25} = \frac{1}{0.75} = \frac{4}{3} \approx 1.33 $
这个结果表示,在每次调用 zslRandomLevel 函数时,生成的层级的平均值为 1.33。这表明大多数节点将会插入到第 1 层或第 2 层。
创建跳表的步骤
-
创建头节点
头结点是跳表中第一个插入的节点, level数组的每一项forward都是NULL, span是0
1 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { 2 zsl->header->level[j].forward = NULL; 3 zsl->header->level[j].span = 0; 4 } -
创建完头结点之后就可以创建跳表了
- 创建跳跃表结构体对象zsl
- zsl头结点指针指向上面创建的头结点
- 跳表的层高初始化为1, 长度初始化为0, 尾节点为NULL
1/* Create a new skiplist. */
2zskiplist *zslCreate(void) {
3 int j;
4 zskiplist *zsl;
5
6 zsl = zmalloc(sizeof(*zsl));
7 zsl->level = 1;
8 zsl->length = 0;
9 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
10 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
11 zsl->header->level[j].forward = NULL;
12 zsl->header->level[j].span = 0;
13 }
14 zsl->header->backward = NULL;
15 zsl->tail = NULL;
16 return zsl;
17}
新增节点
新增节点的步骤:
- 找到位置
- 调整高度
- 插入节点
- 调整
backward
新建节点:
1/* Create a skiplist node with the specified number of levels.
2 * The SDS string 'ele' is referenced by the node after the call. */
3zskiplistNode *zslCreateNode(int level, double score, sds ele) {
4 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
5 zn->score = score;
6 zn->ele = ele;
7 return zn;
8}
由于zskiplistLevel是一个数组, 所有在申请内存空间的时候也需要计算上, 这样一个节点占用的内存大小就是zskiplistNode的大小 + (level 的个数 * zskiplistLevel)的大小。
插入:
1zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
2 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3 unsigned int rank[ZSKIPLIST_MAXLEVEL];
4 int i, level;
5
6 serverAssert(!isnan(score));
7 x = zsl->header;
8 // 找到位置, 并更新update[] 和rank[]
9 for (i = zsl->level-1; i >= 0; i--) {
10 /* store rank that is crossed to reach the insert position */
11 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
12 while (x->level[i].forward &&
13 (x->level[i].forward->score < score ||
14 (x->level[i].forward->score == score &&
15 sdscmp(x->level[i].forward->ele,ele) < 0)))
16 {
17 rank[i] += x->level[i].span;
18 x = x->level[i].forward;
19 }
20 update[i] = x;
21 }
22 /* we assume the element is not already inside, since we allow duplicated
23 * scores, reinserting the same element should never happen since the
24 * caller of zslInsert() should test in the hash table if the element is
25 * already inside or not. */
26 // 生成新的随机高度,
27 level = zslRandomLevel();
28
29 // 判断当前的跳跃表的高度是否小于随机生成的level
30 if (level > zsl->level) {
31 // 调整跳表的高度
32 for (i = zsl->level; i < level; i++) {
33 rank[i] = 0;
34 update[i] = zsl->header;
35 update[i]->level[i].span = zsl->length;
36 }
37 zsl->level = level;
38 }
39
40 // 创建新节点x
41 x = zslCreateNode(level,score,ele);
42 for (i = 0; i < level; i++) {
43 // 更新x竖向层级上的所有节点的forward属性
44 x->level[i].forward = update[i]->level[i].forward;
45 update[i]->level[i].forward = x;
46
47 /* update span covered by update[i] as x is inserted here */
48 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
49 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
50 }
51
52 /* increment span for untouched levels */
53 // 更新
54 for (i = level; i < zsl->level; i++) {
55 update[i]->level[i].span++;
56 }
57
58 // 更新backward
59 x->backward = (update[0] == zsl->header) ? NULL : update[0];
60 if (x->level[0].forward)
61 x->level[0].forward->backward = x;
62 else
63 zsl->tail = x;
64
65 // 增加总长度
66 zsl->length++;
67 return x;
68}
需要用到两个长度为64的数组来辅助新增:
update[]: 用来存放插入的节点所在的每一层的节点, 因为最大高度为64 所以设置长度为64正好rank[]: 用来记录当前层从header节点到update[i]节点所经历的步长, 在更新update[i]的span和设置新插入节点的span时用
删除节点
- 找到节点
- 设置span和forward
1/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
2void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
3 int i;
4 for (i = 0; i < zsl->level; i++) {
5 if (update[i]->level[i].forward == x) {
6 update[i]->level[i].span += x->level[i].span - 1;
7 update[i]->level[i].forward = x->level[i].forward;
8 } else {
9 update[i]->level[i].span -= 1;
10 }
11 }
12 if (x->level[0].forward) {
13 x->level[0].forward->backward = x->backward;
14 } else {
15 zsl->tail = x->backward;
16 }
17 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
18 zsl->level--;
19 zsl->length--;
20}
删除跳表
- 先遍历释放所有节点的内存
- 再释放跳跃表的内存
1/* Free a whole skiplist. */
2void zslFree(zskiplist *zsl) {
3 zskiplistNode *node = zsl->header->level[0].forward, *next;
4
5 zfree(zsl->header);
6 while(node) {
7 next = node->level[0].forward;
8 zslFreeNode(node);
9 node = next;
10 }
11 zfree(zsl);
12}
13
14
15void zslFreeNode(zskiplistNode *node) {
16 sdsfree(node->ele);
17 zfree(node);
18}
应用
跳表主要应用于有序集合的底层实现, 还有一种实现方式为压缩列表
Redis的配置文件中关于有序集合底层实现的两个配置。
zset-max-ziplist-entries 128: zset采用压缩列表时,元素个数最大值。默认值为128。zset-max-ziplist-value 64: zset采用压缩列表时,每个元素的字符串长度最大值。默认值为64。
默认使用压缩列表
zset添加元素的主要逻辑位于t_zset.c的zaddGenericCommand()函数中。
zset在插入第一个元素的时候会判断下面两种条件:
-
zset-max-ziplist-entries的值是否等于0 -
zset-max-ziplist-value小于要插入元素的字符串长度。
1/* Lookup the key and create the sorted set if does not exist. */
2zobj = lookupKeyWrite(c->db,key);
3if (zobj == NULL) {
4 if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
5 if (server.zset_max_ziplist_entries == 0 ||
6 server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
7 {
8
9 zobj = createZsetObject();
10 } else {
11 zobj = createZsetZiplistObject();
12 }
13 dbAdd(c->db,key,zobj);
14} else {
15 if (zobj->type != OBJ_ZSET) {
16 addReply(c,shared.wrongtypeerr);
17 goto cleanup;
18 }
19}
注意: 一旦采用跳表作为底层结构的时候, 即使元素被删除到阈值以下, 也无法转变回压缩列表.