客户端请求处理完整链路源码分析

一、概述

当客户端向 Redis 发送一条命令(如 SET key value)时,请求会经历一个完整的处理链路:从网络数据接收、协议解析、命令查找、命令执行、结果返回,直到最终发送响应。本文将深入分析这个完整的请求处理链路,帮助读者理解 Redis 如何高效地处理每一条命令。

请求处理链路概览

1┌─────────┐    ┌──────────┐    ┌───────────┐    ┌──────────┐    ┌──────────┐    ┌─────────┐
2│ 网络读取 │ -> │ 协议解析  │ -> │ 命令查找   │ -> │ 命令执行  │ -> │ 结果封装 │ -> │ 响应发送 │
3└─────────┘    └──────────┘    └───────────┘    └──────────┘    └──────────┘    └─────────┘
4     │              │               │               │              │              │
5     ▼              ▼               ▼               ▼              ▼              ▼
6read()        processMulti-    lookupCommand    call()       addReply()    write()
7              bulkBuffer()

二、整体架构图

 1┌─────────────────────────────────────────────────────────────────────────────────────┐
 2│                              Redis 客户端请求处理完整链路                              │
 3├─────────────────────────────────────────────────────────────────────────────────────┤
 4│                                                                                     │
 5│   ┌─────────────┐                                                                  │
 6│   │   Client    │                                                                  │
 7│   │  (客户端)    │                                                                  │
 8│   └──────┬──────┘                                                                  │
 9│          │ TCP 连接                                                                 │
10│          ▼                                                                          │
11│   ┌─────────────────────────────────────────────────────────────────────────────┐  │
12│   │                          1. 连接接收阶段                                      │  │
13│   │  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐         │  │
14│   │  │ acceptTcpHandler │ -> │acceptCommonHandler│ -> │  createClient   │         │  │
15│   │  │   (接受连接)      │    │  (检查限制)       │    │ (初始化客户端)    │         │  │
16│   │  └─────────────────┘    └─────────────────┘    └─────────────────┘         │  │
17│   └─────────────────────────────────────────────────────────────────────────────┘  │
18│          │                                                                          │
19│          ▼                                                                          │
20│   ┌─────────────────────────────────────────────────────────────────────────────┐  │
21│   │                          2. 请求读取阶段                                      │  │
22│   │  ┌─────────────────┐    ┌─────────────────────────────────────────────┐    │  │
23│   │  │readQueryFromClient│ -> │         processInputBuffer                 │    │  │
24│   │  │   (读取数据)       │    │  ┌─────────────────┐ ┌─────────────────┐  │    │  │
25│   │  └─────────────────┘    │  │processInlineBuf │ │processMultibulk │  │    │  │
26│   │                          │  │   (内联协议)     │ │   (RESP协议)     │  │    │  │
27│   │                          │  └─────────────────┘ └─────────────────┘  │    │  │
28│   │                          └─────────────────────────────────────────────┘    │  │
29│   └─────────────────────────────────────────────────────────────────────────────┘  │
30│          │                                                                          │
31│          ▼                                                                          │
32│   ┌─────────────────────────────────────────────────────────────────────────────┐  │
33│   │                          3. 命令处理阶段                                      │  │
34│   │  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐         │  │
35│   │  │ processCommand  │ -> │ lookupCommand   │ -> │     call        │         │  │
36│   │  │  (前置检查)      │    │  (命令查找)      │    │  (命令执行)      │         │  │
37│   │  └─────────────────┘    └─────────────────┘    └─────────────────┘         │  │
38│   └─────────────────────────────────────────────────────────────────────────────┘  │
39│          │                                                                          │
40│          ▼                                                                          │
41│   ┌─────────────────────────────────────────────────────────────────────────────┐  │
42│   │                          4. 响应发送阶段                                      │  │
43│   │  ┌─────────────────┐    ┌─────────────────────────────────────────────┐    │  │
44│   │  │    addReply     │ -> │        handleClientsWithPendingWrites       │    │  │
45│   │  │  (封装响应)      │    │  ┌─────────────────┐ ┌─────────────────┐   │    │  │
46│   │  └─────────────────┘    │  │ writeToClient   │ │sendReplyToClient│   │    │  │
47│   │                          │  │   (同步发送)     │ │   (异步发送)     │   │    │  │
48│   │                          │  └─────────────────┘ └─────────────────┘   │    │  │
49│   │                          └─────────────────────────────────────────────┘    │  │
50│   └─────────────────────────────────────────────────────────────────────────────┘  │
51│                                                                                     │
52└─────────────────────────────────────────────────────────────────────────────────────┘

三、阶段一:连接接收

3.1 客户端结构体

每个连接的客户端都对应一个 client 结构体,存储了请求处理过程中的所有状态:

 1// src/server.h:717-774
 2typedef struct client {
 3    uint64_t id;            /* 客户端唯一递增 ID */
 4    int fd;                 /* 客户端 socket 文件描述符 */
 5    redisDb *db;            /* 当前选中的数据库指针 */
 6    robj *name;             /* 客户端名称(CLIENT SETNAME 设置)*/
 7    
 8    // 请求相关
 9    sds querybuf;           /* 查询缓冲区:累积客户端请求 */
10    size_t qb_pos;          /* 在 querybuf 中已读取的位置 */
11    int argc;               /* 当前命令的参数个数 */
12    robj **argv;            /* 当前命令的参数数组 */
13    struct redisCommand *cmd, *lastcmd;  /* 当前/最后执行的命令 */
14    int reqtype;            /* 请求协议类型:PROTO_REQ_* */
15    int multibulklen;       /* 多批量请求中剩余待读参数个数 */
16    long bulklen;           /* 当前批量字符串的长度 */
17    
18    // 响应相关
19    list *reply;            /* 待发送的响应对象链表 */
20    unsigned long long reply_bytes; /* 响应链表的总字节数 */
21    size_t sentlen;         /* 当前发送块已发送的字节数 */
22    int bufpos;             /* 固定响应缓冲区的写入位置 */
23    char buf[PROTO_REPLY_CHUNK_BYTES]; /* 固定响应缓冲区(16KB)*/
24    
25    // 状态相关
26    time_t ctime;           /* 客户端创建时间 */
27    time_t lastinteraction; /* 最后交互时间(用于超时检测)*/
28    int flags;              /* 客户端标志:CLIENT_* 宏 */
29    int authenticated;      /* 是否已认证 */
30    
31    // 其他字段省略...
32} client;

