Redis 如何处理高并发连接(accept + read + write 流程)

一、概述

Redis 作为单线程的高性能内存数据库,能够轻松处理数万甚至数十万的并发连接。其核心秘诀在于采用了 事件驱动 + 非阻塞 I/O 的架构设计。本文将深入分析 Redis 处理高并发连接的完整流程,包括:

  1. accept 流程:如何接收新连接
  2. read 流程:如何读取客户端请求
  3. write 流程:如何发送响应数据

二、整体架构图

 1┌─────────────────────────────────────────────────────────────────────────┐
 2│                           Redis 主线程                                   │
 3├─────────────────────────────────────────────────────────────────────────┤
 4│                                                                         │
 5│   ┌───────────────┐    ┌───────────────┐    ┌─────────────┐             │
 6│   │  监听 socket   │    │ 客户端 socket  │    │ 客户端 socket│  ...        │
 7│   │  (listen_fd)  │    │   (conn_fd)   │    │   (conn_fd)  │            │
 8│   └──────┬────────┘    └──────┬────────┘    └──────┬──────┘             │
 9│          │                  │                  │                        │
10│          ▼                  ▼                  ▼                        │
11│   ┌─────────────────────────────────────────────────────────┐          │
12│   │                   I/O 多路复用器                          │          │
13│   │                  (epoll/kqueue)                         │          │
14│   └─────────────────────────┬───────────────────────────────┘          │
15│                             │                                           │
16│                             ▼                                           │
17│   ┌─────────────────────────────────────────────────────────┐          │
18│   │                    事件循环 (aeMain)                      │          │
19│   │                                                          │          │
20│   │   while(!stop) {                                         │          │
21│   │       beforesleep();     // 发送待发送的响应              │          │
22│   │       aeProcessEvents(); // 处理文件事件和时间事件        │          │
23│   │   }                                                      │          │
24│   └─────────────────────────────────────────────────────────┘          │
25│                             │                                           │
26│         ┌───────────────────┼───────────────────┐                       │
27│         ▼                   ▼                   ▼                       │
28│   ┌───────────┐       ┌───────────┐       ┌───────────┐                │
29│   │  accept   │       │   read    │       │   write   │                │
30│   │  Handler  │       │  Handler  │       │  Handler  │                │
31│   └───────────┘       └───────────┘       └───────────┘                │
32│                                                                         │
33└─────────────────────────────────────────────────────────────────────────┘

三、accept 流程:接收新连接

3.1 监听 socket 的创建与注册

Redis 启动时,会创建监听 socket 并注册可读事件:

1// src/server.c:2130-2131
2// 为每个监听的 IP 端口注册可读事件
3if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
4    acceptTcpHandler,NULL) == AE_ERR)

说明

  • server.ipfd[j] 是监听 socket 的文件描述符
  • acceptTcpHandler 是可读事件触发时的回调函数
  • 当有新连接到来时,监听 socket 变为可读,触发 acceptTcpHandler

3.2 接受连接处理器 acceptTcpHandler

 1// src/networking.c:726-744
 2void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 3    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;  // 单次最多接受 1000 个连接
 4    char cip[NET_IP_STR_LEN];
 5    UNUSED(el);
 6    UNUSED(mask);
 7    UNUSED(privdata);
 8
 9    // 循环接受新连接,直到没有更多连接或达到单次上限
10    while(max--) {
11        // 接受一个 TCP 连接,返回客户端 fd
12        // anetTcpAccept 封装了 accept 系统调用
13        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
14        if (cfd == ANET_ERR) {
15            // EWOULDBLOCK 表示没有更多待处理的连接,正常情况
16            if (errno != EWOULDBLOCK)
17                serverLog(LL_WARNING,
18                    "Accepting client connection: %s", server.neterr);
19            return;
20        }
21        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
22        // 创建客户端结构体,注册读事件回调
23        acceptCommonHandler(cfd,0,cip);
24    }
25}

关键设计

  1. 批量接受:单次最多接受 MAX_ACCEPTS_PER_CALL(1000)个连接,避免长时间阻塞
  2. 非阻塞:监听 socket 设置为非阻塞模式,accept 不会阻塞
  3. EWOULDBLOCK 处理:无连接可接受时正常返回,不视为错误

