Redis 事件循环模型全景解析(ae.c)

Redis 是单线程的,却能处理大量并发连接,核心在于它的事件循环机制。ae.c 是 Redis 自己实现的一个轻量级事件库,支持 I/O 多路复用和定时器。这篇文章拆解它的实现原理。

一、整体架构

Redis 事件循环处理两类事件:

类型 说明 例子
文件事件(File Event) socket 可读/可写 客户端连接、命令请求、回复发送
时间事件(Time Event) 定时执行的回调 serverCron(定时清理过期 key、持久化等)

核心数据结构:

 1// ae.h
 2typedef struct aeEventLoop {
 3    int maxfd;							// 当前注册的最大 fd,用于优化遍历
 4    int setsize;						// events 数组大小,等于 maxclients + CONFIG_FDSET_INCR
 5    long long timeEventNextId;			// 时间事件 ID 生成器
 6    time_t lastTime;					// 上次处理时间事件的时间,用于检测时钟跳变
 7    aeFileEvent *events;				// 注册的文件事件数组,下标就是 fd
 8    aeFiredEvent *fired;				// 已触发的文件事件数组,由 aeApiPoll 填充
 9    aeTimeEvent *timeEventHead;  		// 时间事件链表头节点
10    int stop;                    	 	// 停止标志
11    void *apidata;               	 	// 多路复用 API 的私有数据(epoll 实例等)
12    aeBeforeSleepProc *beforesleep;		// 每轮循环前的回调
13    aeBeforeSleepProc *aftersleep;		// aeApiPoll 返回后的回调
14} aeEventLoop;

文件事件结构:

1typedef struct aeFileEvent {
2    int mask;                    // AE_READABLE | AE_WRITABLE | AE_BARRIER
3    aeFileProc *rfileProc;       // 读回调
4    aeFileProc *wfileProc;       // 写回调
5    void *clientData;            // 回调参数,通常是 client 结构体
6} aeFileEvent;

时间事件结构:

 1typedef struct aeTimeEvent {
 2    long long id;                			// 唯一标识,用于删除
 3    long when_sec;               			// 触发时间(秒)
 4    long when_ms;                			// 触发时间(毫秒)
 5    aeTimeProc *timeProc;        			// 回调函数
 6    aeEventFinalizerProc *finalizerProc;  	// 删除时的清理回调
 7    void *clientData;            			// 回调参数
 8    struct aeTimeEvent *prev;    			// 双向链表
 9    struct aeTimeEvent *next;
10} aeTimeEvent;

二、多路复用抽象层

ae.c 支持多种 I/O 多路复用实现,通过条件编译选择:

  • ae_epoll.c - Linux
  • ae_kqueue.c - macOS / BSD
  • ae_evport.c - Solaris
  • ae_select.c - 兜底方案,可移植性最好

每个实现都要提供三个核心函数:

1// ae_epoll.c
2static int aeApiCreate(aeEventLoop *eventLoop);
3static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
4static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask);
5static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);

2.1 epoll 实现

 1typedef struct aeApiState {
 2    int epfd;                    // epoll 实例 fd
 3    struct epoll_event *events;  // 事件数组,用于接收返回的事件
 4} aeApiState;
 5
 6static int aeApiCreate(aeEventLoop *eventLoop) {
 7    aeApiState *state = zmalloc(sizeof(aeApiState));
 8
 9    // 创建 epoll 实例,1024 是初始大小,会自动扩展
10    state->epfd = epoll_create(1024);
11    if (state->epfd == -1) return -1;
12
13    // events 数组大小等于 eventLoop->setsize
14    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
15    eventLoop->apidata = state;
16    return 0;
17}
18
19static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
20    aeApiState *state = eventLoop->apidata;
21    struct epoll_event ee = {0};
22
23    // EPOLL_CTL_ADD 或 EPOLL_CTL_MOD
24    // 如果 fd 已经有事件,用 MOD;否则用 ADD
25    int op = eventLoop->events[fd].mask == AE_NONE ?
26             EPOLL_CTL_ADD : EPOLL_CTL_MOD;
27
28    ee.events = 0;
29    mask |= eventLoop->events[fd].mask;  // 合并已有的事件
30    if (mask & AE_READABLE) ee.events |= EPOLLIN;
31    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
32    ee.data.fd = fd;
33
34    if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
35    return 0;
36}
37
38static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
39    aeApiState *state = eventLoop->apidata;
40    int retval, numevents = 0;
41
42    // tvp 是超时时间,由最近的时间事件决定
43    // 如果 tvp 为 NULL,表示无限等待
44    retval = epoll_wait(state->epfd, state->events, eventLoop->setsize,
45                        tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
46    if (retval > 0) {
47        int j;
48        numevents = retval;
49        for (j = 0; j < numevents; j++) {
50            int mask = 0;
51            struct epoll_event *e = state->events + j;
52
53            if (e->events & EPOLLIN) mask |= AE_READABLE;
54            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
55            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
56            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
57
58            // 把触发的事件存到 fired 数组
59            eventLoop->fired[j].fd = e->data.fd;
60            eventLoop->fired[j].mask = mask;
61        }
62    }
63    return numevents;
64}

