Redis Reactor 模型源码实现解析(上)

Redis 以单线程扛住海量并发(单机10万QPS)而闻名,其核心秘密就藏在 Reactor 模型的实现中。本文从源码角度深入剖析 Redis 是如何实现高效的事件驱动模型的。

一、什么是 Reactor 模型

Reactor 模型是一种事件驱动的设计模式,核心思想是:

  1. 单线程事件循环:一个死循环不断检测事件
  2. IO 多路复用:用一个线程同时监听多个连接
  3. 回调机制:事件发生时调用对应的处理器

Redis 的实现简洁优雅,核心代码不过 500 行,却支撑了每秒十万级的请求处理。

二、核心数据结构

2.1 事件循环主体:aeEventLoop

 1// src/ae.h:97
 2typedef struct aeEventLoop {
 3    int maxfd;                      // 当前注册的最大 fd
 4    int setsize;                    // 最多能跟踪多少个 fd
 5    long long timeEventNextId;      // 时间事件 ID 生成器
 6    time_t lastTime;                // 用于检测系统时钟跳变
 7    aeFileEvent *events;            // 注册的文件事件数组(下标即 fd)
 8    aeFiredEvent *fired;            // 已触发的事件数组
 9    aeTimeEvent *timeEventHead;     // 时间事件链表头
10    int stop;                       // 停止标志
11    void *apidata;                  // 多路复用私有数据(epoll/kqueue 等)
12    aeBeforeSleepProc *beforesleep; // 每轮循环前的回调
13    aeBeforeSleepProc *aftersleep;  // 每轮循环后的回调
14} aeEventLoop;
  • events是数组:下标就是fd,O(1)时间定位事件,这是一个精妙的"空间换时间"设计
  • fired存触发结果:poll返回后填充,避免重复计算
  • 时间事件用链表:数量少,简单够用

2.2 文件事件:aeFileEvent

1// src/ae.h:71
2typedef struct aeFileEvent {
3    int mask;               // AE_READABLE或AE_WRITABLE或AE_BARRIER
4    aeFileProc *rfileProc;  // 读回调
5    aeFileProc *wfileProc;  // 写回调
6    void *clientData;       // 私有数据
7} aeFileEvent;

一个fd可以同时监听读写,所以有两个回调函数指针。

2.3 时间事件:aeTimeEvent

 1// src/ae.h:79
 2typedef struct aeTimeEvent {
 3    long long id;                       // 事件标识
 4    long when_sec;                      // 触发时间(秒)
 5    long when_ms;                       // 触发时间(毫秒)
 6    aeTimeProc *timeProc;               // 回调函数
 7    aeEventFinalizerProc *finalizerProc;// 删除时的清理函数
 8    void *clientData;
 9    struct aeTimeEvent *prev;
10    struct aeTimeEvent *next;
11} aeTimeEvent;

双向链表节点。回调返回值决定事件生命周期:

  • 返回 AE_NOMORE (-1):一次性事件,执行后删除
  • 返回正数:周期性事件,返回值为下次触发的毫秒延迟

2.4 触发事件:aeFiredEvent

1// src/ae.h:91
2typedef struct aeFiredEvent {
3    int fd;
4    int mask;
5} aeFiredEvent;

轻量结构,只存fd和事件类型。

三、多路复用抽象层

Redis 支持4种多路复用机制,按性能优先级选择:

 1// src/ae.c:49
 2#ifdef HAVE_EVPORT
 3#include "ae_evport.c"   // Solaris
 4#else
 5    #ifdef HAVE_EPOLL
 6    #include "ae_epoll.c"   // Linux
 7    #else
 8        #ifdef HAVE_KQUEUE
 9        #include "ae_kqueue.c"  // BSD/macOS
10        #else
11        #include "ae_select.c"  // 兜底方案
12        #endif
13    #endif
14#endif

3.1 统一接口

所有平台实现相同的5个接口:

1static int aeApiCreate(aeEventLoop *eventLoop);                         // 初始化
2static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);     // 添加事件
3static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask); // 删除事件
4static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);      // 等待事件
5static char *aeApiName(void);                                           // 返回"epoll"、"kqueue"等

3.2 epoll实现解析

1// src/ae_epoll.c:34
2typedef struct aeApiState {
3    int epfd;                    // epoll实例fd
4    struct epoll_event *events;  // 返回事件数组
5} aeApiState;

添加事件的智能判断

1// src/ae_epoll.c:78
2int op = eventLoop->events[fd].mask == AE_NONE ?
3        EPOLL_CTL_ADD : EPOLL_CTL_MOD;

