文件事件(File Event)源码详解

一、概述

Redis 基于 Reactor 模式实现了一套高效的事件驱动机制,其中**文件事件(File Event)**是其核心组件之一。文件事件用于处理网络 I/O 操作,包括客户端连接建立、命令接收、响应发送等。通过 I/O 多路复用技术,Redis 能够在单线程中高效地处理大量并发连接。

本文将深入剖析 Redis 文件事件的实现原理,从数据结构到核心 API,再到实际应用场景。

二、核心数据结构

2.1 文件事件结构体 aeFileEvent

1// src/ae.h:71-76
2typedef struct aeFileEvent {
3    int mask;           // 事件类型掩码:AE_READABLE | AE_WRITABLE | AE_BARRIER
4    aeFileProc *rfileProc;  // 读事件回调函数
5    aeFileProc *wfileProc;  // 写事件回调函数
6    void *clientData;   // 用户自定义数据
7} aeFileEvent;

字段解析

  • mask:标识事件的类型,支持以下取值:

    • AE_NONE (0):无事件注册
    • AE_READABLE (1):可读事件,当 fd 可读时触发
    • AE_WRITABLE (2):可写事件,当 fd 可写时触发
    • AE_BARRIER (4):屏障标志,用于控制读写事件的触发顺序
  • rfileProc:读事件触发时的回调函数指针

  • wfileProc:写事件触发时的回调函数指针

  • clientData:传递给回调函数的用户数据(通常是 client 结构体指针)

2.2 已触发事件结构体 aeFiredEvent

1// src/ae.h:91-94
2typedef struct aeFiredEvent {
3    int fd;     // 触发事件的文件描述符
4    int mask;   // 触发的事件类型
5} aeFiredEvent;

aeApiPoll 返回后,触发的事件信息会被填充到这个结构体数组中,供后续的事件分发使用。

2.3 事件循环结构体 aeEventLoop

 1// src/ae.h:97-109
 2typedef struct aeEventLoop {
 3    int maxfd;              // 当前注册的最大 fd
 4    int setsize;            // 事件表容量上限
 5    long long timeEventNextId;
 6    time_t lastTime;
 7    aeFileEvent *events;    // 注册的事件表(按 fd 索引)
 8    aeFiredEvent *fired;    // 已触发的事件表
 9    aeTimeEvent *timeEventHead;
10    int stop;
11    void *apidata;          // 多路复用 API 的私有数据
12    aeBeforeSleepProc *beforesleep;
13    aeBeforeSleepProc *aftersleep;
14} aeEventLoop;

关键点

  • events 数组以 fd 为索引,直接定位对应的文件事件
  • fired 数组暂存 aeApiPoll 返回的触发事件
  • apidata 存储底层多路复用机制的状态数据(如 epoll 的 epfd

三、事件类型详解

3.1 可读事件 AE_READABLE

可读事件在以下情况触发:

  1. 新连接到达:监听 socket 有新的客户端连接请求
  2. 数据到达:客户端 socket 有数据可读(命令请求)

3.2 可写事件 AE_WRITABLE

可写事件在以下情况触发:

  1. 发送缓冲区未满:可以向客户端 socket 写入数据(发送响应)

3.3 屏障标志 AE_BARRIER

AE_BARRIER 是 Redis 5.0 引入的一个特殊标志,用于控制读写事件的触发顺序。

1// src/ae.h:44-48
2#define AE_BARRIER 4    /* With WRITABLE, never fire the event if the
3                           READABLE event already fired in the same event
4                           loop iteration. Useful when you want to persist
5                           things to disk before sending replies, and want
6                           to do that in a group fashion. */

使用场景:当 appendfsync always 配置生效时,Redis 需要确保数据先持久化到磁盘,再发送响应给客户端。设置 AE_BARRIER 后,会先触发写事件,再触发读事件(与默认顺序相反)。

四、核心 API 实现

4.1 创建文件事件 aeCreateFileEvent

 1// src/ae.c:136-154
 2int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
 3        aeFileProc *proc, void *clientData)
 4{
 5    // 检查 fd 是否超出事件表容量上限,防止数组越界
 6    if (fd >= eventLoop->setsize) {
 7        errno = ERANGE;
 8        return AE_ERR;
 9    }
10    // 通过 fd 直接索引获取对应的文件事件结构体
11    aeFileEvent *fe = &eventLoop->events[fd];
12
13    // 调用底层 I/O 多路复用接口注册事件(如 epoll_ctl)
14    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
15        return AE_ERR;
16    // 更新事件掩码,合并已有事件类型
17    fe->mask |= mask;
18    // 根据事件类型设置对应的回调函数
19    if (mask & AE_READABLE) fe->rfileProc = proc;  // 设置读事件回调
20    if (mask & AE_WRITABLE) fe->wfileProc = proc;  // 设置写事件回调
21    // 保存用户数据指针,通常指向 client 结构体
22    fe->clientData = clientData;
23    // 维护 maxfd,用于优化遍历范围
24    if (fd > eventLoop->maxfd)
25        eventLoop->maxfd = fd;
26    return AE_OK;
27}