3.3 通用接受处理器 acceptCommonHandler

 1// src/networking.c:647-724
 2#define MAX_ACCEPTS_PER_CALL 1000
 3static void acceptCommonHandler(int fd, int flags, char *ip) {
 4    client *c;
 5    // 创建客户端结构体,注册读事件
 6    if ((c = createClient(fd)) == NULL) {
 7        serverLog(LL_WARNING,
 8            "Error registering fd event for the new client: %s (fd=%d)",
 9            strerror(errno),fd);
10        close(fd); /* May be already closed, just ignore errors */
11        return;
12    }
13    /* If maxclient directive is set and this is one client more... close the
14     * connection. Note that we create the client instead to check before
15     * for this condition, since now the socket is already set in non-blocking
16     * mode and we can send an error for free using the Kernel I/O */
17    // 检查是否超过最大客户端数量限制
18    if (listLength(server.clients) > server.maxclients) {
19        char *err = "-ERR max number of clients reached\r\n";
20
21        /* That's a best effort error message, don't check write errors */
22        if (write(c->fd,err,strlen(err)) == -1) {
23            /* Nothing to do, Just to avoid the warning... */
24        }
25        server.stat_rejected_conn++;  // 统计拒绝的连接数
26        freeClient(c);
27        return;
28    }
29
30    /* If the server is running in protected mode (the default) and there
31     * is no password set, nor a specific interface is bound, we don't accept
32     * requests from non loopback interfaces. Instead we try to explain the
33     * user what to do to fix it if needed. */
34    // 检查保护模式:拒绝非本地的未认证连接
35    if (server.protected_mode &&
36        server.bindaddr_count == 0 &&
37        server.requirepass == NULL &&
38        !(flags & CLIENT_UNIX_SOCKET) &&
39        ip != NULL)
40    {
41        if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
42            char *err =
43                "-DENIED Redis is running in protected mode because protected "
44                "mode is enabled, no bind address was specified, no "
45                "authentication password is requested to clients. In this mode "
46                "connections are only accepted from the loopback interface. "
47                "If you want to connect from external computers to Redis you "
48                "may adopt one of the following solutions: "
49                "1) Just disable protected mode sending the command "
50                "'CONFIG SET protected-mode no' from the loopback interface "
51                "by connecting to Redis from the same host the server is "
52                "running, however MAKE SURE Redis is not publicly accessible "
53                "from internet if you do so. Use CONFIG REWRITE to make this "
54                "change permanent. "
55                "2) Alternatively you can just disable the protected mode by "
56                "editing the Redis configuration file, and setting the protected "
57                "mode option to 'no', and then restarting the server. "
58                "3) If you started the server manually just for testing, restart "
59                "it with the '--protected-mode no' option. "
60                "4) Setup a bind address or an authentication password. "
61                "NOTE: You only need to do one of the above things in order for "
62                "the server to start accepting connections from the outside.\r\n";
63            if (write(c->fd,err,strlen(err)) == -1) {
64                /* Nothing to do, Just to avoid the warning... */
65            }
66            server.stat_rejected_conn++;
67            freeClient(c);
68            return;
69        }
70    }
71
72    server.stat_numconnections++;  // 统计总连接数
73    c->flags |= flags;
74}

关键检查

  1. 最大连接数限制:超过 maxclients 配置时拒绝新连接
  2. 保护模式检查:拒绝来自非本地且未配置认证的连接

3.4 创建客户端 createClient

 1// src/networking.c:85-181
 2client *createClient(int fd) {
 3    client *c = zmalloc(sizeof(client));  // 分配客户端结构体内存
 4
 5    /* passing -1 as fd it is possible to create a non connected client.
 6     * This is useful since all the commands needs to be executed
 7     * in the context of a client. When commands are executed in other
 8     * contexts (for instance a Lua script) we need a non connected client. */
 9    // fd != -1 表示创建真实的网络客户端
10    // fd == -1 用于创建伪客户端(如 Lua 脚本执行、AOF 加载)
11    if (fd != -1) {
12        anetNonBlock(NULL,fd);           // 设置非阻塞模式
13        anetEnableTcpNoDelay(NULL,fd);   // 禁用 Nagle 算法,减少延迟
14        if (server.tcpkeepalive)
15            anetKeepAlive(NULL,fd,server.tcpkeepalive);  // 启用 TCP keepalive
16        // 注册可读事件,回调函数为 readQueryFromClient
17        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
18            readQueryFromClient, c) == AE_ERR)
19        {
20            // 注册失败,关闭 fd 并释放内存
21            close(fd);
22            zfree(c);
23            return NULL;
24        }
25    }
26
27    // 初始化客户端状态
28    selectDb(c,0);  // 默认选择数据库 0
29    uint64_t client_id = atomicIncr(server.next_client_id, 1);
30    c->id = client_id;  // 分配唯一客户端 ID
31    c->fd = fd;
32    c->name = NULL;
33    c->bufpos = 0;  // 固定响应缓冲区位置
34    c->querybuf = sdsempty();  // 查询缓冲区
35    c->querybuf_peak = 0;
36    c->reqtype = 0;
37    c->argc = 0;
38    c->argv = NULL;
39    c->cmd = c->lastcmd = NULL;
40    c->multibulklen = 0;
41    c->bulklen = -1;
42    c->sentlen = 0;
43    c->flags = 0;
44    c->ctime = c->lastinteraction = server.unixtime;  // 记录创建时间和最后交互时间
45    c->authenticated = 0;
46    c->replstate = REPL_STATE_NONE;
47    c->repl_put_online_on_ack = 0;
48    c->reploff = 0;
49    c->read_reploff = 0;
50    c->repl_ack_off = 0;
51    c->repl_ack_time = 0;
52    c->slave_listening_port = 0;
53    c->slave_ip[0] = '\0';
54    c->slave_capa = SLAVE_CAPA_NONE;
55    c->reply = listCreate();  // 响应链表
56    c->reply_bytes = 0;
57    c->obuf_soft_limit_reached_time = 0;
58    listSetFreeMethod(c->reply, zfree);  // 设置链表节点的释放函数
59    c->btype = BLOCKED_NONE;
60    c->bpop.timeout = 0;
61    c->bpop.keys = dictCreate(&objectKeyPointerValueDictType, NULL);
62    c->bpop.target = c->bpop.xread_group = NULL;
63    c->bpop.xread_consumer = NULL;
64    c->bpop.blocked_on_key = 0;
65    c->bpop.numreplicas = 0;
66    c->bpop.numlocal = 0;
67    c->woff = 0;
68    c->watched_keys = listCreate();
69    c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType, NULL);
70    c->pubsub_patterns = listCreate();
71    c->peerid = NULL;
72    c->client_list_node = NULL;
73    listSetFreeMethod(c->pubsub_patterns, decrRefCountVoid);
74    listSetMatchMethod(c->pubsub_patterns, matchPubsubPattern);
75    // 将客户端添加到服务器客户端链表
76    if (fd != -1) {
77        c->client_list_node = listAddNodeTailReturn(server.clients, c);
78    }
79    // 初始化客户端输出缓冲区限制
80    c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES);  // 分配固定响应缓冲区(16KB)
81    // 将客户端添加到待写入客户端列表(用于发送初始响应)
82    listAddNodeHead(server.clients_pending_write, c);
83    return c;
84}