3.2 接受连接流程

 1// src/networking.c:726-744
 2void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 3    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;  // 单次最多接受 1000 个连接
 4    char cip[NET_IP_STR_LEN];
 5    UNUSED(el);
 6    UNUSED(mask);
 7    UNUSED(privdata);
 8
 9    // 循环接受新连接,直到没有更多连接或达到单次上限
10    while(max--) {
11        // 接受一个 TCP 连接,返回客户端 fd
12        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
13        if (cfd == ANET_ERR) {
14            // EWOULDBLOCK 表示没有更多待处理的连接,正常情况
15            if (errno != EWOULDBLOCK)
16                serverLog(LL_WARNING,
17                    "Accepting client connection: %s", server.neterr);
18            return;
19        }
20        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
21        // 创建客户端结构体,注册读事件回调
22        acceptCommonHandler(cfd,0,cip);
23    }
24}

3.3 创建客户端

 1// src/networking.c:85-181
 2client *createClient(int fd) {
 3    client *c = zmalloc(sizeof(client));  // 分配客户端结构体内存
 4
 5    // fd != -1 表示创建真实的网络客户端
 6    // fd == -1 用于创建伪客户端(如 Lua 脚本执行、AOF 加载)
 7    if (fd != -1) {
 8        anetNonBlock(NULL,fd);           // 设置非阻塞模式
 9        anetEnableTcpNoDelay(NULL,fd);   // 禁用 Nagle 算法,减少延迟
10        if (server.tcpkeepalive)
11            anetKeepAlive(NULL,fd,server.tcpkeepalive);  // 启用 TCP keepalive
12        // 注册可读事件,回调函数为 readQueryFromClient
13        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
14            readQueryFromClient, c) == AE_ERR)
15        {
16            close(fd);
17            zfree(c);
18            return NULL;
19        }
20    }
21
22    // 初始化客户端状态
23    selectDb(c,0);                       // 默认选择数据库 0
24    uint64_t client_id = atomicIncr(server.next_client_id, 1);
25    c->id = client_id;
26    c->fd = fd;
27    c->name = NULL;
28    c->bufpos = 0;
29    c->querybuf = sdsempty();            // 初始化查询缓冲区
30    c->argc = 0;
31    c->argv = NULL;
32    c->cmd = c->lastcmd = NULL;
33    c->multibulklen = 0;
34    c->bulklen = -1;
35    c->sentlen = 0;
36    c->flags = 0;
37    c->ctime = c->lastinteraction = server.unixtime;
38    c->authenticated = 0;
39    c->reply = listCreate();             // 初始化响应链表
40    c->reply_bytes = 0;
41    listSetFreeMethod(c->reply, zfree);
42    
43    // 将客户端添加到服务器客户端链表
44    if (fd != -1) {
45        c->client_list_node = listAddNodeTailReturn(server.clients, c);
46    }
47    c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES);  // 分配 16KB 固定响应缓冲区
48    listAddNodeHead(server.clients_pending_write, c);
49    return c;
50}

关键操作

  1. 设置非阻塞模式:确保 read/write 不会阻塞事件循环
  2. 禁用 Nagle 算法:减少小数据包的延迟
  3. 注册读事件:回调函数为 readQueryFromClient
  4. 初始化两级缓冲:固定缓冲区(16KB)+ 链表缓冲区

四、阶段二:请求读取与协议解析

4.1 读取客户端数据

 1// src/networking.c:1500-1570
 2void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
 3    client *c = (client*) privdata;
 4    int nread, readlen;
 5    size_t qblen;
 6    UNUSED(el);
 7    UNUSED(mask);
 8
 9    readlen = PROTO_IOBUF_LEN;  // 默认读取缓冲区大小 16KB
10    
11    // 大包优化:如果是多批量请求且正在处理大参数,调整读取长度
12    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
13        && c->bulklen >= PROTO_MBULK_BIG_ARG)
14    {
15        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
16        if (remaining > 0 && remaining < readlen) readlen = remaining;
17    }
18
19    qblen = sdslen(c->querybuf);
20    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
21    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);  // 扩展缓冲区空间
22    
23    // 从 socket 读取数据到查询缓冲区末尾
24    nread = read(fd, c->querybuf+qblen, readlen);
25    if (nread == -1) {
26        if (errno == EAGAIN) {
27            return;  // EAGAIN 表示暂无数据可读
28        } else {
29            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
30            freeClient(c);
31            return;
32        }
33    } else if (nread == 0) {
34        serverLog(LL_VERBOSE, "Client closed connection");
35        freeClient(c);
36        return;
37    }
38
39    sdsIncrLen(c->querybuf,nread);            // 更新缓冲区有效数据长度
40    c->lastinteraction = server.unixtime;      // 更新最后交互时间
41    server.stat_net_input_bytes += nread;     // 统计网络输入字节数
42    
43    // 检查查询缓冲区是否超过最大限制
44    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
45        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
46        bytes = sdscatrepr(bytes,c->querybuf,64);
47        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
48        sdsfree(ci);
49        sdsfree(bytes);
50        freeClient(c);
51        return;
52    }
53
54    // 处理输入缓冲区中的命令
55    processInputBufferAndReplicate(c);
56}

