epoll / kqueue 在 Redis 中的封装实现(下)

一、概述

在上一篇文章中,我们详细分析了 Redis 对 epoll 的封装实现。本文将继续分析 Redis 对 kqueue 的封装,这是 BSD 和 macOS 系统上的高性能 I/O 多路复用机制。

kqueue 与 epoll 设计理念相似但 API 差异较大,Redis 通过统一的抽象层屏蔽了这些差异,使得上层代码可以无缝运行在不同操作系统上。

二、kqueue 基础知识

2.1 kqueue 的核心优势

kqueue 是 FreeBSD 4.1 引入的可扩展事件通知机制,后来被 macOS、NetBSD、OpenBSD 等系统采用:

特性 说明
统一接口 可监听文件描述符、信号、进程、定时器等多种事件
批量操作 支持一次注册/删除多个事件
高效 内核态维护事件状态,避免每次调用传入大量数据
可扩展 支持自定义事件源(如文件系统变化)

2.2 kqueue 的核心系统调用

1// 创建 kqueue 实例,返回文件描述符
2int kqueue(void);
3
4// 注册/修改/删除事件
5int kevent(int kq, const struct kevent *changelist, int nchanges,
6           struct kevent *eventlist, int nevents,
7           const struct timespec *timeout);

说明

  • kqueue() 创建一个内核事件队列,返回其文件描述符
  • kevent() 既可以注册事件,也可以等待事件,通过不同参数组合实现

2.3 kevent 结构体

 1struct kevent {
 2    uintptr_t ident;       // 事件标识符(通常是 fd)
 3    short     filter;      // 事件过滤器(EVFILT_READ、EVFILT_WRITE 等)
 4    u_short   flags;       // 事件动作(EV_ADD、EV_DELETE、EV_ENABLE 等)
 5    u_int     fflags;      // 过滤器特定标志
 6    intptr_t  data;        // 过滤器特定数据
 7    void      *udata;      // 用户数据指针
 8};
 9
10// 用于初始化 kevent 结构体的宏
11EV_SET(&kev, ident, filter, flags, fflags, data, udata);

常用过滤器

过滤器 说明
EVFILT_READ 文件描述符可读
EVFILT_WRITE 文件描述符可写
EVFILT_TIMER 定时器
EVFILT_SIGNAL 信号
EVFILT_VNODE 文件系统事件

常用标志

标志 说明
EV_ADD 添加事件
EV_DELETE 删除事件
EV_ENABLE 启用事件
EV_DISABLE 禁用事件(暂不触发)
EV_ONESHOT 触发后自动删除
EV_CLEAR 触发后清除状态

三、Redis 中 kqueue 的封装实现

3.1 数据结构定义

1// src/ae_kqueue.c:34-41
2typedef struct aeApiState {
3    int kqfd;                   // kqueue 实例的文件描述符
4    struct kevent *events;      // 用于接收 kevent 返回的事件数组
5} aeApiState;

字段说明

  • kqfdkqueue() 返回的 kqueue 实例文件描述符
  • events:预分配的事件数组,用于存放 kevent() 返回的触发事件

与 epoll 的对比

项目 epoll kqueue
实例 fd epfd (epoll_create) kqfd (kqueue)
事件结构体 struct epoll_event struct kevent
事件数组 state->events state->events

3.2 创建 kqueue 实例 aeApiCreate

 1// src/ae_kqueue.c:43-64
 2static int aeApiCreate(aeEventLoop *eventLoop) {
 3    aeApiState *state = zmalloc(sizeof(aeApiState));  // 分配状态结构体
 4
 5    if (!state) return -1;
 6    // 预分配事件数组,大小为 setsize
 7    // 每个事件占用一个 kevent 结构体
 8    state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize);
 9    if (!state->events) {
10        zfree(state);
11        return -1;
12    }
13    // 创建 kqueue 实例
14    // kqueue() 不需要参数,比 epoll_create 更简洁
15    state->kqfd = kqueue();
16    if (state->kqfd == -1) {
17        // 创建失败,释放已分配的资源
18        zfree(state->events);
19        zfree(state);
20        return -1;
21    }
22    // 将 kqueue 状态保存到事件循环的 apidata 字段
23    eventLoop->apidata = state;
24    return 0;
25}

与 epoll 的对比

1// epoll 版本
2state->epfd = epoll_create(1024);  // 参数已被忽略,但仍需传入
3
4// kqueue 版本
5state->kqfd = kqueue();            // 无需参数

3.3 调整容量 aeApiResize

1// src/ae_kqueue.c:66-71
2static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
3    aeApiState *state = eventLoop->apidata;
4
5    // 重新分配事件数组的大小
6    state->events = zrealloc(state->events, sizeof(struct kevent)*setsize);
7    return 0;
8}