关键操作

  1. 设置非阻塞模式:确保 read/write 不会阻塞事件循环
  2. 禁用 Nagle 算法:减少小数据包的延迟
  3. 注册读事件:回调函数为 readQueryFromClient
  4. 初始化客户端状态:包括查询缓冲区、响应缓冲区等

3.5 accept 流程图

 1                    ┌─────────────────────────────────────────┐
 2                    │      新连接到达监听 socket                 │
 3                    │      (listen_fd 可读)                    │
 4                    └─────────────────┬───────────────────────┘
 5 6 7                    ┌─────────────────────────────────────────┐
 8                    │        acceptTcpHandler() 被调用        │
 9                    │        (监听 socket 的读回调)           │
10                    └─────────────────┬───────────────────────┘
111213                    ┌─────────────────────────────────────────┐
14                    │      anetTcpAccept() -> accept()        │
15                    │      接受连接,返回客户端 fd             │
16                    └─────────────────┬───────────────────────┘
171819                    ┌─────────────────────────────────────────┐
20                    │        acceptCommonHandler()            │
21                    │        检查连接限制和保护模式           │
22                    └─────────────────┬───────────────────────┘
2324                    ┌─────────────────┴───────────────────────┐
25                    │                                       │
26                    ▼                                       ▼
27          ┌─────────────────┐                   ┌─────────────────┐
28          │  超过限制/保护    │                   │   正常连接       │
29          │  发送错误消息     │                   │                 │
30          │  关闭连接        │                   │                 │
31          └─────────────────┘                   └────────┬────────┘
323334                                        ┌─────────────────────────────────┐
35                                        │      createClient(fd)           │
36                                        │  1. 设置非阻塞模式                 │
37                                        │  2. 注册 AE_READABLE 事件         │
38                                        │  3. 初始化客户端状态               │
39                                        └─────────────────────────────────┘

四、read 流程:读取客户端请求

4.1 读事件回调 readQueryFromClient

 1// src/networking.c:1500-1570
 2void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
 3    client *c = (client*) privdata;  // 获取客户端结构体
 4    int nread, readlen;
 5    size_t qblen;
 6    UNUSED(el);
 7    UNUSED(mask);
 8
 9    readlen = PROTO_IOBUF_LEN;  // 默认读取缓冲区大小 16KB
