Redis 事件循环模型全景解析(ae.c)
Redis 是单线程的,却能处理大量并发连接,核心在于它的事件循环机制。ae.c 是 Redis 自己实现的一个轻量级事件库,支持 I/O 多路复用和定时器。这篇文章拆解它的实现原理。
一、整体架构
Redis 事件循环处理两类事件:
| 类型 | 说明 | 例子 |
|---|---|---|
| 文件事件(File Event) | socket 可读/可写 | 客户端连接、命令请求、回复发送 |
| 时间事件(Time Event) | 定时执行的回调 | serverCron(定时清理过期 key、持久化等) |
核心数据结构:
1// ae.h
2typedef struct aeEventLoop {
3 int maxfd; // 当前注册的最大 fd,用于优化遍历
4 int setsize; // events 数组大小,等于 maxclients + CONFIG_FDSET_INCR
5 long long timeEventNextId; // 时间事件 ID 生成器
6 time_t lastTime; // 上次处理时间事件的时间,用于检测时钟跳变
7 aeFileEvent *events; // 注册的文件事件数组,下标就是 fd
8 aeFiredEvent *fired; // 已触发的文件事件数组,由 aeApiPoll 填充
9 aeTimeEvent *timeEventHead; // 时间事件链表头节点
10 int stop; // 停止标志
11 void *apidata; // 多路复用 API 的私有数据(epoll 实例等)
12 aeBeforeSleepProc *beforesleep; // 每轮循环前的回调
13 aeBeforeSleepProc *aftersleep; // aeApiPoll 返回后的回调
14} aeEventLoop;
文件事件结构:
1typedef struct aeFileEvent {
2 int mask; // AE_READABLE | AE_WRITABLE | AE_BARRIER
3 aeFileProc *rfileProc; // 读回调
4 aeFileProc *wfileProc; // 写回调
5 void *clientData; // 回调参数,通常是 client 结构体
6} aeFileEvent;
时间事件结构:
1typedef struct aeTimeEvent {
2 long long id; // 唯一标识,用于删除
3 long when_sec; // 触发时间(秒)
4 long when_ms; // 触发时间(毫秒)
5 aeTimeProc *timeProc; // 回调函数
6 aeEventFinalizerProc *finalizerProc; // 删除时的清理回调
7 void *clientData; // 回调参数
8 struct aeTimeEvent *prev; // 双向链表
9 struct aeTimeEvent *next;
10} aeTimeEvent;
二、多路复用抽象层
ae.c 支持多种 I/O 多路复用实现,通过条件编译选择:
ae_epoll.c- Linuxae_kqueue.c- macOS / BSDae_evport.c- Solarisae_select.c- 兜底方案,可移植性最好
每个实现都要提供三个核心函数:
1// ae_epoll.c
2static int aeApiCreate(aeEventLoop *eventLoop);
3static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
4static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask);
5static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
2.1 epoll 实现
1typedef struct aeApiState {
2 int epfd; // epoll 实例 fd
3 struct epoll_event *events; // 事件数组,用于接收返回的事件
4} aeApiState;
5
6static int aeApiCreate(aeEventLoop *eventLoop) {
7 aeApiState *state = zmalloc(sizeof(aeApiState));
8
9 // 创建 epoll 实例,1024 是初始大小,会自动扩展
10 state->epfd = epoll_create(1024);
11 if (state->epfd == -1) return -1;
12
13 // events 数组大小等于 eventLoop->setsize
14 state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
15 eventLoop->apidata = state;
16 return 0;
17}
18
19static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
20 aeApiState *state = eventLoop->apidata;
21 struct epoll_event ee = {0};
22
23 // EPOLL_CTL_ADD 或 EPOLL_CTL_MOD
24 // 如果 fd 已经有事件,用 MOD;否则用 ADD
25 int op = eventLoop->events[fd].mask == AE_NONE ?
26 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
27
28 ee.events = 0;
29 mask |= eventLoop->events[fd].mask; // 合并已有的事件
30 if (mask & AE_READABLE) ee.events |= EPOLLIN;
31 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
32 ee.data.fd = fd;
33
34 if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
35 return 0;
36}
37
38static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
39 aeApiState *state = eventLoop->apidata;
40 int retval, numevents = 0;
41
42 // tvp 是超时时间,由最近的时间事件决定
43 // 如果 tvp 为 NULL,表示无限等待
44 retval = epoll_wait(state->epfd, state->events, eventLoop->setsize,
45 tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
46 if (retval > 0) {
47 int j;
48 numevents = retval;
49 for (j = 0; j < numevents; j++) {
50 int mask = 0;
51 struct epoll_event *e = state->events + j;
52
53 if (e->events & EPOLLIN) mask |= AE_READABLE;
54 if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
55 if (e->events & EPOLLERR) mask |= AE_WRITABLE;
56 if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
57
58 // 把触发的事件存到 fired 数组
59 eventLoop->fired[j].fd = e->data.fd;
60 eventLoop->fired[j].mask = mask;
61 }
62 }
63 return numevents;
64}
三、文件事件管理
3.1 创建事件循环
1aeEventLoop *aeCreateEventLoop(int setsize) {
2 aeEventLoop *eventLoop;
3 int i;
4
5 eventLoop = zmalloc(sizeof(*eventLoop));
6 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
7 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
8 if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
9 eventLoop->setsize = setsize;
10 eventLoop->timeEventHead = NULL;
11 eventLoop->timeEventNextId = 0;
12 eventLoop->stop = 0;
13 eventLoop->maxfd = -1;
14 eventLoop->beforesleep = NULL;
15 eventLoop->aftersleep = NULL;
16 if (aeApiCreate(eventLoop) == -1) goto err;
17
18 // 初始化所有 fd 的事件为 AE_NONE
19 for (i = 0; i < setsize; i++)
20 eventLoop->events[i].mask = AE_NONE;
21
22 return eventLoop;
23
24err:
25 if (eventLoop) {
26 zfree(eventLoop->events);
27 zfree(eventLoop->fired);
28 zfree(eventLoop);
29 }
30 return NULL;
31}
setsize 决定了能监听多少个 fd。Redis 中设置为 maxclients + CONFIG_FDSET_INCR,其中 CONFIG_FDSET_INCR 是预留的额外 fd(用于监听 socket、AOF 文件等)。
3.2 注册文件事件
1int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
2 aeFileProc *proc, void *clientData)
3{
4 if (fd >= eventLoop->setsize) { // fd 超出范围
5 errno = ERANGE;
6 return AE_ERR;
7 }
8
9 aeFileEvent *fe = &eventLoop->events[fd]; // 直接用 fd 做下标,O(1) 访问
10
11 // 调用底层多路复用的 add 接口
12 if (aeApiAddEvent(eventLoop, fd, mask) == -1)
13 return AE_ERR;
14
15 fe->mask |= mask; // 合并 mask,一个 fd 可以同时监听读写
16 if (mask & AE_READABLE) fe->rfileProc = proc;
17 if (mask & AE_WRITABLE) fe->wfileProc = proc;
18 fe->clientData = clientData;
19
20 // 更新 maxfd,用于优化某些场景的遍历
21 if (fd > eventLoop->maxfd)
22 eventLoop->maxfd = fd;
23
24 return AE_OK;
25}
3.3 删除文件事件
1void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
2{
3 if (fd >= eventLoop->setsize) return;
4 aeFileEvent *fe = &eventLoop->events[fd];
5 if (fe->mask == AE_NONE) return; // 本来就没注册
6
7 /* We want to always remove AE_BARRIER if set when AE_WRITABLE
8 * is removed. */
9 if (mask & AE_WRITABLE) mask |= AE_BARRIER; // AE_BARRIER 是附加在写事件上的标志
10
11 // 从底层多路复用中移除
12 aeApiDelEvent(eventLoop, fd, mask);
13
14 fe->mask = fe->mask & (~mask); // 清除对应的 mask 位
15
16 // 如果这个 fd 的所有事件都删了,且它恰好是 maxfd,需要重新找 maxfd
17 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
18 int j;
19 for (j = eventLoop->maxfd-1; j >= 0; j--)
20 if (eventLoop->events[j].mask != AE_NONE) break;
21 eventLoop->maxfd = j;
22 }
23}
四、时间事件管理
时间事件用双向链表组织,没有排序。每次找最近要触发的定时器需要遍历整个链表——O(N) 复杂度。注释里提到可以用跳表优化,但 Redis 的时间事件很少(主要就是 serverCron),所以没这个必要。
4.1 创建时间事件
1long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
2 aeTimeProc *proc, void *clientData,
3 aeEventFinalizerProc *finalizerProc)
4{
5 long long id = eventLoop->timeEventNextId++;
6 aeTimeEvent *te;
7
8 te = zmalloc(sizeof(*te));
9 if (te == NULL) return AE_ERR;
10
11 te->id = id;
12 // 计算触发时间:当前时间 + milliseconds
13 aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms);
14 te->timeProc = proc;
15 te->finalizerProc = finalizerProc;
16 te->clientData = clientData;
17
18 // 头插法插入链表
19 te->prev = NULL;
20 te->next = eventLoop->timeEventHead;
21 if (te->next)
22 te->next->prev = te;
23 eventLoop->timeEventHead = te;
24
25 return id;
26}
4.2 删除时间事件
删除不是立即释放,而是标记为 AE_DELETED_EVENT_ID,在下一轮 processTimeEvents 中清理:
1int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
2{
3 aeTimeEvent *te = eventLoop->timeEventHead;
4 while(te) {
5 if (te->id == id) {
6 te->id = AE_DELETED_EVENT_ID; // 标记删除
7 return AE_OK;
8 }
9 te = te->next;
10 }
11 return AE_ERR; // 没找到
12}
4.3 处理时间事件
1static int processTimeEvents(aeEventLoop *eventLoop) {
2 int processed = 0;
3 aeTimeEvent *te;
4 long long maxId;
5 time_t now = time(NULL);
6
7 /* If the system clock is moved to the future, and then set back to the
8 * right value, time events may be delayed in a random way. Often this
9 * means that scheduled operations will not be performed soon enough.
10 *
11 * Here we try to detect system clock skews, and force all the time
12 * events to be processed ASAP when this happens: the idea is that
13 * processing events earlier is less dangerous than delaying them
14 * indefinitely, and practice suggests it is. */
15 // 检测系统时钟回拨:如果当前时间比上次处理时间还早,说明时钟被调回去了
16 // 这种情况下把所有时间事件的触发时间设为 0,立即执行
17 if (now < eventLoop->lastTime) {
18 te = eventLoop->timeEventHead;
19 while(te) {
20 te->when_sec = 0; // 立即触发
21 te = te->next;
22 }
23 }
24 eventLoop->lastTime = now;
25
26 te = eventLoop->timeEventHead;
27 maxId = eventLoop->timeEventNextId - 1; // 记录当前最大 ID
28 while(te) {
29 long now_sec, now_ms;
30 long long id;
31
32 /* Remove events scheduled for deletion. */
33 // 清理标记为删除的事件
34 if (te->id == AE_DELETED_EVENT_ID) {
35 aeTimeEvent *next = te->next;
36 // 从链表中摘除
37 if (te->prev)
38 te->prev->next = te->next;
39 else
40 eventLoop->timeEventHead = te->next;
41 if (te->next)
42 te->next->prev = te->prev;
43 // 调用清理回调
44 if (te->finalizerProc)
45 te->finalizerProc(eventLoop, te->clientData);
46 zfree(te);
47 te = next;
48 continue;
49 }
50
51 /* Make sure we don't process time events created by time events in
52 * this iteration. Note that this check is currently useless: we always
53 * add new timers on the head, however if we change the implementation
54 * detail, this check may be useful again: we keep it here for future
55 * defense. */
56 // 防止在本轮循环中处理新创建的时间事件
57 // 因为新事件是头插的,且 id 更大,所以跳过 id > maxId 的
58 if (te->id > maxId) {
59 te = te->next;
60 continue;
61 }
62
63 aeGetTime(&now_sec, &now_ms);
64 // 检查是否到达触发时间
65 if (now_sec > te->when_sec ||
66 (now_sec == te->when_sec && now_ms >= te->when_ms))
67 {
68 int retval;
69
70 id = te->id;
71 // 执行回调,返回值是下次触发的间隔(毫秒)
72 retval = te->timeProc(eventLoop, id, te->clientData);
73 processed++;
74
75 if (retval != AE_NOMORE) {
76 // 返回值是正数,表示周期性事件,重新计算下次触发时间
77 aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms);
78 } else {
79 // 返回 AE_NOMORE,表示一次性事件,标记删除
80 te->id = AE_DELETED_EVENT_ID;
81 }
82 }
83 te = te->next;
84 }
85 return processed;
86}
这里有个细节:时间事件的回调返回值决定了它是周期性的还是一次性的。serverCron 返回 1000/server.hz,默认 hz=10,所以每 100ms 执行一次;如果配置 hz 100,就每 10ms 执行一次。
五、事件循环核心:aeProcessEvents
1int aeProcessEvents(aeEventLoop *eventLoop, int flags)
2{
3 int processed = 0, numevents;
4
5 /* Nothing to do? return ASAP */
6 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
7
8 /* Note that we want call select() even if there are no
9 * file events to process as long as we want to process time
10 * events, in order to sleep until the next time event is ready
11 * to fire. */
12 // 即使没有文件事件,如果有时间事件也要调用 poll
13 // 因为需要睡眠等待时间事件触发
14 if (eventLoop->maxfd != -1 ||
15 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
16 int j;
17 aeTimeEvent *shortest = NULL;
18 struct timeval tv, *tvp;
19
20 // 找最近的时间事件,计算 poll 超时时间
21 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
22 shortest = aeSearchNearestTimer(eventLoop);
23
24 if (shortest) {
25 long now_sec, now_ms;
26
27 aeGetTime(&now_sec, &now_ms);
28 tvp = &tv;
29
30 /* How many milliseconds we need to wait for the next
31 * time event to fire? */
32 long long ms =
33 (shortest->when_sec - now_sec)*1000 +
34 shortest->when_ms - now_ms;
35
36 if (ms > 0) {
37 tvp->tv_sec = ms/1000;
38 tvp->tv_usec = (ms % 1000)*1000;
39 } else {
40 // 已经过期了,超时设为 0,立即返回
41 tvp->tv_sec = 0;
42 tvp->tv_usec = 0;
43 }
44 } else {
45 /* If we have to check for events but need to return
46 * ASAP because of AE_DONT_WAIT we need to set the timeout
47 * to zero */
48 if (flags & AE_DONT_WAIT) {
49 tv.tv_sec = tv.tv_usec = 0;
50 tvp = &tv;
51 } else {
52 /* Otherwise we can block */
53 tvp = NULL; // 没有时间事件,无限等待
54 }
55 }
56
57 /* Call the multiplexing API, will return only on timeout or when
58 * some event fires. */
59 // 核心:调用 epoll_wait / select 等待事件
60 numevents = aeApiPoll(eventLoop, tvp);
61
62 /* After sleep callback. */
63 // 从 poll 返回后执行的回调
64 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
65 eventLoop->aftersleep(eventLoop);
66
67 // 处理触发的文件事件
68 for (j = 0; j < numevents; j++) {
69 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
70 int mask = eventLoop->fired[j].mask;
71 int fd = eventLoop->fired[j].fd;
72 int fired = 0;
73
74 /* Normally we execute the readable event first, and the writable
75 * event laster. This is useful as sometimes we may be able
76 * to serve the reply of a query immediately after processing the
77 * query.
78 *
79 * However if AE_BARRIER is set in the mask, our application is
80 * asking us to do the reverse: never fire the writable event
81 * after the readable. In such a case, we invert the calls.
82 * This is useful when, for instance, we want to do things
83 * in the beforeSleep() hook, like fsynching a file to disk,
84 * before replying to a client. */
85 int invert = fe->mask & AE_BARRIER;
86
87 /* Note the "fe->mask & mask & ..." code: maybe an already
88 * processed event removed an element that fired and we still
89 * didn't processed, so we check if the event is still valid.
90 *
91 * Fire the readable event if the call sequence is not
92 * inverted. */
93 // 正常情况:先执行读回调,再执行写回调
94 if (!invert && fe->mask & mask & AE_READABLE) {
95 fe->rfileProc(eventLoop, fd, fe->clientData, mask);
96 fired++;
97 }
98
99 /* Fire the writable event. */
100 if (fe->mask & mask & AE_WRITABLE) {
101 // 避免读写回调是同一个函数时重复调用
102 if (!fired || fe->wfileProc != fe->rfileProc) {
103 fe->wfileProc(eventLoop, fd, fe->clientData, mask);
104 fired++;
105 }
106 }
107
108 /* If we have to invert the call, fire the readable event now
109 * after the writable one. */
110 // 反转情况:先执行写回调,再执行读回调
111 if (invert && fe->mask & mask & AE_READABLE) {
112 if (!fired || fe->wfileProc != fe->rfileProc) {
113 fe->rfileProc(eventLoop, fd, fe->clientData, mask);
114 fired++;
115 }
116 }
117
118 processed++;
119 }
120 }
121
122 /* Check time events */
123 // 最后处理时间事件
124 if (flags & AE_TIME_EVENTS)
125 processed += processTimeEvents(eventLoop);
126
127 return processed;
128}
5.1 AE_BARRIER 标志
正常情况下,如果同一个 fd 同时可读可写,先执行读回调再执行写回调。这样读完数据后可以立即写回复,效率高。
但有时候需要反过来:先写再读。比如 AOF fsync 完成后再处理新的请求。这时候设置 AE_BARRIER 标志,让写回调先执行。
5.2 flags 参数
1#define AE_FILE_EVENTS 1
2#define AE_TIME_EVENTS 2
3#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
4#define AE_DONT_WAIT 4
5#define AE_CALL_AFTER_SLEEP 8
AE_FILE_EVENTS- 处理文件事件AE_TIME_EVENTS- 处理时间事件AE_DONT_WAIT- 不阻塞,处理完已有事件立即返回AE_CALL_AFTER_SLEEP- 从 poll 返回后调用 aftersleep 回调
六、主循环:aeMain
1void aeMain(aeEventLoop *eventLoop) {
2 eventLoop->stop = 0;
3 while (!eventLoop->stop) {
4 // 每轮循环前执行,比如处理 AOF 刷盘、集群心跳等
5 if (eventLoop->beforesleep != NULL)
6 eventLoop->beforesleep(eventLoop);
7
8 // 处理事件
9 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
10 }
11}
beforesleep 回调在每轮事件循环开始前执行,Redis 在这里做很多工作:
- 处理 AOF 写入和 fsync
- 发送集群消息
- 处理阻塞命令的客户端唤醒
- 处理 clients_pending_write 列表
七、Redis 中的使用
7.1 初始化事件循环
1// server.c
2server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
7.2 注册监听 socket 的读事件
1for (j = 0; j < server.ipfd_count; j++) {
2 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
3 acceptTcpHandler, NULL) == AE_ERR)
4 serverPanic("Unrecoverable error creating server.ipfd file event.");
5}
acceptTcpHandler 是新连接到来时的回调,accept 后创建 client 结构体,注册该 client socket 的读事件。
7.3 注册 serverCron 时间事件
1if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
2 serverPanic("Can't create event loop timers.");
3 exit(1);
4}
初始 1ms 后触发,之后 serverCron 返回 1000/server.hz(默认 100ms),周期执行。
7.4 设置 beforesleep 回调
1aeSetBeforeSleepProc(server.el, beforeSleep);
7.5 进入主循环
1aeMain(server.el);
八、流程图
1aeMain()
2 │
3 └── while (!stop)
4 │
5 ├── beforesleep() ← 处理 AOF、集群、待写客户端等
6 │
7 └── aeProcessEvents()
8 │
9 ├── 找最近时间事件,计算超时
10 │
11 ├── aeApiPoll() ← epoll_wait / select
12 │
13 ├── aftersleep() ← 可选
14 │
15 ├── 处理文件事件(读回调、写回调)
16 │
17 └── processTimeEvents() ← 处理时间事件(serverCron)
九、为什么不用 libevent / libev?
Redis 作者 antirez 解释过:
- 代码膨胀:libevent 很大,ae.c 只有几百行
- 可移植性:ae.c 支持多种平台,编译时自动选择最优实现
- 可控性:自己实现的可以精确控制行为,比如 beforesleep/aftersleep 回调
- 学习价值:简洁的事件循环实现,方便理解和修改
/image-20260320001012883-1773936617966-1.png)
对于 Redis 这种单线程、事件类型单一的程序,ae.c 已经足够用。