执行流程

  1. 检查 fd 是否超出容量限制
  2. 调用底层 aeApiAddEvent 将 fd 注册到多路复用器
  3. 更新 aeFileEvent 的事件掩码和回调函数
  4. 更新 maxfd 记录

4.2 删除文件事件 aeDeleteFileEvent

 1// src/ae.c:156-176
 2void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
 3{
 4    // 边界检查:fd 超出容量则直接返回
 5    if (fd >= eventLoop->setsize) return;
 6    aeFileEvent *fe = &eventLoop->events[fd];
 7    // 如果该 fd 上没有任何事件,无需处理
 8    if (fe->mask == AE_NONE) return;
 9
10    // 删除写事件时,需要同时清除 AE_BARRIER 标志
11    // 因为 AE_BARRIER 是依附于 AE_WRITABLE 存在的
12    /* We want to always remove AE_BARRIER if set when AE_WRITABLE
13     * is removed. */
14    if (mask & AE_WRITABLE) mask |= AE_BARRIER;
15
16    // 调用底层接口从多路复用器中移除指定事件
17    aeApiDelEvent(eventLoop, fd, mask);
18    // 更新事件掩码,清除指定的事件位
19    fe->mask = fe->mask & (~mask);
20    // 如果删除的是当前 maxfd 且该 fd 已无任何事件
21    // 需要向前遍历找到新的 maxfd,用于优化后续遍历
22    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
23        /* Update the max fd */
24        int j;
25        for (j = eventLoop->maxfd-1; j >= 0; j--)
26            if (eventLoop->events[j].mask != AE_NONE) break;
27        eventLoop->maxfd = j;
28    }
29}

执行流程

  1. 边界检查
  2. 删除写事件时,同时清除 AE_BARRIER 标志
  3. 调用底层 aeApiDelEvent 从多路复用器移除事件
  4. 如果删除的是 maxfd 且该 fd 已无任何事件,则重新计算 maxfd

4.3 事件分发处理