10    /* If this is a multi bulk request, and we are processing a bulk reply
11     * that is large enough, try to maximize the probability that the query
12     * buffer contains exactly the SDS string representing the object, even
13     * at the risk of requiring more read(2) calls. This way the function
14     * processMultiBulkBuffer() can avoid copying buffers to create the
15     * Redis Object representing the argument. */
16    // 大包优化:如果是多批量请求且正在处理大参数,调整读取长度
17    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
18        && c->bulklen >= PROTO_MBULK_BIG_ARG)
19    {
20        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
21
22        /* Note that the 'remaining' variable may be zero in some edge case,
23         * for example once we resume a blocked client after CLIENT PAUSE. */
24        if (remaining > 0 && remaining < readlen) readlen = remaining;
25    }
26
27    qblen = sdslen(c->querybuf);  // 获取当前查询缓冲区长度
28    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
29    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);  // 扩展缓冲区空间
30    // 从 socket 读取数据到查询缓冲区末尾
31    nread = read(fd, c->querybuf+qblen, readlen);
32    if (nread == -1) {
33        // EAGAIN 表示暂无数据可读,非阻塞模式下正常返回
34        if (errno == EAGAIN) {
35            return;
36        } else {
37            // 其他错误,记录日志并释放客户端
38            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
39            freeClient(c);
40            return;
41        }
42    } else if (nread == 0) {
43        // 返回 0 表示客户端关闭连接
44        serverLog(LL_VERBOSE, "Client closed connection");
45        freeClient(c);
46        return;
47    } else if (c->flags & CLIENT_MASTER) {
48        // 主从复制:追加到 pending_querybuf
49        /* Append the query buffer to the pending (not applied) buffer
50         * of the master. We'll use this buffer later in order to have a
51         * copy of the string applied by the last command executed. */
52        c->pending_querybuf = sdscatlen(c->pending_querybuf,
53                                        c->querybuf+qblen,nread);
54    }
55
56    sdsIncrLen(c->querybuf,nread);  // 更新缓冲区有效数据长度
57    c->lastinteraction = server.unixtime;  // 更新最后交互时间,用于超时检测
58    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
59    server.stat_net_input_bytes += nread;  // 统计网络输入字节数
60    // 检查查询缓冲区是否超过最大限制
61    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
62        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
63
64        bytes = sdscatrepr(bytes,c->querybuf,64);
65        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
66        sdsfree(ci);
67        sdsfree(bytes);
68        freeClient(c);
69        return;
70    }
71
72    /* Time to process the buffer. If the client is a master we need to
73     * compute the difference between the applied offset before and after
74     * processing the buffer, to understand how much of the replication stream
75     * was actually applied to the master state: this quantity, and its
76     * corresponding part of the replication stream, will be propagated to
77     * the sub-slaves and to the replication backlog. */
78    // 处理输入缓冲区中的命令并尝试执行
79    processInputBufferAndReplicate(c);
80}

关键处理

  1. 读取数据:从 socket 读取数据到查询缓冲区
  2. 错误处理EAGAIN 正常返回,其他错误关闭连接
  3. 连接关闭read 返回 0 表示客户端关闭连接
  4. 缓冲区限制:检查查询缓冲区是否超过最大限制
  5. 命令处理:调用 processInputBufferAndReplicate 处理命令

4.2 处理输入缓冲区 processInputBuffer

 1// src/networking.c:1453-1498
 2void processInputBuffer(client *c) {
 3    server.current_client = c;  // 设置当前处理的客户端
 4    /* Keep processing while there is something in the input buffer */
 5    // 循环处理查询缓冲区中的所有命令
 6    while(sdslen(c->querybuf)) {
 7        /* Return if clients are paused. */
 8        // 客户端暂停时不处理命令(用于 CLIENT PAUSE 命令)
 9        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
10
11        /* Immediately abort if the client is in the middle of something. */
12        // 如果客户端正在处理阻塞命令,暂停处理
13        if (c->flags & CLIENT_BLOCKED) break;
14
15        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
16         * written to the client. Make sure to not let the reply grow after
17         * this flag has been populated (e.g. don't process more commands). */
18        // 如果标记为关闭,不再处理更多命令
19        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
20
21        /* Determine request type when unknown. */
22        // 确定请求协议类型:内联命令还是多批量命令
23        if (!c->reqtype) {
24            if (c->querybuf[0] == '*') {
25                // 多批量格式:*3\r\n$3\r\nSET\r\n...
26                c->reqtype = PROTO_REQ_MULTIBULK;
27            } else {
28                // 内联格式:SET key value
29                c->reqtype = PROTO_REQ_INLINE;
30            }
31        }
32
33        if (c->reqtype == PROTO_REQ_INLINE) {
34            // 解析内联命令
35            if (processInlineBuffer(c) != C_OK) break;
36        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
37            // 解析多批量命令
38            if (processMultibulkBuffer(c) != C_OK) break;
39        } else {
40            serverPanic("Unknown request type");
41        }
42
43        /* multibulk processing could see a greater length of the query
44         * buffer, this is an optimization so that we don't parse it again. */
45        // 命令参数不完整,等待更多数据
46        if (c->argc == 0) {
47            resetClient(c);
48        } else {
49            /* Only reset the client when the command was executed. */
50            // 执行命令
51            if (processCommand(c) == C_OK) {
52                // 命令执行成功,重置客户端状态准备处理下一个命令
53                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
54                    /* If the client is a master and we are not in a MULTI
55                     * context, we need to update the applied replication
56                     * offset, because this is the point where we actually
57                     * processed the command. */
58                    c->reploff = c->read_reploff - sdslen(c->querybuf);
59                }
60                // 重置客户端状态
61                if (!(c->flags & CLIENT_BLOCKED || c->flags & CLIENT_MULTI))
62                    resetClient(c);
63            }
64            /* freeMemoryIfNeeded may flush slave output buffers. This may
65             * result into a slave, that may be the master of a sub-slave,
66             * to get disconnected. Handle the special case.
67             * Sometimes the flushed output buffer can contain a PARTIAL
68             * command. If this is the case, reset the client. */
69            // 如果客户端被关闭,跳出循环
70            if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;
71        }
72    }
73    server.current_client = NULL;  // 清除当前客户端引用
74}

