redis7.x源码分析:(8) serverCron
serverCron函数是Redis中最核心的定时处理函数,它通过 aeCreateTimeEvent 注册到事件处理器中(关于它的实现见:redis7.x源码分析:(5) ae事件处理器(二)),触发时间间隔为:1000ms / server.hz ,server.hz 可以通过配置文件中 hz 选项进行修改,默认值为 10,也就是默认100毫秒执行一次。
serverCron函数的职责非常广泛,主要包括以下几个关键技术点:
1.过期键清理:采用主动和自适应策略,清理已过期的键,释放内存。
2.数据库渐进式 Rehash:在字典容量变化时,将 rehash 操作分摊到多次 serverCron 调用中执行,避免单次操作阻塞服务。
3.持久化相关:触发 RDB 快照的后台保存,以及将 AOF 缓冲区中的数据存盘(如果开启)。
4.连接管理:关闭超时闲置的客户端,并清理输出缓冲区过大的客户端。
5.状态统计与更新:更新服务器的时间缓存、内存占用、键空间命中率等统计信息,为 INFO 命令提供数据。
serverCron函数比较常用的一个定时宏是run_with_period,用来控制多久执行相应的代码段:
// 定时时长 < 触发周期 或者 达到定时时长 就立即执行 #define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
serverCron的代码实现如下:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j; UNUSED(eventLoop); UNUSED(id); UNUSED(clientData); // 如果配置开启了watchdog执行时间,则发送SIGALRM消息触发watchdogSignalHandler处理(具体处理代码没看) if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); /* Update the time cache. */ // 更新服务器时间 updateCachedTime(1); // 每1000毫秒执行的次数(执行间隔就是 1000 / server.hz = xxx毫秒) server.hz = server.config_hz; // 如果开启了动态频率,则根据当前连接的客户端数量,动态调整执行频率,最大不超过500 if (server.dynamic_hz) { while (listLength(server.clients) / server.hz > MAX_CLIENTS_PER_CLOCK_TICK) { server.hz *= 2; if (server.hz > CONFIG_MAX_HZ) { server.hz = CONFIG_MAX_HZ; break; } } } /* for debug purposes: skip actual cron work if pause_cron is on */ if (server.pause_cron) return 1000/server.hz; // 100毫秒统计一次网络io数据量 run_with_period(100) { long long stat_net_input_bytes, stat_net_output_bytes; long long stat_net_repl_input_bytes, stat_net_repl_output_bytes; atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes); atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes); trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(STATS_METRIC_NET_INPUT, stat_net_input_bytes + stat_net_repl_input_bytes); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, stat_net_output_bytes + stat_net_repl_output_bytes); trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION, stat_net_repl_input_bytes); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION, stat_net_repl_output_bytes); } // 取当前时间(秒)的后23bit unsigned int lruclock = getLRUClock(); atomicSet(server.lruclock,lruclock); // 统计内存使用量包括内存分配峰值,进程的内存占用等 cronUpdateMemoryStats(); /* We received a SIGTERM or SIGINT, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ if (server.shutdown_asap && !isShutdownInitiated()) { // 第一次收到SIGTERM 或 SIGINT,尝试退出 int shutdownFlags = SHUTDOWN_NOFLAGS; if (server.last_sig_received == SIGINT && server.shutdown_on_sigint) shutdownFlags = server.shutdown_on_sigint; else if (server.last_sig_received == SIGTERM && server.shutdown_on_sigterm) shutdownFlags = server.shutdown_on_sigterm; if (prepareForShutdown(shutdownFlags) == C_OK) exit(0); } else if (isShutdownInitiated()) { // 继续尝试退出 if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) { if (finishShutdown() == C_OK) exit(0); /* Shutdown failed. Continue running. An error has been logged. */ } } /* Show some info about non-empty databases */ if (server.verbosity <= LL_VERBOSE) { // 根据日志等级5s输出一下各个数据库的用量 run_with_period(5000) { for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); vkeys = dictSize(server.db[j].expires); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); } } } } /* Show information about connected clients */ if (!server.sentinel_mode) { // 非sentinel模式5s输出一次连接数的debug日志 run_with_period(5000) { serverLog(LL_DEBUG, "%lu clients connected (%lu replicas), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), zmalloc_used_memory()); } } // 处理空闲客户端连接,释放客户端空闲内存等操作 clientsCron(); // 清理数据库过期key以及resize、rehash数据库 databasesCron(); // 没在aof重写, 并且符合重写限制, 则启动重写 if (!hasActiveChildProcess() && server.aof_rewrite_scheduled && !aofRewriteLimited()) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ if (hasActiveChildProcess() || ldbPendingChildren()) { // 每1000毫秒执行一次 receiveChildInfo run_with_period(1000) receiveChildInfo(); // 等待子进程退出并且获取退出消息(如果有子进程的话) checkChildrenDone(); } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ for (j = 0; j < server.saveparamslen; j++) { // 按照save配置执行rdb保存 struct saveparam *sp = server.saveparams+j; // 默认的save条件:60秒内至少有10000个键被修改,300秒内至少有100个键被修改,900秒内至少有1个键被修改,并超过失败重试间隔5s if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr); break; } } // 满足重写条件, 则启动AOF重写 // AOF重写条件:当前AOF状态为AOF_ON, 没有子进程, 重写百分比大于0, 当前AOF文件大小大于最小重写大小 if (server.aof_state == AOF_ON && !hasActiveChildProcess() && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; // 计算相对于基准大小的增长百分比(rewrite完成后更新基准大小为当前AOF大小) long long growth = (server.aof_current_size*100/base) - 100; // 大于等于重写百分比, 并且没有被限制重写, 则启动AOF重写 if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } } } // 更新字典策略是否可以resize(没有子进程的时候为true) // 这个函数会在每次执行命令时调用, 但是为了确保在完全空闲的时候也能执行一次, 所以这里再次调用 updateDictResizePolicy(); /* AOF postponed flush: Try at every cron cycle if the slow fsync * completed. */ if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) && server.aof_flush_postponed_start) { // 如果上次推迟了fsync, 则每次循环都去尝试执行flush flushAppendOnlyFile(0); } /* AOF write errors: in this case we have a buffer to flush as well and * clear the AOF error in case of success to make the DB writable again, * however to try every second is enough in case of 'hz' is set to * a higher frequency. */ run_with_period(1000) { if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) && server.aof_last_write_status == C_ERR) { // 每秒执行一次flush flushAppendOnlyFile(0); } } /* Clear the paused clients state if needed. */ // 按需清除暂停的客户端状态 checkClientPauseTimeoutAndReturnIfPaused(); // 处理主从复制相关的操作,如果处于failover状态, 则每100毫秒执行一次, 否则每1000毫秒执行一次 if (server.failover_state != NO_FAILOVER) { run_with_period(100) replicationCron(); } else { run_with_period(1000) replicationCron(); } /* Run the Redis Cluster cron. */ // 处理集群相关操作 run_with_period(100) { if (server.cluster_enabled) clusterCron(); } /* Run the Sentinel timer if we are in sentinel mode. */ // 哨兵模式下处理哨兵定时器相关操作 if (server.sentinel_mode) sentinelTimer(); /* Cleanup expired MIGRATE cached sockets. */ // 清理执行迁移时过期的socket run_with_period(1000) { migrateCloseTimedoutSockets(); } /* Stop the I/O threads if we don't have enough pending work. */ // 如果没有足够的io操作, 则停止io线程 stopThreadedIOIfNeeded(); // 启用了键跟踪功能, 则检查键跟踪表进行大小调整 if (server.tracking_clients) trackingLimitUsedSlots(); /* Start a scheduled BGSAVE if the corresponding flag is set. This is * useful when we are forced to postpone a BGSAVE because an AOF * rewrite is in progress. * * Note: this code must be after the replicationCron() call above so * make sure when refactoring this file to keep this order. This is useful * because we want to give priority to RDB savings for replication. */ if (!hasActiveChildProcess() && server.rdb_bgsave_scheduled && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { // 设置了需要bgsave的标志, 且没有子进程在执行bgsave,并且距离上次bgsave超过了重试间隔5s, 或者上次bgsave成功,则进行bgsave rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK) server.rdb_bgsave_scheduled = 0; } run_with_period(100) { // 100ms执行一次模块周期性任务 if (moduleCount()) modulesCron(); } /* Fire the cron loop modules event. */ // 通知模块执行 REDISMODULE_EVENT_CRON_LOOP 事件 RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz}; moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, 0, &ei); server.cronloops++; // 返回定时时长 return 1000/server.hz; }
Redis源码解析 文章被收录于专栏
基于redis7.x版本的源码分析