4.2 处理输入缓冲区

 1// src/networking.c:1453-1498
 2void processInputBuffer(client *c) {
 3    server.current_client = c;
 4    
 5    // 循环处理查询缓冲区中的所有命令
 6    while(sdslen(c->querybuf)) {
 7        // 客户端暂停时不处理命令
 8        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
 9
10        // 如果客户端正在处理阻塞命令,暂停处理
11        if (c->flags & CLIENT_BLOCKED) break;
12
13        // 如果标记为关闭,不再处理更多命令
14        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
15
16        // 确定请求协议类型
17        if (!c->reqtype) {
18            if (c->querybuf[0] == '*') {
19                // 多批量格式:*3\r\n$3\r\nSET\r\n...
20                c->reqtype = PROTO_REQ_MULTIBULK;
21            } else {
22                // 内联格式:SET key value
23                c->reqtype = PROTO_REQ_INLINE;
24            }
25        }
26
27        if (c->reqtype == PROTO_REQ_INLINE) {
28            if (processInlineBuffer(c) != C_OK) break;
29        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
30            if (processMultibulkBuffer(c) != C_OK) break;
31        } else {
32            serverPanic("Unknown request type");
33        }
34
35        // 命令参数不完整,等待更多数据
36        if (c->argc == 0) {
37            resetClient(c);
38        } else {
39            // 执行命令
40            if (processCommand(c) == C_OK) {
41                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
42                    c->reploff = c->read_reploff - sdslen(c->querybuf);
43                }
44                if (!(c->flags & CLIENT_BLOCKED || c->flags & CLIENT_MULTI))
45                    resetClient(c);
46            }
47            if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
48        }
49    }
50    server.current_client = NULL;
51}

4.3 RESP 协议解析

Redis 使用 RESP(Redis Serialization Protocol)协议通信。以 SET key value 为例:

 1请求格式:
 2*3\r\n          <- 参数个数(3个)
 3$3\r\n          <- 第一个参数长度
 4SET\r\n         <- 第一个参数内容
 5$3\r\n          <- 第二个参数长度
 6key\r\n         <- 第二个参数内容
 7$5\r\n          <- 第三个参数长度
 8value\r\n       <- 第三个参数内容
 9
10响应格式:
11+OK\r\n         <- 简单字符串响应
  1// src/networking.c:1267-1383
  2int processMultibulkBuffer(client *c) {
  3    char *newline = NULL;
  4    int ok;
  5    long long ll;
  6
  7    // 读取多批量参数个数:*<count>\r\n
  8    if (c->multibulklen == 0) {
  9        newline = strchr(c->querybuf+c->qb_pos,'\r');
 10        if (newline == NULL) {
 11            if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
 12                addReplyError(c,"Protocol error: too big mbulk count string");
 13                setProtocolError("too big mbulk count",c);
 14            }
 15            return C_ERR;
 16        }
 17
 18        // 检查格式:* 后跟数字
 19        if (c->querybuf[c->qb_pos] != '*') {
 20            addReplyError(c,"Protocol error: expected '*', got something else");
 21            setProtocolError("expected *",c);
 22            return C_ERR;
 23        }
 24
 25        // 解析参数个数
 26        ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
 27        if (!ok || ll > 1024*1024) {
 28            addReplyError(c,"Protocol error: invalid multibulk length");
 29            setProtocolError("invalid mbulk count",c);
 30            return C_ERR;
 31        }
 32
 33        c->qb_pos = (newline-c->querybuf)+2;  // 跳过 \r\n
 34
 35        // 空命令
 36        if (ll <= 0) {
 37            c->argc = 0;
 38            return C_OK;
 39        }
 40
 41        c->multibulklen = ll;  // 设置待读取参数个数
 42        c->argc = 0;
 43        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
 44    }
 45
 46    // 循环读取每个参数
 47    while(c->multibulklen) {
 48        // 读取批量字符串长度:$<len>\r\n
 49        if (c->bulklen == -1) {
 50            newline = strchr(c->querybuf+c->qb_pos,'\r');
 51            if (newline == NULL) {
 52                if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
 53                    addReplyError(c,"Protocol error: too big bulk count string");
 54                    setProtocolError("too big bulk count",c);
 55                    return C_ERR;
 56                }
 57                break;
 58            }
 59
 60            if (c->querybuf[c->qb_pos] != '$') {
 61                addReplyError(c,"Protocol error: expected '$', got something else");
 62                setProtocolError("expected $",c);
 63                return C_ERR;
 64            }
 65
 66            ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
 67            if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
 68                addReplyError(c,"Protocol error: invalid bulk length");
 69                setProtocolError("invalid bulk length",c);
 70                return C_ERR;
 71            }
 72
 73            c->qb_pos = newline-c->querybuf+2;  // 跳过 \r\n
 74            c->bulklen = ll;  // 设置当前参数长度
 75        }
 76
 77        // 读取参数内容
 78        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
 79            // 数据不完整,等待更多数据
 80            break;
 81        } else {
 82            // 创建参数对象
 83            if (c->bulklen >= PROTO_MBULK_BIG_ARG) {
 84                // 大参数优化:直接使用 querybuf 中的数据
 85                c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
 86                c->querybuf = sdsnewlen(NULL, c->bulklen+2);
 87                sdsclear(c->querybuf);
 88            } else {
 89                // 小参数:复制数据
 90                c->argv[c->argc++] =
 91                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
 92            }
 93            c->qb_pos += c->bulklen+2;  // 跳过参数内容和 \r\n
 94            c->bulklen = -1;
 95            c->multibulklen--;
 96        }
 97    }
 98
 99    // 所有参数读取完成
100    if (c->multibulklen == 0) {
101        // 清理已处理的数据
102        if (c->qb_pos) {
103            sdsrange(c->querybuf,c->qb_pos,-1);
104            c->qb_pos = 0;
105        }
106        return C_OK;
107    }
108
109    return C_ERR;
110}

4.4 请求读取流程图

 1                        ┌─────────────────────────────────────────┐
 2                        │      客户端 socket 可读                  │
 3                        │      (有数据到达)                        │
 4                        └─────────────────┬───────────────────────┘
 5 6 7                        ┌─────────────────────────────────────────┐
 8                        │     readQueryFromClient() 被调用        │
 9                        └─────────────────┬───────────────────────┘