如果这个fd之前没监听过,用 ADD;如果已经监听了读,现在又要加写,用 MOD

等待事件

1// src/ae_epoll.c:112
2retval = epoll_wait(state->epfd, state->events, eventLoop->setsize,
3        tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

超时时间tvp是关键参数,后面会讲如何计算。

四、事件循环主流程

4.1 入口函数:aeMain

1// src/ae.c:496
2void aeMain(aeEventLoop *eventLoop) {
3    eventLoop->stop = 0;
4    while (!eventLoop->stop) {
5        if (eventLoop->beforesleep != NULL)
6            eventLoop->beforesleep(eventLoop);
7        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
8    }
9}

整个Redis进程的生命周期就在这个while循环里。每轮循环会执行下面两个过程:

  1. 执行 beforesleep 回调
  2. 处理所有事件

4.2 核心函数:aeProcessEvents

这是事件循环的心脏,分成了四个阶段:

阶段一:计算阻塞超时

这段代码的核心目的:计算 epoll_wait 应该阻塞多久

 1// src/ae.c:375
 2
 3// 只有同时满足两个条件才查找最近的时间事件:
 4// 1. flags包含AE_TIME_EVENTS(需要处理时间事件)
 5// 2. flags不包含AE_DONT_WAIT(允许阻塞等待)
 6if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
 7    shortest = aeSearchNearestTimer(eventLoop);
 8
 9if (shortest) {
10    // 找到了最近要触发的时间事件
11    long now_sec, now_ms;
12    aeGetTime(&now_sec, &now_ms);  // 获取当前时间
13    
14    // 计算距离触发还有多少毫秒
15    long long ms =
16        (shortest->when_sec - now_sec)*1000 +
17        shortest->when_ms - now_ms;
18
19    if (ms > 0) {
20        // 还没到触发时间,阻塞等待这段时长
21        tvp->tv_sec = ms/1000;
22        tvp->tv_usec = (ms % 1000)*1000;
23    } else {
24        // ms<=0 说明时间事件已经到期了
25        // 设置为0 表示epoll_wait立即返回,不阻塞
26        // 这样可以马上处理到期的时间事件
27        tvp->tv_sec = 0;
28        tvp->tv_usec = 0;
29    }
30} else {
31    // 没有时间事件,或调用者要求不等待
32    if (flags & AE_DONT_WAIT) {
33        // 调用者要求立即返回,设置超时为0
34        tv.tv_sec = tv.tv_usec = 0;
35        tvp = &tv;
36    } else {
37        // 没有时间事件,也没有网络事件时可以无限阻塞
38        // tvp=NULL传给epoll_wait表示永久等待
39        tvp = NULL;
40    }
41}

三种阻塞策略总结

场景 tvp 值 epoll_wait 行为
有时间事件,未到期 剩余毫秒 阻塞到最近触发时间
有时间事件,已到期 0 立即返回
无时间事件,允许等待 NULL 永久阻塞直到有网络事件
无时间事件,不允许等待 0 立即返回

这样设计保证了:

  • 无请求时不空转:没有网络事件就阻塞,不浪费 CPU
  • 有时间任务时不延误:阻塞时间精确到最近时间事件,保证定时任务准时执行

阶段二:调用多路复用

1// src/ae.c:411
2numevents = aeApiPoll(eventLoop, tvp);

Linux 下就是 epoll_wait,返回触发的fd数量。

阶段三:处理文件事件

 1// src/ae.c:417
 2for (j = 0; j < numevents; j++) {
 3    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
 4    int mask = eventLoop->fired[j].mask;
 5    int fd = eventLoop->fired[j].fd;
 6    int fired = 0;
 7
 8    int invert = fe->mask & AE_BARRIER;
 9
10    // 先处理读事件(除非设置了BARRIER)
11    if (!invert && fe->mask & mask & AE_READABLE) {
12        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
13        fired++;
14    }
15
16    // 处理写事件
17    if (fe->mask & mask & AE_WRITABLE) {
18        if (!fired || fe->wfileProc != fe->rfileProc) {
19            fe->wfileProc(eventLoop,fd,fe->clientData,mask);
20            fired++;
21        }
22    }
23
24    // BARRIER模式:最后处理读事件
25    if (invert && fe->mask & mask & AE_READABLE) {
26        if (!fired || fe->wfileProc != fe->rfileProc) {
27            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
28            fired++;
29        }
30    }
31    processed++;
32}

AE_BARRIER 标志很巧妙:正常先读后写(收到请求立即回复),但AOF fsync=always 时需要反过来——先在 beforesleep 里fsync,再发送回复,保证数据落盘后才响应。

判断 fe->wfileProc != fe->rfileProc 防止同一回调被执行两次。

阶段四:处理时间事件

1// src/ae.c:468
2if (flags & AE_TIME_EVENTS)
3    processed += processTimeEvents(eventLoop);

五、时间事件处理

5.1 时钟跳变处理

1// src/ae.c:284
2if (now < eventLoop->lastTime) {
3    te = eventLoop->timeEventHead;
4    while(te) {
5        te->when_sec = 0;  // 立即触发
6        te = te->next;
7    }
8}
9eventLoop->lastTime = now;

如果系统时间倒退(NTP同步等),把所有时间事件设为立即触发。在代码注释中可以看到:“提前执行总比无限延迟好”,哈哈。

5.2 懒删除机制

1// src/ae.c:300
2if (te->id == AE_DELETED_EVENT_ID) {
3    aeTimeEvent *next = te->next;
4    // 从链表移除并释放
5    ...
6    te = next;
7    continue;
8}

删除时间事件只是标记 id = AE_DELETED_EVENT_ID,真正释放延后到下次遍历。

5.3 周期性事件处理

1// src/ae.c:331
2retval = te->timeProc(eventLoop, id, te->clientData);
3if (retval != AE_NOMORE) {
4    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
5} else {
6    te->id = AE_DELETED_EVENT_ID;
7}

回调返回的结果如果是正数,则表示是下次触发的毫秒延迟,返回 AE_NOMORE 则标记删除。

六、事件注册

6.1 文件事件注册

 1// src/ae.c:136
 2int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
 3        aeFileProc *proc, void *clientData)
 4{
 5    // 边界检查:fd不能超过事件循环的最大容量
 6    if (fd >= eventLoop->setsize) {
 7        errno = ERANGE;
 8        return AE_ERR;
 9    }
10    
11    // 通过fd直接定位到对应的事件结构(O(1))
12    aeFileEvent *fe = &eventLoop->events[fd];
13
14    // 注册到多路复用(epoll_ctl/kqueue等)
15    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
16        return AE_ERR;
17    
18    // 更新事件掩码(可能同时监听读写)
19    fe->mask |= mask;
20    
21    // 根据事件类型设置对应的回调函数
22    if (mask & AE_READABLE) fe->rfileProc = proc;
23    if (mask & AE_WRITABLE) fe->wfileProc = proc;
24    
25    // 保存私有数据,回调时可取出使用
26    fe->clientData = clientData;
27    
28    // 更新最大fd,某些场景需要遍历[0, maxfd]
29    if (fd > eventLoop->maxfd)
30        eventLoop->maxfd = fd;
31    
32    return AE_OK;
33}
1检查fd越限 → 注册到多路复用 → 设置回调 → 更新maxfd。

