Redis Pub/Sub 源码阅读

Pub/Sub的作用

发布订阅,用过消息队列的应该都不陌生。Redis 的实现比较轻量,生产者 publish 一个消息到 channel,所有订阅了这个 channel 的消费者都能收到。另外还支持模式匹配订阅,比如订阅 news:* 就能收到 news:sportnews:tech 等所有匹配的消息。

数据结构

Redis 在设计上把订阅关系存了两份,一份在 server 端,一份在 client 端。先看 server 端的定义(在 server.h 里):

1// server 结构体中
2dict *pubsub_channels;  // channel -> 客户端列表
3list *pubsub_patterns;  // pattern 订阅列表

pubsub_channels 是个字典,key 是 channel 名,value 是订阅了这个 channel 的客户端链表。为啥是链表而不是集合?因为要保证消息发送的顺序一致,链表遍历比较直接。

pubsub_patterns 是个链表,存的是 pubsubPattern 结构:

1typedef struct pubsubPattern {
2    client *client;
3    robj *pattern;
4} pubsubPattern;

为啥 pattern 用链表而不用字典?因为 pattern 是模糊匹配,没法像 channel 那样直接用 key 做索引,发布消息时只能暴力遍历匹配。这也是为什么 pattern 订阅性能不如 channel 订阅的原因。

再看 client 端:

1// client 结构体中
2dict *pubsub_channels;  // 该客户端订阅的所有 channel
3list *pubsub_patterns;  // 该客户端订阅的所有 pattern

client 自己也维护了一份订阅关系。这样设计有个好处:取消订阅时不需要去 server 端查找,直接从本地删,然后再去 server 端同步删除对应的反向引用。

SUBSCRIBE 订阅

 1int pubsubSubscribeChannel(client *c, robj *channel) {
 2    dictEntry *de;
 3    list *clients = NULL;
 4    int retval = 0;
 5
 6    // 先加到客户端的订阅字典里
 7    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
 8        retval = 1;
 9        incrRefCount(channel);
10        
11        // 再把客户端加到 server 的 channel -> clients 映射里
12        de = dictFind(server.pubsub_channels,channel);
13        if (de == NULL) {
14            // 这个 channel 还没人订阅过,新建一个链表
15            clients = listCreate();
16            dictAdd(server.pubsub_channels,channel,clients);
17            incrRefCount(channel);
18        } else {
19            clients = dictGetVal(de);
20        }
21        listAddNodeTail(clients,c);
22    }
23    
24    // 回复客户端
25    addReply(c,shared.mbulkhdr[3]);
26    addReply(c,shared.subscribebulk);
27    addReplyBulk(c,channel);
28    addReplyLongLong(c,clientSubscriptionsCount(c));
29    return retval;
30}

这个方法的逻辑很清晰:先查 client 本地有没有订阅过这个 channel,没有的话就加进去,然后再往 server 端的反向映射里加。注意这里 channel 对象的引用计数管理,Redis 的引用计数是到处都要注意的细节。

UNSUBSCRIBE 取消订阅

 1int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
 2    dictEntry *de;
 3    list *clients;
 4    listNode *ln;
 5    int retval = 0;
 6
 7    incrRefCount(channel); // 保护一下,防止被提前释放
 8    
 9    // 从客户端字典里删
10    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
11        retval = 1;
12        
13        // 从 server 端的映射里删
14        de = dictFind(server.pubsub_channels,channel);
15        clients = dictGetVal(de);
16        ln = listSearchKey(clients,c);
17        listDelNode(clients,ln);
18        
19        // 如果这个 channel 没人订阅了,就把整个 entry 删掉
20        if (listLength(clients) == 0) {
21            dictDelete(server.pubsub_channels,channel);
22        }
23    }
24    
25    if (notify) {
26        // 回复客户端...
27    }
28    decrRefCount(channel);
29    return retval;
30}