101112                        ┌─────────────────────────────────────────┐
13                        │          read(fd, buf, len)             │
14                        │          读取数据到 querybuf            │
15                        └─────────────────┬───────────────────────┘
1617                        ┌─────────────────┼───────────────────────┐
18                        │                 │                       │
19                        ▼                 ▼                       ▼
20               ┌──────────────┐  ┌──────────────┐      ┌──────────────┐
21               │ nread == -1  │  │ nread == 0   │      │ nread > 0    │
22               │ EAGAIN: 返回 │  │ 客户端关闭   │      │ 正常读取     │
23               │ 其他: 关闭   │  │ freeClient() │      │              │
24               └──────────────┘  └──────────────┘      └───────┬──────┘
252627                                        ┌─────────────────────────────────┐
28                                        │     processInputBuffer()        │
29                                        │     循环处理缓冲区中的命令        │
30                                        └─────────────────────────────────┘
313233                                        ┌─────────────────────────────────┐
34                                        │      判断协议类型                │
35                                        │  querybuf[0] == '*' ?           │
36                                        └─────────────────────────────────┘
37                                              │              │
38                              ┌────────────────┘              └────────────────┐
39                              ▼                                                ▼
40                    ┌─────────────────────┐                        ┌─────────────────────┐
41                    │ PROTO_REQ_MULTIBULK │                        │ PROTO_REQ_INLINE    │
42                    │ processMultibulkBuf │                        │ processInlineBuffer │
43                    │ (RESP 协议解析)      │                        │ (内联命令解析)       │
44                    └──────────┬──────────┘                        └──────────┬──────────┘
45                               │                                              │
46                               └──────────────────┬───────────────────────────┘
4748                                        ┌─────────────────────────────────┐
49                                        │    argc, argv 解析完成          │
50                                        │    调用 processCommand()        │
51                                        └─────────────────────────────────┘

五、阶段三:命令处理

5.1 命令处理入口

  1// src/server.c:2539-2659
  2int processCommand(client *c) {
  3    // 特殊处理 QUIT 命令
  4    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
  5        addReply(c,shared.ok);
  6        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
  7        return C_ERR;
  8    }
  9
 10    // 查找命令
 11    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
 12    if (!c->cmd) {
 13        flagTransaction(c);
 14        sds args = sdsempty();
 15        int i;
 16        for (i=1; i < c->argc && sdslen(args) < 128; i++)
 17            args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
 18        addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
 19                            (char*)c->argv[0]->ptr, args);
 20        sdsfree(args);
 21        return C_OK;
 22    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
 23               (c->argc < -c->cmd->arity)) {
 24        // 参数个数检查
 25        flagTransaction(c);
 26        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
 27                            c->cmd->name);
 28        return C_OK;
 29    }
 30
 31    // 认证检查
 32    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
 33    {
 34        flagTransaction(c);
 35        addReply(c,shared.noautherr);
 36        return C_OK;
 37    }
 38
 39    // Cluster 集群重定向检查
 40    if (server.cluster_enabled &&
 41        !(c->flags & CLIENT_MASTER) &&
 42        !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) &&
 43        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
 44          c->cmd->proc != execCommand))
 45    {
 46        int hashslot;
 47        int error_code;
 48        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
 49                                        &hashslot,&error_code);
 50        if (n == NULL || n != server.cluster->myself) {
 51            if (c->cmd->proc == execCommand) {
 52                discardTransaction(c);
 53            } else {
 54                flagTransaction(c);
 55            }
 56            clusterRedirectClient(c,n,hashslot,error_code);
 57            return C_OK;
 58        }
 59    }
 60
 61    // 内存限制检查
 62    if (server.maxmemory && !server.lua_timedout) {
 63        int out_of_memory = freeMemoryIfNeeded() == C_ERR;
 64        if (server.current_client == NULL) return C_ERR;
 65
 66        if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) {
 67            flagTransaction(c);
 68            addReply(c, shared.oomerr);
 69            return C_OK;
 70        }
 71    }
 72
 73    // 磁盘错误检查
 74    int deny_write_type = writeCommandsDeniedByDiskError();
 75    if (deny_write_type != DISK_ERROR_TYPE_NONE &&
 76        server.masterhost == NULL &&
 77        (c->cmd->flags & CMD_WRITE ||
 78         c->cmd->proc == pingCommand))
 79    {
 80        flagTransaction(c);
 81        if (deny_write_type == DISK_ERROR_TYPE_RDB)
 82            addReply(c, shared.bgsaveerr);
 83        else
 84            addReplySds(c,
 85                        sdscatprintf(sdsempty(),
 86                                     "-MISCONF Errors writing to the AOF file: %s\r\n",
 87                                     strerror(server.aof_last_write_errno)));
 88        return C_OK;
 89    }
 90
 91    // 最小从节点检查
 92    if (server.masterhost == NULL &&
 93        server.repl_min_slaves_to_write &&
 94        server.repl_min_slaves_max_lag &&
 95        c->cmd->flags & CMD_WRITE &&
 96        server.repl_good_slaves_count < server.repl_min_slaves_to_write)
 97    {
 98        flagTransaction(c);
 99        addReply(c, shared.noreplicaserr);
100        return C_OK;
101    }
102
103    // 执行命令
104    if (c->flags & CLIENT_MULTI &&
105        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
106        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
107    {
108        // 在事务中:将命令加入事务队列
109        queueMultiCommand(c);
110        addReply(c,shared.queued);
111    } else {
112        // 直接执行命令
113        call(c,CMD_CALL_FULL);
114        c->cmd = c->lastcmd;
115    }
116
117    return C_OK;
118}

5.2 命令查找

1// src/server.c:2661-2676
2struct redisCommand *lookupCommand(sds name) {
3    return dictFetchValue(server.commands, name);
4}

