Redis 内存模型设计全解析(serverCron 在干什么?)

Redis有过期策略、内存淘汰,但具体怎么实现的?serverCron每隔一段时间跑一次,到底在做什么?这篇文章从源码层面拆解Redis的内存管理机制。

一、Redis内存管理概览

Redis 的内存管理涉及三个层面:

层面 触发时机 机制
key 过期删除 访问时 + 定期扫描 惰性删除 + 定期删除
内存淘汰 内存超限 LRU/LFU/TTL/随机
后台释放 UNLINK/FLUSHDB ASYNC bio线程

二、过期策略:惰性删除 + 定期删除

Redis不是用一个线程专门扫描过期key,而是两种策略结合。

2.1 惰性删除(Lazy Expiration)

访问key时检查是否过期,过期就删除:

 1// db.c
 2robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
 3    robj *val;
 4
 5    if (expireIfNeeded(db,key) == 1) {
 6        /* Key expired. If we are in the context of a master, expireIfNeeded()
 7         * returns 0 only when the key does not exist at all, so it's safe
 8         * to return NULL ASAP. */
 9        if (server.masterhost == NULL) return NULL;
10
11        /* However if we are in the context of a slave, expireIfNeeded() will
12         * not really try to expire the key, it only returns information
13         * about the "logical" status of the key: key expiring is up to the
14         * master in order to have a consistent view of master's data set.
15         *
16         * However, if the command caller is not the master, and as additional
17         * safety measure, the command invoked is a read-only command, we can
18         * safely return NULL here, and provide a more consistent behavior
19         * to clients accessign expired values in a read-only fashion, that
20         * will say the key as non existing.
21         *
22         * Notably this covers GETs when slaves are used to scale reads. */
23        if (server.current_client &&
24            server.current_client != server.master &&
25            server.current_client->cmd &&
26            server.current_client->cmd->flags & CMD_READONLY)
27        {
28            return NULL;
29        }
30    }
31    val = lookupKey(db,key,flags);
32    if (val == NULL)
33        server.stat_keyspace_misses++;
34    else
35        server.stat_keyspace_hits++;
36    return val;
37}

expireIfNeeded 的实现:

 1// db.c
 2int expireIfNeeded(redisDb *db, robj *key) {
 3    mstime_t when = getExpire(db,key);
 4    mstime_t now;
 5
 6    if (when < 0) return 0; /* No expire for this key */
 7
 8    /* Don't expire anything while loading. It will be done later. */
 9    if (server.loading) return 0;
10
11    /* If we are in the context of a Lua script, we pretend that time is
12     * blocked to when the Lua script started. This way a key can expire
13     * only the first time it is accessed and not in the middle of the
14     * script execution, making propagation to slaves / AOF consistent.
15     * See issue #1525 on Github for more information. */
16    now = server.lua_caller ? server.lua_time_start : mstime();
17
18    /* If we are running in the context of a slave, return ASAP:
19     * the slave key expiration is controlled by the master that will
20     * send us synthesized DEL operations for expired keys.
21     *
22     * Still we try to return the right information to the caller,
23     * that is, 0 if we think the key should be still valid, 1 if
24     * we think the key is expired at this time. */
25    if (server.masterhost != NULL) return now > when;
26
27    /* Return when this key has not expired */
28    if (now <= when) return 0;
29
30    /* Delete the key */
31    server.stat_expiredkeys++;
32    propagateExpire(db,key,server.lazyfree_lazy_expire);
33    notifyKeyspaceEvent(NOTIFY_EXPIRED,
34        "expired",key,db->id);
35    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
36                                         dbSyncDelete(db,key);
37}

惰性删除的问题:如果一个key过期了但从未被访问,就会一直占用内存。所以需要定期删除补充。

2.2 定期删除(Active Expiration)