三、文件事件管理

3.1 创建事件循环

 1aeEventLoop *aeCreateEventLoop(int setsize) {
 2    aeEventLoop *eventLoop;
 3    int i;
 4
 5    eventLoop = zmalloc(sizeof(*eventLoop));
 6    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
 7    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
 8    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
 9    eventLoop->setsize = setsize;
10    eventLoop->timeEventHead = NULL;
11    eventLoop->timeEventNextId = 0;
12    eventLoop->stop = 0;
13    eventLoop->maxfd = -1;
14    eventLoop->beforesleep = NULL;
15    eventLoop->aftersleep = NULL;
16    if (aeApiCreate(eventLoop) == -1) goto err;
17
18    // 初始化所有 fd 的事件为 AE_NONE
19    for (i = 0; i < setsize; i++)
20        eventLoop->events[i].mask = AE_NONE;
21
22    return eventLoop;
23
24err:
25    if (eventLoop) {
26        zfree(eventLoop->events);
27        zfree(eventLoop->fired);
28        zfree(eventLoop);
29    }
30    return NULL;
31}

setsize 决定了能监听多少个 fd。Redis 中设置为 maxclients + CONFIG_FDSET_INCR,其中 CONFIG_FDSET_INCR 是预留的额外 fd(用于监听 socket、AOF 文件等)。

3.2 注册文件事件

 1int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
 2        aeFileProc *proc, void *clientData)
 3{
 4    if (fd >= eventLoop->setsize) {  // fd 超出范围
 5        errno = ERANGE;
 6        return AE_ERR;
 7    }
 8
 9    aeFileEvent *fe = &eventLoop->events[fd];  // 直接用 fd 做下标,O(1) 访问
10
11    // 调用底层多路复用的 add 接口
12    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
13        return AE_ERR;
14
15    fe->mask |= mask;  // 合并 mask,一个 fd 可以同时监听读写
16    if (mask & AE_READABLE) fe->rfileProc = proc;
17    if (mask & AE_WRITABLE) fe->wfileProc = proc;
18    fe->clientData = clientData;
19
20    // 更新 maxfd,用于优化某些场景的遍历
21    if (fd > eventLoop->maxfd)
22        eventLoop->maxfd = fd;
23
24    return AE_OK;
25}

3.3 删除文件事件

 1void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
 2{
 3    if (fd >= eventLoop->setsize) return;
 4    aeFileEvent *fe = &eventLoop->events[fd];
 5    if (fe->mask == AE_NONE) return;  // 本来就没注册
 6
 7    /* We want to always remove AE_BARRIER if set when AE_WRITABLE
 8     * is removed. */
 9    if (mask & AE_WRITABLE) mask |= AE_BARRIER;  // AE_BARRIER 是附加在写事件上的标志
10
11    // 从底层多路复用中移除
12    aeApiDelEvent(eventLoop, fd, mask);
13
14    fe->mask = fe->mask & (~mask);  // 清除对应的 mask 位
15
16    // 如果这个 fd 的所有事件都删了,且它恰好是 maxfd,需要重新找 maxfd
17    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
18        int j;
19        for (j = eventLoop->maxfd-1; j >= 0; j--)
20            if (eventLoop->events[j].mask != AE_NONE) break;
21        eventLoop->maxfd = j;
22    }
23}

四、时间事件管理

时间事件用双向链表组织,没有排序。每次找最近要触发的定时器需要遍历整个链表——O(N) 复杂度。注释里提到可以用跳表优化,但 Redis 的时间事件很少(主要就是 serverCron),所以没这个必要。