处理流程

  1. 确定协议类型:内联命令还是多批量命令
  2. 解析命令:将请求解析为 argcargv
  3. 执行命令:调用 processCommand 执行命令
  4. 重置状态:准备处理下一个命令

4.3 read 流程图

 1                    ┌─────────────────────────────────────────┐
 2                    │      客户端 socket 可读                 │
 3                    │      (有数据到达)                       │
 4                    └─────────────────┬───────────────────────┘
 5 6 7                    ┌─────────────────────────────────────────┐
 8                    │     readQueryFromClient() 被调用        │
 9                    │     (客户端 socket 的读回调)            │
10                    └─────────────────┬───────────────────────┘
111213                    ┌─────────────────────────────────────────┐
14                    │          read(fd, buf, len)             │
15                    │          读取数据到查询缓冲区           │
16                    └─────────────────┬───────────────────────┘
1718                    ┌─────────────────┼───────────────────────┐
19                    │                 │                       │
20                    ▼                 ▼                       ▼
21           ┌──────────────┐  ┌──────────────┐      ┌──────────────┐
22           │ nread == -1  │  │ nread == 0   │      │ nread > 0    │
23           │              │  │              │      │              │
24           │ EAGAIN: 返回 │  │ 客户端关闭   │      │ 正常读取     │
25           │ 其他: 关闭   │  │ freeClient() │      │              │
26           └──────────────┘  └──────────────┘      └───────┬──────┘
272829                                        ┌─────────────────────────────────┐
30                                        │  processInputBufferAndReplicate │
31                                        │  解析并执行命令                  │
32                                        └─────────────────────────────────┘
3334                         ┌───────────────────────────────┼───────────────────┐
35                         ▼                               ▼                   ▼
36                ┌─────────────────┐           ┌─────────────────┐   ┌─────────────┐
37                │ processInline   │           │ processMultibulk│   │ 执行命令    │
38                │ Buffer()        │           │ Buffer()        │   │ processCommand│
39                │ (内联命令)      │           │ (RESP 协议)     │   │             │
40                └─────────────────┘           └─────────────────┘   └─────────────┘

五、write 流程:发送响应数据

5.1 响应写入策略

Redis 采用同步优先、异步兜底的策略发送响应:

  1. 同步发送:在 beforeSleep 中尝试直接发送响应
  2. 异步发送:如果同步发送未完成,注册写事件等待发送

5.2 beforeSleep 中的写入处理

 1// src/server.c:1355-1410
 2/* This function gets called every time Redis is entering the
 3 * main loop of the event driven library, that is, before to sleep
 4 * for ready file descriptors. */
 5void beforeSleep(struct aeEventLoop *eventLoop) {
 6    UNUSED(eventLoop);
 7
 8    /* Call the Redis Cluster before sleep function. Note that this function
 9     * may change the state of Redis Cluster (from ok to fail or vice versa),
10     * so it's a good idea to call it before serving the unblocked clients
11     * later in this function. */
12    // Redis Cluster 相关处理
13    if (server.cluster_enabled) clusterBeforeSleep();
14
15    /* Run a fast expire cycle (the called function will return
16     * ASAP if a fast cycle is not needed). */
17    // 快速过期键检查
18    if (server.active_expire_enabled && server.masterhost == NULL)
19        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
20
21    /* Send all the slaves an ACK request if at least one client blocked
22     * during the previous event loop iteration. */
23    // 发送 REPLCONF GETACK 给从节点
24    if (server.get_ack_from_slaves) {
25        robj *argv[3];
26
27        argv[0] = createStringObject("REPLCONF",8);
28        argv[1] = createStringObject("GETACK",6);
29        argv[2] = createStringObject("*",1); /* Not used argument. */
30        replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
31        decrRefCount(argv[0]);
32        decrRefCount(argv[1]);
33        decrRefCount(argv[2]);
34        server.get_ack_from_slaves = 0;
35    }
36
37    /* Unblock all the clients blocked for synchronous replication
38     * in WAIT. */
39    // 处理等待同步复制确认的客户端
40    if (listLength(server.clients_waiting_acks))
41        processClientsWaitingReplicas();
42
43    /* Check if there are clients unblocked by modules that implement
44     * blocking commands. */
45    // 处理模块阻塞的客户端
46    moduleHandleBlockedClients();
47
48    /* Try to process pending commands for clients that were just unblocked. */
49    if (listLength(server.unblocked_clients))
50        processUnblockedClients();
51
52    /* Write the AOF buffer on disk */
53    // 刷新 AOF 缓冲区到磁盘
54    flushAppendOnlyFile(0);
55
56    /* Handle writes with pending output buffers. */
57    // 处理待发送的响应数据(关键!)
58    handleClientsWithPendingWrites();
59
60    /* Before we are going to sleep, let the threads access the dataset by
61     * releasing the GIL. Redis main thread will not touch anything at this
62     * time. */
63    // 释放 GIL(模块线程安全)
64    if (moduleCount()) moduleReleaseGIL();
65}