serverCron里调用activeExpireCycle,主动扫描过期key:

 1// expire.c
 2void activeExpireCycle(int type) {
 3    // 静态变量记录上次扫描到哪个数据库
 4    static unsigned int current_db = 0;
 5    static int timelimit_exit = 0;
 6    static long long last_fast_cycle = 0;
 7
 8    int j, iteration = 0;
 9    int dbs_per_call = CRON_DBS_PER_CALL;  // 默认16
10    long long start = ustime(), timelimit;
11
12    // 快速模式:上次没超时就不跑
13    if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
14        if (!timelimit_exit) return;
15        if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
16        last_fast_cycle = start;
17    }
18
19    // 计算时间上限:每次最多占用CPU时间的25%
20    timelimit = 1000000 * ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC / server.hz / 100;
21    // ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC = 25
22
23    if (type == ACTIVE_EXPIRE_CYCLE_FAST)
24        timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION;  // 1ms
25
26    for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
27        redisDb *db = server.db + (current_db % server.dbnum);
28        current_db++;
29
30        // 随机采样,删除过期key
31        do {
32            unsigned long num = dictSize(db->expires);
33            if (num == 0) break;
34
35            // 每次最多检查20个key
36            if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
37                num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
38
39            expired = 0;
40            while (num--) {
41                dictEntry *de = dictGetRandomKey(db->expires);
42                if (activeExpireCycleTryExpire(db, de, mstime())) {
43                    expired++;
44                }
45            }
46
47            // 超过25%的key过期了,继续扫描这个数据库
48        } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP / 4);
49
50        // 每隔16次迭代检查是否超时
51        if ((iteration & 0xf) == 0) {
52            elapsed = ustime() - start;
53            if (elapsed > timelimit) {
54                timelimit_exit = 1;
55                break;
56            }
57        }
58    }
59}

关键设计点:

  1. 渐进式:每次只扫描部分数据库,不会阻塞太久
  2. 自适应:过期key多就多扫,少就少扫
  3. 时间限制:最多占用25%的CPU时间
  4. 随机采样:不遍历,随机抽取key检查

activeExpireCycleTryExpire 删除单个过期key:

 1// expire.c
 2int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
 3    long long t = dictGetSignedIntegerVal(de);
 4    if (now > t) {
 5        sds key = dictGetKey(de);
 6        robj *keyobj = createStringObject(key,sdslen(key));
 7
 8        propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
 9        if (server.lazyfree_lazy_expire)
10            dbAsyncDelete(db,keyobj);
11        else
12            dbSyncDelete(db,keyobj);
13        notifyKeyspaceEvent(NOTIFY_EXPIRED,
14            "expired",keyobj,db->id);
15        decrRefCount(keyobj);
16        server.stat_expiredkeys++;
17        return 1;
18    } else {
19        return 0;
20    }
21}

三、内存淘汰:LRU/LFU 实现

当内存超过maxmemory时,触发内存淘汰。

3.1 淘汰触发

 1// evict.c
 2int freeMemoryIfNeeded(void) {
 3    size_t mem_reported, mem_tofree, mem_freed;
 4    int result = C_ERR;
 5    static int mem_freed_last_try = 0;
 6
 7    // 获取当前内存使用
 8    mem_reported = zmalloc_used_memory();
 9    
10    // 检查是否超限
11    if (mem_reported <= server.maxmemory) return C_OK;
12
13    // 计算需要释放多少
14    mem_tofree = mem_reported - server.maxmemory;
15    mem_freed = 0;
16
17    // 根据策略淘汰
18    while (mem_freed < mem_tofree) {
19        // 淘汰 key...
20    }
21
22    return result;
23}

3.2 近似LRU实现

真正的LRU需要维护一个访问时间链表,内存开销大。Redis用近似LRU:

1// evict.c
2#define EVPOOL_SIZE 16  // 淘汰池大小
3
4struct evictionPoolEntry {
5    unsigned long long idle;  // 空闲时间(LRU)或逆频率(LFU)
6    sds key;
7    sds cached;               // 缓存的key名,减少分配
8    int dbid;
9};

淘汰池按空闲时间升序排列,最右边是最适合淘汰的。

