55 定时任务系统
模块目标
理解 OpenClaw 的 cron/调度引擎:三种调度类型的时间计算、启动时的清理与对齐、错过任务的补运行、结果投递路由。
核心文件
| 文件 | 职责 |
|---|---|
src/cron/service/ops.ts | CronService 核心操作(start/stop/add/remove) |
src/cron/schedule.ts | computeNextRunAtMs — 下次触发时间计算 |
src/cron/delivery.ts | resolveCronDeliveryPlan — 结果投递路由 |
src/cron/isolated-agent.ts | 隔离 agent 执行 |
src/cron/store.ts | 任务持久化到 JSON 文件 |
src/cron/session-reaper.ts | 过期 session 清理 |
src/cron/types.ts | 核心类型定义 |
一、CronService.start()(精确源码)
ts
// src/cron/service/ops.ts
export async function start(state: CronServiceState) {
await locked(state, async () => {
// 1. 未启用时快速退出
if (!state.deps.cronEnabled) {
state.deps.log.info({ enabled: false }, "cron: disabled");
return;
}
// 2. 加载持久化存储
await ensureLoaded(state, { skipRecompute: true });
// 3. 清理"悬空的运行标记"(上次崩溃遗留)
const jobs = state.store?.jobs ?? [];
for (const job of jobs) {
if (typeof job.state.runningAtMs === "number") {
state.deps.log.warn(
{ jobId: job.id, runningAtMs: job.state.runningAtMs },
"cron: clearing stale running marker on startup",
);
job.state.runningAtMs = undefined;
}
}
// 4. 运行启动时已错过的任务
await runMissedJobs(state);
// 5. 重算所有任务的下次运行时间
recomputeNextRuns(state);
// 6. 持久化更新后的状态
await persist(state);
// 7. 启动定时器(等待下次触发)
armTimer(state);
state.deps.log.info({
enabled: true,
jobs: state.store?.jobs.length ?? 0,
nextWakeAtMs: nextWakeAtMs(state) ?? null,
}, "cron: started");
});
}启动时的三个关键动作:
- 清理悬空运行标记 — 崩溃恢复,防止任务永久处于"运行中"假状态
- 补运行错过的任务 — 服务停机期间本应触发的任务在重启时补执行
- 重算下次运行时间 — 确保定时器指向正确的下次触发时刻
二、computeNextRunAtMs(三种调度类型)
ts
// src/cron/schedule.ts
export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined {
// 类型 1:at(一次性绝对时间)
if (schedule.kind === "at") {
const sched = schedule as { at?: string; atMs?: number | string };
const atMs =
typeof sched.atMs === "number" && Number.isFinite(sched.atMs) && sched.atMs > 0
? sched.atMs
: typeof sched.atMs === "string"
? parseAbsoluteTimeMs(sched.atMs)
: typeof sched.at === "string"
? parseAbsoluteTimeMs(sched.at) // 规范字段
: null;
if (atMs === null) return undefined;
return atMs > nowMs ? atMs : undefined; // 过期的 at 任务不再触发
}
// 类型 2:every(固定间隔循环)
if (schedule.kind === "every") {
const everyMs = Math.max(1, Math.floor(schedule.everyMs));
const anchor = Math.max(0, Math.floor(schedule.anchorMs ?? nowMs));
if (nowMs < anchor) return anchor; // 还未到起始锚点
const elapsed = nowMs - anchor;
const steps = Math.max(1, Math.floor((elapsed + everyMs - 1) / everyMs));
return anchor + steps * everyMs; // 下一个对齐时刻
}
// 类型 3:cron(标准 cron 表达式)
const expr = schedule.expr.trim();
if (!expr) return undefined;
const cron = new Cron(expr, {
timezone: resolveCronTimezone(schedule.tz),
catch: false,
});
const nowSecondMs = Math.floor(nowMs / 1000) * 1000; // 精确到秒
const next = cron.nextRun(new Date(nowSecondMs));
if (!next) return undefined;
const nextMs = next.getTime();
return Number.isFinite(nextMs) && nextMs > nowSecondMs ? nextMs : undefined;
}三种调度类型对比:
| 类型 | 配置字段 | 精度 | 是否循环 | 示例 |
|---|---|---|---|---|
at | at: "2024-12-25T09:00:00Z" | 毫秒 | 否(一次性) | 圣诞节提醒 |
every | everyMs: 1800000, anchorMs? | 毫秒 | 是(锚点对齐) | 每30分钟 |
cron | expr: "0 9 * * 1-5", tz? | 秒 | 是(cron表达式) | 工作日早9点 |
every 的锚点对齐机制:anchorMs 是对齐基准点(默认为 nowMs),保证触发时刻是 anchor + n * everyMs。 例如 anchor=0, everyMs=3600000 → 每小时整点触发(UTC),而非从启动时刻开始计算。
三、三种调度场景的配置示例
json5
// at:一次性
{
"kind": "at",
"at": "2024-12-25T09:00:00Z"
}
// every:固定间隔(每小时,从整点对齐)
{
"kind": "every",
"everyMs": 3600000,
"anchorMs": 0 // 以 Unix 纪元为基准,自然对齐整点
}
// cron:标准 cron 表达式
{
"kind": "cron",
"expr": "0 9 * * 1-5", // 工作日早9点
"tz": "Asia/Shanghai" // 时区
}四、任务执行模式
ts
type CronJobMode =
| "systemEvent" // 系统级事件触发(不启动 agent)
| "agentTurn"; // 启动隔离 agent 执行一轮对话
// agentTurn:在独立 session 中执行,不影响主会话
// 隔离 agent 有自己的 session,执行完即销毁五、结果投递(resolveCronDeliveryPlan)
ts
type CronDeliveryPlan =
| { kind: "none" } // 不投递(静默执行)
| {
kind: "announce";
channel: string; // 目标通道 ID
to: string | string[]; // 收件人(用户/群组)
};投递流程:
agent 执行完成
│
▼
resolveCronDeliveryPlan(job)
│
├─ none → 丢弃结果,不通知
│
└─ announce → 将结果发送到 channel+to 指定的目标六、错误恢复策略
ts
// 连续失败时的退避机制
job.state.consecutiveFailures += 1;
const backoffMs = Math.min(
MAX_BACKOFF_MS,
BASE_BACKOFF_MS * Math.pow(2, job.state.consecutiveFailures - 1),
);
job.state.nextRunAtMs = Date.now() + backoffMs;- 连续失败计数 + 指数退避
- 防止单个任务反复失败拖垮调度器
- 成功后重置
consecutiveFailures
七、armTimer 机制
ts
// 定时器只指向"最近一次触发时刻"
function armTimer(state: CronServiceState) {
const next = nextWakeAtMs(state); // 找所有 job 中最近的 nextRunAtMs
if (!next) return;
const delay = Math.max(0, next - Date.now());
state.timer = setTimeout(() => {
void tick(state); // 触发时检查所有到期任务
}, delay);
}精简设计: 只有一个定时器,指向最近的任务。任务触发后 tick 处理所有到期任务,然后重新 armTimer 指向下一个任务。
八、自检清单
start()首先清理runningAtMs悬空标记(崩溃恢复)。at类型支持at(字符串规范字段)和atMs(数字/字符串旧字段)两种写法(向后兼容)。every类型的触发时刻是anchor + n * everyMs,不是"从启动时刻"计算。cron类型精度是秒(不是毫秒),时间戳截断到秒再计算。armTimer只有一个 setTimeout,指向最近的到期时间(不是每个任务一个定时器)。resolveCronDeliveryPlan返回none时直接丢弃结果,不报错。
九、开发避坑
at任务过期后不再触发:如果服务在at时刻之后才启动,且runMissedJobs判断过期,该任务不会被补运行。every的anchorMs建议显式设置:不设置时默认为nowMs(服务启动时刻),重启后触发时刻会漂移。agentTurn是隔离 session:与主会话完全独立,不共享上下文,也不会影响主会话的历史记录。locked保证原子性:start/stop/add/remove 都在 mutex 锁内执行,避免并发修改。