4.1 创建时间事件

 1long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
 2        aeTimeProc *proc, void *clientData,
 3        aeEventFinalizerProc *finalizerProc)
 4{
 5    long long id = eventLoop->timeEventNextId++;
 6    aeTimeEvent *te;
 7
 8    te = zmalloc(sizeof(*te));
 9    if (te == NULL) return AE_ERR;
10
11    te->id = id;
12    // 计算触发时间:当前时间 + milliseconds
13    aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms);
14    te->timeProc = proc;
15    te->finalizerProc = finalizerProc;
16    te->clientData = clientData;
17
18    // 头插法插入链表
19    te->prev = NULL;
20    te->next = eventLoop->timeEventHead;
21    if (te->next)
22        te->next->prev = te;
23    eventLoop->timeEventHead = te;
24
25    return id;
26}

4.2 删除时间事件

删除不是立即释放,而是标记为 AE_DELETED_EVENT_ID,在下一轮 processTimeEvents 中清理:

 1int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
 2{
 3    aeTimeEvent *te = eventLoop->timeEventHead;
 4    while(te) {
 5        if (te->id == id) {
 6            te->id = AE_DELETED_EVENT_ID;  // 标记删除
 7            return AE_OK;
 8        }
 9        te = te->next;
10    }
11    return AE_ERR;  // 没找到
12}

4.3 处理时间事件

 1static int processTimeEvents(aeEventLoop *eventLoop) {
 2    int processed = 0;
 3    aeTimeEvent *te;
 4    long long maxId;
 5    time_t now = time(NULL);
 6
 7    /* If the system clock is moved to the future, and then set back to the
 8     * right value, time events may be delayed in a random way. Often this
 9     * means that scheduled operations will not be performed soon enough.
10     *
11     * Here we try to detect system clock skews, and force all the time
12     * events to be processed ASAP when this happens: the idea is that
13     * processing events earlier is less dangerous than delaying them
14     * indefinitely, and practice suggests it is. */
15    // 检测系统时钟回拨:如果当前时间比上次处理时间还早,说明时钟被调回去了
16    // 这种情况下把所有时间事件的触发时间设为 0,立即执行
17    if (now < eventLoop->lastTime) {
18        te = eventLoop->timeEventHead;
19        while(te) {
20            te->when_sec = 0;  // 立即触发
21            te = te->next;
22        }
23    }
24    eventLoop->lastTime = now;
25
26    te = eventLoop->timeEventHead;
27    maxId = eventLoop->timeEventNextId - 1;  // 记录当前最大 ID
28    while(te) {
29        long now_sec, now_ms;
30        long long id;
31
32        /* Remove events scheduled for deletion. */
33        // 清理标记为删除的事件
34        if (te->id == AE_DELETED_EVENT_ID) {
35            aeTimeEvent *next = te->next;
36            // 从链表中摘除
37            if (te->prev)
38                te->prev->next = te->next;
39            else
40                eventLoop->timeEventHead = te->next;
41            if (te->next)
42                te->next->prev = te->prev;
43            // 调用清理回调
44            if (te->finalizerProc)
45                te->finalizerProc(eventLoop, te->clientData);
46            zfree(te);
47            te = next;
48            continue;
49        }
50
51        /* Make sure we don't process time events created by time events in
52         * this iteration. Note that this check is currently useless: we always
53         * add new timers on the head, however if we change the implementation
54         * detail, this check may be useful again: we keep it here for future
55         * defense. */
56        // 防止在本轮循环中处理新创建的时间事件
57        // 因为新事件是头插的,且 id 更大,所以跳过 id > maxId 的
58        if (te->id > maxId) {
59            te = te->next;
60            continue;
61        }
62
63        aeGetTime(&now_sec, &now_ms);
64        // 检查是否到达触发时间
65        if (now_sec > te->when_sec ||
66            (now_sec == te->when_sec && now_ms >= te->when_ms))
67        {
68            int retval;
69
70            id = te->id;
71            // 执行回调,返回值是下次触发的间隔(毫秒)
72            retval = te->timeProc(eventLoop, id, te->clientData);
73            processed++;
74
75            if (retval != AE_NOMORE) {
76                // 返回值是正数,表示周期性事件,重新计算下次触发时间
77                aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms);
78            } else {
79                // 返回 AE_NOMORE,表示一次性事件,标记删除
80                te->id = AE_DELETED_EVENT_ID;
81            }
82        }
83        te = te->next;
84    }
85    return processed;
86}