命令表是一个字典,key 是命令名称,value 是 redisCommand 结构体:

 1// src/server.h:144-164
 2struct redisCommand {
 3    char *name;              // 命令名称
 4    redisCommandProc *proc;  // 命令实现函数指针
 5    int arity;               // 参数个数,负数表示最小参数个数
 6    char *sflags;            // 字符串形式的标志
 7    uint64_t flags;          // 二进制标志
 8    // 获取键位置的函数(用于集群重定向)
 9    redisGetKeysProc *getkeys_proc;
10    // 键的位置信息
11    int firstkey;            // 第一个键的位置
12    int lastkey;             // 最后一个键的位置
13    int keystep;             // 键的步长
14    long long microseconds;  // 累计执行时间(微秒)
15    long long calls;         // 累计调用次数
16};

5.3 命令执行

 1// src/server.c:2410-2529
 2void call(client *c, int flags) {
 3    long long dirty, start, duration;
 4    int client_old_flags = c->flags;
 5    struct redisCommand *real_cmd = c->cmd;
 6
 7    // 发送命令给 MONITOR 客户端
 8    if (listLength(server.monitors) &&
 9        !server.loading &&
10        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
11    {
12        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
13    }
14
15    // 初始化传播标志
16    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
17    redisOpArray prev_also_propagate = server.also_propagate;
18    redisOpArrayInit(&server.also_propagate);
19
20    // 执行命令!
21    dirty = server.dirty;
22    start = ustime();
23    c->cmd->proc(c);  // 调用命令实现函数
24    duration = ustime()-start;
25    dirty = server.dirty-dirty;
26    if (dirty < 0) dirty = 0;
27
28    // 慢日志记录
29    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
30        char *latency_event = (c->cmd->flags & CMD_FAST) ?
31                              "fast-command" : "command";
32        latencyAddSampleIfNeeded(latency_event,duration/1000);
33        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
34    }
35    
36    // 命令统计
37    if (flags & CMD_CALL_STATS) {
38        real_cmd->microseconds += duration;
39        real_cmd->calls++;
40    }
41
42    // AOF 和主从复制传播
43    if (flags & CMD_CALL_PROPAGATE &&
44        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
45    {
46        int propagate_flags = PROPAGATE_NONE;
47
48        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
49
50        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
51        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
52
53        if (c->flags & CLIENT_PREVENT_REPL_PROP ||
54            !(flags & CMD_CALL_PROPAGATE_REPL))
55            propagate_flags &= ~PROPAGATE_REPL;
56        if (c->flags & CLIENT_PREVENT_AOF_PROP ||
57            !(flags & CMD_CALL_PROPAGATE_AOF))
58            propagate_flags &= ~PROPAGATE_AOF;
59
60        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
61            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
62    }
63
64    // 恢复传播标志
65    c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
66    c->flags |= client_old_flags &
67                (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
68
69    server.stat_numcommands++;
70}

5.4 命令处理流程图

 1                        ┌─────────────────────────────────────────┐
 2                        │        processCommand(c) 被调用         │
 3                        └─────────────────┬───────────────────────┘
 4 5 6                        ┌─────────────────────────────────────────┐
 7                        │          查找命令 lookupCommand          │
 8                        │     c->cmd = dictFetchValue(commands)   │
 9                        └─────────────────┬───────────────────────┘
1011                        ┌─────────────────┼───────────────────────┐
12                        │                 │                       │
13                        ▼                 ▼                       ▼
14               ┌──────────────┐  ┌──────────────┐      ┌──────────────┐
15               │ 命令不存在    │  │ 参数个数错误  │      │ 命令存在      │
16               │ 返回错误      │  │ 返回错误      │      │ 继续检查      │
17               └──────────────┘  └──────────────┘      └───────┬──────┘
181920                                        ┌─────────────────────────────────┐
21                                        │           前置检查               │
22                                        │  1. 认证检查                     │
23                                        │  2. Cluster 重定向检查           │
24                                        │  3. 内存限制检查                 │
25                                        │  4. 磁盘错误检查                 │
26                                        │  5. 最小从节点检查               │
27                                        └─────────────────────────────────┘
2829                                                        ┌─────┴─────┐
30                                                        │           │
31                                                        ▼           ▼
32                                              ┌──────────────┐ ┌──────────────┐
33                                              │ 在事务中      │ │ 不在事务中    │
34                                              │ 加入队列      │ │ call(c)      │
35                                              │ addReply     │ │ 直接执行      │
36                                              │ (queued)     │ │              │
37                                              └──────────────┘ └───────┬──────┘
383940                                        ┌─────────────────────────────────────────┐
41                                        │              call() 执行命令            │
42                                        │                                         │
43                                        │  1. 发送给 MONITOR 客户端               │
44                                        │  2. c->cmd->proc(c) 执行命令            │
45                                        │  3. 慢日志记录                          │
46                                        │  4. 命令统计                            │
47                                        │  5. AOF/复制传播                        │
48                                        └─────────────────────────────────────────┘

六、阶段四:响应发送

6.1 响应封装

命令执行后,通过 addReply 系列函数封装响应:

 1// src/networking.c:183-217
 2void addReply(client *c, robj *obj) {
 3    if (prepareClientToWrite(c) != C_OK) return;
 4
 5    if (sdsEncodedObject(obj)) {
 6        // SDS 编码的对象:直接追加到缓冲区
 7        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
 8            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
 9    } else if (obj->encoding == OBJ_ENCODING_INT) {
10        // 整数编码的对象:转为字符串后追加
11        char buf[32];
12        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
13        if (_addReplyToBuffer(c,buf,len) != C_OK)
14            _addReplyStringToList(c,buf,len);
15    } else {
16        serverPanic("Wrong obj->encoding in addReply()");
17    }
18}
19
20// src/networking.c:299-311
21void addReplyString(client *c, const char *s, size_t len) {
22    if (prepareClientToWrite(c) != C_OK) return;
23    if (_addReplyToBuffer(c,s,len) != C_OK)
24        _addReplyStringToList(c,s,len);
25}
26
27// src/networking.c:313-328
28void addReplyErrorLength(client *c, const char *s, size_t len) {
29    addReplyString(c,"-ERR ",5);
30    addReplyString(c,s,len);
31    addReplyString(c,"\r\n",2);
32}
33
34// src/networking.c:330-338
35void addReplyError(client *c, const char *err) {
36    addReplyErrorLength(c,err,strlen(err));
37}

6.2 响应缓冲策略

Redis 采用两级缓冲策略:

  1. 固定缓冲区c->buf(16KB),用于小响应
  2. 链表缓冲区c->reply,用于大响应
 1// src/networking.c:219-257
 2int _addReplyToBuffer(client *c, const char *s, size_t len) {
 3    size_t available = sizeof(c->buf)-c->bufpos;
 4
 5    // 如果链表缓冲区不为空,不再使用固定缓冲区
 6    if (listLength(c->reply) > 0) return C_ERR;
 7
 8    // 空间不足,使用链表缓冲区
 9    if (len > available) return C_ERR;
10
11    memcpy(c->buf+c->bufpos,s,len);
12    c->bufpos += len;
13    return C_OK;
14}
15
16// src/networking.c:259-297
17void _addReplyStringToList(client *c, const char *s, size_t len) {
18    // 尝试追加到链表最后一个节点
19    if (listLength(c->reply)) {
20        clientReplyBlock *tail = listNodeValue(listLast(c->reply));
21        size_t avail = tail->size - tail->used;
22        size_t copy = avail < len ? avail : len;
23        memcpy(tail->buf + tail->used, s, copy);
24        tail->used += copy;
25        s += copy;
26        len -= copy;
27    }
28    
29    if (len) {
30        // 创建新的响应块
31        size_t size = len < PROTO_REPLY_CHUNK_BYTES ? PROTO_REPLY_CHUNK_BYTES : len;
32        clientReplyBlock *buf = zmalloc(size + sizeof(clientReplyBlock));
33        buf->size = size;
34        buf->used = len;
35        memcpy(buf->buf, s, len);
36        listAddNodeTail(c->reply, buf);
37        c->reply_bytes += buf->size;
38    }
39}

6.3 同步发送(beforeSleep)

 1// src/networking.c:1064-1104
 2int handleClientsWithPendingWrites(void) {
 3    listIter li;
 4    listNode *ln;
 5    int processed = listLength(server.clients_pending_write);
 6
 7    listRewind(server.clients_pending_write,&li);
 8    while((ln = listNext(&li))) {
 9        client *c = listNodeValue(ln);
10        c->flags &= ~CLIENT_PENDING_WRITE;
11        listDelNode(server.clients_pending_write,ln);
12
13        // 跳过受保护的客户端
14        if (c->flags & CLIENT_PROTECTED) continue;
15
16        // 优先尝试同步写入
17        if (writeToClient(c->fd,c,0) == C_ERR) continue;
18
19        // 如果同步写入后仍有数据未发送,注册写事件
20        if (clientHasPendingReplies(c)) {
21            int ae_flags = AE_WRITABLE;
22            // AOF fsync=always 时需要设置 AE_BARRIER
23            if (server.aof_state == AOF_ON &&
24                server.aof_fsync == AOF_FSYNC_ALWAYS)
25            {
26                ae_flags |= AE_BARRIER;
27            }
28            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
29                sendReplyToClient, c) == AE_ERR)
30            {
31                    freeClientAsync(c);
32            }
33        }
34    }
35    return processed;
36}

