Redis Reactor 模型源码实现解析(下)
在《Redis Reactor 模型源码实现解析(上)》中 我们分析了Redis Reactor模型的核心数据结构、事件循环主流程和事件注册机制。本文将继续深入分析Redis中的实际应用、性能优化技巧、AE_BARRIER深度解析、与典型Reactor对比等内容。
七、Redis中的实际应用
7.1 连接接受:acceptTcpHandler
1// src/networking.c:726
2void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
3 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
4 char cip[NET_IP_STR_LEN];
5
6 while(max--) {
7 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
8 if (cfd == ANET_ERR) {
9 if (errno != EWOULDBLOCK)
10 serverLog(LL_WARNING,
11 "Accepting client connection: %s", server.neterr);
12 return;
13 }
14 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
15 acceptCommonHandler(cfd,0,cip);
16 }
17}
一次循环最多接受 MAX_ACCEPTS_PER_CALL 个连接,提高吞吐。
7.2 请求读取:readQueryFromClient
1// src/networking.c:1500
2void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
3 client *c = (client*) privdata;
4 int nread, readlen;
5 size_t qblen;
6
7 readlen = PROTO_IOBUF_LEN;
8 // 大参数优化读取长度
9 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
10 && c->bulklen >= PROTO_MBULK_BIG_ARG)
11 {
12 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
13 if (remaining > 0 && remaining < readlen) readlen = remaining;
14 }
15
16 qblen = sdslen(c->querybuf);
17 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
18 nread = read(fd, c->querybuf+qblen, readlen);
19
20 if (nread == -1) {
21 if (errno == EAGAIN) return;
22 freeClient(c);
23 return;
24 } else if (nread == 0) {
25 freeClient(c);
26 return;
27 }
28
29 sdsIncrLen(c->querybuf,nread);
30 c->lastinteraction = server.unixtime;
31
32 // 处理输入缓冲区
33 processInputBufferAndReplicate(c);
34}
7.3 响应发送:sendReplyToClient
1// src/networking.c:1054
2void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
3 writeToClient(fd,privdata,1);
4}
7.4 定时任务:serverCron
1// src/server.c:1089
2int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
3 // 更新时间缓存
4 updateCachedTime();
5
6 // 动态调整执行频率
7 server.hz = server.config_hz;
8 if (server.dynamic_hz) {
9 while (listLength(server.clients) / server.hz > MAX_CLIENTS_PER_CLOCK_TICK)
10 {
11 server.hz *= 2;
12 if (server.hz > CONFIG_MAX_HZ) {
13 server.hz = CONFIG_MAX_HZ;
14 break;
15 }
16 }
17 }
18
19 // 统计信息更新
20 run_with_period(100) {
21 trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
22 trackInstantaneousMetric(STATS_METRIC_NET_INPUT, server.stat_net_input_bytes);
23 trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, server.stat_net_output_bytes);
24 }
25
26 // LRU 时钟更新
27 unsigned long lruclock = getLRUClock();
28 atomicSet(server.lruclock,lruclock);
29
30 // 峰值内存记录
31 if (zmalloc_used_memory() > server.stat_peak_memory)
32 server.stat_peak_memory = zmalloc_used_memory();
33
34 // ... 更多定时任务
35}
serverCron 是 Redis 唯一的时间事件,默认每 100ms 执行一次,那么它到底做了哪些事情呢?总体捋一下:
- 更新统计信息
- 清理过期key
- 数据库rehash
- AOF/RDB持久化触发
- 主从同步检查
7.5 beforeSleep钩子
1// src/server.c:1358
2void beforeSleep(struct aeEventLoop *eventLoop) {
3 // Cluster状态检查
4 if (server.cluster_enabled) clusterBeforeSleep();
5
6 // 快速过期检查
7 if (server.active_expire_enabled && server.masterhost == NULL)
8 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
9
10 // 处理阻塞客户端
11 if (listLength(server.clients_waiting_acks))
12 processClientsWaitingReplicas();
13
14 moduleHandleBlockedClients();
15
16 if (listLength(server.unblocked_clients))
17 processUnblockedClients();
18
19 // AOF刷盘
20 flushAppendOnlyFile(0);
21
22 // 发送待写响应
23 handleClientsWithPendingWrites();
24
25 // 释放GIL给模块线程
26 if (moduleCount()) moduleReleaseGIL();
27}
这个钩子方法会在每轮事件循环前执行,主要做了以下三件事情:
- 快速过期key清理
- AOF缓冲区刷盘
- 发送客户端响应
八、为什么时间事件用链表呢?
aeSearchNearestTimer 是O(n)的:
1// src/ae.c:254
2static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
3{
4 aeTimeEvent *te = eventLoop->timeEventHead;
5 aeTimeEvent *nearest = NULL;
6
7 while(te) {
8 if (!nearest || te->when_sec < nearest->when_sec ||
9 (te->when_sec == nearest->when_sec &&
10 te->when_ms < nearest->when_ms))
11 nearest = te;
12 te = te->next;
13 }
14 return nearest;
15}
作者在注释里说可以用跳表优化到 O(1),但由于我们当前分析的这个Redis的版本(Redis 5.0)只有 serverCron 一个时间事件,链表完全够用。这又是一个"够用就好"设计。
九、性能优化技巧
9.1 写事件延迟注册
Redis不立即注册写事件,而是在 handleClientsWithPendingWrites 中尝试同步写:
1// src/networking.c:1064
2int handleClientsWithPendingWrites(void) {
3 listIter li;
4 listNode *ln;
5
6 listRewind(server.clients_pending_write,&li);
7 while((ln = listNext(&li))) {
8 client *c = listNodeValue(ln);
9
10 // 先尝试同步写
11 if (writeToClient(c->fd,c,0) == C_ERR) continue;
12
13 // 写不完才注册写事件
14 if (clientHasPendingReplies(c)) {
15 int ae_flags = AE_WRITABLE;
16 if (server.aof_state == AOF_ON &&
17 server.aof_fsync == AOF_FSYNC_ALWAYS)
18 {
19 ae_flags |= AE_BARRIER;
20 }
21 aeCreateFileEvent(server.el, c->fd, ae_flags,
22 sendReplyToClient, c);
23 }
24 }
25}
如果采用传统的做法来:有数据要写->注册写事件->写事件触发->发送响应,那么每次有数据要写都要调用一次 aeCreateFileEvent,而且写事件触发后还要调用一次 aeDeleteFileEvent 来取消注册。对于高并发场景,这些系统调用的开销是非常大的。通过先尝试同步写,如果能一次性写完就不注册写事件,只有写不完才注册写事件,这样就大大减少了系统调用的次数,提高了性能。
9.2 批量接受连接
acceptTcpHandler 一次最多接受 MAX_ACCEPTS_PER_CALL 个连接,而不是来一个接受一个。
9.3 动态调整cron频率
1if (server.dynamic_hz) {
2 while (listLength(server.clients) / server.hz > MAX_CLIENTS_PER_CLOCK_TICK)
3 {
4 server.hz *= 2;
5 if (server.hz > CONFIG_MAX_HZ) {
6 server.hz = CONFIG_MAX_HZ;
7 break;
8 }
9 }
10}
客户端越多,cron执行越频繁,保证每个客户端在每个时钟周期都能被处理到。
十、完整流程图
1 Redis 启动
2 │
3 ▼
4 ┌─────────────────────┐
5 │ aeCreateEventLoop │ 创建事件循环
6 └─────────────────────┘
7 │
8 ▼
9 ┌─────────────────────┐
10 │ 注册监听socket │ aeCreateFileEvent(ipfd, AE_READABLE, acceptTcpHandler)
11 └─────────────────────┘
12 │
13 ▼
14 ┌─────────────────────┐
15 │ 注册serverCron │ aeCreateTimeEvent(1ms, serverCron)
16 └─────────────────────┘
17 │
18 ▼
19 ┌─────────────────────┐
20 │ aeMain() │ 进入主循环
21 └─────────────────────┘
22 │
23 ┌─────────────┴─────────────┐
24 │ │
25 ▼ │
26 ┌─────────────┐ │
27 │ beforeSleep │ ◄──────────────────┤ 每轮循环开始
28 └─────────────┘ │
29 │ │
30 ▼ │
31 ┌─────────────────────────────────────────────────────────┐
32 │ aeProcessEvents │
33 │ ┌─────────────────────────────────────────────────┐ │
34 │ │ 1. 计算阻塞超时(找最近时间事件) │ │
35 │ └─────────────────────────────────────────────────┘ │
36 │ │ │
37 │ ▼ │
38 │ ┌─────────────────────────────────────────────────┐ │
39 │ │ 2. aeApiPoll() ← epoll_wait / kqueue / select │ │
40 │ └─────────────────────────────────────────────────┘ │
41 │ │ │
42 │ ▼ │
43 │ ┌─────────────────────────────────────────────────┐ │
44 │ │ 3. 处理文件事件 │ │
45 │ │ ├─ acceptTcpHandler(新连接) │ │
46 │ │ ├─ readQueryFromClient(读请求) │ │
47 │ │ └─ sendReplyToClient(写响应) │ │
48 │ └─────────────────────────────────────────────────┘ │
49 │ │ │
50 │ ▼ │
51 │ ┌─────────────────────────────────────────────────┐ │
52 │ │ 4. processTimeEvents() │ │
53 │ │ └─ serverCron(定时任务) │ │
54 │ └─────────────────────────────────────────────────┘ │
55 └─────────────────────────────────────────────────────────┘
56 │
57 ▼
58 停止? ──否──► 继续循环
59 │
60 是
61 │
62 ▼
63 Redis 退出
十一、与典型Reactor模型的对比
11.1 典型Reactor模型
1┌───────────────────────────────────────────────────────┐
2│ Reactor │
3│ ┌─────────────────────────────────────────────────┐ │
4│ │ Event Demultiplexer │ │
5│ │ (epoll / kqueue / select) │ │
6│ └─────────────────────────────────────────────────┘ │
7│ │ │
8│ ┌───────────────┼───────────────┐ │
9│ ▼ ▼ ▼ │
10│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
11│ │ Handler A │ │ Handler B │ │ Handler C │ │
12│ └─────────────┘ └─────────────┘ └─────────────┘ │
13└───────────────────────────────────────────────────────┘
典型Reactor的特点:
- 事件分离器(Demultiplexer)负责监听事件
- 事件处理器(Handler)负责业务逻辑
- 支持多线程扩展
11.2 Redis Reactor简化模型
1┌───────────────────────────────────────────────────────┐
2│ aeEventLoop │
3│ ┌─────────────────────────────────────────────────┐ │
4│ │ aeApiPoll (epoll) │ │
5│ └─────────────────────────────────────────────────┘ │
6│ │ │
7│ ┌───────────────┼───────────────┐ │
8│ ▼ ▼ ▼ │
9│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
10│ │acceptHandler│ │ readHandler │ │writeHandler │ │
11│ └─────────────┘ └─────────────┘ └─────────────┘ │
12│ │ │ │ │
13│ └───────────────┴───────────────┘ │
14│ │ │
15│ ▼ │
16│ commandProc(命令处理) │
17└───────────────────────────────────────────────────────┘
Redis的简化:
- 单线程执行:所有回调在同一个线程执行
- 无连接池:每个连接对应一个client结构
- 无独立Handler:回调直接调用命令处理函数
十二、为什么不用libevent / libev?
有兴趣的朋友可以看一下这篇文章的最后附有作者对于这个问题的解释:Redis事件循环模型全景解析(ae.c)
十三、AE_BARRIER的深度解析
13.1 为什么需要AE_BARRIER?
正常情况下,事件处理顺序是:先读后写
1客户端发送请求 → 触发读事件 → 执行命令 → 生成响应
2 ↓
3 触发写事件 → 发送响应
这个顺序对大多数场景是最优的:收到请求立即处理并回复。
13.2 特殊场景:AOF fsync=always
当配置 appendfsync always 时:
1收到请求 → 执行命令 → 写入 AOF 缓冲区
2 ↓
3 需要先 fsync
4 ↓
5 才能发送响应
问题来了: fsync 在 beforeSleep 中执行,如果先处理读事件,可能读到新请求,新请求处理完要发响应,但上个请求还没fsync,导致响应发送时数据还没落盘。这个该怎么办呢?
13.3 解决方案
1// src/networking.c:1064
2if (server.aof_state == AOF_ON &&
3 server.aof_fsync == AOF_FSYNC_ALWAYS)
4{
5 ae_flags |= AE_BARRIER; // 标记需要反转
6}
设置 AE_BARRIER 后,处理顺序变成:先写后读
1beforeSleep:
2 ├─ flushAppendOnlyFile() // 先fsync
3 └─ handleClientsWithPendingWrites() // 发送响应
4
5aeProcessEvents:
6 ├─ 先处理写事件(发送响应)
7 └─ 再处理读事件(读新请求)
这样保证:响应发送时,数据已经落盘。
十四、事件循环的一些边界情况
14.1 fd耗尽
1// src/ae.c:136
2if (fd >= eventLoop->setsize) {
3 errno = ERANGE;
4 return AE_ERR;
5}
setsize 在启动时设置:
1// src/server.c
2server.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);
如果连接数超过 maxclients,accept会失败。
14.2 时间事件溢出
1// src/ae.c:208
2long long id = eventLoop->timeEventNextId++;
id 是 long long,实际不会溢出。但如果系统时间跳变:
1// src/ae.c:284
2if (now < eventLoop->lastTime) {
3 te = eventLoop->timeEventHead;
4 while(te) {
5 te->when_sec = 0; // 立即触发所有时间事件
6 te = te->next;
7 }
8}
14.3 epoll返回大量事件
1// src/ae_epoll.c:112
2retval = epoll_wait(state->epfd, state->events, eventLoop->setsize, timeout);
返回的事件数量受 setsize 限制。如果触发事件太多:
events数组大小固定- 只处理前
setsize个事件 - 剩余事件下轮处理
十五、最后总结
通过两篇文章的分析,我们可以领会到Redis Reactor模型的设计精髓如下:
| 设计点 | 实现 | 效果 |
|---|---|---|
| 文件事件存储 | 数组,下标即fd | O(1) 访问 |
| 时间事件存储 | 双向链表 | 简单够用 |
| 多路复用 | 统一接口抽象 | 跨平台兼容 |
| 阻塞超时 | 最近时间事件计算 | 精确控制 |
| 事件处理顺序 | 先读后写,支持反转 | 灵活应对不同场景 |
| 时钟跳变 | 强制立即触发 | 避免任务延迟 |
| 回调注册 | 函数指针 + clientData | 灵活扩展 |
整理源码文件索引:
| 文件 | 功能 |
|---|---|
src/ae.h |
事件循环数据结构定义 |
src/ae.c |
事件循环核心实现 |
src/ae_epoll.c |
Linux epoll封装 |
src/ae_kqueue.c |
BSD/macOS kqueue封装 |
src/ae_select.c |
通用 select封装 |
src/ae_evport.c |
Solaris event port封装 |
src/server.c |
beforeSleep、serverCron |
src/networking.c |
acceptTcpHandler、readQueryFromClient、sendReplyToClient |