5.3 处理待发送客户端 handleClientsWithPendingWrites

 1// src/networking.c:1064-1104
 2int handleClientsWithPendingWrites(void) {
 3    listIter li;
 4    listNode *ln;
 5    int processed = listLength(server.clients_pending_write);  // 统计待处理客户端数量
 6
 7    listRewind(server.clients_pending_write,&li);  // 初始化迭代器
 8    while((ln = listNext(&li))) {
 9        client *c = listNodeValue(ln);  // 获取客户端结构体
10        c->flags &= ~CLIENT_PENDING_WRITE;  // 清除待写标志
11        listDelNode(server.clients_pending_write,ln);  // 从待写列表中移除
12
13        /* If the client is protected, don't do anything,
14         * that may prevent the client to be freed. */
15        // 跳过受保护的客户端(如被暂停的客户端)
16        if (c->flags & CLIENT_PROTECTED) continue;
17
18        /* Try to write buffers to the client socket. */
19        // 优先尝试同步写入,直接发送响应数据
20        // 这样可以避免注册写事件的开销,提高响应速度
21        if (writeToClient(c->fd,c,0) == C_ERR) continue;
22
23        /* If after the synchronous writes above there is still data to
24         * output, we need to install a write handler. */
25        // 如果同步写入后仍有数据未发送完(如发送缓冲区满)
26        // 则注册可写事件,等待 socket 可写时异步发送
27        if (clientHasPendingReplies(c)) {
28            int ae_flags = AE_WRITABLE;
29            // 如果开启了 AOF 且 fsync 策略为 always
30            // 需要设置 AE_BARRIER,确保先刷盘再发送响应
31            /* For the fsync=always policy, we want the write operation
32             * to be performed before the reply is sent. */
33            if (server.aof_state == AOF_ON &&
34                server.aof_fsync == AOF_FSYNC_ALWAYS)
35            {
36                ae_flags |= AE_BARRIER;
37            }
38            // 注册可写事件,回调函数为 sendReplyToClient
39            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
40                sendReplyToClient, c) == AE_ERR)
41            {
42                    freeClientAsync(c);  // 注册失败,异步释放客户端
43            }
44        }
45    }
46    return processed;
47}

关键设计

  1. 同步优先:先尝试直接写入,减少事件注册开销
  2. 异步兜底:同步写入未完成时,注册写事件
  3. AE_BARRIER:特定场景下确保数据持久化顺序

5.4 写入数据到客户端 writeToClient

  1// src/networking.c:961-1051
  2int writeToClient(int fd, client *c, int handler_installed) {
  3    ssize_t nwritten = 0, totwritten = 0;  // 单次写入量、总写入量
  4    size_t objlen;
  5    clientReplyBlock *o;  // 响应数据块
  6
  7    // 循环发送,直到没有待发送数据
  8    while(clientHasPendingReplies(c)) {
  9        // 优先发送固定缓冲区中的数据(小响应)
 10        if (c->bufpos > 0) {
 11            // write 系统调用,写入 [sentlen, bufpos) 区间的数据
 12            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
 13            if (nwritten <= 0) break;  // 写入失败或 EAGAIN
 14            c->sentlen += nwritten;    // 更新已发送偏移量
 15            totwritten += nwritten;
 16
 17            // 如果固定缓冲区数据全部发送完毕
 18            /* If the buffer was sent, set bufpos to zero to continue with
 19             * the remainder of the reply. */
 20            if ((int)c->sentlen == c->bufpos) {
 21                c->bufpos = 0;    // 重置缓冲区位置
 22                c->sentlen = 0;   // 重置已发送偏移
 23            }
 24        } else {
 25            // 固定缓冲区为空,从链表中取数据块发送(大响应)
 26            o = listNodeValue(listFirst(c->reply));
 27            objlen = o->used;
 28
 29            if (objlen == 0) {
 30                c->reply_bytes -= o->size;
 31                listDelNode(c->reply,listFirst(c->reply));
 32                continue;
 33            }
 34
 35            // 写入数据块中未发送的部分
 36            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
 37            if (nwritten <= 0) break;
 38            c->sentlen += nwritten;
 39            totwritten += nwritten;
 40
 41            // 如果数据块全部发送完毕,从链表中移除
 42            /* If we fully sent the object on head go to the next one */
 43            if (c->sentlen == objlen) {
 44                c->reply_bytes -= o->size;
 45                listDelNode(c->reply,listFirst(c->reply));
 46                c->sentlen = 0;
 47                /* If there are no longer objects in the list, we expect
 48                 * the count of reply bytes to be exactly zero. */
 49                if (listLength(c->reply) == 0)
 50                    serverAssert(c->reply_bytes == 0);
 51            }
 52        }
 53        // 限制单次写入数据量,避免长时间占用事件循环
 54        // NET_MAX_WRITES_PER_EVENT 默认 64MB
 55        /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
 56         * bytes, in a single threaded server it's a good idea to serve
 57         * other clients as well, even if a very large request comes from
 58         * super fast link that is always able to accept data (in real world
 59         * scenario think about 'KEYS *' against the loopback interface).
 60         *
 61         * However if we are over the maxmemory limit we ignore that and
 62         * just deliver as much data as it is possible to deliver.
 63         *
 64         * Moreover, we also send as much as possible if the client is
 65         * a slave (otherwise, on high-speed traffic, the replication
 66         * buffer will grow indefinitely) */
 67        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
 68            (server.maxmemory == 0 ||
 69             zmalloc_used_memory() < server.maxmemory) &&
 70            !(c->flags & CLIENT_SLAVE)) break;
 71    }
 72    server.stat_net_output_bytes += totwritten;  // 统计网络输出字节数
 73    if (nwritten == -1) {
 74        if (errno == EAGAIN) {
 75            nwritten = 0;  // EAGAIN 不是错误,只是暂时无法写入
 76        } else {
 77            serverLog(LL_VERBOSE,
 78                "Error writing to client: %s", strerror(errno));
 79            freeClient(c);
 80            return C_ERR;
 81        }
 82    }
 83    if (totwritten > 0) {
 84        /* For clients representing masters we don't count sending data
 85         * as an interaction, since we always send REPLCONF ACK commands
 86         * that take some time to just fill the socket output buffer.
 87         * We just rely on data / pings received for timeout detection. */
 88        // 更新最后交互时间(主节点连接除外)
 89        if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
 90    }
 91    // 所有数据发送完毕,清理状态并移除写事件
 92    if (!clientHasPendingReplies(c)) {
 93        c->sentlen = 0;
 94        // handler_installed 表示写事件是否已注册
 95        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
 96
 97        /* Close connection after entire reply has been sent. */
 98        // 如果标记为发送后关闭,执行关闭
 99        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
100            freeClient(c);
101            return C_ERR;
102        }
103    }
104    return C_OK;
105}