这里有个细节:当 channel 的订阅者列表变空时,会直接把整个 channel 从字典里删掉。注释里说的是"防止滥用 PUBSUB 创建数百万个 channel"。确实,如果每个空 channel 都留着,内存会直接打爆。

PUBLISH 命令的实现

  1/* 发布消息到指定的 channel
  2 * 参数:
  3 *   channel - 目标 channel 名称
  4 *   message - 要发布的消息内容
  5 * 返回值:
  6 *   接收到消息的客户端数量
  7 */
  8int pubsubPublishMessage(robj *channel, robj *message) {
  9    int receivers = 0;
 10    dictEntry *de;
 11    listNode *ln;
 12    listIter li;
 13
 14    /* ========== 第一部分:处理精确 channel 订阅 ========== */
 15    
 16    /* 在 server.pubsub_channels 字典中查找该 channel
 17     * 字典结构: key=channel名称, value=订阅该channel的客户端链表
 18     * 时间复杂度: O(1)
 19     */
 20    de = dictFind(server.pubsub_channels,channel);
 21    if (de) {
 22        /* 获取订阅该 channel 的客户端链表 */
 23        list *list = dictGetVal(de);
 24        listNode *ln;
 25        listIter li;
 26
 27        /* 遍历链表,给每个订阅者发送消息 */
 28        listRewind(list,&li);
 29        while ((ln = listNext(&li)) != NULL) {
 30            client *c = ln->value;
 31
 32            /* 构造并发送协议格式的消息:
 33             * *3\r\n              - 3个元素的数组
 34             * $7\r\nmessage\r\n   - "message" 字符串
 35             * $<len>\r\n<channel>\r\n - channel 名称
 36             * $<len>\r\n<message>\r\n - 消息内容
 37             */
 38            addReply(c,shared.mbulkhdr[3]);    // 数组头,3个元素
 39            addReply(c,shared.messagebulk);    // "message" 标识
 40            addReplyBulk(c,channel);           // channel 名称
 41            addReplyBulk(c,message);           // 消息内容
 42            receivers++;
 43        }
 44    }
 45
 46    /* ========== 第二部分:处理 pattern 模式订阅 ========== */
 47    
 48    /* 如果存在任何 pattern 订阅,需要遍历检查匹配 */
 49    if (listLength(server.pubsub_patterns)) {
 50        /* 初始化遍历器 */
 51        listRewind(server.pubsub_patterns,&li);
 52        
 53        /* getDecodedObject 会确保 channel 是解码后的字符串对象
 54         * 如果 channel 是编码过的(如整数编码),会创建一个新的字符串对象
 55         * 这样后续的 pattern 匹配才能正常工作
 56         */
 57        channel = getDecodedObject(channel);
 58        
 59        /* 遍历所有 pattern 订阅,逐个匹配
 60         * 时间复杂度: O(n),n 为 pattern 订阅总数
 61         * 这就是 pattern 订阅比 channel 订阅慢的原因
 62         */
 63        while ((ln = listNext(&li)) != NULL) {
 64            pubsubPattern *pat = ln->value;
 65
 66            /* 使用通配符匹配函数检查 channel 是否匹配 pattern
 67             * stringmatchlen 支持 * 和 ? 通配符
 68             * 参数: pattern, pattern长度, string, string长度, 是否忽略大小写
 69             */
 70            if (stringmatchlen((char*)pat->pattern->ptr,
 71                               sdslen(pat->pattern->ptr),
 72                               (char*)channel->ptr,
 73                               sdslen(channel->ptr),0)) {
 74                
 75                /* 构造并发送 pmessage 格式的消息:
 76                 * *4\r\n               - 4个元素的数组
 77                 * $8\r\npmessage\r\n   - "pmessage" 标识
 78                 * $<len>\r\n<pattern>\r\n - 匹配的 pattern
 79                 * $<len>\r\n<channel>\r\n - 实际 channel 名称
 80                 * $<len>\r\n<message>\r\n - 消息内容
 81                 * 
 82                 * 注意:pattern 订阅的消息比 channel 订阅多一个 pattern 字段
 83                 * 这样客户端可以知道是哪个 pattern 匹配了
 84                 */
 85                addReply(pat->client,shared.mbulkhdr[4]);   // 数组头,4个元素
 86                addReply(pat->client,shared.pmessagebulk);  // "pmessage" 标识
 87                addReplyBulk(pat->client,pat->pattern);     // 匹配的 pattern
 88                addReplyBulk(pat->client,channel);          // 实际 channel
 89                addReplyBulk(pat->client,message);          // 消息内容
 90                receivers++;
 91            }
 92        }
 93        
 94        /* getDecodedObject 可能创建了新对象,需要减少引用计数
 95         * 如果是新创建的对象,这里会释放它
 96         * 如果是原对象,引用计数减1
 97         */
 98        decrRefCount(channel);
 99    }
100    
101    /* 返回接收消息的客户端总数 */
102    return receivers;
103}

