Redis Reactor 模型源码实现解析(上)
Redis 以单线程扛住海量并发(单机10万QPS)而闻名,其核心秘密就藏在 Reactor 模型的实现中。本文从源码角度深入剖析 Redis 是如何实现高效的事件驱动模型的。
一、什么是 Reactor 模型
Reactor 模型是一种事件驱动的设计模式,核心思想是:
- 单线程事件循环:一个死循环不断检测事件
- IO 多路复用:用一个线程同时监听多个连接
- 回调机制:事件发生时调用对应的处理器
Redis 的实现简洁优雅,核心代码不过 500 行,却支撑了每秒十万级的请求处理。
二、核心数据结构
2.1 事件循环主体:aeEventLoop
1// src/ae.h:97
2typedef struct aeEventLoop {
3 int maxfd; // 当前注册的最大 fd
4 int setsize; // 最多能跟踪多少个 fd
5 long long timeEventNextId; // 时间事件 ID 生成器
6 time_t lastTime; // 用于检测系统时钟跳变
7 aeFileEvent *events; // 注册的文件事件数组(下标即 fd)
8 aeFiredEvent *fired; // 已触发的事件数组
9 aeTimeEvent *timeEventHead; // 时间事件链表头
10 int stop; // 停止标志
11 void *apidata; // 多路复用私有数据(epoll/kqueue 等)
12 aeBeforeSleepProc *beforesleep; // 每轮循环前的回调
13 aeBeforeSleepProc *aftersleep; // 每轮循环后的回调
14} aeEventLoop;
- events是数组:下标就是fd,O(1)时间定位事件,这是一个精妙的"空间换时间"设计
- fired存触发结果:poll返回后填充,避免重复计算
- 时间事件用链表:数量少,简单够用
2.2 文件事件:aeFileEvent
1// src/ae.h:71
2typedef struct aeFileEvent {
3 int mask; // AE_READABLE或AE_WRITABLE或AE_BARRIER
4 aeFileProc *rfileProc; // 读回调
5 aeFileProc *wfileProc; // 写回调
6 void *clientData; // 私有数据
7} aeFileEvent;
一个fd可以同时监听读写,所以有两个回调函数指针。
2.3 时间事件:aeTimeEvent
1// src/ae.h:79
2typedef struct aeTimeEvent {
3 long long id; // 事件标识
4 long when_sec; // 触发时间(秒)
5 long when_ms; // 触发时间(毫秒)
6 aeTimeProc *timeProc; // 回调函数
7 aeEventFinalizerProc *finalizerProc;// 删除时的清理函数
8 void *clientData;
9 struct aeTimeEvent *prev;
10 struct aeTimeEvent *next;
11} aeTimeEvent;
双向链表节点。回调返回值决定事件生命周期:
- 返回
AE_NOMORE (-1):一次性事件,执行后删除 - 返回正数:周期性事件,返回值为下次触发的毫秒延迟
2.4 触发事件:aeFiredEvent
1// src/ae.h:91
2typedef struct aeFiredEvent {
3 int fd;
4 int mask;
5} aeFiredEvent;
轻量结构,只存fd和事件类型。
三、多路复用抽象层
Redis 支持4种多路复用机制,按性能优先级选择:
1// src/ae.c:49
2#ifdef HAVE_EVPORT
3#include "ae_evport.c" // Solaris
4#else
5 #ifdef HAVE_EPOLL
6 #include "ae_epoll.c" // Linux
7 #else
8 #ifdef HAVE_KQUEUE
9 #include "ae_kqueue.c" // BSD/macOS
10 #else
11 #include "ae_select.c" // 兜底方案
12 #endif
13 #endif
14#endif
3.1 统一接口
所有平台实现相同的5个接口:
1static int aeApiCreate(aeEventLoop *eventLoop); // 初始化
2static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask); // 添加事件
3static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask); // 删除事件
4static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp); // 等待事件
5static char *aeApiName(void); // 返回"epoll"、"kqueue"等
3.2 epoll实现解析
1// src/ae_epoll.c:34
2typedef struct aeApiState {
3 int epfd; // epoll实例fd
4 struct epoll_event *events; // 返回事件数组
5} aeApiState;
添加事件的智能判断:
1// src/ae_epoll.c:78
2int op = eventLoop->events[fd].mask == AE_NONE ?
3 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
如果这个fd之前没监听过,用 ADD;如果已经监听了读,现在又要加写,用 MOD。
等待事件:
1// src/ae_epoll.c:112
2retval = epoll_wait(state->epfd, state->events, eventLoop->setsize,
3 tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
超时时间tvp是关键参数,后面会讲如何计算。
四、事件循环主流程
4.1 入口函数:aeMain
1// src/ae.c:496
2void aeMain(aeEventLoop *eventLoop) {
3 eventLoop->stop = 0;
4 while (!eventLoop->stop) {
5 if (eventLoop->beforesleep != NULL)
6 eventLoop->beforesleep(eventLoop);
7 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
8 }
9}
整个Redis进程的生命周期就在这个while循环里。每轮循环会执行下面两个过程:
- 执行
beforesleep回调 - 处理所有事件
4.2 核心函数:aeProcessEvents
这是事件循环的心脏,分成了四个阶段:
阶段一:计算阻塞超时
这段代码的核心目的:计算 epoll_wait 应该阻塞多久。
1// src/ae.c:375
2
3// 只有同时满足两个条件才查找最近的时间事件:
4// 1. flags包含AE_TIME_EVENTS(需要处理时间事件)
5// 2. flags不包含AE_DONT_WAIT(允许阻塞等待)
6if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
7 shortest = aeSearchNearestTimer(eventLoop);
8
9if (shortest) {
10 // 找到了最近要触发的时间事件
11 long now_sec, now_ms;
12 aeGetTime(&now_sec, &now_ms); // 获取当前时间
13
14 // 计算距离触发还有多少毫秒
15 long long ms =
16 (shortest->when_sec - now_sec)*1000 +
17 shortest->when_ms - now_ms;
18
19 if (ms > 0) {
20 // 还没到触发时间,阻塞等待这段时长
21 tvp->tv_sec = ms/1000;
22 tvp->tv_usec = (ms % 1000)*1000;
23 } else {
24 // ms<=0 说明时间事件已经到期了
25 // 设置为0 表示epoll_wait立即返回,不阻塞
26 // 这样可以马上处理到期的时间事件
27 tvp->tv_sec = 0;
28 tvp->tv_usec = 0;
29 }
30} else {
31 // 没有时间事件,或调用者要求不等待
32 if (flags & AE_DONT_WAIT) {
33 // 调用者要求立即返回,设置超时为0
34 tv.tv_sec = tv.tv_usec = 0;
35 tvp = &tv;
36 } else {
37 // 没有时间事件,也没有网络事件时可以无限阻塞
38 // tvp=NULL传给epoll_wait表示永久等待
39 tvp = NULL;
40 }
41}
三种阻塞策略总结:
| 场景 | tvp 值 | epoll_wait 行为 |
|---|---|---|
| 有时间事件,未到期 | 剩余毫秒 | 阻塞到最近触发时间 |
| 有时间事件,已到期 | 0 | 立即返回 |
| 无时间事件,允许等待 | NULL | 永久阻塞直到有网络事件 |
| 无时间事件,不允许等待 | 0 | 立即返回 |
这样设计保证了:
- 无请求时不空转:没有网络事件就阻塞,不浪费 CPU
- 有时间任务时不延误:阻塞时间精确到最近时间事件,保证定时任务准时执行
阶段二:调用多路复用
1// src/ae.c:411
2numevents = aeApiPoll(eventLoop, tvp);
Linux 下就是 epoll_wait,返回触发的fd数量。
阶段三:处理文件事件
1// src/ae.c:417
2for (j = 0; j < numevents; j++) {
3 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
4 int mask = eventLoop->fired[j].mask;
5 int fd = eventLoop->fired[j].fd;
6 int fired = 0;
7
8 int invert = fe->mask & AE_BARRIER;
9
10 // 先处理读事件(除非设置了BARRIER)
11 if (!invert && fe->mask & mask & AE_READABLE) {
12 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
13 fired++;
14 }
15
16 // 处理写事件
17 if (fe->mask & mask & AE_WRITABLE) {
18 if (!fired || fe->wfileProc != fe->rfileProc) {
19 fe->wfileProc(eventLoop,fd,fe->clientData,mask);
20 fired++;
21 }
22 }
23
24 // BARRIER模式:最后处理读事件
25 if (invert && fe->mask & mask & AE_READABLE) {
26 if (!fired || fe->wfileProc != fe->rfileProc) {
27 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
28 fired++;
29 }
30 }
31 processed++;
32}
AE_BARRIER 标志很巧妙:正常先读后写(收到请求立即回复),但AOF fsync=always 时需要反过来——先在 beforesleep 里fsync,再发送回复,保证数据落盘后才响应。
判断 fe->wfileProc != fe->rfileProc 防止同一回调被执行两次。
阶段四:处理时间事件
1// src/ae.c:468
2if (flags & AE_TIME_EVENTS)
3 processed += processTimeEvents(eventLoop);
五、时间事件处理
5.1 时钟跳变处理
1// src/ae.c:284
2if (now < eventLoop->lastTime) {
3 te = eventLoop->timeEventHead;
4 while(te) {
5 te->when_sec = 0; // 立即触发
6 te = te->next;
7 }
8}
9eventLoop->lastTime = now;
如果系统时间倒退(NTP同步等),把所有时间事件设为立即触发。在代码注释中可以看到:“提前执行总比无限延迟好”,哈哈。
5.2 懒删除机制
1// src/ae.c:300
2if (te->id == AE_DELETED_EVENT_ID) {
3 aeTimeEvent *next = te->next;
4 // 从链表移除并释放
5 ...
6 te = next;
7 continue;
8}
删除时间事件只是标记 id = AE_DELETED_EVENT_ID,真正释放延后到下次遍历。
5.3 周期性事件处理
1// src/ae.c:331
2retval = te->timeProc(eventLoop, id, te->clientData);
3if (retval != AE_NOMORE) {
4 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
5} else {
6 te->id = AE_DELETED_EVENT_ID;
7}
回调返回的结果如果是正数,则表示是下次触发的毫秒延迟,返回 AE_NOMORE 则标记删除。
六、事件注册
6.1 文件事件注册
1// src/ae.c:136
2int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
3 aeFileProc *proc, void *clientData)
4{
5 // 边界检查:fd不能超过事件循环的最大容量
6 if (fd >= eventLoop->setsize) {
7 errno = ERANGE;
8 return AE_ERR;
9 }
10
11 // 通过fd直接定位到对应的事件结构(O(1))
12 aeFileEvent *fe = &eventLoop->events[fd];
13
14 // 注册到多路复用(epoll_ctl/kqueue等)
15 if (aeApiAddEvent(eventLoop, fd, mask) == -1)
16 return AE_ERR;
17
18 // 更新事件掩码(可能同时监听读写)
19 fe->mask |= mask;
20
21 // 根据事件类型设置对应的回调函数
22 if (mask & AE_READABLE) fe->rfileProc = proc;
23 if (mask & AE_WRITABLE) fe->wfileProc = proc;
24
25 // 保存私有数据,回调时可取出使用
26 fe->clientData = clientData;
27
28 // 更新最大fd,某些场景需要遍历[0, maxfd]
29 if (fd > eventLoop->maxfd)
30 eventLoop->maxfd = fd;
31
32 return AE_OK;
33}
1检查fd越限 → 注册到多路复用 → 设置回调 → 更新maxfd。
6.2 时间事件注册
1// src/ae.c:208
2long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
3 aeTimeProc *proc, void *clientData,
4 aeEventFinalizerProc *finalizerProc)
5{
6 // 生成唯一的事件ID,使用后立即自增
7 long long id = eventLoop->timeEventNextId++;
8 aeTimeEvent *te;
9
10 // 分配时间事件结构
11 te = zmalloc(sizeof(*te));
12 te->id = id;
13
14 // 计算触发时间:当前时间 + 延迟毫秒数
15 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
16
17 // 设置回调函数和私有数据
18 te->timeProc = proc; // 事件触发时调用的回调
19 te->finalizerProc = finalizerProc; // 事件删除时的清理函数(可传NULL)
20 te->clientData = clientData; // 传递给回调的私有数据
21
22 // 头插法插入链表
23 te->prev = NULL;
24 te->next = eventLoop->timeEventHead;
25 if (te->next)
26 te->next->prev = te;
27 eventLoop->timeEventHead = te;
28
29 return id; // 返回事件ID,可用于后续删除
30}
头插法插入链表,返回事件ID供后续删除使用。
至此,我们已经完成了Redis Reactor模型核心机制的源码分析:
- 数据结构:aeEventLoop、aeFileEvent、aeTimeEvent 如何组织文件事件和时间事件
- 多路复用:统一的抽象接口如何适配epoll/kqueue/select
- 事件循环:aeMain和aeProcessEvents的四阶段处理流程
- 阻塞策略:如何根据时间事件动态计算阻塞超时
- 事件注册:文件事件和时间事件的注册与删除机制
这些是理解 Redis 事件驱动模型的基石。在《Redis Reactor 模型源码实现解析(下)》中,我们将继续分析 Redis 中的实际应用、性能优化技巧、AE_BARRIER 深度解析、与典型 Reactor 对比等内容。