epoll / kqueue 在 Redis 中的封装实现(上)

一、概述

Redis 作为高性能的内存数据库,其网络通信层采用了 I/O 多路复用技术来处理大量并发连接。为了适配不同操作系统,Redis 对多种 I/O 多路复用机制进行了统一封装,包括:

机制 操作系统 性能
epoll Linux
kqueue BSD/macOS
evport Solaris
select 跨平台 低(兜底方案)

本文将深入剖析 Redis 对 epoll 的封装实现,下一篇文章将分析 kqueue 的封装。

二、多路复用机制的选择策略

2.1 编译时选择

Redis 在编译时根据操作系统自动选择最优的多路复用机制:

 1// src/ae.c:47-61
 2/* Include the best multiplexing layer supported by this system.
 3 * The following should be ordered by performances, descending. */
 4#ifdef HAVE_EVPORT
 5#include "ae_evport.c"
 6#else
 7    #ifdef HAVE_EPOLL
 8    #include "ae_epoll.c"
 9    #else
10        #ifdef HAVE_KQUEUE
11        #include "ae_kqueue.c"
12        #else
13        #include "ae_select.c"
14        #endif
15    #endif
16#endif

选择优先级(性能从高到低):

  1. evport:Solaris 系统原生机制
  2. epoll:Linux 系统原生机制
  3. kqueue:BSD/macOS 系统原生机制
  4. select:POSIX 标准接口,作为兜底方案

2.2 统一的 API 抽象

Redis 定义了一套统一的多路复用 API,每种实现都遵循相同的接口规范:

1// 以下是需要实现的统一接口(内部使用)
2static int aeApiCreate(aeEventLoop *eventLoop);           // 创建多路复用实例
3static int aeApiResize(aeEventLoop *eventLoop, int setsize);  // 调整容量
4static void aeApiFree(aeEventLoop *eventLoop);            // 释放资源
5static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);  // 注册事件
6static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask); // 删除事件
7static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);   // 等待事件
8static char *aeApiName(void);                             // 返回机制名称

三、epoll 基础知识

3.1 epoll 的核心优势

相比传统的 select/poll,epoll 具有以下优势:

特性 select/poll epoll
fd 数量限制 有限制(FD_SETSIZE,默认 1024) 几乎无限制
效率 O(n) 遍历所有 fd O(1) 只返回就绪的 fd
内存拷贝 每次调用需要拷贝 共享内存,无需拷贝
触发模式 仅水平触发 支持边缘触发(ET)

3.2 epoll 的三个核心系统调用

1// 创建 epoll 实例,返回 epoll 文件描述符
2int epoll_create(int size);
3
4// 管理 epoll 实例中的文件描述符(添加、修改、删除)
5int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
6
7// 等待事件发生
8int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

3.3 epoll_event 结构体

 1struct epoll_event {
 2    uint32_t events;    // 事件类型(EPOLLIN、EPOLLOUT 等)
 3    epoll_data_t data;  // 用户数据(可存储 fd 或指针)
 4};
 5
 6typedef union epoll_data {
 7    void    *ptr;
 8    int      fd;
 9    uint32_t u32;
10    uint64_t u64;
11} epoll_data_t;

四、Redis 中 epoll 的封装实现

4.1 数据结构定义

Redis 使用 aeApiState 结构体封装 epoll 相关状态:

1// src/ae_epoll.c:34-37
2typedef struct aeApiState {
3    int epfd;                   // epoll 实例的文件描述符
4    struct epoll_event *events; // 用于接收 epoll_wait 返回的事件数组
5} aeApiState;

字段说明

  • epfdepoll_create 返回的 epoll 实例文件描述符
  • events:预分配的事件数组,避免每次调用 epoll_wait 时分配内存

4.2 创建 epoll 实例 aeApiCreate

 1// src/ae_epoll.c:39-60
 2static int aeApiCreate(aeEventLoop *eventLoop) {
 3    aeApiState *state = zmalloc(sizeof(aeApiState));  // 分配状态结构体
 4
 5    if (!state) return -1;
 6    // 预分配事件数组,大小为 setsize
 7    // setsize 决定了 Redis 能处理的最大 fd 数量
 8    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
 9    if (!state->events) {
10        zfree(state);
11        return -1;
12    }
13    // 创建 epoll 实例
14    // 参数 size 在 Linux 2.6.8 后已被忽略,但必须 > 0
15    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
16    if (state->epfd == -1) {
17        // 创建失败,释放已分配的资源
18        zfree(state->events);
19        zfree(state);
20        return -1;
21    }
22    // 将 epoll 状态保存到事件循环的 apidata 字段
23    eventLoop->apidata = state;
24    return 0;
25}

执行流程

  1. 分配 aeApiState 结构体
  2. 预分配 epoll_event 数组
  3. 调用 epoll_create 创建 epoll 实例
  4. 将状态保存到 eventLoop->apidata

4.3 调整容量 aeApiResize

1// src/ae_epoll.c:62-67
2static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
3    aeApiState *state = eventLoop->apidata;
4
5    // 重新分配事件数组的大小
6    // 当 Redis 需要动态调整最大连接数时调用
7    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
8    return 0;
9}