这部分就是整个 pub/sub 的核心了。先处理精确匹配的 channel 订阅,直接字典查找 O(1)。然后处理 pattern 订阅,遍历所有 pattern 逐个匹配,这是 O(n)。

pattern 匹调用的是 stringmatchlen 函数,支持 *? 通配符。如果想深入了解匹配算法,可以去看 util.c 里的实现,是经典的通配符匹配。

Pattern 订阅的实现

Pattern 订阅用 PSUBSCRIBE 命令,实现和普通订阅有点不一样:

 1int pubsubSubscribePattern(client *c, robj *pattern) {
 2    int retval = 0;
 3
 4    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
 5        retval = 1;
 6        pubsubPattern *pat;
 7        
 8        // 加到客户端的 pattern 列表
 9        listAddNodeTail(c->pubsub_patterns,pattern);
10        incrRefCount(pattern);
11        
12        // 创建 pubsubPattern 加到 server 列表
13        pat = zmalloc(sizeof(*pat));
14        pat->pattern = getDecodedObject(pattern);
15        pat->client = c;
16        listAddNodeTail(server.pubsub_patterns,pat);
17    }
18    // ...
19}

注意这里 pattern 订阅在客户端是用链表存的,而且每次都要 listSearchKey 遍历查重。如果客户端订阅了大量 pattern,这里会有性能问题。不过实际场景中,单个客户端订阅几百上千个 pattern 的情况应该不多。

PUBSUB 命令:自省工具

Redis 还提供了 PUBSUB 命令来查看订阅状态:

1PUBSUB CHANNELS [pattern]   // 列出当前活跃的 channel
2PUBSUB NUMSUB [channel...]  // 查看各 channel 的订阅者数量
3PUBSUB NUMPAT               // 查看 pattern 订阅总数

实现都比较直接,就是遍历字典或链表统计。PUBSUB CHANNELS 这个命令还支持按 pattern 过滤,方便排查问题。

一些值得注意的细节

消息不持久化

Redis 的 pub/sub 是纯实时的,消息发出去就没了。如果客户端断线,期间的消息就丢了。如果需要消息持久化,应该用 Redis Stream。

没有 ACK 机制

消息发出去就不管了,不保证客户端一定收到。如果网络抖动,消息可能丢了也不知道。

内存占用

每个 pattern 订阅都会在 server 端存一个 pubsubPattern 结构,包含客户端指针和 pattern 字符串。如果 pattern 很多且很长,内存占用会比较大。

集群模式下的行为

在 cluster 模式下,publish 命令会把消息广播到集群所有节点。这个逻辑在 publishCommand 里:

1void publishCommand(client *c) {
2    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
3    if (server.cluster_enabled)
4        clusterPropagatePublish(c->argv[1],c->argv[2]);
5    // ...
6}
— END —