关键设计

  1. 两级缓冲:固定缓冲区(16KB)+ 链表缓冲区
  2. 写入量限制:防止大响应长时间占用事件循环
  3. EAGAIN 处理:发送缓冲区满时返回,等待下次发送

5.5 异步写回调 sendReplyToClient

1// src/networking.c:1106-1112
2void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
3    UNUSED(el);
4    UNUSED(mask);
5    // 直接调用 writeToClient,handler_installed=1 表示已注册写事件
6    writeToClient(fd,privdata,1);
7}

5.6 write 流程图

 1┌─────────────────────────────────────────────────────────────────────────┐
 2│                        命令执行完成,生成响应                               │
 3│                     (添加到 client->buf 或 client->reply)                 │
 4└─────────────────────────────────────────────────────────────────────────┘
 5 6 7┌─────────────────────────────────────────────────────────────────────────┐
 8│                  将 client 加入 server.clients_pending_write             │
 9└─────────────────────────────────────────────────────────────────────────┘
101112┌─────────────────────────────────────────────────────────────────────────┐
13│                    beforeSleep() 被调用                                  │
14│                   (事件循环每次阻塞前执行)                                  │
15└─────────────────────────────────────────────────────────────────────────┘
161718┌─────────────────────────────────────────────────────────────────────────┐
19│               handleClientsWithPendingWrites()                          │
20│              遍历所有待发送响应的客户端                                      │
21└─────────────────────────────────────────────────────────────────────────┘
222324┌─────────────────────────────────────────────────────────────────────────┐
25│                    writeToClient(fd, c, 0)                              │
26│                    尝试同步写入响应数据                                     │
27└─────────────────────────────────────────────────────────────────────────┘
2829                    ┌─────────────────┴─────────────────┐
30                    │                                   │
31                    ▼                                   ▼
32        ┌───────────────────────┐         ┌───────────────────────┐
33        │   全部发送完毕          │         │   还有数据未发送       │
34        │   (发送缓冲区未满)       │         │   (发送缓冲区满)       │
35        └───────────────────────┘         └───────────┬───────────┘
363738                                        ┌─────────────────────────────────┐
39                                        │  aeCreateFileEvent(fd,          │
40                                        │     AE_WRITABLE,                │
41                                        │     sendReplyToClient)          │
42                                        │  注册写事件,等待 socket 可写       │
43                                        └─────────────────────────────────┘
444546                                        ┌─────────────────────────────────┐
47                                        │  socket 可写时                   │
48                                        │  sendReplyToClient() 被调用      │
49                                        │  → writeToClient(fd, c, 1)      │
50                                        │  继续发送剩余数据                  │
51                                        └─────────────────────────────────┘
5253                            ┌─────────────────────────┴─────────────────┐
54                            │                                           │
55                            ▼                                           ▼
56                ┌─────────────────────────┐             ┌─────────────────────────┐
57                │   全部发送完毕            │             │   还有数据未发送           │
58                │   删除写事件              │             │   等待下次 socket 可写    │
59                │   aeDeleteFileEvent()   │             │                         │
60                └─────────────────────────┘             └─────────────────────────┘