文件事件的处理发生在 aeProcessEvents 函数中:

 1// src/ae.c:417-464 (核心部分)
 2for (j = 0; j < numevents; j++) {
 3    // 从已触发事件数组中获取触发的事件信息
 4    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
 5    int mask = eventLoop->fired[j].mask;  // 触发的事件类型
 6    int fd = eventLoop->fired[j].fd;      // 触发事件的 fd
 7    int fired = 0;  // 记录已触发的回调次数,用于去重
 8
 9    // 检查是否设置了 AE_BARRIER,控制读写事件的触发顺序
10    int invert = fe->mask & AE_BARRIER;
11
12    // 默认情况:先触发读事件(未设置 AE_BARRIER 时)
13    /* Fire the readable event if the call sequence is not inverted. */
14    if (!invert && fe->mask & mask & AE_READABLE) {
15        fe->rfileProc(eventLoop,fd,fe->clientData,mask);  // 调用读回调
16        fired++;
17    }
18
19    // 触发写事件
20    /* Fire the writable event. */
21    if (fe->mask & mask & AE_WRITABLE) {
22        // 如果读回调已触发,需检查读写回调是否相同,避免重复调用
23        if (!fired || fe->wfileProc != fe->rfileProc) {
24            fe->wfileProc(eventLoop,fd,fe->clientData,mask);  // 调用写回调
25            fired++;
26        }
27    }
28
29    // 设置 AE_BARRIER 时:后触发读事件(先写后读)
30    /* If we have to invert the call, fire the readable event now. */
31    if (invert && fe->mask & mask & AE_READABLE) {
32        if (!fired || fe->wfileProc != fe->rfileProc) {
33            fe->rfileProc(eventLoop,fd,fe->clientData,mask);  // 调用读回调
34            fired++;
35        }
36    }
37
38    processed++;
39}

分发逻辑

  1. 默认顺序:先触发读事件,再触发写事件
  2. 设置 AE_BARRIER 时:先触发写事件,再触发读事件
  3. 如果读写回调函数相同,则只调用一次

五、底层多路复用实现

Redis 支持多种 I/O 多路复用机制,按性能优先级选择:

 1// src/ae.c:47-61
 2#ifdef HAVE_EVPORT
 3#include "ae_evport.c"
 4#else
 5    #ifdef HAVE_EPOLL
 6    #include "ae_epoll.c"
 7    #else
 8        #ifdef HAVE_KQUEUE
 9        #include "ae_kqueue.c"
10        #else
11        #include "ae_select.c"
12        #endif
13    #endif
14#endif

5.1 epoll 实现(Linux)

1// src/ae_epoll.c:34-37
2typedef struct aeApiState {
3    int epfd;                   // epoll 实例 fd
4    struct epoll_event *events; // 事件数组
5} aeApiState;

核心函数

 1// src/ae_epoll.c:73-88
 2static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
 3    aeApiState *state = eventLoop->apidata;
 4    struct epoll_event ee = {0};  // 初始化 epoll_event 结构体
 5    // 判断是新增还是修改:如果该 fd 上没有事件,则 ADD;否则 MOD
 6    /* If the fd was already monitored for some event, we need a MOD
 7     * operation. Otherwise we need an ADD operation. */
 8    int op = eventLoop->events[fd].mask == AE_NONE ?
 9            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
10
11    ee.events = 0;
12    // 合并已有的事件类型,保留之前注册的事件
13    mask |= eventLoop->events[fd].mask; /* Merge old events */
14    // 将 Redis 的事件类型转换为 epoll 的事件类型
15    if (mask & AE_READABLE) ee.events |= EPOLLIN;   // 可读 -> EPOLLIN
16    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  // 可写 -> EPOLLOUT
17    ee.data.fd = fd;  // 保存 fd,供后续事件触发时使用
18    // 调用 epoll_ctl 注册或修改事件监听
19    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
20    return 0;
21}
 1// src/ae_epoll.c:108-131
 2static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
 3    aeApiState *state = eventLoop->apidata;
 4    int retval, numevents = 0;
 5
 6    // 调用 epoll_wait 等待事件触发
 7    // 超时时间:tvp 为 NULL 表示无限等待,否则转换为毫秒
 8    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
 9            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
10    if (retval > 0) {
11        int j;
12        numevents = retval;
13        // 遍历所有触发的事件
14        for (j = 0; j < numevents; j++) {
15            int mask = 0;
16            struct epoll_event *e = state->events+j;
17
18            // 将 epoll 事件类型转换为 Redis 事件类型
19            if (e->events & EPOLLIN) mask |= AE_READABLE;   // 数据可读
20            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;  // 可写数据
21            if (e->events & EPOLLERR) mask |= AE_WRITABLE;  // 错误事件,标记为可写以便处理
22            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;  // 挂断事件,标记为可写以便处理
23            // 将触发的事件信息填充到 fired 数组
24            eventLoop->fired[j].fd = e->data.fd;
25            eventLoop->fired[j].mask = mask;
26        }
27    }
28    return numevents;  // 返回触发的事件数量
29}

