Redis 内存模型设计全解析(serverCron 在干什么?)
Redis有过期策略、内存淘汰,但具体怎么实现的?serverCron每隔一段时间跑一次,到底在做什么?这篇文章从源码层面拆解Redis的内存管理机制。
一、Redis内存管理概览
Redis 的内存管理涉及三个层面:
| 层面 | 触发时机 | 机制 |
|---|---|---|
| key 过期删除 | 访问时 + 定期扫描 | 惰性删除 + 定期删除 |
| 内存淘汰 | 内存超限 | LRU/LFU/TTL/随机 |
| 后台释放 | UNLINK/FLUSHDB ASYNC | bio线程 |
二、过期策略:惰性删除 + 定期删除
Redis不是用一个线程专门扫描过期key,而是两种策略结合。
2.1 惰性删除(Lazy Expiration)
访问key时检查是否过期,过期就删除:
1// db.c
2robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
3 robj *val;
4
5 if (expireIfNeeded(db,key) == 1) {
6 /* Key expired. If we are in the context of a master, expireIfNeeded()
7 * returns 0 only when the key does not exist at all, so it's safe
8 * to return NULL ASAP. */
9 if (server.masterhost == NULL) return NULL;
10
11 /* However if we are in the context of a slave, expireIfNeeded() will
12 * not really try to expire the key, it only returns information
13 * about the "logical" status of the key: key expiring is up to the
14 * master in order to have a consistent view of master's data set.
15 *
16 * However, if the command caller is not the master, and as additional
17 * safety measure, the command invoked is a read-only command, we can
18 * safely return NULL here, and provide a more consistent behavior
19 * to clients accessign expired values in a read-only fashion, that
20 * will say the key as non existing.
21 *
22 * Notably this covers GETs when slaves are used to scale reads. */
23 if (server.current_client &&
24 server.current_client != server.master &&
25 server.current_client->cmd &&
26 server.current_client->cmd->flags & CMD_READONLY)
27 {
28 return NULL;
29 }
30 }
31 val = lookupKey(db,key,flags);
32 if (val == NULL)
33 server.stat_keyspace_misses++;
34 else
35 server.stat_keyspace_hits++;
36 return val;
37}
expireIfNeeded 的实现:
1// db.c
2int expireIfNeeded(redisDb *db, robj *key) {
3 mstime_t when = getExpire(db,key);
4 mstime_t now;
5
6 if (when < 0) return 0; /* No expire for this key */
7
8 /* Don't expire anything while loading. It will be done later. */
9 if (server.loading) return 0;
10
11 /* If we are in the context of a Lua script, we pretend that time is
12 * blocked to when the Lua script started. This way a key can expire
13 * only the first time it is accessed and not in the middle of the
14 * script execution, making propagation to slaves / AOF consistent.
15 * See issue #1525 on Github for more information. */
16 now = server.lua_caller ? server.lua_time_start : mstime();
17
18 /* If we are running in the context of a slave, return ASAP:
19 * the slave key expiration is controlled by the master that will
20 * send us synthesized DEL operations for expired keys.
21 *
22 * Still we try to return the right information to the caller,
23 * that is, 0 if we think the key should be still valid, 1 if
24 * we think the key is expired at this time. */
25 if (server.masterhost != NULL) return now > when;
26
27 /* Return when this key has not expired */
28 if (now <= when) return 0;
29
30 /* Delete the key */
31 server.stat_expiredkeys++;
32 propagateExpire(db,key,server.lazyfree_lazy_expire);
33 notifyKeyspaceEvent(NOTIFY_EXPIRED,
34 "expired",key,db->id);
35 return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
36 dbSyncDelete(db,key);
37}
惰性删除的问题:如果一个key过期了但从未被访问,就会一直占用内存。所以需要定期删除补充。
2.2 定期删除(Active Expiration)
serverCron里调用activeExpireCycle,主动扫描过期key:
1// expire.c
2void activeExpireCycle(int type) {
3 // 静态变量记录上次扫描到哪个数据库
4 static unsigned int current_db = 0;
5 static int timelimit_exit = 0;
6 static long long last_fast_cycle = 0;
7
8 int j, iteration = 0;
9 int dbs_per_call = CRON_DBS_PER_CALL; // 默认16
10 long long start = ustime(), timelimit;
11
12 // 快速模式:上次没超时就不跑
13 if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
14 if (!timelimit_exit) return;
15 if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
16 last_fast_cycle = start;
17 }
18
19 // 计算时间上限:每次最多占用CPU时间的25%
20 timelimit = 1000000 * ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC / server.hz / 100;
21 // ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC = 25
22
23 if (type == ACTIVE_EXPIRE_CYCLE_FAST)
24 timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; // 1ms
25
26 for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
27 redisDb *db = server.db + (current_db % server.dbnum);
28 current_db++;
29
30 // 随机采样,删除过期key
31 do {
32 unsigned long num = dictSize(db->expires);
33 if (num == 0) break;
34
35 // 每次最多检查20个key
36 if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
37 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
38
39 expired = 0;
40 while (num--) {
41 dictEntry *de = dictGetRandomKey(db->expires);
42 if (activeExpireCycleTryExpire(db, de, mstime())) {
43 expired++;
44 }
45 }
46
47 // 超过25%的key过期了,继续扫描这个数据库
48 } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP / 4);
49
50 // 每隔16次迭代检查是否超时
51 if ((iteration & 0xf) == 0) {
52 elapsed = ustime() - start;
53 if (elapsed > timelimit) {
54 timelimit_exit = 1;
55 break;
56 }
57 }
58 }
59}
关键设计点:
- 渐进式:每次只扫描部分数据库,不会阻塞太久
- 自适应:过期key多就多扫,少就少扫
- 时间限制:最多占用25%的CPU时间
- 随机采样:不遍历,随机抽取key检查
activeExpireCycleTryExpire 删除单个过期key:
1// expire.c
2int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
3 long long t = dictGetSignedIntegerVal(de);
4 if (now > t) {
5 sds key = dictGetKey(de);
6 robj *keyobj = createStringObject(key,sdslen(key));
7
8 propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
9 if (server.lazyfree_lazy_expire)
10 dbAsyncDelete(db,keyobj);
11 else
12 dbSyncDelete(db,keyobj);
13 notifyKeyspaceEvent(NOTIFY_EXPIRED,
14 "expired",keyobj,db->id);
15 decrRefCount(keyobj);
16 server.stat_expiredkeys++;
17 return 1;
18 } else {
19 return 0;
20 }
21}
三、内存淘汰:LRU/LFU 实现
当内存超过maxmemory时,触发内存淘汰。
3.1 淘汰触发
1// evict.c
2int freeMemoryIfNeeded(void) {
3 size_t mem_reported, mem_tofree, mem_freed;
4 int result = C_ERR;
5 static int mem_freed_last_try = 0;
6
7 // 获取当前内存使用
8 mem_reported = zmalloc_used_memory();
9
10 // 检查是否超限
11 if (mem_reported <= server.maxmemory) return C_OK;
12
13 // 计算需要释放多少
14 mem_tofree = mem_reported - server.maxmemory;
15 mem_freed = 0;
16
17 // 根据策略淘汰
18 while (mem_freed < mem_tofree) {
19 // 淘汰 key...
20 }
21
22 return result;
23}
3.2 近似LRU实现
真正的LRU需要维护一个访问时间链表,内存开销大。Redis用近似LRU:
1// evict.c
2#define EVPOOL_SIZE 16 // 淘汰池大小
3
4struct evictionPoolEntry {
5 unsigned long long idle; // 空闲时间(LRU)或逆频率(LFU)
6 sds key;
7 sds cached; // 缓存的key名,减少分配
8 int dbid;
9};
淘汰池按空闲时间升序排列,最右边是最适合淘汰的。
填充淘汰池:
1void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict,
2 struct evictionPoolEntry *pool) {
3 int j, k, count;
4 dictEntry *samples[server.maxmemory_samples]; // 默认采样5个
5
6 // 随机采样N个key
7 count = dictGetSomeKeys(sampledict, samples, server.maxmemory_samples);
8
9 for (j = 0; j < count; j++) {
10 unsigned long long idle;
11 sds key = dictGetKey(samples[j]);
12 robj *o = dictGetVal(dictFind(keydict, key));
13
14 // 计算空闲时间
15 if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
16 idle = estimateObjectIdleTime(o);
17 } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
18 idle = 255 - LFUDecrAndReturn(o); // LFU:逆频率
19 }
20
21 // 插入淘汰池(按idle升序)
22 k = 0;
23 while (k < EVPOOL_SIZE && pool[k].key && pool[k].idle < idle)
24 k++;
25
26 // 插入逻辑:把idle大的往右移
27 if (k < EVPOOL_SIZE) {
28 // 移动元素腾出位置
29 if (pool[EVPOOL_SIZE-1].key == NULL) {
30 memmove(pool+k+1, pool+k, sizeof(pool[0])*(EVPOOL_SIZE-k-1));
31 } else {
32 // 池满了,挤出左边最小的
33 memmove(pool, pool+1, sizeof(pool[0])*k);
34 k--;
35 }
36 pool[k].idle = idle;
37 pool[k].key = key;
38 pool[k].dbid = dbid;
39 }
40 }
41}
估算对象空闲时间:
1// evict.c
2unsigned long long estimateObjectIdleTime(robj *o) {
3 unsigned long long lruclock = LRU_CLOCK();
4 if (lruclock >= o->lru) {
5 return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
6 } else {
7 // 处理时钟回绕
8 return (lruclock + (LRU_CLOCK_MAX - o->lru)) * LRU_CLOCK_RESOLUTION;
9 }
10}
每次访问key时更新lru字段:
1// db.c
2robj *lookupKey(redisDb *db, robj *key, int flags) {
3 dictEntry *de = dictFind(db->dict,key->ptr);
4 if (de) {
5 robj *val = dictGetVal(de);
6 /* Update the access time for the ageing algorithm.
7 * Don't do it if we have a saving child, as this will trigger
8 * a copy on write madness. */
9 if (server.rdb_child_pid == -1 &&
10 server.aof_child_pid == -1 &&
11 !(flags & LOOKUP_NOTOUCH))
12 {
13 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
14 updateLFU(val);
15 } else {
16 val->lru = LRU_CLOCK();
17 }
18 }
19 return val;
20 } else {
21 return NULL;
22 }
23}
3.3 LFU 实现
Redis 4.0引入LFU(Least Frequently Used),复用lru字段(24位):
1+----------------+--------+
2| 16 bits | 8 bits |
3| Last Decr Time | LOG_C |
4+----------------+--------+
LOG_C:对数计数器,表示访问频率Last Decr Time:上次衰减时间
1// evict.c
2#define LFU_INIT_VAL 5 // 新key的初始计数
3
4uint8_t LFULogIncr(uint8_t counter) {
5 if (counter == 255) return 255; // 最大值
6 double r = (double)rand() / RAND_MAX;
7 double baseval = counter - LFU_INIT_VAL;
8 if (baseval < 0) baseval = 0;
9 double p = 1.0 / (baseval * server.lfu_log_factor + 1);
10 if (r < p) counter++; // 概率递增
11 return counter;
12}
13
14unsigned long LFUDecrAndReturn(robj *o) {
15 unsigned long ldt = o->lru >> 8; // 上次衰减时间
16 unsigned long counter = o->lru & 255; // 计数器
17
18 // 计算需要衰减多少
19 unsigned long num_periods = server.lfu_decay_time ?
20 LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
21
22 if (num_periods) {
23 counter = (num_periods > counter) ? 0 : counter - num_periods;
24 }
25 return counter;
26}
LFU 的特点:
- 对数递增:不是每次访问都+1,而是概率递增,热点key计数会趋于稳定
- 时间衰减:长时间不访问,计数会逐渐减少
- 配置参数:
lfu-log-factor:递增因子,越大增长越慢lfu-decay-time:衰减周期,单位分钟
四、serverCron:Redis 的后台管家
serverCron是Redis的定时任务,默认每100ms执行一次(hz=10)。
1// server.c
2int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
3 int j;
4
5 // 1. 更新时间缓存
6 server.mstime = mstime();
7 server.unixtime = server.mstime / 1000;
8
9 // 2. 更新LRU时钟
10 server.lruclock = getLRUClock();
11
12 // 3. 处理信号
13 if (server.shutdown_asap) {
14 if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
15 server.shutdown_asap = 0;
16 }
17
18 // 4. 打印数据库信息
19 run_with_period(5000) {
20 for (j = 0; j < server.dbnum; j++) {
21 long long size = dictSize(server.db[j].dict);
22 long long expires = dictSize(server.db[j].expires);
23 // 打印日志...
24 }
25 }
26
27 // 5. 客户端管理
28 clientsCron();
29
30 // 6. 数据库管理(过期key、rehash)
31 databasesCron();
32
33 // 7. 检查是否需要 AOF重写 / RDB保存
34 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
35 // 检查BGSAVE条件
36 for (j = 0; j < server.saveparamslen; j++) {
37 struct saveparam *sp = server.saveparams + j;
38 if (server.dirty >= sp->changes &&
39 server.unixtime - sp->lastsave > sp->seconds) {
40 rdbSaveBackground(server.rdb_filename, NULL);
41 break;
42 }
43 }
44
45 // 检查AOF重写条件
46 if (server.aof_rewrite_perc &&
47 server.aof_current_size > server.aof_rewrite_min_size) {
48 long long base = server.aof_rewrite_base_size;
49 if (server.aof_current_size > base * (1 + server.aof_rewrite_perc / 100.0)) {
50 rewriteAppendOnlyFileBackground();
51 }
52 }
53 }
54
55 // 8. AOF刷盘
56 if (server.aof_flush_postponed_start)
57 flushAppendOnlyFile(0);
58
59 // 9. 释放超时客户端
60 freeClientsInAsyncFreeQueue();
61
62 // 10. 集群相关
63 if (server.cluster_enabled) clusterCron();
64
65 // 11. 哨兵相关
66 if (server.sentinel_mode) sentinelTimer();
67
68 // 返回下次执行的间隔
69 return 1000000 / server.hz;
70}
4.1 clientsCron:客户端管理
1// server.c
2void clientsCron(void) {
3 int numclients = listLength(server.clients);
4 int iterations = numclients / server.hz; // 每次处理的客户端数
5
6 if (iterations < 50) iterations = 50;
7 if (iterations > 500) iterations = 500;
8
9 while (listLength(server.clients) && iterations--) {
10 client *c = listNodeValue(listFirst(server.clients));
11 listRotateHeadToTail(server.clients);
12
13 // 1. 关闭超时客户端
14 if (server.maxidletime &&
15 !(c->flags & CLIENT_SLAVE) &&
16 !(c->flags & CLIENT_MASTER) &&
17 !(c->flags & CLIENT_BLOCKED) &&
18 !(c->flags & CLIENT_PUBSUB) &&
19 (now - c->lastinteraction) > server.maxidletime) {
20 freeClient(c);
21 continue;
22 }
23
24 // 2. 释放过多内存的客户端输入/输出缓冲区
25 if (c->querybuf_peak >= server.client_max_querybuf_len) {
26 freeClient(c);
27 continue;
28 }
29
30 // 3. 回收客户端缓冲区内存
31 if (c->querybuf && sdslen(c->querybuf) > 1024) {
32 c->querybuf = sdsRemoveFreeSpace(c->querybuf);
33 }
34 }
35}
4.2 databasesCron:数据库管理
1// server.c
2void databasesCron(void) {
3 // 1. 过期key扫描
4 if (server.active_expire_enabled) {
5 if (server.maxmemory > 0) {
6 // 内存紧张时更积极
7 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
8 }
9 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
10 }
11
12 // 2. 渐进式rehash
13 if (server.activerehashing) {
14 for (int j = 0; j < server.dbnum; j++) {
15 redisDb *db = server.db + j;
16 if (dictIsRehashing(db->dict)) {
17 dictRehashMilliseconds(db->dict, 1); // 最多1ms
18 break; // 每次只处理一个数据库
19 }
20 }
21 }
22}
五、异步释放:lazyfree
大对象删除会阻塞,Redis 4.0引入lazyfree。
5.1 异步删除函数
1// lazyfree.c
2#define LAZYFREE_THRESHOLD 64 // 大于此值才异步删除
3
4int dbAsyncDelete(redisDb *db, robj *key) {
5 // 从expires字典删除
6 if (dictSize(db->expires) > 0) dictDelete(db->expires, key->ptr);
7
8 // 从主字典摘除,但不释放value
9 dictEntry *de = dictUnlink(db->dict, key->ptr);
10 if (de) {
11 robj *val = dictGetVal(de);
12 // 估算释放开销
13 size_t free_effort = lazyfreeGetFreeEffort(val);
14
15 if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
16 // 扔给后台线程
17 atomicIncr(lazyfree_objects,1);
18 bioCreateBackgroundJob(BIO_LAZY_FREE, val, NULL, NULL);
19 dictSetVal(db->dict, de, NULL); // 防止被同步释放
20 }
21 }
22
23 if (de) {
24 dictFreeUnlinkedEntry(db->dict, de);
25 if (server.cluster_enabled) slotToKeyDel(key);
26 return 1;
27 } else {
28 return 0;
29 }
30}
5.2 估算释放开销
1// lazyfree.c
2size_t lazyfreeGetFreeEffort(robj *obj) {
3 if (obj->type == OBJ_LIST) {
4 quicklist *ql = obj->ptr;
5 return ql->len;
6 } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
7 dict *ht = obj->ptr;
8 return dictSize(ht);
9 } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
10 zset *zs = obj->ptr;
11 return zs->zsl->length;
12 } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
13 dict *ht = obj->ptr;
14 return dictSize(ht);
15 } else {
16 return 1; /* Everything else is a single allocation. */
17 }
18}
5.3 bio 后台线程
1// bio.c
2void *bioProcessBackgroundJobs(void *arg) {
3 long type = (long)arg;
4
5 while (1) {
6 listNode *ln;
7 bio_job *job;
8
9 pthread_mutex_lock(&bio_mutex[type]);
10 while (listLength(bio_jobs[type]) == 0) {
11 pthread_cond_wait(&bio_newjob_cond[type], &bio_mutex[type]);
12 }
13 ln = listFirst(bio_jobs[type]);
14 job = ln->value;
15 listDelNode(bio_jobs[type], ln);
16 pthread_mutex_unlock(&bio_mutex[type]);
17
18 // 执行任务
19 if (type == BIO_LAZY_FREE) {
20 if (job->free_args->obj) {
21 decrRefCount(job->free_args->obj);
22 } else if (job->free_args->sl) {
23 slowlogFreeEntry(job->free_args->sl);
24 }
25 } else if (type == BIO_CLOSE_FILE) {
26 close(job->fd);
27 } else if (type == BIO_AOF_FSYNC) {
28 aof_fsync(job->fd);
29 }
30
31 zfree(job);
32 }
33}
Redis 的内存管理核心思想:用CPU换内存,用近似换精确。LRU/LFU都不是精确实现,但足够好,且开销可控。
— END —