说明

  • 当通过 CONFIG SET maxclients 调整最大客户端数时,会触发此函数
  • 重新分配事件数组以适应新的容量

4.4 释放资源 aeApiFree

1// src/ae_epoll.c:69-74
2static void aeApiFree(aeEventLoop *eventLoop) {
3    aeApiState *state = eventLoop->apidata;
4
5    close(state->epfd);      // 关闭 epoll 实例文件描述符
6    zfree(state->events);    // 释放事件数组
7    zfree(state);            // 释放状态结构体
8}

4.5 注册事件 aeApiAddEvent

 1// src/ae_epoll.c:76-91
 2static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
 3    aeApiState *state = eventLoop->apidata;
 4    struct epoll_event ee = {0}; /* avoid valgrind warning */
 5    /* If the fd was already monitored for some event, we need a MOD
 6     * operation. Otherwise we need an ADD operation. */
 7    // 判断操作类型:如果该 fd 已有事件注册,则使用 MOD;否则使用 ADD
 8    int op = eventLoop->events[fd].mask == AE_NONE ?
 9            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
10
11    ee.events = 0;
12    // 合并已有的事件类型(MOD 操作时保留之前注册的事件)
13    mask |= eventLoop->events[fd].mask; /* Merge old events */
14    // 将 Redis 的事件类型转换为 epoll 的事件类型
15    if (mask & AE_READABLE) ee.events |= EPOLLIN;   // 可读事件
16    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  // 可写事件
17    ee.data.fd = fd;  // 将 fd 存储在 epoll_event.data 中
18    // 调用 epoll_ctl 注册或修改事件监听
19    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
20    return 0;
21}

关键点

  1. ADD vs MOD:新 fd 使用 EPOLL_CTL_ADD,已注册的 fd 使用 EPOLL_CTL_MOD
  2. 事件合并:MOD 操作时,需要合并已有的事件,避免覆盖
  3. 事件类型转换:Redis 的 AE_READABLEEPOLLINAE_WRITABLEEPOLLOUT

事件注册示例

 1假设 fd=5 当前无任何事件注册:
 2  1. 调用 aeApiAddEvent(eventLoop, 5, AE_READABLE)
 3  2. op = EPOLL_CTL_ADD(因为 mask == AE_NONE)
 4  3. ee.events = EPOLLIN
 5  4. 调用 epoll_ctl(epfd, EPOLL_CTL_ADD, 5, &ee)
 6
 7假设 fd=5 已注册 AE_READABLE,现在要添加 AE_WRITABLE:
 8  1. 调用 aeApiAddEvent(eventLoop, 5, AE_WRITABLE)
 9  2. op = EPOLL_CTL_MOD(因为 mask != AE_NONE)
10  3. ee.events = EPOLLIN | EPOLLOUT(合并已有事件)
11  4. 调用 epoll_ctl(epfd, EPOLL_CTL_MOD, 5, &ee)

4.6 删除事件 aeApiDelEvent

 1// src/ae_epoll.c:93-113
 2static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
 3    aeApiState *state = eventLoop->apidata;
 4    struct epoll_event ee = {0}; /* avoid valgrind warning */
 5    // 计算删除后剩余的事件类型
 6    mask = eventLoop->events[fd].mask & (~mask);
 7
 8    ee.events = 0;
 9    // 重新构建 epoll 事件掩码
10    if (mask & AE_READABLE) ee.events |= EPOLLIN;
11    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
12    ee.data.fd = fd;
13
14    if (mask != AE_NONE) {
15        // 还有其他事件,使用 MOD 操作更新
16        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
17    } else {
18        // 所有事件都已删除,从 epoll 中移除该 fd
19        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
20         * EPOLL_CTL_DEL. */
21        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
22    }
23}

关键点

  1. 保留剩余事件:删除某种事件时,保留其他已注册的事件
  2. MOD vs DEL
    • 如果删除后还有事件,使用 EPOLL_CTL_MOD
    • 如果删除后无任何事件,使用 EPOLL_CTL_DEL

事件删除示例

 1假设 fd=5 已注册 AE_READABLE | AE_WRITABLE:
 2
 3场景1:删除 AE_WRITABLE
 4  1. mask = (AE_READABLE | AE_WRITABLE) & (~AE_WRITABLE) = AE_READABLE
 5  2. ee.events = EPOLLIN
 6  3. 调用 epoll_ctl(epfd, EPOLL_CTL_MOD, 5, &ee)
 7
 8场景2:删除 AE_READABLE | AE_WRITABLE(全部删除)
 9  1. mask = (AE_READABLE | AE_WRITABLE) & (~(AE_READABLE | AE_WRITABLE)) = AE_NONE
10  2. 调用 epoll_ctl(epfd, EPOLL_CTL_DEL, 5, &ee)

4.7 等待事件 aeApiPoll

 1// src/ae_epoll.c:115-145
 2static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
 3    aeApiState *state = eventLoop->apidata;
 4    int retval, numevents = 0;
 5
 6    // 调用 epoll_wait 等待事件发生
 7    // 参数:epoll 实例 fd、事件数组、数组大小、超时时间(毫秒)
 8    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
 9            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