这里有个细节:时间事件的回调返回值决定了它是周期性的还是一次性的。serverCron 返回 1000/server.hz,默认 hz=10,所以每 100ms 执行一次;如果配置 hz 100,就每 10ms 执行一次。

五、事件循环核心:aeProcessEvents

  1int aeProcessEvents(aeEventLoop *eventLoop, int flags)
  2{
  3    int processed = 0, numevents;
  4
  5    /* Nothing to do? return ASAP */
  6    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
  7
  8    /* Note that we want call select() even if there are no
  9     * file events to process as long as we want to process time
 10     * events, in order to sleep until the next time event is ready
 11     * to fire. */
 12    // 即使没有文件事件,如果有时间事件也要调用 poll
 13    // 因为需要睡眠等待时间事件触发
 14    if (eventLoop->maxfd != -1 ||
 15        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
 16        int j;
 17        aeTimeEvent *shortest = NULL;
 18        struct timeval tv, *tvp;
 19
 20        // 找最近的时间事件,计算 poll 超时时间
 21        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
 22            shortest = aeSearchNearestTimer(eventLoop);
 23
 24        if (shortest) {
 25            long now_sec, now_ms;
 26
 27            aeGetTime(&now_sec, &now_ms);
 28            tvp = &tv;
 29
 30            /* How many milliseconds we need to wait for the next
 31             * time event to fire? */
 32            long long ms =
 33                (shortest->when_sec - now_sec)*1000 +
 34                shortest->when_ms - now_ms;
 35
 36            if (ms > 0) {
 37                tvp->tv_sec = ms/1000;
 38                tvp->tv_usec = (ms % 1000)*1000;
 39            } else {
 40                // 已经过期了,超时设为 0,立即返回
 41                tvp->tv_sec = 0;
 42                tvp->tv_usec = 0;
 43            }
 44        } else {
 45            /* If we have to check for events but need to return
 46             * ASAP because of AE_DONT_WAIT we need to set the timeout
 47             * to zero */
 48            if (flags & AE_DONT_WAIT) {
 49                tv.tv_sec = tv.tv_usec = 0;
 50                tvp = &tv;
 51            } else {
 52                /* Otherwise we can block */
 53                tvp = NULL;  // 没有时间事件,无限等待
 54            }
 55        }
 56
 57        /* Call the multiplexing API, will return only on timeout or when
 58         * some event fires. */
 59        // 核心:调用 epoll_wait / select 等待事件
 60        numevents = aeApiPoll(eventLoop, tvp);
 61
 62        /* After sleep callback. */
 63        // 从 poll 返回后执行的回调
 64        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
 65            eventLoop->aftersleep(eventLoop);
 66
 67        // 处理触发的文件事件
 68        for (j = 0; j < numevents; j++) {
 69            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
 70            int mask = eventLoop->fired[j].mask;
 71            int fd = eventLoop->fired[j].fd;
 72            int fired = 0;
 73
 74            /* Normally we execute the readable event first, and the writable
 75             * event laster. This is useful as sometimes we may be able
 76             * to serve the reply of a query immediately after processing the
 77             * query.
 78             *
 79             * However if AE_BARRIER is set in the mask, our application is
 80             * asking us to do the reverse: never fire the writable event
 81             * after the readable. In such a case, we invert the calls.
 82             * This is useful when, for instance, we want to do things
 83             * in the beforeSleep() hook, like fsynching a file to disk,
 84             * before replying to a client. */
 85            int invert = fe->mask & AE_BARRIER;
 86
 87            /* Note the "fe->mask & mask & ..." code: maybe an already
 88             * processed event removed an element that fired and we still
 89             * didn't processed, so we check if the event is still valid.
 90             *
 91             * Fire the readable event if the call sequence is not
 92             * inverted. */
 93            // 正常情况:先执行读回调,再执行写回调
 94            if (!invert && fe->mask & mask & AE_READABLE) {
 95                fe->rfileProc(eventLoop, fd, fe->clientData, mask);
 96                fired++;
 97            }
 98
 99            /* Fire the writable event. */
100            if (fe->mask & mask & AE_WRITABLE) {
101                // 避免读写回调是同一个函数时重复调用
102                if (!fired || fe->wfileProc != fe->rfileProc) {
103                    fe->wfileProc(eventLoop, fd, fe->clientData, mask);
104                    fired++;
105                }
106            }
107
108            /* If we have to invert the call, fire the readable event now
109             * after the writable one. */
110            // 反转情况:先执行写回调,再执行读回调
111            if (invert && fe->mask & mask & AE_READABLE) {
112                if (!fired || fe->wfileProc != fe->rfileProc) {
113                    fe->rfileProc(eventLoop, fd, fe->clientData, mask);
114                    fired++;
115                }
116            }
117
118            processed++;
119        }
120    }
121
122    /* Check time events */
123    // 最后处理时间事件
124    if (flags & AE_TIME_EVENTS)
125        processed += processTimeEvents(eventLoop);
126
127    return processed;
128}