3.4 释放资源 aeApiFree

1// src/ae_kqueue.c:73-78
2static void aeApiFree(aeEventLoop *eventLoop) {
3    aeApiState *state = eventLoop->apidata;
4
5    close(state->kqfd);     // 关闭 kqueue 实例文件描述符
6    zfree(state->events);   // 释放事件数组
7    zfree(state);           // 释放状态结构体
8}

3.5 注册事件 aeApiAddEvent

 1// src/ae_kqueue.c:80-102
 2static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
 3    aeApiState *state = eventLoop->apidata;
 4    struct kevent ke;
 5
 6    // kqueue 需要分别注册读事件和写事件
 7    // 不能像 epoll 那样通过一次调用同时注册读写事件
 8    if (mask & AE_READABLE) {
 9        // 注册可读事件
10        // EV_SET 宏用于初始化 kevent 结构体
11        // 参数:kevent指针, ident(fd), filter, flags, fflags, data, udata
12        EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
13        // kevent 调用:kqfd, changelist, nchanges, eventlist, nevents, timeout
14        // 这里只注册事件,不等待事件(nchanges=1, nevents=0)
15        if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
16    }
17    if (mask & AE_WRITABLE) {
18        // 注册可写事件
19        EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
20        if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
21    }
22    return 0;
23}

关键点

  1. 分离注册:kqueue 中读事件和写事件是独立的过滤器,需要分别注册
  2. EV_ADD:添加事件标志,如果事件已存在则修改
  3. kevent 调用
    • changelistnchanges:要注册/修改的事件列表和数量
    • eventlistnevents:接收触发事件的缓冲区和大小
    • timeout:超时时间,NULL 表示非阻塞(仅注册时)

与 epoll 的对比

 1// epoll 版本:通过 mask 合并,一次调用注册读写事件
 2if (mask & AE_READABLE) ee.events |= EPOLLIN;
 3if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
 4epoll_ctl(state->epfd, op, fd, &ee);
 5
 6// kqueue 版本:需要分别注册
 7if (mask & AE_READABLE) {
 8    EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
 9    kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
10}
11if (mask & AE_WRITABLE) {
12    EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
13    kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
14}

3.6 删除事件 aeApiDelEvent

 1// src/ae_kqueue.c:104-124
 2static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
 3    aeApiState *state = eventLoop->apidata;
 4    struct kevent ke;
 5
 6    // kqueue 删除事件同样需要分别处理读和写
 7    if (mask & AE_READABLE) {
 8        // 删除可读事件
 9        // EV_DELETE 标志表示删除事件
10        EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
11        kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
12    }
13    if (mask & AE_WRITABLE) {
14        // 删除可写事件
15        EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
16        kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
17    }
18}

关键点

  1. EV_DELETE:删除事件标志
  2. 分别删除:读事件和写事件需要分别删除

与 epoll 的对比

 1// epoll 版本:需要计算剩余事件,决定 MOD 还是 DEL
 2mask = eventLoop->events[fd].mask & (~mask);
 3if (mask != AE_NONE) {
 4    epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee);
 5} else {
 6    epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee);
 7}
 8
 9// kqueue 版本:直接删除指定类型的事件,无需关心剩余事件
10if (mask & AE_READABLE) {
11    EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
12    kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
13}

kqueue 的优势:删除操作更简单,不需要查询当前注册的事件状态。

3.7 等待事件 aeApiPoll

 1// src/ae_kqueue.c:126-162
 2static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
 3    aeApiState *state = eventLoop->apidata;
 4    int retval, numevents = 0;
 5
 6    // 转换超时时间为 timespec 结构
 7    if (tvp != NULL) {
 8        struct timespec timeout;
 9        timeout.tv_sec = tvp->tv_sec;
10        timeout.tv_nsec = tvp->tv_usec * 1000;  // 微秒转纳秒
11        // 调用 kevent 等待事件
12        // changelist=NULL, nchanges=0 表示仅等待,不注册事件
13        // eventlist=state->events, nevents=setsize 接收触发的事件
14        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout);
15    } else {
16        // tvp 为 NULL 表示无限等待
17        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL);
18    }
19
20    if (retval > 0) {
21        int j;
22
23        numevents = retval;
24        // 遍历所有触发的事件
25        for (j = 0; j < numevents; j++) {
26            int mask = 0;
27            struct kevent *e = state->events+j;
28
29            // 根据过滤器类型确定事件类型
30            if (e->filter == EVFILT_READ) mask |= AE_READABLE;
31            if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
32            // 处理错误情况
33            if (e->flags & EV_ERROR) mask |= AE_WRITABLE;
34            // 将触发的事件信息填充到 fired 数组
35            eventLoop->fired[j].fd = e->ident;  // ident 存储的是 fd
36            eventLoop->fired[j].mask = mask;
37        }
38    }
39    return numevents;
40}

