epoll / kqueue 在 Redis 中的封装实现(下)
一、概述
在上一篇文章中,我们详细分析了 Redis 对 epoll 的封装实现。本文将继续分析 Redis 对 kqueue 的封装,这是 BSD 和 macOS 系统上的高性能 I/O 多路复用机制。
kqueue 与 epoll 设计理念相似但 API 差异较大,Redis 通过统一的抽象层屏蔽了这些差异,使得上层代码可以无缝运行在不同操作系统上。
二、kqueue 基础知识
2.1 kqueue 的核心优势
kqueue 是 FreeBSD 4.1 引入的可扩展事件通知机制,后来被 macOS、NetBSD、OpenBSD 等系统采用:
| 特性 | 说明 |
|---|---|
| 统一接口 | 可监听文件描述符、信号、进程、定时器等多种事件 |
| 批量操作 | 支持一次注册/删除多个事件 |
| 高效 | 内核态维护事件状态,避免每次调用传入大量数据 |
| 可扩展 | 支持自定义事件源(如文件系统变化) |
2.2 kqueue 的核心系统调用
1// 创建 kqueue 实例,返回文件描述符
2int kqueue(void);
3
4// 注册/修改/删除事件
5int kevent(int kq, const struct kevent *changelist, int nchanges,
6 struct kevent *eventlist, int nevents,
7 const struct timespec *timeout);
说明:
kqueue()创建一个内核事件队列,返回其文件描述符kevent()既可以注册事件,也可以等待事件,通过不同参数组合实现
2.3 kevent 结构体
1struct kevent {
2 uintptr_t ident; // 事件标识符(通常是 fd)
3 short filter; // 事件过滤器(EVFILT_READ、EVFILT_WRITE 等)
4 u_short flags; // 事件动作(EV_ADD、EV_DELETE、EV_ENABLE 等)
5 u_int fflags; // 过滤器特定标志
6 intptr_t data; // 过滤器特定数据
7 void *udata; // 用户数据指针
8};
9
10// 用于初始化 kevent 结构体的宏
11EV_SET(&kev, ident, filter, flags, fflags, data, udata);
常用过滤器:
| 过滤器 | 说明 |
|---|---|
EVFILT_READ |
文件描述符可读 |
EVFILT_WRITE |
文件描述符可写 |
EVFILT_TIMER |
定时器 |
EVFILT_SIGNAL |
信号 |
EVFILT_VNODE |
文件系统事件 |
常用标志:
| 标志 | 说明 |
|---|---|
EV_ADD |
添加事件 |
EV_DELETE |
删除事件 |
EV_ENABLE |
启用事件 |
EV_DISABLE |
禁用事件(暂不触发) |
EV_ONESHOT |
触发后自动删除 |
EV_CLEAR |
触发后清除状态 |
三、Redis 中 kqueue 的封装实现
3.1 数据结构定义
1// src/ae_kqueue.c:34-41
2typedef struct aeApiState {
3 int kqfd; // kqueue 实例的文件描述符
4 struct kevent *events; // 用于接收 kevent 返回的事件数组
5} aeApiState;
字段说明:
kqfd:kqueue()返回的 kqueue 实例文件描述符events:预分配的事件数组,用于存放kevent()返回的触发事件
与 epoll 的对比:
| 项目 | epoll | kqueue |
|---|---|---|
| 实例 fd | epfd (epoll_create) |
kqfd (kqueue) |
| 事件结构体 | struct epoll_event |
struct kevent |
| 事件数组 | state->events |
state->events |
3.2 创建 kqueue 实例 aeApiCreate
1// src/ae_kqueue.c:43-64
2static int aeApiCreate(aeEventLoop *eventLoop) {
3 aeApiState *state = zmalloc(sizeof(aeApiState)); // 分配状态结构体
4
5 if (!state) return -1;
6 // 预分配事件数组,大小为 setsize
7 // 每个事件占用一个 kevent 结构体
8 state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize);
9 if (!state->events) {
10 zfree(state);
11 return -1;
12 }
13 // 创建 kqueue 实例
14 // kqueue() 不需要参数,比 epoll_create 更简洁
15 state->kqfd = kqueue();
16 if (state->kqfd == -1) {
17 // 创建失败,释放已分配的资源
18 zfree(state->events);
19 zfree(state);
20 return -1;
21 }
22 // 将 kqueue 状态保存到事件循环的 apidata 字段
23 eventLoop->apidata = state;
24 return 0;
25}
与 epoll 的对比:
1// epoll 版本
2state->epfd = epoll_create(1024); // 参数已被忽略,但仍需传入
3
4// kqueue 版本
5state->kqfd = kqueue(); // 无需参数
3.3 调整容量 aeApiResize
1// src/ae_kqueue.c:66-71
2static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
3 aeApiState *state = eventLoop->apidata;
4
5 // 重新分配事件数组的大小
6 state->events = zrealloc(state->events, sizeof(struct kevent)*setsize);
7 return 0;
8}
3.4 释放资源 aeApiFree
1// src/ae_kqueue.c:73-78
2static void aeApiFree(aeEventLoop *eventLoop) {
3 aeApiState *state = eventLoop->apidata;
4
5 close(state->kqfd); // 关闭 kqueue 实例文件描述符
6 zfree(state->events); // 释放事件数组
7 zfree(state); // 释放状态结构体
8}
3.5 注册事件 aeApiAddEvent
1// src/ae_kqueue.c:80-102
2static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
3 aeApiState *state = eventLoop->apidata;
4 struct kevent ke;
5
6 // kqueue 需要分别注册读事件和写事件
7 // 不能像 epoll 那样通过一次调用同时注册读写事件
8 if (mask & AE_READABLE) {
9 // 注册可读事件
10 // EV_SET 宏用于初始化 kevent 结构体
11 // 参数:kevent指针, ident(fd), filter, flags, fflags, data, udata
12 EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
13 // kevent 调用:kqfd, changelist, nchanges, eventlist, nevents, timeout
14 // 这里只注册事件,不等待事件(nchanges=1, nevents=0)
15 if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
16 }
17 if (mask & AE_WRITABLE) {
18 // 注册可写事件
19 EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
20 if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
21 }
22 return 0;
23}
关键点:
- 分离注册:kqueue 中读事件和写事件是独立的过滤器,需要分别注册
- EV_ADD:添加事件标志,如果事件已存在则修改
- kevent 调用:
changelist和nchanges:要注册/修改的事件列表和数量eventlist和nevents:接收触发事件的缓冲区和大小timeout:超时时间,NULL 表示非阻塞(仅注册时)
与 epoll 的对比:
1// epoll 版本:通过 mask 合并,一次调用注册读写事件
2if (mask & AE_READABLE) ee.events |= EPOLLIN;
3if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
4epoll_ctl(state->epfd, op, fd, &ee);
5
6// kqueue 版本:需要分别注册
7if (mask & AE_READABLE) {
8 EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
9 kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
10}
11if (mask & AE_WRITABLE) {
12 EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
13 kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
14}
3.6 删除事件 aeApiDelEvent
1// src/ae_kqueue.c:104-124
2static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
3 aeApiState *state = eventLoop->apidata;
4 struct kevent ke;
5
6 // kqueue 删除事件同样需要分别处理读和写
7 if (mask & AE_READABLE) {
8 // 删除可读事件
9 // EV_DELETE 标志表示删除事件
10 EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
11 kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
12 }
13 if (mask & AE_WRITABLE) {
14 // 删除可写事件
15 EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
16 kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
17 }
18}
关键点:
- EV_DELETE:删除事件标志
- 分别删除:读事件和写事件需要分别删除
与 epoll 的对比:
1// epoll 版本:需要计算剩余事件,决定 MOD 还是 DEL
2mask = eventLoop->events[fd].mask & (~mask);
3if (mask != AE_NONE) {
4 epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee);
5} else {
6 epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee);
7}
8
9// kqueue 版本:直接删除指定类型的事件,无需关心剩余事件
10if (mask & AE_READABLE) {
11 EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
12 kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
13}
kqueue 的优势:删除操作更简单,不需要查询当前注册的事件状态。
3.7 等待事件 aeApiPoll
1// src/ae_kqueue.c:126-162
2static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
3 aeApiState *state = eventLoop->apidata;
4 int retval, numevents = 0;
5
6 // 转换超时时间为 timespec 结构
7 if (tvp != NULL) {
8 struct timespec timeout;
9 timeout.tv_sec = tvp->tv_sec;
10 timeout.tv_nsec = tvp->tv_usec * 1000; // 微秒转纳秒
11 // 调用 kevent 等待事件
12 // changelist=NULL, nchanges=0 表示仅等待,不注册事件
13 // eventlist=state->events, nevents=setsize 接收触发的事件
14 retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout);
15 } else {
16 // tvp 为 NULL 表示无限等待
17 retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL);
18 }
19
20 if (retval > 0) {
21 int j;
22
23 numevents = retval;
24 // 遍历所有触发的事件
25 for (j = 0; j < numevents; j++) {
26 int mask = 0;
27 struct kevent *e = state->events+j;
28
29 // 根据过滤器类型确定事件类型
30 if (e->filter == EVFILT_READ) mask |= AE_READABLE;
31 if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
32 // 处理错误情况
33 if (e->flags & EV_ERROR) mask |= AE_WRITABLE;
34 // 将触发的事件信息填充到 fired 数组
35 eventLoop->fired[j].fd = e->ident; // ident 存储的是 fd
36 eventLoop->fired[j].mask = mask;
37 }
38 }
39 return numevents;
40}
关键点:
- 超时转换:
timeval(微秒)→timespec(纳秒) - 过滤器判断:通过
e->filter判断是读事件还是写事件 - fd 获取:通过
e->ident获取触发事件的文件描述符
与 epoll 的对比:
1// epoll 版本:通过 events 字段判断事件类型
2if (e->events & EPOLLIN) mask |= AE_READABLE;
3if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
4
5// kqueue 版本:通过 filter 字段判断事件类型
6if (e->filter == EVFILT_READ) mask |= AE_READABLE;
7if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
3.8 获取机制名称 aeApiName
1// src/ae_kqueue.c:164-166
2static char *aeApiName(void) {
3 return "kqueue";
4}
四、epoll 与 kqueue 的详细对比
4.1 API 设计对比
| 操作 | epoll | kqueue |
|---|---|---|
| 创建实例 | epoll_create(size) |
kqueue() |
| 注册事件 | epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) |
kevent(kqfd, &ke, 1, NULL, 0, NULL) |
| 修改事件 | epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) |
kevent(kqfd, &ke, 1, NULL, 0, NULL) |
| 删除事件 | epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL) |
kevent(kqfd, &ke, 1, NULL, 0, NULL) |
| 等待事件 | epoll_wait(epfd, events, maxevents, timeout_ms) |
kevent(kqfd, NULL, 0, events, maxevents, &timeout) |
4.2 事件注册对比
epoll:
- 一个 fd 对应一个
epoll_event - 读写事件通过位掩码组合(
EPOLLIN | EPOLLOUT) - 添加新事件需要检查是否已存在(决定 ADD 还是 MOD)
kqueue:
- 一个 fd 对应多个
kevent(每种过滤器一个) - 读写事件通过不同过滤器区分(
EVFILT_READ、EVFILT_WRITE) - 添加事件直接使用
EV_ADD,已存在则修改
4.3 事件删除对比
epoll:
1// 删除特定事件需要:查询当前状态 → 计算剩余事件 → 决定 MOD/DEL
2mask = eventLoop->events[fd].mask & (~mask);
3if (mask != AE_NONE) {
4 // 还有其他事件,使用 MOD
5 epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ee);
6} else {
7 // 没有事件了,使用 DEL
8 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
9}
kqueue:
1// 直接删除指定类型的事件,无需查询当前状态
2EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
3kevent(kqfd, &ke, 1, NULL, 0, NULL);
4.4 性能对比
| 指标 | epoll | kqueue |
|---|---|---|
| 注册事件 | 一次系统调用注册读写 | 两次系统调用注册读写 |
| 等待事件 | O(1) | O(1) |
| 批量操作 | 不支持 | 支持批量注册/删除 |
| 事件类型 | 文件描述符 | 文件描述符、信号、定时器等 |
注意:虽然 kqueue 注册读写事件需要两次系统调用,但 Redis 的典型使用场景中,连接建立时只注册读事件,写事件是按需注册的,所以影响很小。
五、Redis 的统一抽象层
5.1 抽象层接口定义
Redis 定义了一组统一的内部接口,每种多路复用机制都需要实现:
1// 所有实现必须提供的接口
2static int aeApiCreate(aeEventLoop *eventLoop);
3static int aeApiResize(aeEventLoop *eventLoop, int setsize);
4static void aeApiFree(aeEventLoop *eventLoop);
5static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
6static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask);
7static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
8static char *aeApiName(void);
5.2 上层调用示例
1// ae.c 中对抽象层的调用
2
3// 创建事件循环
4aeEventLoop *aeCreateEventLoop(int setsize) {
5 aeEventLoop *eventLoop = zmalloc(sizeof(aeEventLoop));
6 // ...
7 if (aeApiCreate(eventLoop) == -1) goto err; // 调用抽象层
8 // ...
9}
10
11// 注册文件事件
12int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
13 aeFileProc *proc, void *clientData) {
14 // ...
15 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; // 调用抽象层
16 // ...
17}
18
19// 处理事件
20int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
21 // ...
22 numevents = aeApiPoll(eventLoop, tvp); // 调用抽象层
23 // ...
24}
5.3 抽象层的优势
- 跨平台:同一份上层代码可在 Linux、BSD、macOS 上运行
- 可扩展:新增多路复用机制只需实现抽象层接口
- 可测试:可以使用简单的 select 实现进行测试
- 性能优化:每个平台可使用最优的原生机制
六、完整调用流程图
1┌─────────────────────────────────────────────────────────────────┐
2│ Redis 初始化 │
3│ initServer() │
4│ │ │
5│ ▼ │
6│ aeCreateEventLoop() │
7│ │ │
8│ ▼ │
9│ ┌─────┴─────┐ │
10│ │ aeApiCreate│ │
11│ └─────┬─────┘ │
12│ │ │
13│ ┌──────────────┼──────────────┐ │
14│ ▼ ▼ ▼ │
15│ epoll_create kqueue() port_create │
16│ (Linux) (BSD/macOS) (Solaris) │
17└─────────────────────────────────────────────────────────────────┘
18 │
19 ▼
20┌─────────────────────────────────────────────────────────────────┐
21│ 注册监听 socket │
22│ aeCreateFileEvent(listen_fd, AE_READABLE) │
23│ │ │
24│ ▼ │
25│ aeApiAddEvent() │
26│ │ │
27│ ┌──────────────┼──────────────┐ │
28│ ▼ ▼ ▼ │
29│ epoll_ctl kevent() port_associate │
30│ (EPOLL_CTL_ADD) (EV_ADD) (PORT_SOURCE_FD) │
31└─────────────────────────────────────────────────────────────────┘
32 │
33 ▼
34┌─────────────────────────────────────────────────────────────────┐
35│ 事件循环 aeMain() │
36│ while(!stop) │
37│ │ │
38│ ▼ │
39│ aeProcessEvents() │
40│ │ │
41│ ▼ │
42│ aeApiPoll() │
43│ │ │
44│ ┌──────────────┼──────────────┐ │
45│ ▼ ▼ ▼ │
46│ epoll_wait kevent() port_getn │
47│ (等待) (等待) (等待) │
48│ │ │
49│ ▼ │
50│ 遍历 fired 数组,调用回调函数 │
51└─────────────────────────────────────────────────────────────────┘
52 │
53 ▼
54┌─────────────────────────────────────────────────────────────────┐
55│ 客户端事件管理 │
56│ aeCreateFileEvent(fd, mask) │
57│ aeDeleteFileEvent(fd, mask) │
58│ │ │
59│ ▼ │
60│ aeApiAddEvent() / aeApiDelEvent() │
61│ │ │
62│ ┌──────────────┼──────────────┐ │
63│ ▼ ▼ ▼ │
64│ epoll_ctl kevent() port_associate │
65│ (ADD/MOD/DEL) (EV_ADD/DELETE) (PORT_SOURCE_FD) │
66└─────────────────────────────────────────────────────────────────┘
七、select 作为兜底方案
当系统不支持 epoll/kqueue/evport 时,Redis 使用 select 作为兜底方案:
1// src/ae_select.c
2static int aeApiCreate(aeEventLoop *eventLoop) {
3 aeApiState *state = zmalloc(sizeof(aeApiState));
4 if (!state) return -1;
5 // select 使用 fd_set 来管理文件描述符
6 FD_ZERO(&state->rfds);
7 FD_ZERO(&state->wfds);
8 eventLoop->apidata = state;
9 return 0;
10}
select 的限制:
- 单个进程可监听的 fd 数量有限(FD_SETSIZE,默认 1024)
- 每次调用需要重新设置 fd_set
- 效率较低,需要遍历所有 fd
适用场景:
- 不支持更高级机制的系统
- 测试和调试目的
八、总结
8.1 epoll 和 kqueue 的封装对比
| 项目 | epoll 实现 | kqueue 实现 |
|---|---|---|
| 头文件 | ae_epoll.c |
ae_kqueue.c |
| 实例创建 | epoll_create(1024) |
kqueue() |
| 事件结构体 | struct epoll_event |
struct kevent |
| 注册读事件 | epoll_ctl(EPOLL_CTL_ADD/MOD) |
kevent(EV_ADD, EVFILT_READ) |
| 注册写事件 | 同一次调用合并 | 单独调用 kevent(EV_ADD, EVFILT_WRITE) |
| 删除事件 | 需要 MOD/DEL 判断 | 直接 kevent(EV_DELETE) |
| 等待事件 | epoll_wait() |
kevent() |
| 事件类型判断 | e->events & EPOLLIN/OUT |
e->filter == EVFILT_READ/WRITE |
8.2 Redis 抽象层的设计优势
- 统一接口:屏蔽底层差异,上层代码无需关心具体实现
- 性能优先:自动选择当前系统最优的多路复用机制
- 向后兼容:使用 select 作为兜底,确保可移植性
- 简洁高效:接口设计精简,只暴露必要的操作
8.3 选择建议
| 操作系统 | 推荐机制 | 说明 |
|---|---|---|
| Linux | epoll | 性能最优,广泛使用 |
| macOS | kqueue | 系统原生,高效稳定 |
| FreeBSD | kqueue | 系统原生,高效稳定 |
| Solaris | evport | Sun 系统原生机制 |
| 其他 | select | 兜底方案 |
通过这两篇文章的分析,我们可以看到 Redis 如何优雅地封装不同的 I/O 多路复用机制,实现跨平台的高性能网络通信。这种抽象层设计思想值得在类似项目中借鉴。
— END —