5.1 AE_BARRIER 标志

正常情况下,如果同一个 fd 同时可读可写,先执行读回调再执行写回调。这样读完数据后可以立即写回复,效率高。

但有时候需要反过来:先写再读。比如 AOF fsync 完成后再处理新的请求。这时候设置 AE_BARRIER 标志,让写回调先执行。

5.2 flags 参数

1#define AE_FILE_EVENTS 1
2#define AE_TIME_EVENTS 2
3#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
4#define AE_DONT_WAIT 4
5#define AE_CALL_AFTER_SLEEP 8
  • AE_FILE_EVENTS - 处理文件事件
  • AE_TIME_EVENTS - 处理时间事件
  • AE_DONT_WAIT - 不阻塞,处理完已有事件立即返回
  • AE_CALL_AFTER_SLEEP - 从 poll 返回后调用 aftersleep 回调

六、主循环:aeMain

 1void aeMain(aeEventLoop *eventLoop) {
 2    eventLoop->stop = 0;
 3    while (!eventLoop->stop) {
 4        // 每轮循环前执行,比如处理 AOF 刷盘、集群心跳等
 5        if (eventLoop->beforesleep != NULL)
 6            eventLoop->beforesleep(eventLoop);
 7
 8        // 处理事件
 9        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
10    }
11}

beforesleep 回调在每轮事件循环开始前执行,Redis 在这里做很多工作:

  • 处理 AOF 写入和 fsync
  • 发送集群消息
  • 处理阻塞命令的客户端唤醒
  • 处理 clients_pending_write 列表

七、Redis 中的使用

7.1 初始化事件循环

1// server.c
2server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

7.2 注册监听 socket 的读事件

1for (j = 0; j < server.ipfd_count; j++) {
2    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
3            acceptTcpHandler, NULL) == AE_ERR)
4        serverPanic("Unrecoverable error creating server.ipfd file event.");
5}

acceptTcpHandler 是新连接到来时的回调,accept 后创建 client 结构体,注册该 client socket 的读事件。

7.3 注册 serverCron 时间事件

1if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
2    serverPanic("Can't create event loop timers.");
3    exit(1);
4}

初始 1ms 后触发,之后 serverCron 返回 1000/server.hz(默认 100ms),周期执行。

7.4 设置 beforesleep 回调

1aeSetBeforeSleepProc(server.el, beforeSleep);

7.5 进入主循环

1aeMain(server.el);

八、流程图

 1aeMain()
 2 3  └── while (!stop)
 4 5         ├── beforesleep()         ← 处理 AOF、集群、待写客户端等
 6 7         └── aeProcessEvents()
 8 9                ├── 找最近时间事件,计算超时
1011                ├── aeApiPoll()    ← epoll_wait / select
1213                ├── aftersleep()   ← 可选
1415                ├── 处理文件事件(读回调、写回调)
1617                └── processTimeEvents()  ← 处理时间事件(serverCron)

九、为什么不用 libevent / libev?

Redis 作者 antirez 解释过:

  1. 代码膨胀:libevent 很大,ae.c 只有几百行
  2. 可移植性:ae.c 支持多种平台,编译时自动选择最优实现
  3. 可控性:自己实现的可以精确控制行为,比如 beforesleep/aftersleep 回调
  4. 学习价值:简洁的事件循环实现,方便理解和修改

![image-20260320001012883](../post_img/Redis 事件循环模型全景解析(ae.c)/image-20260320001012883-1773936617966-1.png)

对于 Redis 这种单线程、事件类型单一的程序,ae.c 已经足够用。

— END —