关键点

  1. 超时转换timeval(微秒)→ timespec(纳秒)
  2. 过滤器判断:通过 e->filter 判断是读事件还是写事件
  3. fd 获取:通过 e->ident 获取触发事件的文件描述符

与 epoll 的对比

1// epoll 版本:通过 events 字段判断事件类型
2if (e->events & EPOLLIN) mask |= AE_READABLE;
3if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
4
5// kqueue 版本:通过 filter 字段判断事件类型
6if (e->filter == EVFILT_READ) mask |= AE_READABLE;
7if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;

3.8 获取机制名称 aeApiName

1// src/ae_kqueue.c:164-166
2static char *aeApiName(void) {
3    return "kqueue";
4}

四、epoll 与 kqueue 的详细对比

4.1 API 设计对比

操作 epoll kqueue
创建实例 epoll_create(size) kqueue()
注册事件 epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) kevent(kqfd, &ke, 1, NULL, 0, NULL)
修改事件 epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) kevent(kqfd, &ke, 1, NULL, 0, NULL)
删除事件 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL) kevent(kqfd, &ke, 1, NULL, 0, NULL)
等待事件 epoll_wait(epfd, events, maxevents, timeout_ms) kevent(kqfd, NULL, 0, events, maxevents, &timeout)

4.2 事件注册对比

epoll

  • 一个 fd 对应一个 epoll_event
  • 读写事件通过位掩码组合(EPOLLIN | EPOLLOUT
  • 添加新事件需要检查是否已存在(决定 ADD 还是 MOD)

kqueue

  • 一个 fd 对应多个 kevent(每种过滤器一个)
  • 读写事件通过不同过滤器区分(EVFILT_READEVFILT_WRITE
  • 添加事件直接使用 EV_ADD,已存在则修改

4.3 事件删除对比

epoll

1// 删除特定事件需要:查询当前状态 → 计算剩余事件 → 决定 MOD/DEL
2mask = eventLoop->events[fd].mask & (~mask);
3if (mask != AE_NONE) {
4    // 还有其他事件,使用 MOD
5    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ee);
6} else {
7    // 没有事件了,使用 DEL
8    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
9}

kqueue

1// 直接删除指定类型的事件,无需查询当前状态
2EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
3kevent(kqfd, &ke, 1, NULL, 0, NULL);

4.4 性能对比

指标 epoll kqueue
注册事件 一次系统调用注册读写 两次系统调用注册读写
等待事件 O(1) O(1)
批量操作 不支持 支持批量注册/删除
事件类型 文件描述符 文件描述符、信号、定时器等

注意:虽然 kqueue 注册读写事件需要两次系统调用,但 Redis 的典型使用场景中,连接建立时只注册读事件,写事件是按需注册的,所以影响很小。

五、Redis 的统一抽象层

5.1 抽象层接口定义

Redis 定义了一组统一的内部接口,每种多路复用机制都需要实现:

1// 所有实现必须提供的接口
2static int aeApiCreate(aeEventLoop *eventLoop);
3static int aeApiResize(aeEventLoop *eventLoop, int setsize);
4static void aeApiFree(aeEventLoop *eventLoop);
5static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
6static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask);
7static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
8static char *aeApiName(void);

5.2 上层调用示例

 1// ae.c 中对抽象层的调用
 2
 3// 创建事件循环
 4aeEventLoop *aeCreateEventLoop(int setsize) {
 5    aeEventLoop *eventLoop = zmalloc(sizeof(aeEventLoop));
 6    // ...
 7    if (aeApiCreate(eventLoop) == -1) goto err;  // 调用抽象层
 8    // ...
 9}
10
11// 注册文件事件
12int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
13                      aeFileProc *proc, void *clientData) {
14    // ...
15    if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR;  // 调用抽象层
16    // ...
17}
18
19// 处理事件
20int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
21    // ...
22    numevents = aeApiPoll(eventLoop, tvp);  // 调用抽象层
23    // ...
24}

5.3 抽象层的优势

  1. 跨平台:同一份上层代码可在 Linux、BSD、macOS 上运行
  2. 可扩展:新增多路复用机制只需实现抽象层接口
  3. 可测试:可以使用简单的 select 实现进行测试
  4. 性能优化:每个平台可使用最优的原生机制

六、完整调用流程图

 1┌─────────────────────────────────────────────────────────────────┐
 2│                        Redis 初始化                              │
 3│                     initServer()                                │
 4│                          │                                      │
 5│                          ▼                                      │
 6│                  aeCreateEventLoop()                            │
 7│                          │                                      │
 8│                          ▼                                      │
 9│                    ┌─────┴─────┐                                │