6.2 时间事件注册

 1// src/ae.c:208
 2long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
 3        aeTimeProc *proc, void *clientData,
 4        aeEventFinalizerProc *finalizerProc)
 5{
 6    // 生成唯一的事件ID,使用后立即自增
 7    long long id = eventLoop->timeEventNextId++;
 8    aeTimeEvent *te;
 9
10    // 分配时间事件结构
11    te = zmalloc(sizeof(*te));
12    te->id = id;
13    
14    // 计算触发时间:当前时间 + 延迟毫秒数
15    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
16    
17    // 设置回调函数和私有数据
18    te->timeProc = proc;            // 事件触发时调用的回调
19    te->finalizerProc = finalizerProc;  // 事件删除时的清理函数(可传NULL)
20    te->clientData = clientData;    // 传递给回调的私有数据
21    
22    // 头插法插入链表
23    te->prev = NULL;
24    te->next = eventLoop->timeEventHead;
25    if (te->next)
26        te->next->prev = te;
27    eventLoop->timeEventHead = te;
28    
29    return id;  // 返回事件ID,可用于后续删除
30}

头插法插入链表,返回事件ID供后续删除使用。


至此,我们已经完成了Redis Reactor模型核心机制的源码分析:

  • 数据结构:aeEventLoop、aeFileEvent、aeTimeEvent 如何组织文件事件和时间事件
  • 多路复用:统一的抽象接口如何适配epoll/kqueue/select
  • 事件循环:aeMain和aeProcessEvents的四阶段处理流程
  • 阻塞策略:如何根据时间事件动态计算阻塞超时
  • 事件注册:文件事件和时间事件的注册与删除机制

这些是理解 Redis 事件驱动模型的基石。在《Redis Reactor 模型源码实现解析(下)》中,我们将继续分析 Redis 中的实际应用、性能优化技巧、AE_BARRIER 深度解析、与典型 Reactor 对比等内容。

— END —