6.4 写入数据到客户端

 1// src/networking.c:961-1051
 2int writeToClient(int fd, client *c, int handler_installed) {
 3    ssize_t nwritten = 0, totwritten = 0;
 4    size_t objlen;
 5    clientReplyBlock *o;
 6
 7    // 循环发送,直到没有待发送数据
 8    while(clientHasPendingReplies(c)) {
 9        // 优先发送固定缓冲区中的数据
10        if (c->bufpos > 0) {
11            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
12            if (nwritten <= 0) break;
13            c->sentlen += nwritten;
14            totwritten += nwritten;
15
16            if ((int)c->sentlen == c->bufpos) {
17                c->bufpos = 0;
18                c->sentlen = 0;
19            }
20        } else {
21            // 从链表中取数据块发送
22            o = listNodeValue(listFirst(c->reply));
23            objlen = o->used;
24
25            if (objlen == 0) {
26                c->reply_bytes -= o->size;
27                listDelNode(c->reply,listFirst(c->reply));
28                continue;
29            }
30
31            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
32            if (nwritten <= 0) break;
33            c->sentlen += nwritten;
34            totwritten += nwritten;
35
36            if (c->sentlen == objlen) {
37                c->reply_bytes -= o->size;
38                listDelNode(c->reply,listFirst(c->reply));
39                c->sentlen = 0;
40            }
41        }
42        
43        // 限制单次写入数据量(64MB),避免长时间占用事件循环
44        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
45            (server.maxmemory == 0 ||
46             zmalloc_used_memory() < server.maxmemory) &&
47            !(c->flags & CLIENT_SLAVE)) break;
48    }
49    
50    server.stat_net_output_bytes += totwritten;
51    
52    if (nwritten == -1) {
53        if (errno == EAGAIN) {
54            nwritten = 0;
55        } else {
56            serverLog(LL_VERBOSE,
57                "Error writing to client: %s", strerror(errno));
58            freeClient(c);
59            return C_ERR;
60        }
61    }
62    
63    if (totwritten > 0) {
64        if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
65    }
66    
67    if (!clientHasPendingReplies(c)) {
68        c->sentlen = 0;
69        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
70
71        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
72            freeClient(c);
73            return C_ERR;
74        }
75    }
76    return C_OK;
77}