填充淘汰池:

 1void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict,
 2                          struct evictionPoolEntry *pool) {
 3    int j, k, count;
 4    dictEntry *samples[server.maxmemory_samples];  // 默认采样5个
 5
 6    // 随机采样N个key
 7    count = dictGetSomeKeys(sampledict, samples, server.maxmemory_samples);
 8
 9    for (j = 0; j < count; j++) {
10        unsigned long long idle;
11        sds key = dictGetKey(samples[j]);
12        robj *o = dictGetVal(dictFind(keydict, key));
13
14        // 计算空闲时间
15        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
16            idle = estimateObjectIdleTime(o);
17        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
18            idle = 255 - LFUDecrAndReturn(o);  // LFU:逆频率
19        }
20
21        // 插入淘汰池(按idle升序)
22        k = 0;
23        while (k < EVPOOL_SIZE && pool[k].key && pool[k].idle < idle)
24            k++;
25
26        // 插入逻辑:把idle大的往右移
27        if (k < EVPOOL_SIZE) {
28            // 移动元素腾出位置
29            if (pool[EVPOOL_SIZE-1].key == NULL) {
30                memmove(pool+k+1, pool+k, sizeof(pool[0])*(EVPOOL_SIZE-k-1));
31            } else {
32                // 池满了,挤出左边最小的
33                memmove(pool, pool+1, sizeof(pool[0])*k);
34                k--;
35            }
36            pool[k].idle = idle;
37            pool[k].key = key;
38            pool[k].dbid = dbid;
39        }
40    }
41}

估算对象空闲时间:

 1// evict.c
 2unsigned long long estimateObjectIdleTime(robj *o) {
 3    unsigned long long lruclock = LRU_CLOCK();
 4    if (lruclock >= o->lru) {
 5        return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
 6    } else {
 7        // 处理时钟回绕
 8        return (lruclock + (LRU_CLOCK_MAX - o->lru)) * LRU_CLOCK_RESOLUTION;
 9    }
10}

每次访问key时更新lru字段:

 1// db.c
 2robj *lookupKey(redisDb *db, robj *key, int flags) {
 3    dictEntry *de = dictFind(db->dict,key->ptr);
 4    if (de) {
 5        robj *val = dictGetVal(de);
 6        /* Update the access time for the ageing algorithm.
 7         * Don't do it if we have a saving child, as this will trigger
 8         * a copy on write madness. */
 9        if (server.rdb_child_pid == -1 &&
10            server.aof_child_pid == -1 &&
11            !(flags & LOOKUP_NOTOUCH))
12        {
13            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
14                updateLFU(val);
15            } else {
16                val->lru = LRU_CLOCK();
17            }
18        }
19        return val;
20    } else {
21        return NULL;
22    }
23}

3.3 LFU 实现

Redis 4.0引入LFU(Least Frequently Used),复用lru字段(24位):

1+----------------+--------+
2| 16 bits        | 8 bits |
3| Last Decr Time | LOG_C  |
4+----------------+--------+
  • LOG_C:对数计数器,表示访问频率
  • Last Decr Time:上次衰减时间
 1// evict.c
 2#define LFU_INIT_VAL 5  // 新key的初始计数
 3
 4uint8_t LFULogIncr(uint8_t counter) {
 5    if (counter == 255) return 255;  // 最大值
 6    double r = (double)rand() / RAND_MAX;
 7    double baseval = counter - LFU_INIT_VAL;
 8    if (baseval < 0) baseval = 0;
 9    double p = 1.0 / (baseval * server.lfu_log_factor + 1);
10    if (r < p) counter++;  // 概率递增
11    return counter;
12}
13
14unsigned long LFUDecrAndReturn(robj *o) {
15    unsigned long ldt = o->lru >> 8;   // 上次衰减时间
16    unsigned long counter = o->lru & 255;  // 计数器
17
18    // 计算需要衰减多少
19    unsigned long num_periods = server.lfu_decay_time ?
20        LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
21
22    if (num_periods) {
23        counter = (num_periods > counter) ? 0 : counter - num_periods;
24    }
25    return counter;
26}

