Redis 如何处理高并发连接(accept + read + write 流程)
一、概述
Redis 作为单线程的高性能内存数据库,能够轻松处理数万甚至数十万的并发连接。其核心秘诀在于采用了 事件驱动 + 非阻塞 I/O 的架构设计。本文将深入分析 Redis 处理高并发连接的完整流程,包括:
- accept 流程:如何接收新连接
- read 流程:如何读取客户端请求
- write 流程:如何发送响应数据
二、整体架构图
1┌─────────────────────────────────────────────────────────────────────────┐
2│ Redis 主线程 │
3├─────────────────────────────────────────────────────────────────────────┤
4│ │
5│ ┌───────────────┐ ┌───────────────┐ ┌─────────────┐ │
6│ │ 监听 socket │ │ 客户端 socket │ │ 客户端 socket│ ... │
7│ │ (listen_fd) │ │ (conn_fd) │ │ (conn_fd) │ │
8│ └──────┬────────┘ └──────┬────────┘ └──────┬──────┘ │
9│ │ │ │ │
10│ ▼ ▼ ▼ │
11│ ┌─────────────────────────────────────────────────────────┐ │
12│ │ I/O 多路复用器 │ │
13│ │ (epoll/kqueue) │ │
14│ └─────────────────────────┬───────────────────────────────┘ │
15│ │ │
16│ ▼ │
17│ ┌─────────────────────────────────────────────────────────┐ │
18│ │ 事件循环 (aeMain) │ │
19│ │ │ │
20│ │ while(!stop) { │ │
21│ │ beforesleep(); // 发送待发送的响应 │ │
22│ │ aeProcessEvents(); // 处理文件事件和时间事件 │ │
23│ │ } │ │
24│ └─────────────────────────────────────────────────────────┘ │
25│ │ │
26│ ┌───────────────────┼───────────────────┐ │
27│ ▼ ▼ ▼ │
28│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
29│ │ accept │ │ read │ │ write │ │
30│ │ Handler │ │ Handler │ │ Handler │ │
31│ └───────────┘ └───────────┘ └───────────┘ │
32│ │
33└─────────────────────────────────────────────────────────────────────────┘
三、accept 流程:接收新连接
3.1 监听 socket 的创建与注册
Redis 启动时,会创建监听 socket 并注册可读事件:
1// src/server.c:2130-2131
2// 为每个监听的 IP 端口注册可读事件
3if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
4 acceptTcpHandler,NULL) == AE_ERR)
说明:
server.ipfd[j]是监听 socket 的文件描述符acceptTcpHandler是可读事件触发时的回调函数- 当有新连接到来时,监听 socket 变为可读,触发
acceptTcpHandler
3.2 接受连接处理器 acceptTcpHandler
1// src/networking.c:726-744
2void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
3 int cport, cfd, max = MAX_ACCEPTS_PER_CALL; // 单次最多接受 1000 个连接
4 char cip[NET_IP_STR_LEN];
5 UNUSED(el);
6 UNUSED(mask);
7 UNUSED(privdata);
8
9 // 循环接受新连接,直到没有更多连接或达到单次上限
10 while(max--) {
11 // 接受一个 TCP 连接,返回客户端 fd
12 // anetTcpAccept 封装了 accept 系统调用
13 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
14 if (cfd == ANET_ERR) {
15 // EWOULDBLOCK 表示没有更多待处理的连接,正常情况
16 if (errno != EWOULDBLOCK)
17 serverLog(LL_WARNING,
18 "Accepting client connection: %s", server.neterr);
19 return;
20 }
21 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
22 // 创建客户端结构体,注册读事件回调
23 acceptCommonHandler(cfd,0,cip);
24 }
25}
关键设计:
- 批量接受:单次最多接受
MAX_ACCEPTS_PER_CALL(1000)个连接,避免长时间阻塞 - 非阻塞:监听 socket 设置为非阻塞模式,
accept不会阻塞 - EWOULDBLOCK 处理:无连接可接受时正常返回,不视为错误
3.3 通用接受处理器 acceptCommonHandler
1// src/networking.c:647-724
2#define MAX_ACCEPTS_PER_CALL 1000
3static void acceptCommonHandler(int fd, int flags, char *ip) {
4 client *c;
5 // 创建客户端结构体,注册读事件
6 if ((c = createClient(fd)) == NULL) {
7 serverLog(LL_WARNING,
8 "Error registering fd event for the new client: %s (fd=%d)",
9 strerror(errno),fd);
10 close(fd); /* May be already closed, just ignore errors */
11 return;
12 }
13 /* If maxclient directive is set and this is one client more... close the
14 * connection. Note that we create the client instead to check before
15 * for this condition, since now the socket is already set in non-blocking
16 * mode and we can send an error for free using the Kernel I/O */
17 // 检查是否超过最大客户端数量限制
18 if (listLength(server.clients) > server.maxclients) {
19 char *err = "-ERR max number of clients reached\r\n";
20
21 /* That's a best effort error message, don't check write errors */
22 if (write(c->fd,err,strlen(err)) == -1) {
23 /* Nothing to do, Just to avoid the warning... */
24 }
25 server.stat_rejected_conn++; // 统计拒绝的连接数
26 freeClient(c);
27 return;
28 }
29
30 /* If the server is running in protected mode (the default) and there
31 * is no password set, nor a specific interface is bound, we don't accept
32 * requests from non loopback interfaces. Instead we try to explain the
33 * user what to do to fix it if needed. */
34 // 检查保护模式:拒绝非本地的未认证连接
35 if (server.protected_mode &&
36 server.bindaddr_count == 0 &&
37 server.requirepass == NULL &&
38 !(flags & CLIENT_UNIX_SOCKET) &&
39 ip != NULL)
40 {
41 if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
42 char *err =
43 "-DENIED Redis is running in protected mode because protected "
44 "mode is enabled, no bind address was specified, no "
45 "authentication password is requested to clients. In this mode "
46 "connections are only accepted from the loopback interface. "
47 "If you want to connect from external computers to Redis you "
48 "may adopt one of the following solutions: "
49 "1) Just disable protected mode sending the command "
50 "'CONFIG SET protected-mode no' from the loopback interface "
51 "by connecting to Redis from the same host the server is "
52 "running, however MAKE SURE Redis is not publicly accessible "
53 "from internet if you do so. Use CONFIG REWRITE to make this "
54 "change permanent. "
55 "2) Alternatively you can just disable the protected mode by "
56 "editing the Redis configuration file, and setting the protected "
57 "mode option to 'no', and then restarting the server. "
58 "3) If you started the server manually just for testing, restart "
59 "it with the '--protected-mode no' option. "
60 "4) Setup a bind address or an authentication password. "
61 "NOTE: You only need to do one of the above things in order for "
62 "the server to start accepting connections from the outside.\r\n";
63 if (write(c->fd,err,strlen(err)) == -1) {
64 /* Nothing to do, Just to avoid the warning... */
65 }
66 server.stat_rejected_conn++;
67 freeClient(c);
68 return;
69 }
70 }
71
72 server.stat_numconnections++; // 统计总连接数
73 c->flags |= flags;
74}
关键检查:
- 最大连接数限制:超过
maxclients配置时拒绝新连接 - 保护模式检查:拒绝来自非本地且未配置认证的连接
3.4 创建客户端 createClient
1// src/networking.c:85-181
2client *createClient(int fd) {
3 client *c = zmalloc(sizeof(client)); // 分配客户端结构体内存
4
5 /* passing -1 as fd it is possible to create a non connected client.
6 * This is useful since all the commands needs to be executed
7 * in the context of a client. When commands are executed in other
8 * contexts (for instance a Lua script) we need a non connected client. */
9 // fd != -1 表示创建真实的网络客户端
10 // fd == -1 用于创建伪客户端(如 Lua 脚本执行、AOF 加载)
11 if (fd != -1) {
12 anetNonBlock(NULL,fd); // 设置非阻塞模式
13 anetEnableTcpNoDelay(NULL,fd); // 禁用 Nagle 算法,减少延迟
14 if (server.tcpkeepalive)
15 anetKeepAlive(NULL,fd,server.tcpkeepalive); // 启用 TCP keepalive
16 // 注册可读事件,回调函数为 readQueryFromClient
17 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
18 readQueryFromClient, c) == AE_ERR)
19 {
20 // 注册失败,关闭 fd 并释放内存
21 close(fd);
22 zfree(c);
23 return NULL;
24 }
25 }
26
27 // 初始化客户端状态
28 selectDb(c,0); // 默认选择数据库 0
29 uint64_t client_id = atomicIncr(server.next_client_id, 1);
30 c->id = client_id; // 分配唯一客户端 ID
31 c->fd = fd;
32 c->name = NULL;
33 c->bufpos = 0; // 固定响应缓冲区位置
34 c->querybuf = sdsempty(); // 查询缓冲区
35 c->querybuf_peak = 0;
36 c->reqtype = 0;
37 c->argc = 0;
38 c->argv = NULL;
39 c->cmd = c->lastcmd = NULL;
40 c->multibulklen = 0;
41 c->bulklen = -1;
42 c->sentlen = 0;
43 c->flags = 0;
44 c->ctime = c->lastinteraction = server.unixtime; // 记录创建时间和最后交互时间
45 c->authenticated = 0;
46 c->replstate = REPL_STATE_NONE;
47 c->repl_put_online_on_ack = 0;
48 c->reploff = 0;
49 c->read_reploff = 0;
50 c->repl_ack_off = 0;
51 c->repl_ack_time = 0;
52 c->slave_listening_port = 0;
53 c->slave_ip[0] = '\0';
54 c->slave_capa = SLAVE_CAPA_NONE;
55 c->reply = listCreate(); // 响应链表
56 c->reply_bytes = 0;
57 c->obuf_soft_limit_reached_time = 0;
58 listSetFreeMethod(c->reply, zfree); // 设置链表节点的释放函数
59 c->btype = BLOCKED_NONE;
60 c->bpop.timeout = 0;
61 c->bpop.keys = dictCreate(&objectKeyPointerValueDictType, NULL);
62 c->bpop.target = c->bpop.xread_group = NULL;
63 c->bpop.xread_consumer = NULL;
64 c->bpop.blocked_on_key = 0;
65 c->bpop.numreplicas = 0;
66 c->bpop.numlocal = 0;
67 c->woff = 0;
68 c->watched_keys = listCreate();
69 c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType, NULL);
70 c->pubsub_patterns = listCreate();
71 c->peerid = NULL;
72 c->client_list_node = NULL;
73 listSetFreeMethod(c->pubsub_patterns, decrRefCountVoid);
74 listSetMatchMethod(c->pubsub_patterns, matchPubsubPattern);
75 // 将客户端添加到服务器客户端链表
76 if (fd != -1) {
77 c->client_list_node = listAddNodeTailReturn(server.clients, c);
78 }
79 // 初始化客户端输出缓冲区限制
80 c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES); // 分配固定响应缓冲区(16KB)
81 // 将客户端添加到待写入客户端列表(用于发送初始响应)
82 listAddNodeHead(server.clients_pending_write, c);
83 return c;
84}
关键操作:
- 设置非阻塞模式:确保
read/write不会阻塞事件循环 - 禁用 Nagle 算法:减少小数据包的延迟
- 注册读事件:回调函数为
readQueryFromClient - 初始化客户端状态:包括查询缓冲区、响应缓冲区等
3.5 accept 流程图
1 ┌─────────────────────────────────────────┐
2 │ 新连接到达监听 socket │
3 │ (listen_fd 可读) │
4 └─────────────────┬───────────────────────┘
5 │
6 ▼
7 ┌─────────────────────────────────────────┐
8 │ acceptTcpHandler() 被调用 │
9 │ (监听 socket 的读回调) │
10 └─────────────────┬───────────────────────┘
11 │
12 ▼
13 ┌─────────────────────────────────────────┐
14 │ anetTcpAccept() -> accept() │
15 │ 接受连接,返回客户端 fd │
16 └─────────────────┬───────────────────────┘
17 │
18 ▼
19 ┌─────────────────────────────────────────┐
20 │ acceptCommonHandler() │
21 │ 检查连接限制和保护模式 │
22 └─────────────────┬───────────────────────┘
23 │
24 ┌─────────────────┴───────────────────────┐
25 │ │
26 ▼ ▼
27 ┌─────────────────┐ ┌─────────────────┐
28 │ 超过限制/保护 │ │ 正常连接 │
29 │ 发送错误消息 │ │ │
30 │ 关闭连接 │ │ │
31 └─────────────────┘ └────────┬────────┘
32 │
33 ▼
34 ┌─────────────────────────────────┐
35 │ createClient(fd) │
36 │ 1. 设置非阻塞模式 │
37 │ 2. 注册 AE_READABLE 事件 │
38 │ 3. 初始化客户端状态 │
39 └─────────────────────────────────┘
四、read 流程:读取客户端请求
4.1 读事件回调 readQueryFromClient
1// src/networking.c:1500-1570
2void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
3 client *c = (client*) privdata; // 获取客户端结构体
4 int nread, readlen;
5 size_t qblen;
6 UNUSED(el);
7 UNUSED(mask);
8
9 readlen = PROTO_IOBUF_LEN; // 默认读取缓冲区大小 16KB
10 /* If this is a multi bulk request, and we are processing a bulk reply
11 * that is large enough, try to maximize the probability that the query
12 * buffer contains exactly the SDS string representing the object, even
13 * at the risk of requiring more read(2) calls. This way the function
14 * processMultiBulkBuffer() can avoid copying buffers to create the
15 * Redis Object representing the argument. */
16 // 大包优化:如果是多批量请求且正在处理大参数,调整读取长度
17 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
18 && c->bulklen >= PROTO_MBULK_BIG_ARG)
19 {
20 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
21
22 /* Note that the 'remaining' variable may be zero in some edge case,
23 * for example once we resume a blocked client after CLIENT PAUSE. */
24 if (remaining > 0 && remaining < readlen) readlen = remaining;
25 }
26
27 qblen = sdslen(c->querybuf); // 获取当前查询缓冲区长度
28 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
29 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 扩展缓冲区空间
30 // 从 socket 读取数据到查询缓冲区末尾
31 nread = read(fd, c->querybuf+qblen, readlen);
32 if (nread == -1) {
33 // EAGAIN 表示暂无数据可读,非阻塞模式下正常返回
34 if (errno == EAGAIN) {
35 return;
36 } else {
37 // 其他错误,记录日志并释放客户端
38 serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
39 freeClient(c);
40 return;
41 }
42 } else if (nread == 0) {
43 // 返回 0 表示客户端关闭连接
44 serverLog(LL_VERBOSE, "Client closed connection");
45 freeClient(c);
46 return;
47 } else if (c->flags & CLIENT_MASTER) {
48 // 主从复制:追加到 pending_querybuf
49 /* Append the query buffer to the pending (not applied) buffer
50 * of the master. We'll use this buffer later in order to have a
51 * copy of the string applied by the last command executed. */
52 c->pending_querybuf = sdscatlen(c->pending_querybuf,
53 c->querybuf+qblen,nread);
54 }
55
56 sdsIncrLen(c->querybuf,nread); // 更新缓冲区有效数据长度
57 c->lastinteraction = server.unixtime; // 更新最后交互时间,用于超时检测
58 if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
59 server.stat_net_input_bytes += nread; // 统计网络输入字节数
60 // 检查查询缓冲区是否超过最大限制
61 if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
62 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
63
64 bytes = sdscatrepr(bytes,c->querybuf,64);
65 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
66 sdsfree(ci);
67 sdsfree(bytes);
68 freeClient(c);
69 return;
70 }
71
72 /* Time to process the buffer. If the client is a master we need to
73 * compute the difference between the applied offset before and after
74 * processing the buffer, to understand how much of the replication stream
75 * was actually applied to the master state: this quantity, and its
76 * corresponding part of the replication stream, will be propagated to
77 * the sub-slaves and to the replication backlog. */
78 // 处理输入缓冲区中的命令并尝试执行
79 processInputBufferAndReplicate(c);
80}
关键处理:
- 读取数据:从 socket 读取数据到查询缓冲区
- 错误处理:
EAGAIN正常返回,其他错误关闭连接 - 连接关闭:
read返回 0 表示客户端关闭连接 - 缓冲区限制:检查查询缓冲区是否超过最大限制
- 命令处理:调用
processInputBufferAndReplicate处理命令
4.2 处理输入缓冲区 processInputBuffer
1// src/networking.c:1453-1498
2void processInputBuffer(client *c) {
3 server.current_client = c; // 设置当前处理的客户端
4 /* Keep processing while there is something in the input buffer */
5 // 循环处理查询缓冲区中的所有命令
6 while(sdslen(c->querybuf)) {
7 /* Return if clients are paused. */
8 // 客户端暂停时不处理命令(用于 CLIENT PAUSE 命令)
9 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
10
11 /* Immediately abort if the client is in the middle of something. */
12 // 如果客户端正在处理阻塞命令,暂停处理
13 if (c->flags & CLIENT_BLOCKED) break;
14
15 /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
16 * written to the client. Make sure to not let the reply grow after
17 * this flag has been populated (e.g. don't process more commands). */
18 // 如果标记为关闭,不再处理更多命令
19 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
20
21 /* Determine request type when unknown. */
22 // 确定请求协议类型:内联命令还是多批量命令
23 if (!c->reqtype) {
24 if (c->querybuf[0] == '*') {
25 // 多批量格式:*3\r\n$3\r\nSET\r\n...
26 c->reqtype = PROTO_REQ_MULTIBULK;
27 } else {
28 // 内联格式:SET key value
29 c->reqtype = PROTO_REQ_INLINE;
30 }
31 }
32
33 if (c->reqtype == PROTO_REQ_INLINE) {
34 // 解析内联命令
35 if (processInlineBuffer(c) != C_OK) break;
36 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
37 // 解析多批量命令
38 if (processMultibulkBuffer(c) != C_OK) break;
39 } else {
40 serverPanic("Unknown request type");
41 }
42
43 /* multibulk processing could see a greater length of the query
44 * buffer, this is an optimization so that we don't parse it again. */
45 // 命令参数不完整,等待更多数据
46 if (c->argc == 0) {
47 resetClient(c);
48 } else {
49 /* Only reset the client when the command was executed. */
50 // 执行命令
51 if (processCommand(c) == C_OK) {
52 // 命令执行成功,重置客户端状态准备处理下一个命令
53 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
54 /* If the client is a master and we are not in a MULTI
55 * context, we need to update the applied replication
56 * offset, because this is the point where we actually
57 * processed the command. */
58 c->reploff = c->read_reploff - sdslen(c->querybuf);
59 }
60 // 重置客户端状态
61 if (!(c->flags & CLIENT_BLOCKED || c->flags & CLIENT_MULTI))
62 resetClient(c);
63 }
64 /* freeMemoryIfNeeded may flush slave output buffers. This may
65 * result into a slave, that may be the master of a sub-slave,
66 * to get disconnected. Handle the special case.
67 * Sometimes the flushed output buffer can contain a PARTIAL
68 * command. If this is the case, reset the client. */
69 // 如果客户端被关闭,跳出循环
70 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
71 }
72 }
73 server.current_client = NULL; // 清除当前客户端引用
74}
处理流程:
- 确定协议类型:内联命令还是多批量命令
- 解析命令:将请求解析为
argc和argv - 执行命令:调用
processCommand执行命令 - 重置状态:准备处理下一个命令
4.3 read 流程图
1 ┌─────────────────────────────────────────┐
2 │ 客户端 socket 可读 │
3 │ (有数据到达) │
4 └─────────────────┬───────────────────────┘
5 │
6 ▼
7 ┌─────────────────────────────────────────┐
8 │ readQueryFromClient() 被调用 │
9 │ (客户端 socket 的读回调) │
10 └─────────────────┬───────────────────────┘
11 │
12 ▼
13 ┌─────────────────────────────────────────┐
14 │ read(fd, buf, len) │
15 │ 读取数据到查询缓冲区 │
16 └─────────────────┬───────────────────────┘
17 │
18 ┌─────────────────┼───────────────────────┐
19 │ │ │
20 ▼ ▼ ▼
21 ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
22 │ nread == -1 │ │ nread == 0 │ │ nread > 0 │
23 │ │ │ │ │ │
24 │ EAGAIN: 返回 │ │ 客户端关闭 │ │ 正常读取 │
25 │ 其他: 关闭 │ │ freeClient() │ │ │
26 └──────────────┘ └──────────────┘ └───────┬──────┘
27 │
28 ▼
29 ┌─────────────────────────────────┐
30 │ processInputBufferAndReplicate │
31 │ 解析并执行命令 │
32 └─────────────────────────────────┘
33 │
34 ┌───────────────────────────────┼───────────────────┐
35 ▼ ▼ ▼
36 ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐
37 │ processInline │ │ processMultibulk│ │ 执行命令 │
38 │ Buffer() │ │ Buffer() │ │ processCommand│
39 │ (内联命令) │ │ (RESP 协议) │ │ │
40 └─────────────────┘ └─────────────────┘ └─────────────┘
五、write 流程:发送响应数据
5.1 响应写入策略
Redis 采用同步优先、异步兜底的策略发送响应:
- 同步发送:在
beforeSleep中尝试直接发送响应 - 异步发送:如果同步发送未完成,注册写事件等待发送
5.2 beforeSleep 中的写入处理
1// src/server.c:1355-1410
2/* This function gets called every time Redis is entering the
3 * main loop of the event driven library, that is, before to sleep
4 * for ready file descriptors. */
5void beforeSleep(struct aeEventLoop *eventLoop) {
6 UNUSED(eventLoop);
7
8 /* Call the Redis Cluster before sleep function. Note that this function
9 * may change the state of Redis Cluster (from ok to fail or vice versa),
10 * so it's a good idea to call it before serving the unblocked clients
11 * later in this function. */
12 // Redis Cluster 相关处理
13 if (server.cluster_enabled) clusterBeforeSleep();
14
15 /* Run a fast expire cycle (the called function will return
16 * ASAP if a fast cycle is not needed). */
17 // 快速过期键检查
18 if (server.active_expire_enabled && server.masterhost == NULL)
19 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
20
21 /* Send all the slaves an ACK request if at least one client blocked
22 * during the previous event loop iteration. */
23 // 发送 REPLCONF GETACK 给从节点
24 if (server.get_ack_from_slaves) {
25 robj *argv[3];
26
27 argv[0] = createStringObject("REPLCONF",8);
28 argv[1] = createStringObject("GETACK",6);
29 argv[2] = createStringObject("*",1); /* Not used argument. */
30 replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
31 decrRefCount(argv[0]);
32 decrRefCount(argv[1]);
33 decrRefCount(argv[2]);
34 server.get_ack_from_slaves = 0;
35 }
36
37 /* Unblock all the clients blocked for synchronous replication
38 * in WAIT. */
39 // 处理等待同步复制确认的客户端
40 if (listLength(server.clients_waiting_acks))
41 processClientsWaitingReplicas();
42
43 /* Check if there are clients unblocked by modules that implement
44 * blocking commands. */
45 // 处理模块阻塞的客户端
46 moduleHandleBlockedClients();
47
48 /* Try to process pending commands for clients that were just unblocked. */
49 if (listLength(server.unblocked_clients))
50 processUnblockedClients();
51
52 /* Write the AOF buffer on disk */
53 // 刷新 AOF 缓冲区到磁盘
54 flushAppendOnlyFile(0);
55
56 /* Handle writes with pending output buffers. */
57 // 处理待发送的响应数据(关键!)
58 handleClientsWithPendingWrites();
59
60 /* Before we are going to sleep, let the threads access the dataset by
61 * releasing the GIL. Redis main thread will not touch anything at this
62 * time. */
63 // 释放 GIL(模块线程安全)
64 if (moduleCount()) moduleReleaseGIL();
65}
5.3 处理待发送客户端 handleClientsWithPendingWrites
1// src/networking.c:1064-1104
2int handleClientsWithPendingWrites(void) {
3 listIter li;
4 listNode *ln;
5 int processed = listLength(server.clients_pending_write); // 统计待处理客户端数量
6
7 listRewind(server.clients_pending_write,&li); // 初始化迭代器
8 while((ln = listNext(&li))) {
9 client *c = listNodeValue(ln); // 获取客户端结构体
10 c->flags &= ~CLIENT_PENDING_WRITE; // 清除待写标志
11 listDelNode(server.clients_pending_write,ln); // 从待写列表中移除
12
13 /* If the client is protected, don't do anything,
14 * that may prevent the client to be freed. */
15 // 跳过受保护的客户端(如被暂停的客户端)
16 if (c->flags & CLIENT_PROTECTED) continue;
17
18 /* Try to write buffers to the client socket. */
19 // 优先尝试同步写入,直接发送响应数据
20 // 这样可以避免注册写事件的开销,提高响应速度
21 if (writeToClient(c->fd,c,0) == C_ERR) continue;
22
23 /* If after the synchronous writes above there is still data to
24 * output, we need to install a write handler. */
25 // 如果同步写入后仍有数据未发送完(如发送缓冲区满)
26 // 则注册可写事件,等待 socket 可写时异步发送
27 if (clientHasPendingReplies(c)) {
28 int ae_flags = AE_WRITABLE;
29 // 如果开启了 AOF 且 fsync 策略为 always
30 // 需要设置 AE_BARRIER,确保先刷盘再发送响应
31 /* For the fsync=always policy, we want the write operation
32 * to be performed before the reply is sent. */
33 if (server.aof_state == AOF_ON &&
34 server.aof_fsync == AOF_FSYNC_ALWAYS)
35 {
36 ae_flags |= AE_BARRIER;
37 }
38 // 注册可写事件,回调函数为 sendReplyToClient
39 if (aeCreateFileEvent(server.el, c->fd, ae_flags,
40 sendReplyToClient, c) == AE_ERR)
41 {
42 freeClientAsync(c); // 注册失败,异步释放客户端
43 }
44 }
45 }
46 return processed;
47}
关键设计:
- 同步优先:先尝试直接写入,减少事件注册开销
- 异步兜底:同步写入未完成时,注册写事件
- AE_BARRIER:特定场景下确保数据持久化顺序
5.4 写入数据到客户端 writeToClient
1// src/networking.c:961-1051
2int writeToClient(int fd, client *c, int handler_installed) {
3 ssize_t nwritten = 0, totwritten = 0; // 单次写入量、总写入量
4 size_t objlen;
5 clientReplyBlock *o; // 响应数据块
6
7 // 循环发送,直到没有待发送数据
8 while(clientHasPendingReplies(c)) {
9 // 优先发送固定缓冲区中的数据(小响应)
10 if (c->bufpos > 0) {
11 // write 系统调用,写入 [sentlen, bufpos) 区间的数据
12 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
13 if (nwritten <= 0) break; // 写入失败或 EAGAIN
14 c->sentlen += nwritten; // 更新已发送偏移量
15 totwritten += nwritten;
16
17 // 如果固定缓冲区数据全部发送完毕
18 /* If the buffer was sent, set bufpos to zero to continue with
19 * the remainder of the reply. */
20 if ((int)c->sentlen == c->bufpos) {
21 c->bufpos = 0; // 重置缓冲区位置
22 c->sentlen = 0; // 重置已发送偏移
23 }
24 } else {
25 // 固定缓冲区为空,从链表中取数据块发送(大响应)
26 o = listNodeValue(listFirst(c->reply));
27 objlen = o->used;
28
29 if (objlen == 0) {
30 c->reply_bytes -= o->size;
31 listDelNode(c->reply,listFirst(c->reply));
32 continue;
33 }
34
35 // 写入数据块中未发送的部分
36 nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
37 if (nwritten <= 0) break;
38 c->sentlen += nwritten;
39 totwritten += nwritten;
40
41 // 如果数据块全部发送完毕,从链表中移除
42 /* If we fully sent the object on head go to the next one */
43 if (c->sentlen == objlen) {
44 c->reply_bytes -= o->size;
45 listDelNode(c->reply,listFirst(c->reply));
46 c->sentlen = 0;
47 /* If there are no longer objects in the list, we expect
48 * the count of reply bytes to be exactly zero. */
49 if (listLength(c->reply) == 0)
50 serverAssert(c->reply_bytes == 0);
51 }
52 }
53 // 限制单次写入数据量,避免长时间占用事件循环
54 // NET_MAX_WRITES_PER_EVENT 默认 64MB
55 /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
56 * bytes, in a single threaded server it's a good idea to serve
57 * other clients as well, even if a very large request comes from
58 * super fast link that is always able to accept data (in real world
59 * scenario think about 'KEYS *' against the loopback interface).
60 *
61 * However if we are over the maxmemory limit we ignore that and
62 * just deliver as much data as it is possible to deliver.
63 *
64 * Moreover, we also send as much as possible if the client is
65 * a slave (otherwise, on high-speed traffic, the replication
66 * buffer will grow indefinitely) */
67 if (totwritten > NET_MAX_WRITES_PER_EVENT &&
68 (server.maxmemory == 0 ||
69 zmalloc_used_memory() < server.maxmemory) &&
70 !(c->flags & CLIENT_SLAVE)) break;
71 }
72 server.stat_net_output_bytes += totwritten; // 统计网络输出字节数
73 if (nwritten == -1) {
74 if (errno == EAGAIN) {
75 nwritten = 0; // EAGAIN 不是错误,只是暂时无法写入
76 } else {
77 serverLog(LL_VERBOSE,
78 "Error writing to client: %s", strerror(errno));
79 freeClient(c);
80 return C_ERR;
81 }
82 }
83 if (totwritten > 0) {
84 /* For clients representing masters we don't count sending data
85 * as an interaction, since we always send REPLCONF ACK commands
86 * that take some time to just fill the socket output buffer.
87 * We just rely on data / pings received for timeout detection. */
88 // 更新最后交互时间(主节点连接除外)
89 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
90 }
91 // 所有数据发送完毕,清理状态并移除写事件
92 if (!clientHasPendingReplies(c)) {
93 c->sentlen = 0;
94 // handler_installed 表示写事件是否已注册
95 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
96
97 /* Close connection after entire reply has been sent. */
98 // 如果标记为发送后关闭,执行关闭
99 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
100 freeClient(c);
101 return C_ERR;
102 }
103 }
104 return C_OK;
105}
关键设计:
- 两级缓冲:固定缓冲区(16KB)+ 链表缓冲区
- 写入量限制:防止大响应长时间占用事件循环
- EAGAIN 处理:发送缓冲区满时返回,等待下次发送
5.5 异步写回调 sendReplyToClient
1// src/networking.c:1106-1112
2void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
3 UNUSED(el);
4 UNUSED(mask);
5 // 直接调用 writeToClient,handler_installed=1 表示已注册写事件
6 writeToClient(fd,privdata,1);
7}
5.6 write 流程图
1┌─────────────────────────────────────────────────────────────────────────┐
2│ 命令执行完成,生成响应 │
3│ (添加到 client->buf 或 client->reply) │
4└─────────────────────────────────────────────────────────────────────────┘
5 │
6 ▼
7┌─────────────────────────────────────────────────────────────────────────┐
8│ 将 client 加入 server.clients_pending_write │
9└─────────────────────────────────────────────────────────────────────────┘
10 │
11 ▼
12┌─────────────────────────────────────────────────────────────────────────┐
13│ beforeSleep() 被调用 │
14│ (事件循环每次阻塞前执行) │
15└─────────────────────────────────────────────────────────────────────────┘
16 │
17 ▼
18┌─────────────────────────────────────────────────────────────────────────┐
19│ handleClientsWithPendingWrites() │
20│ 遍历所有待发送响应的客户端 │
21└─────────────────────────────────────────────────────────────────────────┘
22 │
23 ▼
24┌─────────────────────────────────────────────────────────────────────────┐
25│ writeToClient(fd, c, 0) │
26│ 尝试同步写入响应数据 │
27└─────────────────────────────────────────────────────────────────────────┘
28 │
29 ┌─────────────────┴─────────────────┐
30 │ │
31 ▼ ▼
32 ┌───────────────────────┐ ┌───────────────────────┐
33 │ 全部发送完毕 │ │ 还有数据未发送 │
34 │ (发送缓冲区未满) │ │ (发送缓冲区满) │
35 └───────────────────────┘ └───────────┬───────────┘
36 │
37 ▼
38 ┌─────────────────────────────────┐
39 │ aeCreateFileEvent(fd, │
40 │ AE_WRITABLE, │
41 │ sendReplyToClient) │
42 │ 注册写事件,等待 socket 可写 │
43 └─────────────────────────────────┘
44 │
45 ▼
46 ┌─────────────────────────────────┐
47 │ socket 可写时 │
48 │ sendReplyToClient() 被调用 │
49 │ → writeToClient(fd, c, 1) │
50 │ 继续发送剩余数据 │
51 └─────────────────────────────────┘
52 │
53 ┌─────────────────────────┴─────────────────┐
54 │ │
55 ▼ ▼
56 ┌─────────────────────────┐ ┌─────────────────────────┐
57 │ 全部发送完毕 │ │ 还有数据未发送 │
58 │ 删除写事件 │ │ 等待下次 socket 可写 │
59 │ aeDeleteFileEvent() │ │ │
60 └─────────────────────────┘ └─────────────────────────┘
六、完整连接生命周期流程图
1┌─────────────────────────────────────────────────────────────────────────┐
2│ 客户端连接生命周期 │
3└─────────────────────────────────────────────────────────────────────────┘
4
5┌───────────────┐ ┌───────────────┐ ┌───────────────┐
6│ 客户端 │ │ Redis │ │ 事件循环 │
7└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
8 │ │ │
9 │ 1. SYN │ │
10 │ ──────────────────> │ │
11 │ │ │
12 │ 2. SYN+ACK │ │
13 │ <────────────────── │ │
14 │ │ │
15 │ 3. ACK │ │
16 │ ──────────────────> │ │
17 │ │ │
18 │ │ listen_fd 可读 │
19 │ │ ──────────────────> │
20 │ │ │
21 │ │ 4. acceptTcpHandler
22 │ │ <────────────────── │
23 │ │ │
24 │ │ 5. createClient │
25 │ │ 注册读事件 │
26 │ │ ──────────────────> │
27 │ │ │
28 │ 6. 发送命令 │ │
29 │ ──────────────────> │ │
30 │ │ │
31 │ │ client_fd 可读 │
32 │ │ ──────────────────> │
33 │ │ │
34 │ │ 7. readQueryFromClient
35 │ │ <────────────────── │
36 │ │ │
37 │ │ 8. processCommand │
38 │ │ 执行命令 │
39 │ │ 生成响应 │
40 │ │ │
41 │ │ 9. beforeSleep │
42 │ │ <───────────────── │
43 │ │ │
44 │ │ 10. writeToClient │
45 │ │ 发送响应 │
46 │ 11. 收到响应 │ ─────────────────> │
47 │ <────────────────── │ │
48 │ │ │
49 │ ... 循环处理多个命令 ... │
50 │ │ │
51 │ 12. 关闭连接 │ │
52 │ ──────────────────> │ │
53 │ │ read 返回 0 │
54 │ │ ──────────────────> │
55 │ │ │
56 │ │ 13. freeClient │
57 │ │ <────────────────── │
58 │ │ │
59 ▼ ▼ ▼
七、高并发处理的关键设计
7.1 非阻塞 I/O
1// 所有 socket 都设置为非阻塞模式
2anetNonBlock(NULL, fd);
优势:
accept、read、write都不会阻塞事件循环- 单线程可以处理大量并发连接
7.2 事件驱动
1// 监听 socket 注册读事件
2aeCreateFileEvent(server.el, listen_fd, AE_READABLE, acceptTcpHandler, NULL);
3
4// 客户端 socket 注册读事件
5aeCreateFileEvent(server.el, client_fd, AE_READABLE, readQueryFromClient, c);
优势:
- 只在有事件发生时才处理,避免无效轮询
- 通过 I/O 多路复用实现高效的事件分发
7.3 批量处理
1// accept: 单次最多接受 1000 个连接
2int max = MAX_ACCEPTS_PER_CALL;
3while(max--) { ... }
4
5// read: 一次读取 16KB 数据
6readlen = PROTO_IOBUF_LEN;
7
8// write: 单次最多写入 64MB 数据
9if (totwritten > NET_MAX_WRITES_PER_EVENT) break;
优势:
- 减少系统调用次数
- 平衡公平性和效率
7.4 两级响应缓冲区
1// 小响应使用固定缓冲区(16KB)
2c->buf[PROTO_REPLY_CHUNK_BYTES];
3c->bufpos;
4
5// 大响应使用链表缓冲区
6c->reply; // list of clientReplyBlock
优势:
- 小响应无需额外内存分配
- 大响应通过链表支持无限大小
7.5 同步优先写入策略
1// beforeSleep 中尝试同步写入
2if (writeToClient(c->fd, c, 0) == C_ERR) continue;
3
4// 只有同步写入未完成才注册写事件
5if (clientHasPendingReplies(c)) {
6 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c);
7}
优势:
- 减少事件注册/删除开销
- 大多数小响应可以直接发送完成
八、总结
Redis 处理高并发连接的核心机制:
| 阶段 | 关键函数 | 核心设计 |
|---|---|---|
| accept | acceptTcpHandler → createClient |
批量接受、非阻塞、注册读事件 |
| read | readQueryFromClient → processInputBuffer |
非阻塞读取、协议解析、命令执行 |
| write | handleClientsWithPendingWrites → writeToClient |
同步优先、异步兜底、两级缓冲 |
核心优势:
- 单线程简化并发:无需锁,无上下文切换开销
- 非阻塞 I/O:所有操作都不会阻塞事件循环
- 事件驱动:只在有事件时处理,高效利用 CPU
- 批量处理:减少系统调用,平衡公平性
- 灵活缓冲:两级缓冲区适应不同大小的响应
通过这些设计,Redis 单线程即可处理数十万并发连接,充分发挥 I/O 多路复用的性能优势。
— END —