六、文件事件在 Redis 中的应用

6.1 服务端监听连接

Redis 启动时,会为监听 socket 注册可读事件,回调函数为 acceptTcpHandler

1// src/server.c:2130-2131
2if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
3    acceptTcpHandler,NULL) == AE_ERR)

acceptTcpHandler 负责接受新连接:

 1// src/networking.c:726-744
 2void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 3    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;  // 单次最多接受 1000 个连接
 4    char cip[NET_IP_STR_LEN];
 5    UNUSED(el);
 6    UNUSED(mask);
 7    UNUSED(privdata);
 8
 9    // 循环接受新连接,直到没有更多连接或达到单次上限
10    while(max--) {
11        // 接受一个 TCP 连接,返回客户端 fd
12        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
13        if (cfd == ANET_ERR) {
14            // EWOULDBLOCK 表示没有更多待处理的连接,正常情况
15            if (errno != EWOULDBLOCK)
16                serverLog(LL_WARNING,
17                    "Accepting client connection: %s", server.neterr);
18            return;
19        }
20        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
21        // 创建客户端结构体,注册读事件回调
22        acceptCommonHandler(cfd,0,cip);
23    }
24}

6.2 客户端连接建立

当新连接被接受后,createClient 会为其注册可读事件:

 1// src/networking.c:85-104
 2client *createClient(int fd) {
 3    client *c = zmalloc(sizeof(client));  // 分配客户端结构体内存
 4
 5    // fd != -1 表示创建真实的网络客户端
 6    // fd == -1 用于创建伪客户端(如 Lua 脚本执行)
 7    /* passing -1 as fd it is possible to create a non connected client.
 8     * This is useful since all the commands needs to be executed
 9     * in the context of a client. When commands are executed in other
10     * contexts (for instance a Lua script) we need a non connected client. */
11    if (fd != -1) {
12        anetNonBlock(NULL,fd);           // 设置非阻塞模式
13        anetEnableTcpNoDelay(NULL,fd);   // 禁用 Nagle 算法,减少延迟
14        if (server.tcpkeepalive)
15            anetKeepAlive(NULL,fd,server.tcpkeepalive);  // 启用 TCP keepalive
16        // 注册可读事件,回调函数为 readQueryFromClient
17        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
18            readQueryFromClient, c) == AE_ERR)
19        {
20            // 注册失败,关闭 fd 并释放内存
21            close(fd);
22            zfree(c);
23            return NULL;
24        }
25    }
26    // ... 初始化其他字段
27}

6.3 读取客户端命令

readQueryFromClient 是客户端可读事件的回调函数:

 1// src/networking.c:1500-1570
 2void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
 3    client *c = (client*) privdata;  // 获取客户端结构体
 4    int nread, readlen;
 5    size_t qblen;
 6    UNUSED(el);
 7    UNUSED(mask);
 8
 9    readlen = PROTO_IOBUF_LEN;  // 默认读取缓冲区大小 16KB
