文件事件(File Event)源码详解
一、概述
Redis 基于 Reactor 模式实现了一套高效的事件驱动机制,其中**文件事件(File Event)**是其核心组件之一。文件事件用于处理网络 I/O 操作,包括客户端连接建立、命令接收、响应发送等。通过 I/O 多路复用技术,Redis 能够在单线程中高效地处理大量并发连接。
本文将深入剖析 Redis 文件事件的实现原理,从数据结构到核心 API,再到实际应用场景。
二、核心数据结构
2.1 文件事件结构体 aeFileEvent
1// src/ae.h:71-76
2typedef struct aeFileEvent {
3 int mask; // 事件类型掩码:AE_READABLE | AE_WRITABLE | AE_BARRIER
4 aeFileProc *rfileProc; // 读事件回调函数
5 aeFileProc *wfileProc; // 写事件回调函数
6 void *clientData; // 用户自定义数据
7} aeFileEvent;
字段解析:
-
mask:标识事件的类型,支持以下取值:AE_NONE (0):无事件注册AE_READABLE (1):可读事件,当 fd 可读时触发AE_WRITABLE (2):可写事件,当 fd 可写时触发AE_BARRIER (4):屏障标志,用于控制读写事件的触发顺序
-
rfileProc:读事件触发时的回调函数指针 -
wfileProc:写事件触发时的回调函数指针 -
clientData:传递给回调函数的用户数据(通常是client结构体指针)
2.2 已触发事件结构体 aeFiredEvent
1// src/ae.h:91-94
2typedef struct aeFiredEvent {
3 int fd; // 触发事件的文件描述符
4 int mask; // 触发的事件类型
5} aeFiredEvent;
当 aeApiPoll 返回后,触发的事件信息会被填充到这个结构体数组中,供后续的事件分发使用。
2.3 事件循环结构体 aeEventLoop
1// src/ae.h:97-109
2typedef struct aeEventLoop {
3 int maxfd; // 当前注册的最大 fd
4 int setsize; // 事件表容量上限
5 long long timeEventNextId;
6 time_t lastTime;
7 aeFileEvent *events; // 注册的事件表(按 fd 索引)
8 aeFiredEvent *fired; // 已触发的事件表
9 aeTimeEvent *timeEventHead;
10 int stop;
11 void *apidata; // 多路复用 API 的私有数据
12 aeBeforeSleepProc *beforesleep;
13 aeBeforeSleepProc *aftersleep;
14} aeEventLoop;
关键点:
events数组以 fd 为索引,直接定位对应的文件事件fired数组暂存aeApiPoll返回的触发事件apidata存储底层多路复用机制的状态数据(如 epoll 的epfd)
三、事件类型详解
3.1 可读事件 AE_READABLE
可读事件在以下情况触发:
- 新连接到达:监听 socket 有新的客户端连接请求
- 数据到达:客户端 socket 有数据可读(命令请求)
3.2 可写事件 AE_WRITABLE
可写事件在以下情况触发:
- 发送缓冲区未满:可以向客户端 socket 写入数据(发送响应)
3.3 屏障标志 AE_BARRIER
AE_BARRIER 是 Redis 5.0 引入的一个特殊标志,用于控制读写事件的触发顺序。
1// src/ae.h:44-48
2#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the
3 READABLE event already fired in the same event
4 loop iteration. Useful when you want to persist
5 things to disk before sending replies, and want
6 to do that in a group fashion. */
使用场景:当 appendfsync always 配置生效时,Redis 需要确保数据先持久化到磁盘,再发送响应给客户端。设置 AE_BARRIER 后,会先触发写事件,再触发读事件(与默认顺序相反)。
四、核心 API 实现
4.1 创建文件事件 aeCreateFileEvent
1// src/ae.c:136-154
2int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
3 aeFileProc *proc, void *clientData)
4{
5 // 检查 fd 是否超出事件表容量上限,防止数组越界
6 if (fd >= eventLoop->setsize) {
7 errno = ERANGE;
8 return AE_ERR;
9 }
10 // 通过 fd 直接索引获取对应的文件事件结构体
11 aeFileEvent *fe = &eventLoop->events[fd];
12
13 // 调用底层 I/O 多路复用接口注册事件(如 epoll_ctl)
14 if (aeApiAddEvent(eventLoop, fd, mask) == -1)
15 return AE_ERR;
16 // 更新事件掩码,合并已有事件类型
17 fe->mask |= mask;
18 // 根据事件类型设置对应的回调函数
19 if (mask & AE_READABLE) fe->rfileProc = proc; // 设置读事件回调
20 if (mask & AE_WRITABLE) fe->wfileProc = proc; // 设置写事件回调
21 // 保存用户数据指针,通常指向 client 结构体
22 fe->clientData = clientData;
23 // 维护 maxfd,用于优化遍历范围
24 if (fd > eventLoop->maxfd)
25 eventLoop->maxfd = fd;
26 return AE_OK;
27}
执行流程:
- 检查 fd 是否超出容量限制
- 调用底层
aeApiAddEvent将 fd 注册到多路复用器 - 更新
aeFileEvent的事件掩码和回调函数 - 更新
maxfd记录
4.2 删除文件事件 aeDeleteFileEvent
1// src/ae.c:156-176
2void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
3{
4 // 边界检查:fd 超出容量则直接返回
5 if (fd >= eventLoop->setsize) return;
6 aeFileEvent *fe = &eventLoop->events[fd];
7 // 如果该 fd 上没有任何事件,无需处理
8 if (fe->mask == AE_NONE) return;
9
10 // 删除写事件时,需要同时清除 AE_BARRIER 标志
11 // 因为 AE_BARRIER 是依附于 AE_WRITABLE 存在的
12 /* We want to always remove AE_BARRIER if set when AE_WRITABLE
13 * is removed. */
14 if (mask & AE_WRITABLE) mask |= AE_BARRIER;
15
16 // 调用底层接口从多路复用器中移除指定事件
17 aeApiDelEvent(eventLoop, fd, mask);
18 // 更新事件掩码,清除指定的事件位
19 fe->mask = fe->mask & (~mask);
20 // 如果删除的是当前 maxfd 且该 fd 已无任何事件
21 // 需要向前遍历找到新的 maxfd,用于优化后续遍历
22 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
23 /* Update the max fd */
24 int j;
25 for (j = eventLoop->maxfd-1; j >= 0; j--)
26 if (eventLoop->events[j].mask != AE_NONE) break;
27 eventLoop->maxfd = j;
28 }
29}
执行流程:
- 边界检查
- 删除写事件时,同时清除
AE_BARRIER标志 - 调用底层
aeApiDelEvent从多路复用器移除事件 - 如果删除的是
maxfd且该 fd 已无任何事件,则重新计算maxfd
4.3 事件分发处理
文件事件的处理发生在 aeProcessEvents 函数中:
1// src/ae.c:417-464 (核心部分)
2for (j = 0; j < numevents; j++) {
3 // 从已触发事件数组中获取触发的事件信息
4 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
5 int mask = eventLoop->fired[j].mask; // 触发的事件类型
6 int fd = eventLoop->fired[j].fd; // 触发事件的 fd
7 int fired = 0; // 记录已触发的回调次数,用于去重
8
9 // 检查是否设置了 AE_BARRIER,控制读写事件的触发顺序
10 int invert = fe->mask & AE_BARRIER;
11
12 // 默认情况:先触发读事件(未设置 AE_BARRIER 时)
13 /* Fire the readable event if the call sequence is not inverted. */
14 if (!invert && fe->mask & mask & AE_READABLE) {
15 fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 调用读回调
16 fired++;
17 }
18
19 // 触发写事件
20 /* Fire the writable event. */
21 if (fe->mask & mask & AE_WRITABLE) {
22 // 如果读回调已触发,需检查读写回调是否相同,避免重复调用
23 if (!fired || fe->wfileProc != fe->rfileProc) {
24 fe->wfileProc(eventLoop,fd,fe->clientData,mask); // 调用写回调
25 fired++;
26 }
27 }
28
29 // 设置 AE_BARRIER 时:后触发读事件(先写后读)
30 /* If we have to invert the call, fire the readable event now. */
31 if (invert && fe->mask & mask & AE_READABLE) {
32 if (!fired || fe->wfileProc != fe->rfileProc) {
33 fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 调用读回调
34 fired++;
35 }
36 }
37
38 processed++;
39}
分发逻辑:
- 默认顺序:先触发读事件,再触发写事件
- 设置
AE_BARRIER时:先触发写事件,再触发读事件 - 如果读写回调函数相同,则只调用一次
五、底层多路复用实现
Redis 支持多种 I/O 多路复用机制,按性能优先级选择:
1// src/ae.c:47-61
2#ifdef HAVE_EVPORT
3#include "ae_evport.c"
4#else
5 #ifdef HAVE_EPOLL
6 #include "ae_epoll.c"
7 #else
8 #ifdef HAVE_KQUEUE
9 #include "ae_kqueue.c"
10 #else
11 #include "ae_select.c"
12 #endif
13 #endif
14#endif
5.1 epoll 实现(Linux)
1// src/ae_epoll.c:34-37
2typedef struct aeApiState {
3 int epfd; // epoll 实例 fd
4 struct epoll_event *events; // 事件数组
5} aeApiState;
核心函数:
1// src/ae_epoll.c:73-88
2static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
3 aeApiState *state = eventLoop->apidata;
4 struct epoll_event ee = {0}; // 初始化 epoll_event 结构体
5 // 判断是新增还是修改:如果该 fd 上没有事件,则 ADD;否则 MOD
6 /* If the fd was already monitored for some event, we need a MOD
7 * operation. Otherwise we need an ADD operation. */
8 int op = eventLoop->events[fd].mask == AE_NONE ?
9 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
10
11 ee.events = 0;
12 // 合并已有的事件类型,保留之前注册的事件
13 mask |= eventLoop->events[fd].mask; /* Merge old events */
14 // 将 Redis 的事件类型转换为 epoll 的事件类型
15 if (mask & AE_READABLE) ee.events |= EPOLLIN; // 可读 -> EPOLLIN
16 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // 可写 -> EPOLLOUT
17 ee.data.fd = fd; // 保存 fd,供后续事件触发时使用
18 // 调用 epoll_ctl 注册或修改事件监听
19 if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
20 return 0;
21}
1// src/ae_epoll.c:108-131
2static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
3 aeApiState *state = eventLoop->apidata;
4 int retval, numevents = 0;
5
6 // 调用 epoll_wait 等待事件触发
7 // 超时时间:tvp 为 NULL 表示无限等待,否则转换为毫秒
8 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
9 tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
10 if (retval > 0) {
11 int j;
12 numevents = retval;
13 // 遍历所有触发的事件
14 for (j = 0; j < numevents; j++) {
15 int mask = 0;
16 struct epoll_event *e = state->events+j;
17
18 // 将 epoll 事件类型转换为 Redis 事件类型
19 if (e->events & EPOLLIN) mask |= AE_READABLE; // 数据可读
20 if (e->events & EPOLLOUT) mask |= AE_WRITABLE; // 可写数据
21 if (e->events & EPOLLERR) mask |= AE_WRITABLE; // 错误事件,标记为可写以便处理
22 if (e->events & EPOLLHUP) mask |= AE_WRITABLE; // 挂断事件,标记为可写以便处理
23 // 将触发的事件信息填充到 fired 数组
24 eventLoop->fired[j].fd = e->data.fd;
25 eventLoop->fired[j].mask = mask;
26 }
27 }
28 return numevents; // 返回触发的事件数量
29}
六、文件事件在 Redis 中的应用
6.1 服务端监听连接
Redis 启动时,会为监听 socket 注册可读事件,回调函数为 acceptTcpHandler:
1// src/server.c:2130-2131
2if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
3 acceptTcpHandler,NULL) == AE_ERR)
acceptTcpHandler 负责接受新连接:
1// src/networking.c:726-744
2void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
3 int cport, cfd, max = MAX_ACCEPTS_PER_CALL; // 单次最多接受 1000 个连接
4 char cip[NET_IP_STR_LEN];
5 UNUSED(el);
6 UNUSED(mask);
7 UNUSED(privdata);
8
9 // 循环接受新连接,直到没有更多连接或达到单次上限
10 while(max--) {
11 // 接受一个 TCP 连接,返回客户端 fd
12 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
13 if (cfd == ANET_ERR) {
14 // EWOULDBLOCK 表示没有更多待处理的连接,正常情况
15 if (errno != EWOULDBLOCK)
16 serverLog(LL_WARNING,
17 "Accepting client connection: %s", server.neterr);
18 return;
19 }
20 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
21 // 创建客户端结构体,注册读事件回调
22 acceptCommonHandler(cfd,0,cip);
23 }
24}
6.2 客户端连接建立
当新连接被接受后,createClient 会为其注册可读事件:
1// src/networking.c:85-104
2client *createClient(int fd) {
3 client *c = zmalloc(sizeof(client)); // 分配客户端结构体内存
4
5 // fd != -1 表示创建真实的网络客户端
6 // fd == -1 用于创建伪客户端(如 Lua 脚本执行)
7 /* passing -1 as fd it is possible to create a non connected client.
8 * This is useful since all the commands needs to be executed
9 * in the context of a client. When commands are executed in other
10 * contexts (for instance a Lua script) we need a non connected client. */
11 if (fd != -1) {
12 anetNonBlock(NULL,fd); // 设置非阻塞模式
13 anetEnableTcpNoDelay(NULL,fd); // 禁用 Nagle 算法,减少延迟
14 if (server.tcpkeepalive)
15 anetKeepAlive(NULL,fd,server.tcpkeepalive); // 启用 TCP keepalive
16 // 注册可读事件,回调函数为 readQueryFromClient
17 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
18 readQueryFromClient, c) == AE_ERR)
19 {
20 // 注册失败,关闭 fd 并释放内存
21 close(fd);
22 zfree(c);
23 return NULL;
24 }
25 }
26 // ... 初始化其他字段
27}
6.3 读取客户端命令
readQueryFromClient 是客户端可读事件的回调函数:
1// src/networking.c:1500-1570
2void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
3 client *c = (client*) privdata; // 获取客户端结构体
4 int nread, readlen;
5 size_t qblen;
6 UNUSED(el);
7 UNUSED(mask);
8
9 readlen = PROTO_IOBUF_LEN; // 默认读取缓冲区大小 16KB
10 // 大包优化:如果是多批量请求且正在处理大参数,调整读取长度
11 /* If this is a multi bulk request, and we are processing a bulk reply
12 * that is large enough, try to maximize the probability that the query
13 * buffer contains exactly the SDS string representing the object, even
14 * at the risk of requiring more read(2) calls. This way the function
15 * processMultiBulkBuffer() can avoid copying buffers to create the
16 * Redis Object representing the argument. */
17 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
18 && c->bulklen >= PROTO_MBULK_BIG_ARG)
19 {
20 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
21
22 /* Note that the 'remaining' variable may be zero in some edge case,
23 * for example once we resume a blocked client after CLIENT PAUSE. */
24 if (remaining > 0 && remaining < readlen) readlen = remaining;
25 }
26
27 qblen = sdslen(c->querybuf); // 获取当前查询缓冲区长度
28 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
29 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 扩展缓冲区空间
30 // 从 socket 读取数据到查询缓冲区末尾
31 nread = read(fd, c->querybuf+qblen, readlen);
32 if (nread == -1) {
33 // EAGAIN 表示暂无数据可读,非阻塞模式下正常返回
34 if (errno == EAGAIN) {
35 return;
36 } else {
37 serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
38 freeClient(c);
39 return;
40 }
41 } else if (nread == 0) {
42 // 返回 0 表示客户端关闭连接
43 serverLog(LL_VERBOSE, "Client closed connection");
44 freeClient(c);
45 return;
46 } else if (c->flags & CLIENT_MASTER) {
47 // 主从复制:追加到 pending_querybuf
48 /* Append the query buffer to the pending (not applied) buffer
49 * of the master. We'll use this buffer later in order to have a
50 * copy of the string applied by the last command executed. */
51 c->pending_querybuf = sdscatlen(c->pending_querybuf,
52 c->querybuf+qblen,nread);
53 }
54
55 sdsIncrLen(c->querybuf,nread); // 更新缓冲区有效数据长度
56 c->lastinteraction = server.unixtime; // 更新最后交互时间,用于超时检测
57 if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
58 server.stat_net_input_bytes += nread; // 统计网络输入字节数
59 // 检查查询缓冲区是否超过最大限制
60 if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
61 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
62
63 bytes = sdscatrepr(bytes,c->querybuf,64);
64 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
65 sdsfree(ci);
66 sdsfree(bytes);
67 freeClient(c);
68 return;
69 }
70
71 // 处理输入缓冲区中的命令并尝试执行
72 /* Time to process the buffer. If the client is a master we need to
73 * compute the difference between the applied offset before and after
74 * processing the buffer, to understand how much of the replication stream
75 * was actually applied to the master state: this quantity, and its
76 * corresponding part of the replication stream, will be propagated to
77 * the sub-slaves and to the replication backlog. */
78 processInputBufferAndReplicate(c);
79}
6.4 发送响应给客户端
当有响应需要发送时,Redis 会注册可写事件:
1// src/networking.c:1064-1104
2int handleClientsWithPendingWrites(void) {
3 listIter li;
4 listNode *ln;
5 int processed = listLength(server.clients_pending_write); // 统计待处理客户端数量
6
7 listRewind(server.clients_pending_write,&li); // 初始化迭代器
8 while((ln = listNext(&li))) {
9 client *c = listNodeValue(ln); // 获取客户端结构体
10 c->flags &= ~CLIENT_PENDING_WRITE; // 清除待写标志
11 listDelNode(server.clients_pending_write,ln); // 从待写列表中移除
12
13 // 跳过受保护的客户端(如被暂停的客户端)
14 /* If the client is protected, don't do anything,
15 * that may prevent the client to be freed. */
16 if (c->flags & CLIENT_PROTECTED) continue;
17
18 // 优先尝试同步写入,直接发送响应数据
19 // 这样可以避免注册写事件的开销,提高响应速度
20 /* Try to write buffers to the client socket. */
21 if (writeToClient(c->fd,c,0) == C_ERR) continue;
22
23 // 如果同步写入后仍有数据未发送完(如发送缓冲区满)
24 // 则注册可写事件,等待 socket 可写时异步发送
25 /* If after the synchronous writes above there is still data to
26 * output, we need to install a write handler. */
27 if (clientHasPendingReplies(c)) {
28 int ae_flags = AE_WRITABLE;
29 // 如果开启了 AOF 且 fsync 策略为 always
30 // 需要设置 AE_BARRIER,确保先刷盘再发送响应
31 /* For the fsync=always policy, we want the write operation
32 * to be performed before the reply is sent. */
33 if (server.aof_state == AOF_ON &&
34 server.aof_fsync == AOF_FSYNC_ALWAYS)
35 {
36 ae_flags |= AE_BARRIER;
37 }
38 // 注册可写事件,回调函数为 sendReplyToClient
39 if (aeCreateFileEvent(server.el, c->fd, ae_flags,
40 sendReplyToClient, c) == AE_ERR)
41 {
42 freeClientAsync(c); // 注册失败,异步释放客户端
43 }
44 }
45 }
46 return processed;
47}
writeToClient 负责实际的数据写入:
1// src/networking.c:961-1051
2int writeToClient(int fd, client *c, int handler_installed) {
3 ssize_t nwritten = 0, totwritten = 0; // 单次写入量、总写入量
4 size_t objlen;
5 clientReplyBlock *o; // 响应数据块
6
7 // 循环发送,直到没有待发送数据
8 while(clientHasPendingReplies(c)) {
9 // 优先发送固定缓冲区中的数据(小响应)
10 if (c->bufpos > 0) {
11 // write 系统调用,写入 [sentlen, bufpos) 区间的数据
12 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
13 if (nwritten <= 0) break; // 写入失败或 EAGAIN
14 c->sentlen += nwritten; // 更新已发送偏移量
15 totwritten += nwritten;
16
17 // 如果固定缓冲区数据全部发送完毕
18 /* If the buffer was sent, set bufpos to zero to continue with
19 * the remainder of the reply. */
20 if ((int)c->sentlen == c->bufpos) {
21 c->bufpos = 0; // 重置缓冲区位置
22 c->sentlen = 0; // 重置已发送偏移
23 }
24 } else {
25 // 固定缓冲区为空,从链表中取数据块发送(大响应)
26 o = listNodeValue(listFirst(c->reply));
27 objlen = o->used;
28
29 if (objlen == 0) {
30 c->reply_bytes -= o->size;
31 listDelNode(c->reply,listFirst(c->reply));
32 continue;
33 }
34
35 // 写入数据块中未发送的部分
36 nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
37 if (nwritten <= 0) break;
38 c->sentlen += nwritten;
39 totwritten += nwritten;
40
41 // 如果数据块全部发送完毕,从链表中移除
42 /* If we fully sent the object on head go to the next one */
43 if (c->sentlen == objlen) {
44 c->reply_bytes -= o->size;
45 listDelNode(c->reply,listFirst(c->reply));
46 c->sentlen = 0;
47 /* If there are no longer objects in the list, we expect
48 * the count of reply bytes to be exactly zero. */
49 if (listLength(c->reply) == 0)
50 serverAssert(c->reply_bytes == 0);
51 }
52 }
53 // 限制单次写入数据量,避免长时间占用事件循环
54 // NET_MAX_WRITES_PER_EVENT 默认 64MB
55 /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
56 * bytes, in a single threaded server it's a good idea to serve
57 * other clients as well, even if a very large request comes from
58 * super fast link that is always able to accept data (in real world
59 * scenario think about 'KEYS *' against the loopback interface).
60 *
61 * However if we are over the maxmemory limit we ignore that and
62 * just deliver as much data as it is possible to deliver.
63 *
64 * Moreover, we also send as much as possible if the client is
65 * a slave (otherwise, on high-speed traffic, the replication
66 * buffer will grow indefinitely) */
67 if (totwritten > NET_MAX_WRITES_PER_EVENT &&
68 (server.maxmemory == 0 ||
69 zmalloc_used_memory() < server.maxmemory) &&
70 !(c->flags & CLIENT_SLAVE)) break;
71 }
72 server.stat_net_output_bytes += totwritten;
73 if (nwritten == -1) {
74 if (errno == EAGAIN) {
75 nwritten = 0;
76 } else {
77 serverLog(LL_VERBOSE,
78 "Error writing to client: %s", strerror(errno));
79 freeClient(c);
80 return C_ERR;
81 }
82 }
83 if (totwritten > 0) {
84 /* For clients representing masters we don't count sending data
85 * as an interaction, since we always send REPLCONF ACK commands
86 * that take some time to just fill the socket output buffer.
87 * We just rely on data / pings received for timeout detection. */
88 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
89 }
90 // 所有数据发送完毕,清理状态并移除写事件
91 if (!clientHasPendingReplies(c)) {
92 c->sentlen = 0;
93 // handler_installed 表示写事件是否已注册
94 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
95
96 /* Close connection after entire reply has been sent. */
97 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
98 freeClient(c);
99 return C_ERR;
100 }
101 }
102 return C_OK;
103}
七、事件处理流程图
1 ┌─────────────────────────────────────────┐
2 │ aeMain() │
3 │ while(!stop) │
4 └─────────────────┬───────────────────────┘
5 │
6 ▼
7 ┌─────────────────────────────────────────┐
8 │ beforesleep() │
9 │ (处理待发送响应、AOF 刷盘等) │
10 └─────────────────┬───────────────────────┘
11 │
12 ▼
13 ┌─────────────────────────────────────────┐
14 │ aeProcessEvents() │
15 │ AE_ALL_EVENTS | AE_CALL_AFTER_SLEEP │
16 └─────────────────┬───────────────────────┘
17 │
18 ┌───────────────────────┼───────────────────────┐
19 │ │ │
20 ▼ ▼ ▼
21 ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
22 │ aeSearchNearest │ │ aeApiPoll() │ │ aftersleep() │
23 │ Timer() │ │ (epoll_wait) │ │ (可选) │
24 │ 计算超时时间 │ │ 等待事件触发 │ │ │
25 └─────────────────┘ └────────┬────────┘ └─────────────────┘
26 │
27 ▼
28 ┌─────────────────────────────────────────┐
29 │ 遍历 fired 事件数组 │
30 │ for (j = 0; j < numevents; j++) │
31 └─────────────────┬───────────────────────┘
32 │
33 ┌───────────────────────┼───────────────────────┐
34 │ │ │
35 ▼ ▼ ▼
36 ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
37 │ acceptHandler │ │readQueryFrom │ │sendReplyTo │
38 │ (新连接) │ │ Client() │ │ Client() │
39 │ │ │ (读命令) │ │ (发响应) │
40 └─────────────────┘ └─────────────────┘ └─────────────────┘
八、设计亮点
8.1 按需注册写事件
Redis 采用「按需注册」策略管理可写事件:
- 只有当发送缓冲区有待发送数据时,才注册可写事件
- 数据发送完毕后,立即移除可写事件
这种设计避免了可写事件被频繁触发(因为 TCP socket 通常都是可写的),减少了不必要的事件回调。
8.2 同步优先、异步兜底
在 handleClientsWithPendingWrites 中:
- 先尝试同步写入(
writeToClient) - 只有同步写入无法完成时,才注册可写事件异步处理
这减少了系统调用次数,提高了响应速度。
8.3 写入量限制防饥饿
writeToClient 中限制单次写入量为 NET_MAX_WRITES_PER_EVENT(默认 64MB),避免某个大响应长时间占用事件循环,导致其他客户端饥饿。
8.4 AE_BARRIER 保证一致性
在 appendfsync always 模式下,通过 AE_BARRIER 确保先执行 beforeSleep 中的 AOF 刷盘,再发送响应给客户端,保证数据一致性。
九、总结
Redis 文件事件机制的核心设计要点:
| 特性 | 说明 |
|---|---|
| 统一抽象 | 通过 aeFileEvent 统一封装不同 I/O 多路复用机制 |
| 事件驱动 | 基于回调函数实现事件响应,解耦事件检测与事件处理 |
| 按需注册 | 写事件按需注册/移除,避免无效唤醒 |
| 同步优先 | 优先同步发送,减少事件注册开销 |
| 公平调度 | 限制单次处理量,避免客户端饥饿 |
| 顺序控制 | AE_BARRIER 实现读写事件触发顺序的灵活控制 |
文件事件是 Redis 高性能网络处理的基础,理解其实现原理对于掌握 Redis 内部机制至关重要。