客户端请求处理完整链路源码分析
一、概述
当客户端向 Redis 发送一条命令(如 SET key value)时,请求会经历一个完整的处理链路:从网络数据接收、协议解析、命令查找、命令执行、结果返回,直到最终发送响应。本文将深入分析这个完整的请求处理链路,帮助读者理解 Redis 如何高效地处理每一条命令。
请求处理链路概览:
1┌─────────┐ ┌──────────┐ ┌───────────┐ ┌──────────┐ ┌──────────┐ ┌─────────┐
2│ 网络读取 │ -> │ 协议解析 │ -> │ 命令查找 │ -> │ 命令执行 │ -> │ 结果封装 │ -> │ 响应发送 │
3└─────────┘ └──────────┘ └───────────┘ └──────────┘ └──────────┘ └─────────┘
4 │ │ │ │ │ │
5 ▼ ▼ ▼ ▼ ▼ ▼
6read() processMulti- lookupCommand call() addReply() write()
7 bulkBuffer()
二、整体架构图
1┌─────────────────────────────────────────────────────────────────────────────────────┐
2│ Redis 客户端请求处理完整链路 │
3├─────────────────────────────────────────────────────────────────────────────────────┤
4│ │
5│ ┌─────────────┐ │
6│ │ Client │ │
7│ │ (客户端) │ │
8│ └──────┬──────┘ │
9│ │ TCP 连接 │
10│ ▼ │
11│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
12│ │ 1. 连接接收阶段 │ │
13│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
14│ │ │ acceptTcpHandler │ -> │acceptCommonHandler│ -> │ createClient │ │ │
15│ │ │ (接受连接) │ │ (检查限制) │ │ (初始化客户端) │ │ │
16│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │
17│ └─────────────────────────────────────────────────────────────────────────────┘ │
18│ │ │
19│ ▼ │
20│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
21│ │ 2. 请求读取阶段 │ │
22│ │ ┌─────────────────┐ ┌─────────────────────────────────────────────┐ │ │
23│ │ │readQueryFromClient│ -> │ processInputBuffer │ │ │
24│ │ │ (读取数据) │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │
25│ │ └─────────────────┘ │ │processInlineBuf │ │processMultibulk │ │ │ │
26│ │ │ │ (内联协议) │ │ (RESP协议) │ │ │ │
27│ │ │ └─────────────────┘ └─────────────────┘ │ │ │
28│ │ └─────────────────────────────────────────────┘ │ │
29│ └─────────────────────────────────────────────────────────────────────────────┘ │
30│ │ │
31│ ▼ │
32│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
33│ │ 3. 命令处理阶段 │ │
34│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
35│ │ │ processCommand │ -> │ lookupCommand │ -> │ call │ │ │
36│ │ │ (前置检查) │ │ (命令查找) │ │ (命令执行) │ │ │
37│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │
38│ └─────────────────────────────────────────────────────────────────────────────┘ │
39│ │ │
40│ ▼ │
41│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
42│ │ 4. 响应发送阶段 │ │
43│ │ ┌─────────────────┐ ┌─────────────────────────────────────────────┐ │ │
44│ │ │ addReply │ -> │ handleClientsWithPendingWrites │ │ │
45│ │ │ (封装响应) │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │
46│ │ └─────────────────┘ │ │ writeToClient │ │sendReplyToClient│ │ │ │
47│ │ │ │ (同步发送) │ │ (异步发送) │ │ │ │
48│ │ │ └─────────────────┘ └─────────────────┘ │ │ │
49│ │ └─────────────────────────────────────────────┘ │ │
50│ └─────────────────────────────────────────────────────────────────────────────┘ │
51│ │
52└─────────────────────────────────────────────────────────────────────────────────────┘
三、阶段一:连接接收
3.1 客户端结构体
每个连接的客户端都对应一个 client 结构体,存储了请求处理过程中的所有状态:
1// src/server.h:717-774
2typedef struct client {
3 uint64_t id; /* 客户端唯一递增 ID */
4 int fd; /* 客户端 socket 文件描述符 */
5 redisDb *db; /* 当前选中的数据库指针 */
6 robj *name; /* 客户端名称(CLIENT SETNAME 设置)*/
7
8 // 请求相关
9 sds querybuf; /* 查询缓冲区:累积客户端请求 */
10 size_t qb_pos; /* 在 querybuf 中已读取的位置 */
11 int argc; /* 当前命令的参数个数 */
12 robj **argv; /* 当前命令的参数数组 */
13 struct redisCommand *cmd, *lastcmd; /* 当前/最后执行的命令 */
14 int reqtype; /* 请求协议类型:PROTO_REQ_* */
15 int multibulklen; /* 多批量请求中剩余待读参数个数 */
16 long bulklen; /* 当前批量字符串的长度 */
17
18 // 响应相关
19 list *reply; /* 待发送的响应对象链表 */
20 unsigned long long reply_bytes; /* 响应链表的总字节数 */
21 size_t sentlen; /* 当前发送块已发送的字节数 */
22 int bufpos; /* 固定响应缓冲区的写入位置 */
23 char buf[PROTO_REPLY_CHUNK_BYTES]; /* 固定响应缓冲区(16KB)*/
24
25 // 状态相关
26 time_t ctime; /* 客户端创建时间 */
27 time_t lastinteraction; /* 最后交互时间(用于超时检测)*/
28 int flags; /* 客户端标志:CLIENT_* 宏 */
29 int authenticated; /* 是否已认证 */
30
31 // 其他字段省略...
32} client;
3.2 接受连接流程
1// src/networking.c:726-744
2void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
3 int cport, cfd, max = MAX_ACCEPTS_PER_CALL; // 单次最多接受 1000 个连接
4 char cip[NET_IP_STR_LEN];
5 UNUSED(el);
6 UNUSED(mask);
7 UNUSED(privdata);
8
9 // 循环接受新连接,直到没有更多连接或达到单次上限
10 while(max--) {
11 // 接受一个 TCP 连接,返回客户端 fd
12 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
13 if (cfd == ANET_ERR) {
14 // EWOULDBLOCK 表示没有更多待处理的连接,正常情况
15 if (errno != EWOULDBLOCK)
16 serverLog(LL_WARNING,
17 "Accepting client connection: %s", server.neterr);
18 return;
19 }
20 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
21 // 创建客户端结构体,注册读事件回调
22 acceptCommonHandler(cfd,0,cip);
23 }
24}
3.3 创建客户端
1// src/networking.c:85-181
2client *createClient(int fd) {
3 client *c = zmalloc(sizeof(client)); // 分配客户端结构体内存
4
5 // fd != -1 表示创建真实的网络客户端
6 // fd == -1 用于创建伪客户端(如 Lua 脚本执行、AOF 加载)
7 if (fd != -1) {
8 anetNonBlock(NULL,fd); // 设置非阻塞模式
9 anetEnableTcpNoDelay(NULL,fd); // 禁用 Nagle 算法,减少延迟
10 if (server.tcpkeepalive)
11 anetKeepAlive(NULL,fd,server.tcpkeepalive); // 启用 TCP keepalive
12 // 注册可读事件,回调函数为 readQueryFromClient
13 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
14 readQueryFromClient, c) == AE_ERR)
15 {
16 close(fd);
17 zfree(c);
18 return NULL;
19 }
20 }
21
22 // 初始化客户端状态
23 selectDb(c,0); // 默认选择数据库 0
24 uint64_t client_id = atomicIncr(server.next_client_id, 1);
25 c->id = client_id;
26 c->fd = fd;
27 c->name = NULL;
28 c->bufpos = 0;
29 c->querybuf = sdsempty(); // 初始化查询缓冲区
30 c->argc = 0;
31 c->argv = NULL;
32 c->cmd = c->lastcmd = NULL;
33 c->multibulklen = 0;
34 c->bulklen = -1;
35 c->sentlen = 0;
36 c->flags = 0;
37 c->ctime = c->lastinteraction = server.unixtime;
38 c->authenticated = 0;
39 c->reply = listCreate(); // 初始化响应链表
40 c->reply_bytes = 0;
41 listSetFreeMethod(c->reply, zfree);
42
43 // 将客户端添加到服务器客户端链表
44 if (fd != -1) {
45 c->client_list_node = listAddNodeTailReturn(server.clients, c);
46 }
47 c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES); // 分配 16KB 固定响应缓冲区
48 listAddNodeHead(server.clients_pending_write, c);
49 return c;
50}
关键操作:
- 设置非阻塞模式:确保
read/write不会阻塞事件循环 - 禁用 Nagle 算法:减少小数据包的延迟
- 注册读事件:回调函数为
readQueryFromClient - 初始化两级缓冲:固定缓冲区(16KB)+ 链表缓冲区
四、阶段二:请求读取与协议解析
4.1 读取客户端数据
1// src/networking.c:1500-1570
2void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
3 client *c = (client*) privdata;
4 int nread, readlen;
5 size_t qblen;
6 UNUSED(el);
7 UNUSED(mask);
8
9 readlen = PROTO_IOBUF_LEN; // 默认读取缓冲区大小 16KB
10
11 // 大包优化:如果是多批量请求且正在处理大参数,调整读取长度
12 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
13 && c->bulklen >= PROTO_MBULK_BIG_ARG)
14 {
15 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
16 if (remaining > 0 && remaining < readlen) readlen = remaining;
17 }
18
19 qblen = sdslen(c->querybuf);
20 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
21 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 扩展缓冲区空间
22
23 // 从 socket 读取数据到查询缓冲区末尾
24 nread = read(fd, c->querybuf+qblen, readlen);
25 if (nread == -1) {
26 if (errno == EAGAIN) {
27 return; // EAGAIN 表示暂无数据可读
28 } else {
29 serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
30 freeClient(c);
31 return;
32 }
33 } else if (nread == 0) {
34 serverLog(LL_VERBOSE, "Client closed connection");
35 freeClient(c);
36 return;
37 }
38
39 sdsIncrLen(c->querybuf,nread); // 更新缓冲区有效数据长度
40 c->lastinteraction = server.unixtime; // 更新最后交互时间
41 server.stat_net_input_bytes += nread; // 统计网络输入字节数
42
43 // 检查查询缓冲区是否超过最大限制
44 if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
45 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
46 bytes = sdscatrepr(bytes,c->querybuf,64);
47 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
48 sdsfree(ci);
49 sdsfree(bytes);
50 freeClient(c);
51 return;
52 }
53
54 // 处理输入缓冲区中的命令
55 processInputBufferAndReplicate(c);
56}
4.2 处理输入缓冲区
1// src/networking.c:1453-1498
2void processInputBuffer(client *c) {
3 server.current_client = c;
4
5 // 循环处理查询缓冲区中的所有命令
6 while(sdslen(c->querybuf)) {
7 // 客户端暂停时不处理命令
8 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
9
10 // 如果客户端正在处理阻塞命令,暂停处理
11 if (c->flags & CLIENT_BLOCKED) break;
12
13 // 如果标记为关闭,不再处理更多命令
14 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
15
16 // 确定请求协议类型
17 if (!c->reqtype) {
18 if (c->querybuf[0] == '*') {
19 // 多批量格式:*3\r\n$3\r\nSET\r\n...
20 c->reqtype = PROTO_REQ_MULTIBULK;
21 } else {
22 // 内联格式:SET key value
23 c->reqtype = PROTO_REQ_INLINE;
24 }
25 }
26
27 if (c->reqtype == PROTO_REQ_INLINE) {
28 if (processInlineBuffer(c) != C_OK) break;
29 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
30 if (processMultibulkBuffer(c) != C_OK) break;
31 } else {
32 serverPanic("Unknown request type");
33 }
34
35 // 命令参数不完整,等待更多数据
36 if (c->argc == 0) {
37 resetClient(c);
38 } else {
39 // 执行命令
40 if (processCommand(c) == C_OK) {
41 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
42 c->reploff = c->read_reploff - sdslen(c->querybuf);
43 }
44 if (!(c->flags & CLIENT_BLOCKED || c->flags & CLIENT_MULTI))
45 resetClient(c);
46 }
47 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
48 }
49 }
50 server.current_client = NULL;
51}
4.3 RESP 协议解析
Redis 使用 RESP(Redis Serialization Protocol)协议通信。以 SET key value 为例:
1请求格式:
2*3\r\n <- 参数个数(3个)
3$3\r\n <- 第一个参数长度
4SET\r\n <- 第一个参数内容
5$3\r\n <- 第二个参数长度
6key\r\n <- 第二个参数内容
7$5\r\n <- 第三个参数长度
8value\r\n <- 第三个参数内容
9
10响应格式:
11+OK\r\n <- 简单字符串响应
1// src/networking.c:1267-1383
2int processMultibulkBuffer(client *c) {
3 char *newline = NULL;
4 int ok;
5 long long ll;
6
7 // 读取多批量参数个数:*<count>\r\n
8 if (c->multibulklen == 0) {
9 newline = strchr(c->querybuf+c->qb_pos,'\r');
10 if (newline == NULL) {
11 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
12 addReplyError(c,"Protocol error: too big mbulk count string");
13 setProtocolError("too big mbulk count",c);
14 }
15 return C_ERR;
16 }
17
18 // 检查格式:* 后跟数字
19 if (c->querybuf[c->qb_pos] != '*') {
20 addReplyError(c,"Protocol error: expected '*', got something else");
21 setProtocolError("expected *",c);
22 return C_ERR;
23 }
24
25 // 解析参数个数
26 ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
27 if (!ok || ll > 1024*1024) {
28 addReplyError(c,"Protocol error: invalid multibulk length");
29 setProtocolError("invalid mbulk count",c);
30 return C_ERR;
31 }
32
33 c->qb_pos = (newline-c->querybuf)+2; // 跳过 \r\n
34
35 // 空命令
36 if (ll <= 0) {
37 c->argc = 0;
38 return C_OK;
39 }
40
41 c->multibulklen = ll; // 设置待读取参数个数
42 c->argc = 0;
43 c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
44 }
45
46 // 循环读取每个参数
47 while(c->multibulklen) {
48 // 读取批量字符串长度:$<len>\r\n
49 if (c->bulklen == -1) {
50 newline = strchr(c->querybuf+c->qb_pos,'\r');
51 if (newline == NULL) {
52 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
53 addReplyError(c,"Protocol error: too big bulk count string");
54 setProtocolError("too big bulk count",c);
55 return C_ERR;
56 }
57 break;
58 }
59
60 if (c->querybuf[c->qb_pos] != '$') {
61 addReplyError(c,"Protocol error: expected '$', got something else");
62 setProtocolError("expected $",c);
63 return C_ERR;
64 }
65
66 ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
67 if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
68 addReplyError(c,"Protocol error: invalid bulk length");
69 setProtocolError("invalid bulk length",c);
70 return C_ERR;
71 }
72
73 c->qb_pos = newline-c->querybuf+2; // 跳过 \r\n
74 c->bulklen = ll; // 设置当前参数长度
75 }
76
77 // 读取参数内容
78 if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
79 // 数据不完整,等待更多数据
80 break;
81 } else {
82 // 创建参数对象
83 if (c->bulklen >= PROTO_MBULK_BIG_ARG) {
84 // 大参数优化:直接使用 querybuf 中的数据
85 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
86 c->querybuf = sdsnewlen(NULL, c->bulklen+2);
87 sdsclear(c->querybuf);
88 } else {
89 // 小参数:复制数据
90 c->argv[c->argc++] =
91 createStringObject(c->querybuf+c->qb_pos,c->bulklen);
92 }
93 c->qb_pos += c->bulklen+2; // 跳过参数内容和 \r\n
94 c->bulklen = -1;
95 c->multibulklen--;
96 }
97 }
98
99 // 所有参数读取完成
100 if (c->multibulklen == 0) {
101 // 清理已处理的数据
102 if (c->qb_pos) {
103 sdsrange(c->querybuf,c->qb_pos,-1);
104 c->qb_pos = 0;
105 }
106 return C_OK;
107 }
108
109 return C_ERR;
110}
4.4 请求读取流程图
1 ┌─────────────────────────────────────────┐
2 │ 客户端 socket 可读 │
3 │ (有数据到达) │
4 └─────────────────┬───────────────────────┘
5 │
6 ▼
7 ┌─────────────────────────────────────────┐
8 │ readQueryFromClient() 被调用 │
9 └─────────────────┬───────────────────────┘
10 │
11 ▼
12 ┌─────────────────────────────────────────┐
13 │ read(fd, buf, len) │
14 │ 读取数据到 querybuf │
15 └─────────────────┬───────────────────────┘
16 │
17 ┌─────────────────┼───────────────────────┐
18 │ │ │
19 ▼ ▼ ▼
20 ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
21 │ nread == -1 │ │ nread == 0 │ │ nread > 0 │
22 │ EAGAIN: 返回 │ │ 客户端关闭 │ │ 正常读取 │
23 │ 其他: 关闭 │ │ freeClient() │ │ │
24 └──────────────┘ └──────────────┘ └───────┬──────┘
25 │
26 ▼
27 ┌─────────────────────────────────┐
28 │ processInputBuffer() │
29 │ 循环处理缓冲区中的命令 │
30 └─────────────────────────────────┘
31 │
32 ▼
33 ┌─────────────────────────────────┐
34 │ 判断协议类型 │
35 │ querybuf[0] == '*' ? │
36 └─────────────────────────────────┘
37 │ │
38 ┌────────────────┘ └────────────────┐
39 ▼ ▼
40 ┌─────────────────────┐ ┌─────────────────────┐
41 │ PROTO_REQ_MULTIBULK │ │ PROTO_REQ_INLINE │
42 │ processMultibulkBuf │ │ processInlineBuffer │
43 │ (RESP 协议解析) │ │ (内联命令解析) │
44 └──────────┬──────────┘ └──────────┬──────────┘
45 │ │
46 └──────────────────┬───────────────────────────┘
47 ▼
48 ┌─────────────────────────────────┐
49 │ argc, argv 解析完成 │
50 │ 调用 processCommand() │
51 └─────────────────────────────────┘
五、阶段三:命令处理
5.1 命令处理入口
1// src/server.c:2539-2659
2int processCommand(client *c) {
3 // 特殊处理 QUIT 命令
4 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
5 addReply(c,shared.ok);
6 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
7 return C_ERR;
8 }
9
10 // 查找命令
11 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
12 if (!c->cmd) {
13 flagTransaction(c);
14 sds args = sdsempty();
15 int i;
16 for (i=1; i < c->argc && sdslen(args) < 128; i++)
17 args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
18 addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
19 (char*)c->argv[0]->ptr, args);
20 sdsfree(args);
21 return C_OK;
22 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
23 (c->argc < -c->cmd->arity)) {
24 // 参数个数检查
25 flagTransaction(c);
26 addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
27 c->cmd->name);
28 return C_OK;
29 }
30
31 // 认证检查
32 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
33 {
34 flagTransaction(c);
35 addReply(c,shared.noautherr);
36 return C_OK;
37 }
38
39 // Cluster 集群重定向检查
40 if (server.cluster_enabled &&
41 !(c->flags & CLIENT_MASTER) &&
42 !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) &&
43 !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
44 c->cmd->proc != execCommand))
45 {
46 int hashslot;
47 int error_code;
48 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
49 &hashslot,&error_code);
50 if (n == NULL || n != server.cluster->myself) {
51 if (c->cmd->proc == execCommand) {
52 discardTransaction(c);
53 } else {
54 flagTransaction(c);
55 }
56 clusterRedirectClient(c,n,hashslot,error_code);
57 return C_OK;
58 }
59 }
60
61 // 内存限制检查
62 if (server.maxmemory && !server.lua_timedout) {
63 int out_of_memory = freeMemoryIfNeeded() == C_ERR;
64 if (server.current_client == NULL) return C_ERR;
65
66 if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) {
67 flagTransaction(c);
68 addReply(c, shared.oomerr);
69 return C_OK;
70 }
71 }
72
73 // 磁盘错误检查
74 int deny_write_type = writeCommandsDeniedByDiskError();
75 if (deny_write_type != DISK_ERROR_TYPE_NONE &&
76 server.masterhost == NULL &&
77 (c->cmd->flags & CMD_WRITE ||
78 c->cmd->proc == pingCommand))
79 {
80 flagTransaction(c);
81 if (deny_write_type == DISK_ERROR_TYPE_RDB)
82 addReply(c, shared.bgsaveerr);
83 else
84 addReplySds(c,
85 sdscatprintf(sdsempty(),
86 "-MISCONF Errors writing to the AOF file: %s\r\n",
87 strerror(server.aof_last_write_errno)));
88 return C_OK;
89 }
90
91 // 最小从节点检查
92 if (server.masterhost == NULL &&
93 server.repl_min_slaves_to_write &&
94 server.repl_min_slaves_max_lag &&
95 c->cmd->flags & CMD_WRITE &&
96 server.repl_good_slaves_count < server.repl_min_slaves_to_write)
97 {
98 flagTransaction(c);
99 addReply(c, shared.noreplicaserr);
100 return C_OK;
101 }
102
103 // 执行命令
104 if (c->flags & CLIENT_MULTI &&
105 c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
106 c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
107 {
108 // 在事务中:将命令加入事务队列
109 queueMultiCommand(c);
110 addReply(c,shared.queued);
111 } else {
112 // 直接执行命令
113 call(c,CMD_CALL_FULL);
114 c->cmd = c->lastcmd;
115 }
116
117 return C_OK;
118}
5.2 命令查找
1// src/server.c:2661-2676
2struct redisCommand *lookupCommand(sds name) {
3 return dictFetchValue(server.commands, name);
4}
命令表是一个字典,key 是命令名称,value 是 redisCommand 结构体:
1// src/server.h:144-164
2struct redisCommand {
3 char *name; // 命令名称
4 redisCommandProc *proc; // 命令实现函数指针
5 int arity; // 参数个数,负数表示最小参数个数
6 char *sflags; // 字符串形式的标志
7 uint64_t flags; // 二进制标志
8 // 获取键位置的函数(用于集群重定向)
9 redisGetKeysProc *getkeys_proc;
10 // 键的位置信息
11 int firstkey; // 第一个键的位置
12 int lastkey; // 最后一个键的位置
13 int keystep; // 键的步长
14 long long microseconds; // 累计执行时间(微秒)
15 long long calls; // 累计调用次数
16};
5.3 命令执行
1// src/server.c:2410-2529
2void call(client *c, int flags) {
3 long long dirty, start, duration;
4 int client_old_flags = c->flags;
5 struct redisCommand *real_cmd = c->cmd;
6
7 // 发送命令给 MONITOR 客户端
8 if (listLength(server.monitors) &&
9 !server.loading &&
10 !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
11 {
12 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
13 }
14
15 // 初始化传播标志
16 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
17 redisOpArray prev_also_propagate = server.also_propagate;
18 redisOpArrayInit(&server.also_propagate);
19
20 // 执行命令!
21 dirty = server.dirty;
22 start = ustime();
23 c->cmd->proc(c); // 调用命令实现函数
24 duration = ustime()-start;
25 dirty = server.dirty-dirty;
26 if (dirty < 0) dirty = 0;
27
28 // 慢日志记录
29 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
30 char *latency_event = (c->cmd->flags & CMD_FAST) ?
31 "fast-command" : "command";
32 latencyAddSampleIfNeeded(latency_event,duration/1000);
33 slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
34 }
35
36 // 命令统计
37 if (flags & CMD_CALL_STATS) {
38 real_cmd->microseconds += duration;
39 real_cmd->calls++;
40 }
41
42 // AOF 和主从复制传播
43 if (flags & CMD_CALL_PROPAGATE &&
44 (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
45 {
46 int propagate_flags = PROPAGATE_NONE;
47
48 if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
49
50 if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
51 if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
52
53 if (c->flags & CLIENT_PREVENT_REPL_PROP ||
54 !(flags & CMD_CALL_PROPAGATE_REPL))
55 propagate_flags &= ~PROPAGATE_REPL;
56 if (c->flags & CLIENT_PREVENT_AOF_PROP ||
57 !(flags & CMD_CALL_PROPAGATE_AOF))
58 propagate_flags &= ~PROPAGATE_AOF;
59
60 if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
61 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
62 }
63
64 // 恢复传播标志
65 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
66 c->flags |= client_old_flags &
67 (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
68
69 server.stat_numcommands++;
70}
5.4 命令处理流程图
1 ┌─────────────────────────────────────────┐
2 │ processCommand(c) 被调用 │
3 └─────────────────┬───────────────────────┘
4 │
5 ▼
6 ┌─────────────────────────────────────────┐
7 │ 查找命令 lookupCommand │
8 │ c->cmd = dictFetchValue(commands) │
9 └─────────────────┬───────────────────────┘
10 │
11 ┌─────────────────┼───────────────────────┐
12 │ │ │
13 ▼ ▼ ▼
14 ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
15 │ 命令不存在 │ │ 参数个数错误 │ │ 命令存在 │
16 │ 返回错误 │ │ 返回错误 │ │ 继续检查 │
17 └──────────────┘ └──────────────┘ └───────┬──────┘
18 │
19 ▼
20 ┌─────────────────────────────────┐
21 │ 前置检查 │
22 │ 1. 认证检查 │
23 │ 2. Cluster 重定向检查 │
24 │ 3. 内存限制检查 │
25 │ 4. 磁盘错误检查 │
26 │ 5. 最小从节点检查 │
27 └─────────────────────────────────┘
28 │
29 ┌─────┴─────┐
30 │ │
31 ▼ ▼
32 ┌──────────────┐ ┌──────────────┐
33 │ 在事务中 │ │ 不在事务中 │
34 │ 加入队列 │ │ call(c) │
35 │ addReply │ │ 直接执行 │
36 │ (queued) │ │ │
37 └──────────────┘ └───────┬──────┘
38 │
39 ▼
40 ┌─────────────────────────────────────────┐
41 │ call() 执行命令 │
42 │ │
43 │ 1. 发送给 MONITOR 客户端 │
44 │ 2. c->cmd->proc(c) 执行命令 │
45 │ 3. 慢日志记录 │
46 │ 4. 命令统计 │
47 │ 5. AOF/复制传播 │
48 └─────────────────────────────────────────┘
六、阶段四:响应发送
6.1 响应封装
命令执行后,通过 addReply 系列函数封装响应:
1// src/networking.c:183-217
2void addReply(client *c, robj *obj) {
3 if (prepareClientToWrite(c) != C_OK) return;
4
5 if (sdsEncodedObject(obj)) {
6 // SDS 编码的对象:直接追加到缓冲区
7 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
8 _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
9 } else if (obj->encoding == OBJ_ENCODING_INT) {
10 // 整数编码的对象:转为字符串后追加
11 char buf[32];
12 size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
13 if (_addReplyToBuffer(c,buf,len) != C_OK)
14 _addReplyStringToList(c,buf,len);
15 } else {
16 serverPanic("Wrong obj->encoding in addReply()");
17 }
18}
19
20// src/networking.c:299-311
21void addReplyString(client *c, const char *s, size_t len) {
22 if (prepareClientToWrite(c) != C_OK) return;
23 if (_addReplyToBuffer(c,s,len) != C_OK)
24 _addReplyStringToList(c,s,len);
25}
26
27// src/networking.c:313-328
28void addReplyErrorLength(client *c, const char *s, size_t len) {
29 addReplyString(c,"-ERR ",5);
30 addReplyString(c,s,len);
31 addReplyString(c,"\r\n",2);
32}
33
34// src/networking.c:330-338
35void addReplyError(client *c, const char *err) {
36 addReplyErrorLength(c,err,strlen(err));
37}
6.2 响应缓冲策略
Redis 采用两级缓冲策略:
- 固定缓冲区:
c->buf(16KB),用于小响应 - 链表缓冲区:
c->reply,用于大响应
1// src/networking.c:219-257
2int _addReplyToBuffer(client *c, const char *s, size_t len) {
3 size_t available = sizeof(c->buf)-c->bufpos;
4
5 // 如果链表缓冲区不为空,不再使用固定缓冲区
6 if (listLength(c->reply) > 0) return C_ERR;
7
8 // 空间不足,使用链表缓冲区
9 if (len > available) return C_ERR;
10
11 memcpy(c->buf+c->bufpos,s,len);
12 c->bufpos += len;
13 return C_OK;
14}
15
16// src/networking.c:259-297
17void _addReplyStringToList(client *c, const char *s, size_t len) {
18 // 尝试追加到链表最后一个节点
19 if (listLength(c->reply)) {
20 clientReplyBlock *tail = listNodeValue(listLast(c->reply));
21 size_t avail = tail->size - tail->used;
22 size_t copy = avail < len ? avail : len;
23 memcpy(tail->buf + tail->used, s, copy);
24 tail->used += copy;
25 s += copy;
26 len -= copy;
27 }
28
29 if (len) {
30 // 创建新的响应块
31 size_t size = len < PROTO_REPLY_CHUNK_BYTES ? PROTO_REPLY_CHUNK_BYTES : len;
32 clientReplyBlock *buf = zmalloc(size + sizeof(clientReplyBlock));
33 buf->size = size;
34 buf->used = len;
35 memcpy(buf->buf, s, len);
36 listAddNodeTail(c->reply, buf);
37 c->reply_bytes += buf->size;
38 }
39}
6.3 同步发送(beforeSleep)
1// src/networking.c:1064-1104
2int handleClientsWithPendingWrites(void) {
3 listIter li;
4 listNode *ln;
5 int processed = listLength(server.clients_pending_write);
6
7 listRewind(server.clients_pending_write,&li);
8 while((ln = listNext(&li))) {
9 client *c = listNodeValue(ln);
10 c->flags &= ~CLIENT_PENDING_WRITE;
11 listDelNode(server.clients_pending_write,ln);
12
13 // 跳过受保护的客户端
14 if (c->flags & CLIENT_PROTECTED) continue;
15
16 // 优先尝试同步写入
17 if (writeToClient(c->fd,c,0) == C_ERR) continue;
18
19 // 如果同步写入后仍有数据未发送,注册写事件
20 if (clientHasPendingReplies(c)) {
21 int ae_flags = AE_WRITABLE;
22 // AOF fsync=always 时需要设置 AE_BARRIER
23 if (server.aof_state == AOF_ON &&
24 server.aof_fsync == AOF_FSYNC_ALWAYS)
25 {
26 ae_flags |= AE_BARRIER;
27 }
28 if (aeCreateFileEvent(server.el, c->fd, ae_flags,
29 sendReplyToClient, c) == AE_ERR)
30 {
31 freeClientAsync(c);
32 }
33 }
34 }
35 return processed;
36}
6.4 写入数据到客户端
1// src/networking.c:961-1051
2int writeToClient(int fd, client *c, int handler_installed) {
3 ssize_t nwritten = 0, totwritten = 0;
4 size_t objlen;
5 clientReplyBlock *o;
6
7 // 循环发送,直到没有待发送数据
8 while(clientHasPendingReplies(c)) {
9 // 优先发送固定缓冲区中的数据
10 if (c->bufpos > 0) {
11 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
12 if (nwritten <= 0) break;
13 c->sentlen += nwritten;
14 totwritten += nwritten;
15
16 if ((int)c->sentlen == c->bufpos) {
17 c->bufpos = 0;
18 c->sentlen = 0;
19 }
20 } else {
21 // 从链表中取数据块发送
22 o = listNodeValue(listFirst(c->reply));
23 objlen = o->used;
24
25 if (objlen == 0) {
26 c->reply_bytes -= o->size;
27 listDelNode(c->reply,listFirst(c->reply));
28 continue;
29 }
30
31 nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
32 if (nwritten <= 0) break;
33 c->sentlen += nwritten;
34 totwritten += nwritten;
35
36 if (c->sentlen == objlen) {
37 c->reply_bytes -= o->size;
38 listDelNode(c->reply,listFirst(c->reply));
39 c->sentlen = 0;
40 }
41 }
42
43 // 限制单次写入数据量(64MB),避免长时间占用事件循环
44 if (totwritten > NET_MAX_WRITES_PER_EVENT &&
45 (server.maxmemory == 0 ||
46 zmalloc_used_memory() < server.maxmemory) &&
47 !(c->flags & CLIENT_SLAVE)) break;
48 }
49
50 server.stat_net_output_bytes += totwritten;
51
52 if (nwritten == -1) {
53 if (errno == EAGAIN) {
54 nwritten = 0;
55 } else {
56 serverLog(LL_VERBOSE,
57 "Error writing to client: %s", strerror(errno));
58 freeClient(c);
59 return C_ERR;
60 }
61 }
62
63 if (totwritten > 0) {
64 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
65 }
66
67 if (!clientHasPendingReplies(c)) {
68 c->sentlen = 0;
69 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
70
71 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
72 freeClient(c);
73 return C_ERR;
74 }
75 }
76 return C_OK;
77}
6.5 异步发送回调
1// src/networking.c:1106-1112
2void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
3 UNUSED(el);
4 UNUSED(mask);
5 writeToClient(fd,privdata,1);
6}
6.6 响应发送流程图
1┌─────────────────────────────────────────────────────────────────────────┐
2│ 命令执行完成,生成响应 │
3│ (调用 addReply 系列函数) │
4└─────────────────────────────────────────────────────────────────────────┘
5 │
6 ▼
7┌─────────────────────────────────────────────────────────────────────────┐
8│ 尝试写入固定缓冲区 c->buf (16KB) │
9└─────────────────────────────────────────────────────────────────────────┘
10 │
11 ┌─────────────┴─────────────┐
12 │ │
13 ▼ ▼
14 ┌─────────────────┐ ┌─────────────────┐
15 │ 空间足够 │ │ 空间不足 │
16 │ 写入成功 │ │ 写入链表 │
17 └─────────────────┘ └─────────────────┘
18 │ │
19 └─────────────┬─────────────┘
20 │
21 ▼
22┌─────────────────────────────────────────────────────────────────────────┐
23│ 将 client 加入 server.clients_pending_write │
24└─────────────────────────────────────────────────────────────────────────┘
25 │
26 ▼
27┌─────────────────────────────────────────────────────────────────────────┐
28│ beforeSleep() 被调用 │
29│ (事件循环每次阻塞前执行) │
30└─────────────────────────────────────────────────────────────────────────┘
31 │
32 ▼
33┌─────────────────────────────────────────────────────────────────────────┐
34│ handleClientsWithPendingWrites() │
35│ 遍历所有待发送响应的客户端 │
36└─────────────────────────────────────────────────────────────────────────┘
37 │
38 ▼
39┌─────────────────────────────────────────────────────────────────────────┐
40│ writeToClient(fd, c, 0) │
41│ 尝试同步写入响应数据 │
42└─────────────────────────────────────────────────────────────────────────┘
43 │
44 ┌─────────────┴─────────────┐
45 │ │
46 ▼ ▼
47 ┌─────────────────┐ ┌─────────────────┐
48 │ 全部发送完毕 │ │ 还有数据未发送 │
49 │ (发送缓冲区未满) │ │ (发送缓冲区满) │
50 └─────────────────┘ └────────┬────────┘
51 │
52 ▼
53 ┌─────────────────────────────────┐
54 │ 注册 AE_WRITABLE 事件 │
55 │ 回调: sendReplyToClient │
56 │ 等待 socket 可写时异步发送 │
57 └─────────────────────────────────┘
58 │
59 ▼
60 ┌─────────────────────────────────┐
61 │ socket 可写,回调被触发 │
62 │ sendReplyToClient() │
63 │ -> writeToClient(fd, c, 1) │
64 └─────────────────────────────────┘
七、完整链路时序图
1┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
2│ Client │ │ Kernel │ │ Redis │ │Command │ │ DB │
3│ │ │ TCP │ │ Server │ │ Proc │ │ │
4└───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘
5 │ │ │ │ │
6 │ CONNECT │ │ │ │
7 │──────────────────>│ │ │ │
8 │ │ accept event │ │ │
9 │ │──────────────────>│ │ │
10 │ │ │ createClient() │ │
11 │ │ │ register read │ │
12 │ │ │ │ │
13 │ SET key value │ │ │ │
14 │──────────────────>│ │ │ │
15 │ │ read event │ │ │
16 │ │──────────────────>│ │ │
17 │ │ │ readQueryFromClient() │
18 │ │ │ processMultibulkBuffer() │
19 │ │ │ processCommand() │ │
20 │ │ │ lookupCommand() │ │
21 │ │ │ call() │ │
22 │ │ │──────────────────>│ │
23 │ │ │ │ setCommand() │
24 │ │ │ │──────────────────>│
25 │ │ │ │ │
26 │ │ │ │ dbAdd() │
27 │ │ │ │<──────────────────│
28 │ │ │ │ │
29 │ │ │ │ addReply(OK) │
30 │ │ │<──────────────────│ │
31 │ │ │ │ │
32 │ │ │ beforeSleep() │ │
33 │ │ │ writeToClient() │ │
34 │ │<──────────────────│ │ │
35 │ │ │ │ │
36 │ +OK\r\n │ │ │ │
37 │<──────────────────│ │ │ │
38 │ │ │ │ │
八、关键数据结构
8.1 client 结构体核心字段
1┌─────────────────────────────────────────────────────────────────────────────────────┐
2│ client 结构体核心字段 │
3├─────────────────────────────────────────────────────────────────────────────────────┤
4│ │
5│ 请求处理相关 │
6│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
7│ │ querybuf (sds) ────────────────────────────────────────────────────┐ │ │
8│ │ │ "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n" │ │ │
9│ │ └───────────────────────────────────────────────────┘ │ │
10│ │ argc (int) ────────────────────────────────────────────────────── 3 │ │
11│ │ argv (robj**) ──────────────────────────────────────────────────────┐ │ │
12│ │ │ [0]: "SET" [1]: "key" [2]: "value" │ │ │
13│ │ └─────────────────────────────────────────────────────┘ │ │
14│ │ cmd (redisCommand*)───────────────────────────────────────────────────────┐ │ │
15│ │ │ name: "set", proc: setCommand, arity: -3 │ │ │
16│ │ └─────────────────────────────────────────────────────┘ │ │
17│ └─────────────────────────────────────────────────────────────────────────────┘ │
18│ │
19│ 响应处理相关 │
20│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
21│ │ buf (char[16KB]) ──────────────────────────────────────────────────────┐ │ │
22│ │ │ "+OK\r\n" │ │ │
23│ │ └─────────────────────────────────────────────────────┘ │ │
24│ │ bufpos (int) ─────────────────────────────────────────────────────── 5 │ │
25│ │ reply (list) ──────────────────────────────────────────────────────┐ │ │
26│ │ │ node1 -> node2 -> node3 -> ... │ │ │
27│ │ └─────────────────────────────────────────────────────┘ │ │
28│ │ reply_bytes ─────────────────────────────────────────────────────── 0 │ │
29│ │ sentlen (size_t) ─────────────────────────────────────────────────────── 0 │ │
30│ └─────────────────────────────────────────────────────────────────────────────┘ │
31│ │
32└─────────────────────────────────────────────────────────────────────────────────────┘
8.2 redisCommand 结构体
1┌─────────────────────────────────────────────────────────────────────────────────────┐
2│ redisCommand 结构体 │
3├─────────────────────────────────────────────────────────────────────────────────────┤
4│ │
5│ SET 命令示例 │
6│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
7│ │ name : "set" │ │
8│ │ proc : setCommand │ │
9│ │ arity : -3 // 至少 3 个参数 │ │
10│ │ sflags : "wm" // w: 写命令, m: 可能内存溢出 │ │
11│ │ flags : CMD_WRITE | CMD_DENYOOM │ │
12│ │ firstkey : 1 // 第一个键在 argv[1] │ │
13│ │ lastkey : 1 // 最后一个键在 argv[1] │ │
14│ │ keystep : 1 // 键的步长 │ │
15│ │ microseconds: 123456 // 累计执行时间 │ │
16│ │ calls : 1000000 // 累计调用次数 │ │
17│ └─────────────────────────────────────────────────────────────────────────────┘ │
18│ │
19│ GET 命令示例 │
20│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
21│ │ name : "get" │ │
22│ │ proc : getCommand │ │
23│ │ arity : 2 // 正好 2 个参数 │ │
24│ │ sflags : "rF" // r: 读命令, F: 快速命令 │ │
25│ │ flags : CMD_READONLY | CMD_FAST │ │
26│ │ firstkey : 1 │ │
27│ │ lastkey : 1 │ │
28│ │ keystep : 1 │ │
29│ └─────────────────────────────────────────────────────────────────────────────┘ │
30│ │
31└─────────────────────────────────────────────────────────────────────────────────────┘
九、关键设计亮点
9.1 两级缓冲设计
| 缓冲区类型 | 大小 | 使用场景 | 特点 |
|---|---|---|---|
固定缓冲区 buf |
16KB | 小响应 | 零拷贝,高效 |
链表缓冲区 reply |
动态 | 大响应 | 按需扩展,内存友好 |
9.2 同步优先、异步兜底的发送策略
1┌─────────────────────────────────────────────────────────────────────────────────────┐
2│ 响应发送策略 │
3├─────────────────────────────────────────────────────────────────────────────────────┤
4│ │
5│ 命令执行完成后 │
6│ │ │
7│ ▼ │
8│ addReply() 封装响应 │
9│ │ │
10│ ▼ │
11│ 加入 clients_pending_write 队列 │
12│ │ │
13│ ▼ │
14│ beforeSleep() 中同步发送 │
15│ │ │
16│ ├──────────────────────┬──────────────────────┐ │
17│ ▼ ▼ │ │
18│ ┌───────────┐ ┌───────────┐ │ │
19│ │ 发送完毕 │ │ 未发送完 │ │ │
20│ │ 直接返回 │ │ 注册写事件 │ │ │
21│ └───────────┘ └─────┬─────┘ │ │
22│ │ │ │
23│ ▼ │ │
24│ 等待 socket 可写 │ │
25│ │ │ │
26│ ▼ │ │
27│ 异步发送剩余数据 │ │
28│ │
29└─────────────────────────────────────────────────────────────────────────────────────┘
9.3 非阻塞 I/O 全链路
| 阶段 | 操作 | 非阻塞处理 |
|---|---|---|
| 连接接受 | accept() |
EWOULDBLOCK 正常返回 |
| 数据读取 | read() |
EAGAIN 正常返回,等待下次事件 |
| 数据发送 | write() |
EAGAIN 正常返回,注册写事件 |
十、总结
Redis 客户端请求处理的完整链路包括四个阶段:
-
连接接收:
acceptTcpHandler→acceptCommonHandler→createClient,完成客户端结构体初始化和读事件注册 -
请求读取与解析:
readQueryFromClient→processInputBuffer→processMultibulkBuffer,完成 RESP 协议解析,将请求转换为argc/argv -
命令处理:
processCommand→lookupCommand→call,完成命令查找、前置检查、命令执行、慢日志记录、AOF/复制传播 -
响应发送:
addReply→handleClientsWithPendingWrites→writeToClient,完成响应封装、同步发送、异步兜底
整个链路设计精妙,通过非阻塞 I/O、事件驱动、两级缓冲、同步优先异步兜底等策略,实现了高效的单线程请求处理能力。