10    if (retval > 0) {
11        int j;
12
13        numevents = retval;
14        // 遍历所有触发的事件
15        for (j = 0; j < numevents; j++) {
16            int mask = 0;
17            struct epoll_event *e = state->events+j;
18
19            // 将 epoll 事件类型转换回 Redis 事件类型
20            if (e->events & EPOLLIN) mask |= AE_READABLE;
21            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
22            // 错误和挂断事件也标记为可写,便于上层处理
23            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
24            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
25            // 将触发的事件信息填充到 fired 数组
26            eventLoop->fired[j].fd = e->data.fd;
27            eventLoop->fired[j].mask = mask;
28        }
29    }
30    // 返回触发的事件数量
31    return numevents;
32}

关键点

  1. 超时处理tvp 为 NULL 表示无限等待,否则转换为毫秒
  2. 事件类型转换:epoll 事件 → Redis 事件
  3. 错误处理EPOLLERREPOLLHUP 标记为可写,触发写回调处理异常

4.8 获取机制名称 aeApiName

1// src/ae_epoll.c:147-149
2static char *aeApiName(void) {
3    return "epoll";
4}

用于日志输出和调试信息。

五、epoll 封装的完整调用链

 1┌─────────────────────────────────────────────────────────────────┐
 2│                        Redis 启动                                │
 3│                     initServer()                                │
 4│                          │                                      │
 5│                          ▼                                      │
 6│                  aeCreateEventLoop()                            │
 7│                          │                                      │
 8│                          ▼                                      │
 9│                   aeApiCreate()                                 │
10│                          │                                      │
11│                          ▼                                      │
12│                  epoll_create(1024)                             │
13│                    返回 epfd                                     │
14└─────────────────────────────────────────────────────────────────┘
151617┌─────────────────────────────────────────────────────────────────┐
18│                     注册监听 socket                              │
19│              aeCreateFileEvent(listen_fd, AE_READABLE)          │
20│                          │                                      │
21│                          ▼                                      │
22│                   aeApiAddEvent()                               │
23│                          │                                      │
24│                          ▼                                      │
25│              epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd)          │
26└─────────────────────────────────────────────────────────────────┘
272829┌─────────────────────────────────────────────────────────────────┐
30│                     事件循环 aeMain()                            │
31│                       while(!stop)                              │
32│                          │                                      │
33│                          ▼                                      │
34│                   aeProcessEvents()                             │
35│                          │                                      │
36│                          ▼                                      │
37│                    aeApiPoll()                                  │
38│                          │                                      │
39│                          ▼                                      │
40│               epoll_wait(epfd, events, timeout)                 │
41│                    等待事件触发                                   │
42│                          │                                      │
43│                          ▼                                      │
44│              遍历 fired 数组,调用回调函数                        │
45│                rfileProc / wfileProc                            │
46└─────────────────────────────────────────────────────────────────┘
474849┌─────────────────────────────────────────────────────────────────┐
50│                    注册/删除客户端事件                           │
51│         aeCreateFileEvent(client_fd, AE_READABLE)               │
52│         aeDeleteFileEvent(client_fd, AE_WRITABLE)               │
53│                          │                                      │
54│                          ▼                                      │
55│             aeApiAddEvent() / aeApiDelEvent()                   │
56│                          │                                      │
57│                          ▼                                      │
58│         epoll_ctl(epfd, EPOLL_CTL_ADD/MOD/DEL, fd)              │
59└─────────────────────────────────────────────────────────────────┘

六、Redis 对 epoll 的优化点

6.1 预分配事件数组

1state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);

优化效果:避免每次 epoll_wait 调用时分配内存,减少内存分配开销。

6.2 事件合并策略

1mask |= eventLoop->events[fd].mask; /* Merge old events */

优化效果:添加新事件时保留已有事件,避免多次系统调用。

6.3 使用水平触发(LT)

Redis 使用 epoll 的默认触发模式——水平触发(Level Triggered),而非边缘触发(Edge Triggered)。

为什么选择 LT 而非 ET?

  1. 编程简单:LT 模式下,只要 fd 可读/可写,就会持续通知
  2. 无需一次性读完:可以分多次读取/写入,灵活控制
  3. 配合非阻塞 I/O:Redis 使用非阻塞 socket,不会因为 LT 而阻塞

6.4 动态容量调整

1static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
2    aeApiState *state = eventLoop->apidata;
3    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
4    return 0;
5}

优化效果:支持运行时调整最大连接数,无需重启服务。

七、总结

本文详细分析了 Redis 对 epoll 的封装实现,核心要点如下:

API epoll 系统调用 功能
aeApiCreate epoll_create 创建 epoll 实例
aeApiAddEvent epoll_ctl(ADD/MOD) 注册/修改事件
aeApiDelEvent epoll_ctl(MOD/DEL) 删除事件
aeApiPoll epoll_wait 等待事件触发
aeApiFree close(epfd) 释放资源

下一篇文章将分析 Redis 对 kqueue 的封装实现,展示 Redis 如何在 BSD/macOS 系统上实现高性能 I/O 多路复用。

— END —