10    // 大包优化:如果是多批量请求且正在处理大参数,调整读取长度
11    /* If this is a multi bulk request, and we are processing a bulk reply
12     * that is large enough, try to maximize the probability that the query
13     * buffer contains exactly the SDS string representing the object, even
14     * at the risk of requiring more read(2) calls. This way the function
15     * processMultiBulkBuffer() can avoid copying buffers to create the
16     * Redis Object representing the argument. */
17    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
18        && c->bulklen >= PROTO_MBULK_BIG_ARG)
19    {
20        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
21
22        /* Note that the 'remaining' variable may be zero in some edge case,
23         * for example once we resume a blocked client after CLIENT PAUSE. */
24        if (remaining > 0 && remaining < readlen) readlen = remaining;
25    }
26
27    qblen = sdslen(c->querybuf);  // 获取当前查询缓冲区长度
28    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
29    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);  // 扩展缓冲区空间
30    // 从 socket 读取数据到查询缓冲区末尾
31    nread = read(fd, c->querybuf+qblen, readlen);
32    if (nread == -1) {
33        // EAGAIN 表示暂无数据可读,非阻塞模式下正常返回
34        if (errno == EAGAIN) {
35            return;
36        } else {
37            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
38            freeClient(c);
39            return;
40        }
41    } else if (nread == 0) {
42        // 返回 0 表示客户端关闭连接
43        serverLog(LL_VERBOSE, "Client closed connection");
44        freeClient(c);
45        return;
46    } else if (c->flags & CLIENT_MASTER) {
47        // 主从复制:追加到 pending_querybuf
48        /* Append the query buffer to the pending (not applied) buffer
49         * of the master. We'll use this buffer later in order to have a
50         * copy of the string applied by the last command executed. */
51        c->pending_querybuf = sdscatlen(c->pending_querybuf,
52                                        c->querybuf+qblen,nread);
53    }
54
55    sdsIncrLen(c->querybuf,nread);  // 更新缓冲区有效数据长度
56    c->lastinteraction = server.unixtime;  // 更新最后交互时间,用于超时检测
57    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
58    server.stat_net_input_bytes += nread;  // 统计网络输入字节数
59    // 检查查询缓冲区是否超过最大限制
60    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
61        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
62
63        bytes = sdscatrepr(bytes,c->querybuf,64);
64        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
65        sdsfree(ci);
66        sdsfree(bytes);
67        freeClient(c);
68        return;
69    }
70
71    // 处理输入缓冲区中的命令并尝试执行
72    /* Time to process the buffer. If the client is a master we need to
73     * compute the difference between the applied offset before and after
74     * processing the buffer, to understand how much of the replication stream
75     * was actually applied to the master state: this quantity, and its
76     * corresponding part of the replication stream, will be propagated to
77     * the sub-slaves and to the replication backlog. */
78    processInputBufferAndReplicate(c);
79}

6.4 发送响应给客户端

当有响应需要发送时,Redis 会注册可写事件:

 1// src/networking.c:1064-1104
 2int handleClientsWithPendingWrites(void) {
 3    listIter li;
 4    listNode *ln;
 5    int processed = listLength(server.clients_pending_write);  // 统计待处理客户端数量
 6
 7    listRewind(server.clients_pending_write,&li);  // 初始化迭代器
 8    while((ln = listNext(&li))) {
 9        client *c = listNodeValue(ln);  // 获取客户端结构体
10        c->flags &= ~CLIENT_PENDING_WRITE;  // 清除待写标志
11        listDelNode(server.clients_pending_write,ln);  // 从待写列表中移除
12
13        // 跳过受保护的客户端(如被暂停的客户端)
14        /* If the client is protected, don't do anything,
15         * that may prevent the client to be freed. */
16        if (c->flags & CLIENT_PROTECTED) continue;
17
18        // 优先尝试同步写入,直接发送响应数据
19        // 这样可以避免注册写事件的开销,提高响应速度
20        /* Try to write buffers to the client socket. */
21        if (writeToClient(c->fd,c,0) == C_ERR) continue;
22
23        // 如果同步写入后仍有数据未发送完(如发送缓冲区满)
24        // 则注册可写事件,等待 socket 可写时异步发送
25        /* If after the synchronous writes above there is still data to
26         * output, we need to install a write handler. */
27        if (clientHasPendingReplies(c)) {
28            int ae_flags = AE_WRITABLE;
29            // 如果开启了 AOF 且 fsync 策略为 always
30            // 需要设置 AE_BARRIER,确保先刷盘再发送响应
31            /* For the fsync=always policy, we want the write operation
32             * to be performed before the reply is sent. */
33            if (server.aof_state == AOF_ON &&
34                server.aof_fsync == AOF_FSYNC_ALWAYS)
35            {
36                ae_flags |= AE_BARRIER;
37            }
38            // 注册可写事件,回调函数为 sendReplyToClient
39            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
40                sendReplyToClient, c) == AE_ERR)
41            {
42                    freeClientAsync(c);  // 注册失败,异步释放客户端
43            }
44        }
45    }
46    return processed;
47}