LFU 的特点:

  1. 对数递增:不是每次访问都+1,而是概率递增,热点key计数会趋于稳定
  2. 时间衰减:长时间不访问,计数会逐渐减少
  3. 配置参数
    • lfu-log-factor:递增因子,越大增长越慢
    • lfu-decay-time:衰减周期,单位分钟

四、serverCron:Redis 的后台管家

serverCron是Redis的定时任务,默认每100ms执行一次(hz=10)。

 1// server.c
 2int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 3    int j;
 4
 5    // 1. 更新时间缓存
 6    server.mstime = mstime();
 7    server.unixtime = server.mstime / 1000;
 8
 9    // 2. 更新LRU时钟
10    server.lruclock = getLRUClock();
11
12    // 3. 处理信号
13    if (server.shutdown_asap) {
14        if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
15        server.shutdown_asap = 0;
16    }
17
18    // 4. 打印数据库信息
19    run_with_period(5000) {
20        for (j = 0; j < server.dbnum; j++) {
21            long long size = dictSize(server.db[j].dict);
22            long long expires = dictSize(server.db[j].expires);
23            // 打印日志...
24        }
25    }
26
27    // 5. 客户端管理
28    clientsCron();
29
30    // 6. 数据库管理(过期key、rehash)
31    databasesCron();
32
33    // 7. 检查是否需要 AOF重写 / RDB保存
34    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
35        // 检查BGSAVE条件
36        for (j = 0; j < server.saveparamslen; j++) {
37            struct saveparam *sp = server.saveparams + j;
38            if (server.dirty >= sp->changes &&
39                server.unixtime - sp->lastsave > sp->seconds) {
40                rdbSaveBackground(server.rdb_filename, NULL);
41                break;
42            }
43        }
44
45        // 检查AOF重写条件
46        if (server.aof_rewrite_perc &&
47            server.aof_current_size > server.aof_rewrite_min_size) {
48            long long base = server.aof_rewrite_base_size;
49            if (server.aof_current_size > base * (1 + server.aof_rewrite_perc / 100.0)) {
50                rewriteAppendOnlyFileBackground();
51            }
52        }
53    }
54
55    // 8. AOF刷盘
56    if (server.aof_flush_postponed_start)
57        flushAppendOnlyFile(0);
58
59    // 9. 释放超时客户端
60    freeClientsInAsyncFreeQueue();
61
62    // 10. 集群相关
63    if (server.cluster_enabled) clusterCron();
64
65    // 11. 哨兵相关
66    if (server.sentinel_mode) sentinelTimer();
67
68    // 返回下次执行的间隔
69    return 1000000 / server.hz;
70}

4.1 clientsCron:客户端管理

 1// server.c
 2void clientsCron(void) {
 3    int numclients = listLength(server.clients);
 4    int iterations = numclients / server.hz;  // 每次处理的客户端数
 5
 6    if (iterations < 50) iterations = 50;
 7    if (iterations > 500) iterations = 500;
 8
 9    while (listLength(server.clients) && iterations--) {
10        client *c = listNodeValue(listFirst(server.clients));
11        listRotateHeadToTail(server.clients);
12
13        // 1. 关闭超时客户端
14        if (server.maxidletime &&
15            !(c->flags & CLIENT_SLAVE) &&
16            !(c->flags & CLIENT_MASTER) &&
17            !(c->flags & CLIENT_BLOCKED) &&
18            !(c->flags & CLIENT_PUBSUB) &&
19            (now - c->lastinteraction) > server.maxidletime) {
20            freeClient(c);
21            continue;
22        }
23
24        // 2. 释放过多内存的客户端输入/输出缓冲区
25        if (c->querybuf_peak >= server.client_max_querybuf_len) {
26            freeClient(c);
27            continue;
28        }
29
30        // 3. 回收客户端缓冲区内存
31        if (c->querybuf && sdslen(c->querybuf) > 1024) {
32            c->querybuf = sdsRemoveFreeSpace(c->querybuf);
33        }
34    }
35}