10│                    │ aeApiCreate│                               │
11│                    └─────┬─────┘                                │
12│                          │                                      │
13│           ┌──────────────┼──────────────┐                       │
14│           ▼              ▼              ▼                       │
15│     epoll_create   kqueue()      port_create                    │
16│        (Linux)     (BSD/macOS)    (Solaris)                     │
17└─────────────────────────────────────────────────────────────────┘
181920┌─────────────────────────────────────────────────────────────────┐
21│                     注册监听 socket                              │
22│          aeCreateFileEvent(listen_fd, AE_READABLE)              │
23│                          │                                      │
24│                          ▼                                      │
25│                   aeApiAddEvent()                               │
26│                          │                                      │
27│           ┌──────────────┼──────────────┐                       │
28│           ▼              ▼              ▼                       │
29│        epoll_ctl      kevent()      port_associate              │
30│     (EPOLL_CTL_ADD)  (EV_ADD)       (PORT_SOURCE_FD)            │
31└─────────────────────────────────────────────────────────────────┘
323334┌─────────────────────────────────────────────────────────────────┐
35│                     事件循环 aeMain()                            │
36│                       while(!stop)                              │
37│                          │                                      │
38│                          ▼                                      │
39│                   aeProcessEvents()                             │
40│                          │                                      │
41│                          ▼                                      │
42│                    aeApiPoll()                                  │
43│                          │                                      │
44│           ┌──────────────┼──────────────┐                       │
45│           ▼              ▼              ▼                       │
46│       epoll_wait      kevent()       port_getn                  │
47│        (等待)          (等待)          (等待)                    │
48│                          │                                      │
49│                          ▼                                      │
50│              遍历 fired 数组,调用回调函数                        │
51└─────────────────────────────────────────────────────────────────┘
525354┌─────────────────────────────────────────────────────────────────┐
55│                    客户端事件管理                                │
56│              aeCreateFileEvent(fd, mask)                        │
57│              aeDeleteFileEvent(fd, mask)                        │
58│                          │                                      │
59│                          ▼                                      │
60│         aeApiAddEvent() / aeApiDelEvent()                       │
61│                          │                                      │
62│           ┌──────────────┼──────────────┐                       │
63│           ▼              ▼              ▼                       │
64│        epoll_ctl      kevent()      port_associate              │
65│     (ADD/MOD/DEL)   (EV_ADD/DELETE)  (PORT_SOURCE_FD)           │
66└─────────────────────────────────────────────────────────────────┘

七、select 作为兜底方案

当系统不支持 epoll/kqueue/evport 时,Redis 使用 select 作为兜底方案:

 1// src/ae_select.c
 2static int aeApiCreate(aeEventLoop *eventLoop) {
 3    aeApiState *state = zmalloc(sizeof(aeApiState));
 4    if (!state) return -1;
 5    // select 使用 fd_set 来管理文件描述符
 6    FD_ZERO(&state->rfds);
 7    FD_ZERO(&state->wfds);
 8    eventLoop->apidata = state;
 9    return 0;
10}

select 的限制

  • 单个进程可监听的 fd 数量有限(FD_SETSIZE,默认 1024)
  • 每次调用需要重新设置 fd_set
  • 效率较低,需要遍历所有 fd

适用场景

  • 不支持更高级机制的系统
  • 测试和调试目的

八、总结

8.1 epoll 和 kqueue 的封装对比

项目 epoll 实现 kqueue 实现
头文件 ae_epoll.c ae_kqueue.c
实例创建 epoll_create(1024) kqueue()
事件结构体 struct epoll_event struct kevent
注册读事件 epoll_ctl(EPOLL_CTL_ADD/MOD) kevent(EV_ADD, EVFILT_READ)
注册写事件 同一次调用合并 单独调用 kevent(EV_ADD, EVFILT_WRITE)
删除事件 需要 MOD/DEL 判断 直接 kevent(EV_DELETE)
等待事件 epoll_wait() kevent()
事件类型判断 e->events & EPOLLIN/OUT e->filter == EVFILT_READ/WRITE

8.2 Redis 抽象层的设计优势

  1. 统一接口:屏蔽底层差异,上层代码无需关心具体实现
  2. 性能优先:自动选择当前系统最优的多路复用机制
  3. 向后兼容:使用 select 作为兜底,确保可移植性
  4. 简洁高效:接口设计精简,只暴露必要的操作

8.3 选择建议

操作系统 推荐机制 说明
Linux epoll 性能最优,广泛使用
macOS kqueue 系统原生,高效稳定
FreeBSD kqueue 系统原生,高效稳定
Solaris evport Sun 系统原生机制
其他 select 兜底方案

通过这两篇文章的分析,我们可以看到 Redis 如何优雅地封装不同的 I/O 多路复用机制,实现跨平台的高性能网络通信。这种抽象层设计思想值得在类似项目中借鉴。

— END —