6.5 异步发送回调

1// src/networking.c:1106-1112
2void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
3    UNUSED(el);
4    UNUSED(mask);
5    writeToClient(fd,privdata,1);
6}

6.6 响应发送流程图

 1┌─────────────────────────────────────────────────────────────────────────┐
 2│                        命令执行完成,生成响应                               │
 3│                     (调用 addReply 系列函数)                              │
 4└─────────────────────────────────────────────────────────────────────────┘
 5 6 7┌─────────────────────────────────────────────────────────────────────────┐
 8│                  尝试写入固定缓冲区 c->buf (16KB)                         │
 9└─────────────────────────────────────────────────────────────────────────┘
1011                        ┌─────────────┴─────────────┐
12                        │                           │
13                        ▼                           ▼
14              ┌─────────────────┐         ┌─────────────────┐
15              │   空间足够       │         │   空间不足       │
16              │   写入成功       │         │   写入链表       │
17              └─────────────────┘         └─────────────────┘
18                        │                           │
19                        └─────────────┬─────────────┘
202122┌─────────────────────────────────────────────────────────────────────────┐
23│                  将 client 加入 server.clients_pending_write             │
24└─────────────────────────────────────────────────────────────────────────┘
252627┌─────────────────────────────────────────────────────────────────────────┐
28│                    beforeSleep() 被调用                                  │
29│                   (事件循环每次阻塞前执行)                                  │
30└─────────────────────────────────────────────────────────────────────────┘
313233┌─────────────────────────────────────────────────────────────────────────┐
34│               handleClientsWithPendingWrites()                          │
35│              遍历所有待发送响应的客户端                                      │
36└─────────────────────────────────────────────────────────────────────────┘
373839┌─────────────────────────────────────────────────────────────────────────┐
40│                    writeToClient(fd, c, 0)                              │
41│                    尝试同步写入响应数据                                     │
42└─────────────────────────────────────────────────────────────────────────┘
4344                        ┌─────────────┴─────────────┐
45                        │                           │
46                        ▼                           ▼
47              ┌─────────────────┐         ┌─────────────────┐
48              │   全部发送完毕    │         │   还有数据未发送  │
49              │   (发送缓冲区未满) │         │   (发送缓冲区满)  │
50              └─────────────────┘         └────────┬────────┘
515253                                    ┌─────────────────────────────────┐
54                                    │   注册 AE_WRITABLE 事件         │
55                                    │   回调: sendReplyToClient       │
56                                    │   等待 socket 可写时异步发送      │
57                                    └─────────────────────────────────┘
585960                                    ┌─────────────────────────────────┐
61                                    │   socket 可写,回调被触发         │
62                                    │   sendReplyToClient()           │
63                                    │   -> writeToClient(fd, c, 1)    │
64                                    └─────────────────────────────────┘

七、完整链路时序图

 1┌────────┐          ┌────────┐          ┌────────┐          ┌────────┐          ┌────────┐
 2│ Client │          │ Kernel │          │  Redis │          │Command │          │   DB   │
 3│        │          │  TCP   │          │ Server │          │  Proc  │          │        │
 4└───┬────┘          └───┬────┘          └───┬────┘          └───┬────┘          └───┬────┘
 5    │                   │                   │                   │                   │
 6    │  CONNECT          │                   │                   │                   │
 7    │──────────────────>│                   │                   │                   │
 8    │                   │  accept event     │                   │                   │
 9    │                   │──────────────────>│                   │                   │
10    │                   │                   │ createClient()    │                   │
11    │                   │                   │ register read     │                   │
12    │                   │                   │                   │                   │
13    │  SET key value    │                   │                   │                   │
14    │──────────────────>│                   │                   │                   │
15    │                   │  read event       │                   │                   │
16    │                   │──────────────────>│                   │                   │
17    │                   │                   │ readQueryFromClient()               │
18    │                   │                   │ processMultibulkBuffer()            │
19    │                   │                   │ processCommand()  │                   │
20    │                   │                   │ lookupCommand()   │                   │
21    │                   │                   │ call()            │                   │
22    │                   │                   │──────────────────>│                   │
23    │                   │                   │                   │ setCommand()      │
24    │                   │                   │                   │──────────────────>│
25    │                   │                   │                   │                   │
26    │                   │                   │                   │   dbAdd()         │
27    │                   │                   │                   │<──────────────────│
28    │                   │                   │                   │                   │
29    │                   │                   │                   │ addReply(OK)      │
30    │                   │                   │<──────────────────│                   │
31    │                   │                   │                   │                   │
32    │                   │                   │ beforeSleep()     │                   │
33    │                   │                   │ writeToClient()   │                   │
34    │                   │<──────────────────│                   │                   │
35    │                   │                   │                   │                   │
36    │  +OK\r\n          │                   │                   │                   │
37    │<──────────────────│                   │                   │                   │
38    │                   │                   │                   │                   │

八、关键数据结构

8.1 client 结构体核心字段

 1┌─────────────────────────────────────────────────────────────────────────────────────┐
 2│                              client 结构体核心字段                                    │
 3├─────────────────────────────────────────────────────────────────────────────────────┤
 4│                                                                                     │
 5│   请求处理相关                                                                        │
 6│   ┌─────────────────────────────────────────────────────────────────────────────┐  │
 7│   │  querybuf (sds)     ────────────────────────────────────────────────────┐   │  │
 8│   │                     │ "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n" │   │  │
 9│   │                     └───────────────────────────────────────────────────┘   │  │