writeToClient 负责实际的数据写入:

  1// src/networking.c:961-1051
  2int writeToClient(int fd, client *c, int handler_installed) {
  3    ssize_t nwritten = 0, totwritten = 0;  // 单次写入量、总写入量
  4    size_t objlen;
  5    clientReplyBlock *o;  // 响应数据块
  6
  7    // 循环发送,直到没有待发送数据
  8    while(clientHasPendingReplies(c)) {
  9        // 优先发送固定缓冲区中的数据(小响应)
 10        if (c->bufpos > 0) {
 11            // write 系统调用,写入 [sentlen, bufpos) 区间的数据
 12            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
 13            if (nwritten <= 0) break;  // 写入失败或 EAGAIN
 14            c->sentlen += nwritten;    // 更新已发送偏移量
 15            totwritten += nwritten;
 16
 17            // 如果固定缓冲区数据全部发送完毕
 18            /* If the buffer was sent, set bufpos to zero to continue with
 19             * the remainder of the reply. */
 20            if ((int)c->sentlen == c->bufpos) {
 21                c->bufpos = 0;    // 重置缓冲区位置
 22                c->sentlen = 0;   // 重置已发送偏移
 23            }
 24        } else {
 25            // 固定缓冲区为空,从链表中取数据块发送(大响应)
 26            o = listNodeValue(listFirst(c->reply));
 27            objlen = o->used;
 28
 29            if (objlen == 0) {
 30                c->reply_bytes -= o->size;
 31                listDelNode(c->reply,listFirst(c->reply));
 32                continue;
 33            }
 34
 35            // 写入数据块中未发送的部分
 36            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
 37            if (nwritten <= 0) break;
 38            c->sentlen += nwritten;
 39            totwritten += nwritten;
 40
 41            // 如果数据块全部发送完毕,从链表中移除
 42            /* If we fully sent the object on head go to the next one */
 43            if (c->sentlen == objlen) {
 44                c->reply_bytes -= o->size;
 45                listDelNode(c->reply,listFirst(c->reply));
 46                c->sentlen = 0;
 47                /* If there are no longer objects in the list, we expect
 48                 * the count of reply bytes to be exactly zero. */
 49                if (listLength(c->reply) == 0)
 50                    serverAssert(c->reply_bytes == 0);
 51            }
 52        }
 53        // 限制单次写入数据量,避免长时间占用事件循环
 54        // NET_MAX_WRITES_PER_EVENT 默认 64MB
 55        /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
 56         * bytes, in a single threaded server it's a good idea to serve
 57         * other clients as well, even if a very large request comes from
 58         * super fast link that is always able to accept data (in real world
 59         * scenario think about 'KEYS *' against the loopback interface).
 60         *
 61         * However if we are over the maxmemory limit we ignore that and
 62         * just deliver as much data as it is possible to deliver.
 63         *
 64         * Moreover, we also send as much as possible if the client is
 65         * a slave (otherwise, on high-speed traffic, the replication
 66         * buffer will grow indefinitely) */
 67        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
 68            (server.maxmemory == 0 ||
 69             zmalloc_used_memory() < server.maxmemory) &&
 70            !(c->flags & CLIENT_SLAVE)) break;
 71    }
 72    server.stat_net_output_bytes += totwritten;
 73    if (nwritten == -1) {
 74        if (errno == EAGAIN) {
 75            nwritten = 0;
 76        } else {
 77            serverLog(LL_VERBOSE,
 78                "Error writing to client: %s", strerror(errno));
 79            freeClient(c);
 80            return C_ERR;
 81        }
 82    }
 83    if (totwritten > 0) {
 84        /* For clients representing masters we don't count sending data
 85         * as an interaction, since we always send REPLCONF ACK commands
 86         * that take some time to just fill the socket output buffer.
 87         * We just rely on data / pings received for timeout detection. */
 88        if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
 89    }
 90    // 所有数据发送完毕,清理状态并移除写事件
 91    if (!clientHasPendingReplies(c)) {
 92        c->sentlen = 0;
 93        // handler_installed 表示写事件是否已注册
 94        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
 95
 96        /* Close connection after entire reply has been sent. */
 97        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
 98            freeClient(c);
 99            return C_ERR;
100        }
101    }
102    return C_OK;
103}

