Redis Pub/Sub 源码解析
Redis Pub/Sub 源码阅读
Pub/Sub的作用
发布订阅,用过消息队列的应该都不陌生。Redis 的实现比较轻量,生产者 publish 一个消息到 channel,所有订阅了这个 channel 的消费者都能收到。另外还支持模式匹配订阅,比如订阅 news:* 就能收到 news:sport、news:tech 等所有匹配的消息。
数据结构
Redis 在设计上把订阅关系存了两份,一份在 server 端,一份在 client 端。先看 server 端的定义(在 server.h 里):
1// server 结构体中
2dict *pubsub_channels; // channel -> 客户端列表
3list *pubsub_patterns; // pattern 订阅列表
pubsub_channels 是个字典,key 是 channel 名,value 是订阅了这个 channel 的客户端链表。为啥是链表而不是集合?因为要保证消息发送的顺序一致,链表遍历比较直接。
pubsub_patterns 是个链表,存的是 pubsubPattern 结构:
1typedef struct pubsubPattern {
2 client *client;
3 robj *pattern;
4} pubsubPattern;
为啥 pattern 用链表而不用字典?因为 pattern 是模糊匹配,没法像 channel 那样直接用 key 做索引,发布消息时只能暴力遍历匹配。这也是为什么 pattern 订阅性能不如 channel 订阅的原因。
再看 client 端:
1// client 结构体中
2dict *pubsub_channels; // 该客户端订阅的所有 channel
3list *pubsub_patterns; // 该客户端订阅的所有 pattern
client 自己也维护了一份订阅关系。这样设计有个好处:取消订阅时不需要去 server 端查找,直接从本地删,然后再去 server 端同步删除对应的反向引用。
SUBSCRIBE 订阅
1int pubsubSubscribeChannel(client *c, robj *channel) {
2 dictEntry *de;
3 list *clients = NULL;
4 int retval = 0;
5
6 // 先加到客户端的订阅字典里
7 if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
8 retval = 1;
9 incrRefCount(channel);
10
11 // 再把客户端加到 server 的 channel -> clients 映射里
12 de = dictFind(server.pubsub_channels,channel);
13 if (de == NULL) {
14 // 这个 channel 还没人订阅过,新建一个链表
15 clients = listCreate();
16 dictAdd(server.pubsub_channels,channel,clients);
17 incrRefCount(channel);
18 } else {
19 clients = dictGetVal(de);
20 }
21 listAddNodeTail(clients,c);
22 }
23
24 // 回复客户端
25 addReply(c,shared.mbulkhdr[3]);
26 addReply(c,shared.subscribebulk);
27 addReplyBulk(c,channel);
28 addReplyLongLong(c,clientSubscriptionsCount(c));
29 return retval;
30}
这个方法的逻辑很清晰:先查 client 本地有没有订阅过这个 channel,没有的话就加进去,然后再往 server 端的反向映射里加。注意这里 channel 对象的引用计数管理,Redis 的引用计数是到处都要注意的细节。
UNSUBSCRIBE 取消订阅
1int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
2 dictEntry *de;
3 list *clients;
4 listNode *ln;
5 int retval = 0;
6
7 incrRefCount(channel); // 保护一下,防止被提前释放
8
9 // 从客户端字典里删
10 if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
11 retval = 1;
12
13 // 从 server 端的映射里删
14 de = dictFind(server.pubsub_channels,channel);
15 clients = dictGetVal(de);
16 ln = listSearchKey(clients,c);
17 listDelNode(clients,ln);
18
19 // 如果这个 channel 没人订阅了,就把整个 entry 删掉
20 if (listLength(clients) == 0) {
21 dictDelete(server.pubsub_channels,channel);
22 }
23 }
24
25 if (notify) {
26 // 回复客户端...
27 }
28 decrRefCount(channel);
29 return retval;
30}
这里有个细节:当 channel 的订阅者列表变空时,会直接把整个 channel 从字典里删掉。注释里说的是"防止滥用 PUBSUB 创建数百万个 channel"。确实,如果每个空 channel 都留着,内存会直接打爆。
PUBLISH 命令的实现
1/* 发布消息到指定的 channel
2 * 参数:
3 * channel - 目标 channel 名称
4 * message - 要发布的消息内容
5 * 返回值:
6 * 接收到消息的客户端数量
7 */
8int pubsubPublishMessage(robj *channel, robj *message) {
9 int receivers = 0;
10 dictEntry *de;
11 listNode *ln;
12 listIter li;
13
14 /* ========== 第一部分:处理精确 channel 订阅 ========== */
15
16 /* 在 server.pubsub_channels 字典中查找该 channel
17 * 字典结构: key=channel名称, value=订阅该channel的客户端链表
18 * 时间复杂度: O(1)
19 */
20 de = dictFind(server.pubsub_channels,channel);
21 if (de) {
22 /* 获取订阅该 channel 的客户端链表 */
23 list *list = dictGetVal(de);
24 listNode *ln;
25 listIter li;
26
27 /* 遍历链表,给每个订阅者发送消息 */
28 listRewind(list,&li);
29 while ((ln = listNext(&li)) != NULL) {
30 client *c = ln->value;
31
32 /* 构造并发送协议格式的消息:
33 * *3\r\n - 3个元素的数组
34 * $7\r\nmessage\r\n - "message" 字符串
35 * $<len>\r\n<channel>\r\n - channel 名称
36 * $<len>\r\n<message>\r\n - 消息内容
37 */
38 addReply(c,shared.mbulkhdr[3]); // 数组头,3个元素
39 addReply(c,shared.messagebulk); // "message" 标识
40 addReplyBulk(c,channel); // channel 名称
41 addReplyBulk(c,message); // 消息内容
42 receivers++;
43 }
44 }
45
46 /* ========== 第二部分:处理 pattern 模式订阅 ========== */
47
48 /* 如果存在任何 pattern 订阅,需要遍历检查匹配 */
49 if (listLength(server.pubsub_patterns)) {
50 /* 初始化遍历器 */
51 listRewind(server.pubsub_patterns,&li);
52
53 /* getDecodedObject 会确保 channel 是解码后的字符串对象
54 * 如果 channel 是编码过的(如整数编码),会创建一个新的字符串对象
55 * 这样后续的 pattern 匹配才能正常工作
56 */
57 channel = getDecodedObject(channel);
58
59 /* 遍历所有 pattern 订阅,逐个匹配
60 * 时间复杂度: O(n),n 为 pattern 订阅总数
61 * 这就是 pattern 订阅比 channel 订阅慢的原因
62 */
63 while ((ln = listNext(&li)) != NULL) {
64 pubsubPattern *pat = ln->value;
65
66 /* 使用通配符匹配函数检查 channel 是否匹配 pattern
67 * stringmatchlen 支持 * 和 ? 通配符
68 * 参数: pattern, pattern长度, string, string长度, 是否忽略大小写
69 */
70 if (stringmatchlen((char*)pat->pattern->ptr,
71 sdslen(pat->pattern->ptr),
72 (char*)channel->ptr,
73 sdslen(channel->ptr),0)) {
74
75 /* 构造并发送 pmessage 格式的消息:
76 * *4\r\n - 4个元素的数组
77 * $8\r\npmessage\r\n - "pmessage" 标识
78 * $<len>\r\n<pattern>\r\n - 匹配的 pattern
79 * $<len>\r\n<channel>\r\n - 实际 channel 名称
80 * $<len>\r\n<message>\r\n - 消息内容
81 *
82 * 注意:pattern 订阅的消息比 channel 订阅多一个 pattern 字段
83 * 这样客户端可以知道是哪个 pattern 匹配了
84 */
85 addReply(pat->client,shared.mbulkhdr[4]); // 数组头,4个元素
86 addReply(pat->client,shared.pmessagebulk); // "pmessage" 标识
87 addReplyBulk(pat->client,pat->pattern); // 匹配的 pattern
88 addReplyBulk(pat->client,channel); // 实际 channel
89 addReplyBulk(pat->client,message); // 消息内容
90 receivers++;
91 }
92 }
93
94 /* getDecodedObject 可能创建了新对象,需要减少引用计数
95 * 如果是新创建的对象,这里会释放它
96 * 如果是原对象,引用计数减1
97 */
98 decrRefCount(channel);
99 }
100
101 /* 返回接收消息的客户端总数 */
102 return receivers;
103}
这部分就是整个 pub/sub 的核心了。先处理精确匹配的 channel 订阅,直接字典查找 O(1)。然后处理 pattern 订阅,遍历所有 pattern 逐个匹配,这是 O(n)。
pattern 匹调用的是 stringmatchlen 函数,支持 * 和 ? 通配符。如果想深入了解匹配算法,可以去看 util.c 里的实现,是经典的通配符匹配。
Pattern 订阅的实现
Pattern 订阅用 PSUBSCRIBE 命令,实现和普通订阅有点不一样:
1int pubsubSubscribePattern(client *c, robj *pattern) {
2 int retval = 0;
3
4 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
5 retval = 1;
6 pubsubPattern *pat;
7
8 // 加到客户端的 pattern 列表
9 listAddNodeTail(c->pubsub_patterns,pattern);
10 incrRefCount(pattern);
11
12 // 创建 pubsubPattern 加到 server 列表
13 pat = zmalloc(sizeof(*pat));
14 pat->pattern = getDecodedObject(pattern);
15 pat->client = c;
16 listAddNodeTail(server.pubsub_patterns,pat);
17 }
18 // ...
19}
注意这里 pattern 订阅在客户端是用链表存的,而且每次都要 listSearchKey 遍历查重。如果客户端订阅了大量 pattern,这里会有性能问题。不过实际场景中,单个客户端订阅几百上千个 pattern 的情况应该不多。
PUBSUB 命令:自省工具
Redis 还提供了 PUBSUB 命令来查看订阅状态:
1PUBSUB CHANNELS [pattern] // 列出当前活跃的 channel
2PUBSUB NUMSUB [channel...] // 查看各 channel 的订阅者数量
3PUBSUB NUMPAT // 查看 pattern 订阅总数
实现都比较直接,就是遍历字典或链表统计。PUBSUB CHANNELS 这个命令还支持按 pattern 过滤,方便排查问题。
一些值得注意的细节
消息不持久化
Redis 的 pub/sub 是纯实时的,消息发出去就没了。如果客户端断线,期间的消息就丢了。如果需要消息持久化,应该用 Redis Stream。
没有 ACK 机制
消息发出去就不管了,不保证客户端一定收到。如果网络抖动,消息可能丢了也不知道。
内存占用
每个 pattern 订阅都会在 server 端存一个 pubsubPattern 结构,包含客户端指针和 pattern 字符串。如果 pattern 很多且很长,内存占用会比较大。
集群模式下的行为
在 cluster 模式下,publish 命令会把消息广播到集群所有节点。这个逻辑在 publishCommand 里:
1void publishCommand(client *c) {
2 int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
3 if (server.cluster_enabled)
4 clusterPropagatePublish(c->argv[1],c->argv[2]);
5 // ...
6}