epoll / kqueue 在 Redis 中的封装实现(上)
一、概述
Redis 作为高性能的内存数据库,其网络通信层采用了 I/O 多路复用技术来处理大量并发连接。为了适配不同操作系统,Redis 对多种 I/O 多路复用机制进行了统一封装,包括:
| 机制 | 操作系统 | 性能 |
|---|---|---|
| epoll | Linux | 高 |
| kqueue | BSD/macOS | 高 |
| evport | Solaris | 高 |
| select | 跨平台 | 低(兜底方案) |
本文将深入剖析 Redis 对 epoll 的封装实现,下一篇文章将分析 kqueue 的封装。
二、多路复用机制的选择策略
2.1 编译时选择
Redis 在编译时根据操作系统自动选择最优的多路复用机制:
1// src/ae.c:47-61
2/* Include the best multiplexing layer supported by this system.
3 * The following should be ordered by performances, descending. */
4#ifdef HAVE_EVPORT
5#include "ae_evport.c"
6#else
7 #ifdef HAVE_EPOLL
8 #include "ae_epoll.c"
9 #else
10 #ifdef HAVE_KQUEUE
11 #include "ae_kqueue.c"
12 #else
13 #include "ae_select.c"
14 #endif
15 #endif
16#endif
选择优先级(性能从高到低):
- evport:Solaris 系统原生机制
- epoll:Linux 系统原生机制
- kqueue:BSD/macOS 系统原生机制
- select:POSIX 标准接口,作为兜底方案
2.2 统一的 API 抽象
Redis 定义了一套统一的多路复用 API,每种实现都遵循相同的接口规范:
1// 以下是需要实现的统一接口(内部使用)
2static int aeApiCreate(aeEventLoop *eventLoop); // 创建多路复用实例
3static int aeApiResize(aeEventLoop *eventLoop, int setsize); // 调整容量
4static void aeApiFree(aeEventLoop *eventLoop); // 释放资源
5static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask); // 注册事件
6static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask); // 删除事件
7static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp); // 等待事件
8static char *aeApiName(void); // 返回机制名称
三、epoll 基础知识
3.1 epoll 的核心优势
相比传统的 select/poll,epoll 具有以下优势:
| 特性 | select/poll | epoll |
|---|---|---|
| fd 数量限制 | 有限制(FD_SETSIZE,默认 1024) | 几乎无限制 |
| 效率 | O(n) 遍历所有 fd | O(1) 只返回就绪的 fd |
| 内存拷贝 | 每次调用需要拷贝 | 共享内存,无需拷贝 |
| 触发模式 | 仅水平触发 | 支持边缘触发(ET) |
3.2 epoll 的三个核心系统调用
1// 创建 epoll 实例,返回 epoll 文件描述符
2int epoll_create(int size);
3
4// 管理 epoll 实例中的文件描述符(添加、修改、删除)
5int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
6
7// 等待事件发生
8int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
3.3 epoll_event 结构体
1struct epoll_event {
2 uint32_t events; // 事件类型(EPOLLIN、EPOLLOUT 等)
3 epoll_data_t data; // 用户数据(可存储 fd 或指针)
4};
5
6typedef union epoll_data {
7 void *ptr;
8 int fd;
9 uint32_t u32;
10 uint64_t u64;
11} epoll_data_t;
四、Redis 中 epoll 的封装实现
4.1 数据结构定义
Redis 使用 aeApiState 结构体封装 epoll 相关状态:
1// src/ae_epoll.c:34-37
2typedef struct aeApiState {
3 int epfd; // epoll 实例的文件描述符
4 struct epoll_event *events; // 用于接收 epoll_wait 返回的事件数组
5} aeApiState;
字段说明:
epfd:epoll_create返回的 epoll 实例文件描述符events:预分配的事件数组,避免每次调用epoll_wait时分配内存
4.2 创建 epoll 实例 aeApiCreate
1// src/ae_epoll.c:39-60
2static int aeApiCreate(aeEventLoop *eventLoop) {
3 aeApiState *state = zmalloc(sizeof(aeApiState)); // 分配状态结构体
4
5 if (!state) return -1;
6 // 预分配事件数组,大小为 setsize
7 // setsize 决定了 Redis 能处理的最大 fd 数量
8 state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
9 if (!state->events) {
10 zfree(state);
11 return -1;
12 }
13 // 创建 epoll 实例
14 // 参数 size 在 Linux 2.6.8 后已被忽略,但必须 > 0
15 state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
16 if (state->epfd == -1) {
17 // 创建失败,释放已分配的资源
18 zfree(state->events);
19 zfree(state);
20 return -1;
21 }
22 // 将 epoll 状态保存到事件循环的 apidata 字段
23 eventLoop->apidata = state;
24 return 0;
25}
执行流程:
- 分配
aeApiState结构体 - 预分配
epoll_event数组 - 调用
epoll_create创建 epoll 实例 - 将状态保存到
eventLoop->apidata
4.3 调整容量 aeApiResize
1// src/ae_epoll.c:62-67
2static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
3 aeApiState *state = eventLoop->apidata;
4
5 // 重新分配事件数组的大小
6 // 当 Redis 需要动态调整最大连接数时调用
7 state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
8 return 0;
9}
说明:
- 当通过
CONFIG SET maxclients调整最大客户端数时,会触发此函数 - 重新分配事件数组以适应新的容量
4.4 释放资源 aeApiFree
1// src/ae_epoll.c:69-74
2static void aeApiFree(aeEventLoop *eventLoop) {
3 aeApiState *state = eventLoop->apidata;
4
5 close(state->epfd); // 关闭 epoll 实例文件描述符
6 zfree(state->events); // 释放事件数组
7 zfree(state); // 释放状态结构体
8}
4.5 注册事件 aeApiAddEvent
1// src/ae_epoll.c:76-91
2static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
3 aeApiState *state = eventLoop->apidata;
4 struct epoll_event ee = {0}; /* avoid valgrind warning */
5 /* If the fd was already monitored for some event, we need a MOD
6 * operation. Otherwise we need an ADD operation. */
7 // 判断操作类型:如果该 fd 已有事件注册,则使用 MOD;否则使用 ADD
8 int op = eventLoop->events[fd].mask == AE_NONE ?
9 EPOLL_CTL_ADD : EPOLL_CTL_MOD;
10
11 ee.events = 0;
12 // 合并已有的事件类型(MOD 操作时保留之前注册的事件)
13 mask |= eventLoop->events[fd].mask; /* Merge old events */
14 // 将 Redis 的事件类型转换为 epoll 的事件类型
15 if (mask & AE_READABLE) ee.events |= EPOLLIN; // 可读事件
16 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // 可写事件
17 ee.data.fd = fd; // 将 fd 存储在 epoll_event.data 中
18 // 调用 epoll_ctl 注册或修改事件监听
19 if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
20 return 0;
21}
关键点:
- ADD vs MOD:新 fd 使用
EPOLL_CTL_ADD,已注册的 fd 使用EPOLL_CTL_MOD - 事件合并:MOD 操作时,需要合并已有的事件,避免覆盖
- 事件类型转换:Redis 的
AE_READABLE→EPOLLIN,AE_WRITABLE→EPOLLOUT
事件注册示例:
1假设 fd=5 当前无任何事件注册:
2 1. 调用 aeApiAddEvent(eventLoop, 5, AE_READABLE)
3 2. op = EPOLL_CTL_ADD(因为 mask == AE_NONE)
4 3. ee.events = EPOLLIN
5 4. 调用 epoll_ctl(epfd, EPOLL_CTL_ADD, 5, &ee)
6
7假设 fd=5 已注册 AE_READABLE,现在要添加 AE_WRITABLE:
8 1. 调用 aeApiAddEvent(eventLoop, 5, AE_WRITABLE)
9 2. op = EPOLL_CTL_MOD(因为 mask != AE_NONE)
10 3. ee.events = EPOLLIN | EPOLLOUT(合并已有事件)
11 4. 调用 epoll_ctl(epfd, EPOLL_CTL_MOD, 5, &ee)
4.6 删除事件 aeApiDelEvent
1// src/ae_epoll.c:93-113
2static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
3 aeApiState *state = eventLoop->apidata;
4 struct epoll_event ee = {0}; /* avoid valgrind warning */
5 // 计算删除后剩余的事件类型
6 mask = eventLoop->events[fd].mask & (~mask);
7
8 ee.events = 0;
9 // 重新构建 epoll 事件掩码
10 if (mask & AE_READABLE) ee.events |= EPOLLIN;
11 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
12 ee.data.fd = fd;
13
14 if (mask != AE_NONE) {
15 // 还有其他事件,使用 MOD 操作更新
16 epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
17 } else {
18 // 所有事件都已删除,从 epoll 中移除该 fd
19 /* Note, Kernel < 2.6.9 requires a non null event pointer even for
20 * EPOLL_CTL_DEL. */
21 epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
22 }
23}
关键点:
- 保留剩余事件:删除某种事件时,保留其他已注册的事件
- MOD vs DEL:
- 如果删除后还有事件,使用
EPOLL_CTL_MOD - 如果删除后无任何事件,使用
EPOLL_CTL_DEL
- 如果删除后还有事件,使用
事件删除示例:
1假设 fd=5 已注册 AE_READABLE | AE_WRITABLE:
2
3场景1:删除 AE_WRITABLE
4 1. mask = (AE_READABLE | AE_WRITABLE) & (~AE_WRITABLE) = AE_READABLE
5 2. ee.events = EPOLLIN
6 3. 调用 epoll_ctl(epfd, EPOLL_CTL_MOD, 5, &ee)
7
8场景2:删除 AE_READABLE | AE_WRITABLE(全部删除)
9 1. mask = (AE_READABLE | AE_WRITABLE) & (~(AE_READABLE | AE_WRITABLE)) = AE_NONE
10 2. 调用 epoll_ctl(epfd, EPOLL_CTL_DEL, 5, &ee)
4.7 等待事件 aeApiPoll
1// src/ae_epoll.c:115-145
2static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
3 aeApiState *state = eventLoop->apidata;
4 int retval, numevents = 0;
5
6 // 调用 epoll_wait 等待事件发生
7 // 参数:epoll 实例 fd、事件数组、数组大小、超时时间(毫秒)
8 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
9 tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
10 if (retval > 0) {
11 int j;
12
13 numevents = retval;
14 // 遍历所有触发的事件
15 for (j = 0; j < numevents; j++) {
16 int mask = 0;
17 struct epoll_event *e = state->events+j;
18
19 // 将 epoll 事件类型转换回 Redis 事件类型
20 if (e->events & EPOLLIN) mask |= AE_READABLE;
21 if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
22 // 错误和挂断事件也标记为可写,便于上层处理
23 if (e->events & EPOLLERR) mask |= AE_WRITABLE;
24 if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
25 // 将触发的事件信息填充到 fired 数组
26 eventLoop->fired[j].fd = e->data.fd;
27 eventLoop->fired[j].mask = mask;
28 }
29 }
30 // 返回触发的事件数量
31 return numevents;
32}
关键点:
- 超时处理:
tvp为 NULL 表示无限等待,否则转换为毫秒 - 事件类型转换:epoll 事件 → Redis 事件
- 错误处理:
EPOLLERR和EPOLLHUP标记为可写,触发写回调处理异常
4.8 获取机制名称 aeApiName
1// src/ae_epoll.c:147-149
2static char *aeApiName(void) {
3 return "epoll";
4}
用于日志输出和调试信息。
五、epoll 封装的完整调用链
1┌─────────────────────────────────────────────────────────────────┐
2│ Redis 启动 │
3│ initServer() │
4│ │ │
5│ ▼ │
6│ aeCreateEventLoop() │
7│ │ │
8│ ▼ │
9│ aeApiCreate() │
10│ │ │
11│ ▼ │
12│ epoll_create(1024) │
13│ 返回 epfd │
14└─────────────────────────────────────────────────────────────────┘
15 │
16 ▼
17┌─────────────────────────────────────────────────────────────────┐
18│ 注册监听 socket │
19│ aeCreateFileEvent(listen_fd, AE_READABLE) │
20│ │ │
21│ ▼ │
22│ aeApiAddEvent() │
23│ │ │
24│ ▼ │
25│ epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd) │
26└─────────────────────────────────────────────────────────────────┘
27 │
28 ▼
29┌─────────────────────────────────────────────────────────────────┐
30│ 事件循环 aeMain() │
31│ while(!stop) │
32│ │ │
33│ ▼ │
34│ aeProcessEvents() │
35│ │ │
36│ ▼ │
37│ aeApiPoll() │
38│ │ │
39│ ▼ │
40│ epoll_wait(epfd, events, timeout) │
41│ 等待事件触发 │
42│ │ │
43│ ▼ │
44│ 遍历 fired 数组,调用回调函数 │
45│ rfileProc / wfileProc │
46└─────────────────────────────────────────────────────────────────┘
47 │
48 ▼
49┌─────────────────────────────────────────────────────────────────┐
50│ 注册/删除客户端事件 │
51│ aeCreateFileEvent(client_fd, AE_READABLE) │
52│ aeDeleteFileEvent(client_fd, AE_WRITABLE) │
53│ │ │
54│ ▼ │
55│ aeApiAddEvent() / aeApiDelEvent() │
56│ │ │
57│ ▼ │
58│ epoll_ctl(epfd, EPOLL_CTL_ADD/MOD/DEL, fd) │
59└─────────────────────────────────────────────────────────────────┘
六、Redis 对 epoll 的优化点
6.1 预分配事件数组
1state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
优化效果:避免每次 epoll_wait 调用时分配内存,减少内存分配开销。
6.2 事件合并策略
1mask |= eventLoop->events[fd].mask; /* Merge old events */
优化效果:添加新事件时保留已有事件,避免多次系统调用。
6.3 使用水平触发(LT)
Redis 使用 epoll 的默认触发模式——水平触发(Level Triggered),而非边缘触发(Edge Triggered)。
为什么选择 LT 而非 ET?
- 编程简单:LT 模式下,只要 fd 可读/可写,就会持续通知
- 无需一次性读完:可以分多次读取/写入,灵活控制
- 配合非阻塞 I/O:Redis 使用非阻塞 socket,不会因为 LT 而阻塞
6.4 动态容量调整
1static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
2 aeApiState *state = eventLoop->apidata;
3 state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
4 return 0;
5}
优化效果:支持运行时调整最大连接数,无需重启服务。
七、总结
本文详细分析了 Redis 对 epoll 的封装实现,核心要点如下:
| API | epoll 系统调用 | 功能 |
|---|---|---|
aeApiCreate |
epoll_create |
创建 epoll 实例 |
aeApiAddEvent |
epoll_ctl(ADD/MOD) |
注册/修改事件 |
aeApiDelEvent |
epoll_ctl(MOD/DEL) |
删除事件 |
aeApiPoll |
epoll_wait |
等待事件触发 |
aeApiFree |
close(epfd) |
释放资源 |
下一篇文章将分析 Redis 对 kqueue 的封装实现,展示 Redis 如何在 BSD/macOS 系统上实现高性能 I/O 多路复用。
— END —