4.2 databasesCron:数据库管理

 1// server.c
 2void databasesCron(void) {
 3    // 1. 过期key扫描
 4    if (server.active_expire_enabled) {
 5        if (server.maxmemory > 0) {
 6            // 内存紧张时更积极
 7            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
 8        }
 9        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
10    }
11
12    // 2. 渐进式rehash
13    if (server.activerehashing) {
14        for (int j = 0; j < server.dbnum; j++) {
15            redisDb *db = server.db + j;
16            if (dictIsRehashing(db->dict)) {
17                dictRehashMilliseconds(db->dict, 1);  // 最多1ms
18                break;  // 每次只处理一个数据库
19            }
20        }
21    }
22}

五、异步释放:lazyfree

大对象删除会阻塞,Redis 4.0引入lazyfree。

5.1 异步删除函数

 1// lazyfree.c
 2#define LAZYFREE_THRESHOLD 64  // 大于此值才异步删除
 3
 4int dbAsyncDelete(redisDb *db, robj *key) {
 5    // 从expires字典删除
 6    if (dictSize(db->expires) > 0) dictDelete(db->expires, key->ptr);
 7
 8    // 从主字典摘除,但不释放value
 9    dictEntry *de = dictUnlink(db->dict, key->ptr);
10    if (de) {
11        robj *val = dictGetVal(de);
12        // 估算释放开销
13        size_t free_effort = lazyfreeGetFreeEffort(val);
14
15        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
16            // 扔给后台线程
17            atomicIncr(lazyfree_objects,1);
18            bioCreateBackgroundJob(BIO_LAZY_FREE, val, NULL, NULL);
19            dictSetVal(db->dict, de, NULL);  // 防止被同步释放
20        }
21    }
22
23    if (de) {
24        dictFreeUnlinkedEntry(db->dict, de);
25        if (server.cluster_enabled) slotToKeyDel(key);
26        return 1;
27    } else {
28        return 0;
29    }
30}

5.2 估算释放开销

 1// lazyfree.c
 2size_t lazyfreeGetFreeEffort(robj *obj) {
 3    if (obj->type == OBJ_LIST) {
 4        quicklist *ql = obj->ptr;
 5        return ql->len;
 6    } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
 7        dict *ht = obj->ptr;
 8        return dictSize(ht);
 9    } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
10        zset *zs = obj->ptr;
11        return zs->zsl->length;
12    } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
13        dict *ht = obj->ptr;
14        return dictSize(ht);
15    } else {
16        return 1; /* Everything else is a single allocation. */
17    }
18}

5.3 bio 后台线程

 1// bio.c
 2void *bioProcessBackgroundJobs(void *arg) {
 3    long type = (long)arg;
 4
 5    while (1) {
 6        listNode *ln;
 7        bio_job *job;
 8
 9        pthread_mutex_lock(&bio_mutex[type]);
10        while (listLength(bio_jobs[type]) == 0) {
11            pthread_cond_wait(&bio_newjob_cond[type], &bio_mutex[type]);
12        }
13        ln = listFirst(bio_jobs[type]);
14        job = ln->value;
15        listDelNode(bio_jobs[type], ln);
16        pthread_mutex_unlock(&bio_mutex[type]);
17
18        // 执行任务
19        if (type == BIO_LAZY_FREE) {
20            if (job->free_args->obj) {
21                decrRefCount(job->free_args->obj);
22            } else if (job->free_args->sl) {
23                slowlogFreeEntry(job->free_args->sl);
24            }
25        } else if (type == BIO_CLOSE_FILE) {
26            close(job->fd);
27        } else if (type == BIO_AOF_FSYNC) {
28            aof_fsync(job->fd);
29        }
30
31        zfree(job);
32    }
33}

Redis 的内存管理核心思想:用CPU换内存,用近似换精确。LRU/LFU都不是精确实现,但足够好,且开销可控。

— END —