七、事件处理流程图

 1                    ┌─────────────────────────────────────────┐
 2                    │              aeMain()                   │
 3                    │           while(!stop)                  │
 4                    └─────────────────┬───────────────────────┘
 5 6 7                    ┌─────────────────────────────────────────┐
 8                    │          beforesleep()                  │
 9                    │   (处理待发送响应、AOF 刷盘等)              │
10                    └─────────────────┬───────────────────────┘
111213                    ┌─────────────────────────────────────────┐
14                    │         aeProcessEvents()               │
15                    │   AE_ALL_EVENTS | AE_CALL_AFTER_SLEEP   │
16                    └─────────────────┬───────────────────────┘
1718              ┌───────────────────────┼───────────────────────┐
19              │                       │                       │
20              ▼                       ▼                       ▼
21    ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
22    │ aeSearchNearest │    │   aeApiPoll()   │    │  aftersleep()   │
23    │    Timer()      │    │  (epoll_wait)   │    │   (可选)         │
24    │  计算超时时间     │    │  等待事件触发     │    │                 │
25    └─────────────────┘    └────────┬────────┘    └─────────────────┘
262728                    ┌─────────────────────────────────────────┐
29                    │       遍历 fired 事件数组                 │
30                    │   for (j = 0; j < numevents; j++)       │
31                    └─────────────────┬───────────────────────┘
3233              ┌───────────────────────┼───────────────────────┐
34              │                       │                       │
35              ▼                       ▼                       ▼
36    ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
37    │  acceptHandler  │    │readQueryFrom    │    │sendReplyTo      │
38    │   (新连接)       │    │  Client()       │    │  Client()       │
39    │                 │    │  (读命令)        │    │  (发响应)        │
40    └─────────────────┘    └─────────────────┘    └─────────────────┘

八、设计亮点

8.1 按需注册写事件

Redis 采用「按需注册」策略管理可写事件:

  • 只有当发送缓冲区有待发送数据时,才注册可写事件
  • 数据发送完毕后,立即移除可写事件

这种设计避免了可写事件被频繁触发(因为 TCP socket 通常都是可写的),减少了不必要的事件回调。

8.2 同步优先、异步兜底

handleClientsWithPendingWrites 中:

  1. 先尝试同步写入(writeToClient
  2. 只有同步写入无法完成时,才注册可写事件异步处理

这减少了系统调用次数,提高了响应速度。

8.3 写入量限制防饥饿

writeToClient 中限制单次写入量为 NET_MAX_WRITES_PER_EVENT(默认 64MB),避免某个大响应长时间占用事件循环,导致其他客户端饥饿。

8.4 AE_BARRIER 保证一致性

appendfsync always 模式下,通过 AE_BARRIER 确保先执行 beforeSleep 中的 AOF 刷盘,再发送响应给客户端,保证数据一致性。

九、总结

Redis 文件事件机制的核心设计要点:

特性 说明
统一抽象 通过 aeFileEvent 统一封装不同 I/O 多路复用机制
事件驱动 基于回调函数实现事件响应,解耦事件检测与事件处理
按需注册 写事件按需注册/移除,避免无效唤醒
同步优先 优先同步发送,减少事件注册开销
公平调度 限制单次处理量,避免客户端饥饿
顺序控制 AE_BARRIER 实现读写事件触发顺序的灵活控制

文件事件是 Redis 高性能网络处理的基础,理解其实现原理对于掌握 Redis 内部机制至关重要。

— END —