六、完整连接生命周期流程图

 1┌─────────────────────────────────────────────────────────────────────────┐
 2│                           客户端连接生命周期                               │
 3└─────────────────────────────────────────────────────────────────────────┘
 4
 5┌───────────────┐     ┌───────────────┐     ┌───────────────┐
 6│   客户端       │     │   Redis       │     │   事件循环      │
 7└───────┬───────┘     └───────┬───────┘     └───────┬───────┘
 8        │                     │                     │
 9        │  1. SYN             │                     │
10        │ ──────────────────> │                     │
11        │                     │                     │
12        │  2. SYN+ACK         │                     │
13        │ <────────────────── │                     │
14        │                     │                     │
15        │  3. ACK             │                     │
16        │ ──────────────────> │                     │
17        │                     │                     │
18        │                     │  listen_fd 可读      │
19        │                     │ ──────────────────> │
20        │                     │                     │
21        │                     │  4. acceptTcpHandler
22        │                     │ <────────────────── │
23        │                     │                     │
24        │                     │  5. createClient    │
25        │                     │     注册读事件        │
26        │                     │ ──────────────────> │
27        │                     │                     │
28        │  6. 发送命令         │                     │
29        │ ──────────────────> │                     │
30        │                     │                     │
31        │                     │  client_fd 可读     │
32        │                     │ ──────────────────> │
33        │                     │                     │
34        │                     │  7. readQueryFromClient
35        │                     │ <────────────────── │
36        │                     │                     │
37        │                     │  8. processCommand │
38        │                     │     执行命令        │
39        │                     │     生成响应        │
40        │                     │                     │
41        │                     │  9. beforeSleep    │
42        │                     │ <───────────────── │
43        │                     │                     │
44        │                     │  10. writeToClient │
45        │                     │      发送响应       │
46        │  11. 收到响应         │ ─────────────────> │
47        │ <────────────────── │                     │
48        │                     │                     │
49        │  ... 循环处理多个命令 ...                    │
50        │                     │                     │
51        │  12. 关闭连接        │                     │
52        │ ──────────────────> │                     │
53        │                     │  read 返回 0         │
54        │                     │ ──────────────────> │
55        │                     │                     │
56        │                     │  13. freeClient     │
57        │                     │ <────────────────── │
58        │                     │                     │
59        ▼                     ▼                     ▼

七、高并发处理的关键设计

7.1 非阻塞 I/O

1// 所有 socket 都设置为非阻塞模式
2anetNonBlock(NULL, fd);

优势

  • acceptreadwrite 都不会阻塞事件循环
  • 单线程可以处理大量并发连接

7.2 事件驱动

1// 监听 socket 注册读事件
2aeCreateFileEvent(server.el, listen_fd, AE_READABLE, acceptTcpHandler, NULL);
3
4// 客户端 socket 注册读事件
5aeCreateFileEvent(server.el, client_fd, AE_READABLE, readQueryFromClient, c);

优势

  • 只在有事件发生时才处理,避免无效轮询
  • 通过 I/O 多路复用实现高效的事件分发

7.3 批量处理

1// accept: 单次最多接受 1000 个连接
2int max = MAX_ACCEPTS_PER_CALL;
3while(max--) { ... }
4
5// read: 一次读取 16KB 数据
6readlen = PROTO_IOBUF_LEN;
7
8// write: 单次最多写入 64MB 数据
9if (totwritten > NET_MAX_WRITES_PER_EVENT) break;

优势

  • 减少系统调用次数
  • 平衡公平性和效率

7.4 两级响应缓冲区

1// 小响应使用固定缓冲区(16KB)
2c->buf[PROTO_REPLY_CHUNK_BYTES];
3c->bufpos;
4
5// 大响应使用链表缓冲区
6c->reply;  // list of clientReplyBlock

优势

  • 小响应无需额外内存分配
  • 大响应通过链表支持无限大小

7.5 同步优先写入策略

1// beforeSleep 中尝试同步写入
2if (writeToClient(c->fd, c, 0) == C_ERR) continue;
3
4// 只有同步写入未完成才注册写事件
5if (clientHasPendingReplies(c)) {
6    aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c);
7}

优势

  • 减少事件注册/删除开销
  • 大多数小响应可以直接发送完成

八、总结

Redis 处理高并发连接的核心机制:

阶段 关键函数 核心设计
accept acceptTcpHandlercreateClient 批量接受、非阻塞、注册读事件
read readQueryFromClientprocessInputBuffer 非阻塞读取、协议解析、命令执行
write handleClientsWithPendingWriteswriteToClient 同步优先、异步兜底、两级缓冲

核心优势

  1. 单线程简化并发:无需锁,无上下文切换开销
  2. 非阻塞 I/O:所有操作都不会阻塞事件循环
  3. 事件驱动:只在有事件时处理,高效利用 CPU
  4. 批量处理:减少系统调用,平衡公平性
  5. 灵活缓冲:两级缓冲区适应不同大小的响应

通过这些设计,Redis 单线程即可处理数十万并发连接,充分发挥 I/O 多路复用的性能优势。

— END —