10│   │  argc (int)         ────────────────────────────────────────────────────── 3 │  │
11│   │  argv (robj**)      ──────────────────────────────────────────────────────┐  │  │
12│   │                     │ [0]: "SET"  [1]: "key"  [2]: "value"               │  │  │
13│   │                     └─────────────────────────────────────────────────────┘  │  │
14│   │  cmd (redisCommand*)───────────────────────────────────────────────────────┐ │  │
15│   │                     │ name: "set", proc: setCommand, arity: -3            │ │  │
16│   │                     └─────────────────────────────────────────────────────┘  │  │
17│   └─────────────────────────────────────────────────────────────────────────────┘  │
18│                                                                                     │
19│   响应处理相关                                                                        │
20│   ┌─────────────────────────────────────────────────────────────────────────────┐  │
21│   │  buf (char[16KB])   ──────────────────────────────────────────────────────┐  │  │
22│   │                     │ "+OK\r\n"                                           │  │  │
23│   │                     └─────────────────────────────────────────────────────┘  │  │
24│   │  bufpos (int)       ─────────────────────────────────────────────────────── 5 │  │
25│   │  reply (list)       ──────────────────────────────────────────────────────┐  │  │
26│   │                     │ node1 -> node2 -> node3 -> ...                      │  │  │
27│   │                     └─────────────────────────────────────────────────────┘  │  │
28│   │  reply_bytes        ─────────────────────────────────────────────────────── 0 │  │
29│   │  sentlen (size_t)   ─────────────────────────────────────────────────────── 0 │  │
30│   └─────────────────────────────────────────────────────────────────────────────┘  │
31│                                                                                     │
32└─────────────────────────────────────────────────────────────────────────────────────┘

8.2 redisCommand 结构体

 1┌─────────────────────────────────────────────────────────────────────────────────────┐
 2│                            redisCommand 结构体                                       │
 3├─────────────────────────────────────────────────────────────────────────────────────┤
 4│                                                                                     │
 5│   SET 命令示例                                                                       │
 6│   ┌─────────────────────────────────────────────────────────────────────────────┐  │
 7│   │  name        : "set"                                                         │  │
 8│   │  proc        : setCommand                                                    │  │
 9│   │  arity       : -3                    // 至少 3 个参数                          │  │
10│   │  sflags      : "wm"                  // w: 写命令, m: 可能内存溢出              │  │
11│   │  flags       : CMD_WRITE | CMD_DENYOOM                                       │  │
12│   │  firstkey    : 1                     // 第一个键在 argv[1]                     │  │
13│   │  lastkey     : 1                     // 最后一个键在 argv[1]                   │  │
14│   │  keystep     : 1                     // 键的步长                               │  │
15│   │  microseconds: 123456               // 累计执行时间                            │  │
16│   │  calls       : 1000000              // 累计调用次数                            │  │
17│   └─────────────────────────────────────────────────────────────────────────────┘  │
18│                                                                                     │
19│   GET 命令示例                                                                       │
20│   ┌─────────────────────────────────────────────────────────────────────────────┐  │
21│   │  name        : "get"                                                         │  │
22│   │  proc        : getCommand                                                    │  │
23│   │  arity       : 2                     // 正好 2 个参数                          │  │
24│   │  sflags      : "rF"                  // r: 读命令, F: 快速命令                 │  │
25│   │  flags       : CMD_READONLY | CMD_FAST                                       │  │
26│   │  firstkey    : 1                                                              │  │
27│   │  lastkey     : 1                                                              │  │
28│   │  keystep     : 1                                                              │  │
29│   └─────────────────────────────────────────────────────────────────────────────┘  │
30│                                                                                     │
31└─────────────────────────────────────────────────────────────────────────────────────┘

九、关键设计亮点

9.1 两级缓冲设计

缓冲区类型 大小 使用场景 特点
固定缓冲区 buf 16KB 小响应 零拷贝,高效
链表缓冲区 reply 动态 大响应 按需扩展,内存友好

9.2 同步优先、异步兜底的发送策略

 1┌─────────────────────────────────────────────────────────────────────────────────────┐
 2│                          响应发送策略                                                │
 3├─────────────────────────────────────────────────────────────────────────────────────┤
 4│                                                                                     │
 5│   命令执行完成后                                                                       │
 6│         │                                                                           │
 7│         ▼                                                                           │
 8│   addReply() 封装响应                                                                │
 9│         │                                                                           │
10│         ▼                                                                           │
11│   加入 clients_pending_write 队列                                                   │
12│         │                                                                           │
13│         ▼                                                                           │
14│   beforeSleep() 中同步发送                                                          │
15│         │                                                                           │
16│         ├──────────────────────┬──────────────────────┐                            │
17│         ▼                      ▼                      │                            │
18│   ┌───────────┐         ┌───────────┐                │                            │
19│   │ 发送完毕   │         │ 未发送完   │                │                            │
20│   │ 直接返回   │         │ 注册写事件  │                │                            │
21│   └───────────┘         └─────┬─────┘                │                            │
22│                               │                      │                            │
23│                               ▼                      │                            │
24│                         等待 socket 可写              │                            │
25│                               │                      │                            │
26│                               ▼                      │                            │
27│                         异步发送剩余数据             │                            │
28│                                                                                     │
29└─────────────────────────────────────────────────────────────────────────────────────┘

9.3 非阻塞 I/O 全链路

阶段 操作 非阻塞处理
连接接受 accept() EWOULDBLOCK 正常返回
数据读取 read() EAGAIN 正常返回,等待下次事件
数据发送 write() EAGAIN 正常返回,注册写事件

十、总结

Redis 客户端请求处理的完整链路包括四个阶段:

  1. 连接接收acceptTcpHandleracceptCommonHandlercreateClient,完成客户端结构体初始化和读事件注册

  2. 请求读取与解析readQueryFromClientprocessInputBufferprocessMultibulkBuffer,完成 RESP 协议解析,将请求转换为 argc/argv

  3. 命令处理processCommandlookupCommandcall,完成命令查找、前置检查、命令执行、慢日志记录、AOF/复制传播

  4. 响应发送addReplyhandleClientsWithPendingWriteswriteToClient,完成响应封装、同步发送、异步兜底

整个链路设计精妙,通过非阻塞 I/O、事件驱动、两级缓冲、同步优先异步兜底等策略,实现了高效的单线程请求处理能力。

— END —