Skip to content

55 定时任务系统

模块目标

理解 OpenClaw 的 cron/调度引擎:三种调度类型的时间计算、启动时的清理与对齐、错过任务的补运行、结果投递路由。

核心文件

文件职责
src/cron/service/ops.tsCronService 核心操作(start/stop/add/remove)
src/cron/schedule.tscomputeNextRunAtMs — 下次触发时间计算
src/cron/delivery.tsresolveCronDeliveryPlan — 结果投递路由
src/cron/isolated-agent.ts隔离 agent 执行
src/cron/store.ts任务持久化到 JSON 文件
src/cron/session-reaper.ts过期 session 清理
src/cron/types.ts核心类型定义

一、CronService.start()(精确源码)

ts
// src/cron/service/ops.ts

export async function start(state: CronServiceState) {
  await locked(state, async () => {
    // 1. 未启用时快速退出
    if (!state.deps.cronEnabled) {
      state.deps.log.info({ enabled: false }, "cron: disabled");
      return;
    }

    // 2. 加载持久化存储
    await ensureLoaded(state, { skipRecompute: true });

    // 3. 清理"悬空的运行标记"(上次崩溃遗留)
    const jobs = state.store?.jobs ?? [];
    for (const job of jobs) {
      if (typeof job.state.runningAtMs === "number") {
        state.deps.log.warn(
          { jobId: job.id, runningAtMs: job.state.runningAtMs },
          "cron: clearing stale running marker on startup",
        );
        job.state.runningAtMs = undefined;
      }
    }

    // 4. 运行启动时已错过的任务
    await runMissedJobs(state);

    // 5. 重算所有任务的下次运行时间
    recomputeNextRuns(state);

    // 6. 持久化更新后的状态
    await persist(state);

    // 7. 启动定时器(等待下次触发)
    armTimer(state);

    state.deps.log.info({
      enabled: true,
      jobs: state.store?.jobs.length ?? 0,
      nextWakeAtMs: nextWakeAtMs(state) ?? null,
    }, "cron: started");
  });
}

启动时的三个关键动作:

  1. 清理悬空运行标记 — 崩溃恢复,防止任务永久处于"运行中"假状态
  2. 补运行错过的任务 — 服务停机期间本应触发的任务在重启时补执行
  3. 重算下次运行时间 — 确保定时器指向正确的下次触发时刻

二、computeNextRunAtMs(三种调度类型)

ts
// src/cron/schedule.ts

export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined {
  // 类型 1:at(一次性绝对时间)
  if (schedule.kind === "at") {
    const sched = schedule as { at?: string; atMs?: number | string };
    const atMs =
      typeof sched.atMs === "number" && Number.isFinite(sched.atMs) && sched.atMs > 0
        ? sched.atMs
        : typeof sched.atMs === "string"
          ? parseAbsoluteTimeMs(sched.atMs)
          : typeof sched.at === "string"
            ? parseAbsoluteTimeMs(sched.at)   // 规范字段
            : null;
    if (atMs === null) return undefined;
    return atMs > nowMs ? atMs : undefined;   // 过期的 at 任务不再触发
  }

  // 类型 2:every(固定间隔循环)
  if (schedule.kind === "every") {
    const everyMs = Math.max(1, Math.floor(schedule.everyMs));
    const anchor = Math.max(0, Math.floor(schedule.anchorMs ?? nowMs));
    if (nowMs < anchor) return anchor;    // 还未到起始锚点
    const elapsed = nowMs - anchor;
    const steps = Math.max(1, Math.floor((elapsed + everyMs - 1) / everyMs));
    return anchor + steps * everyMs;      // 下一个对齐时刻
  }

  // 类型 3:cron(标准 cron 表达式)
  const expr = schedule.expr.trim();
  if (!expr) return undefined;
  const cron = new Cron(expr, {
    timezone: resolveCronTimezone(schedule.tz),
    catch: false,
  });
  const nowSecondMs = Math.floor(nowMs / 1000) * 1000;  // 精确到秒
  const next = cron.nextRun(new Date(nowSecondMs));
  if (!next) return undefined;
  const nextMs = next.getTime();
  return Number.isFinite(nextMs) && nextMs > nowSecondMs ? nextMs : undefined;
}

三种调度类型对比:

类型配置字段精度是否循环示例
atat: "2024-12-25T09:00:00Z"毫秒否(一次性)圣诞节提醒
everyeveryMs: 1800000, anchorMs?毫秒是(锚点对齐)每30分钟
cronexpr: "0 9 * * 1-5", tz?是(cron表达式)工作日早9点

every 的锚点对齐机制:anchorMs 是对齐基准点(默认为 nowMs),保证触发时刻是 anchor + n * everyMs。 例如 anchor=0, everyMs=3600000 → 每小时整点触发(UTC),而非从启动时刻开始计算。

三、三种调度场景的配置示例

json5
// at:一次性
{
  "kind": "at",
  "at": "2024-12-25T09:00:00Z"
}

// every:固定间隔(每小时,从整点对齐)
{
  "kind": "every",
  "everyMs": 3600000,
  "anchorMs": 0    // Unix 纪元为基准,自然对齐整点
}

// cron:标准 cron 表达式
{
  "kind": "cron",
  "expr": "0 9 * * 1-5",    // 工作日早9点
  "tz": "Asia/Shanghai"      // 时区
}

四、任务执行模式

ts
type CronJobMode =
  | "systemEvent"   // 系统级事件触发(不启动 agent)
  | "agentTurn";    // 启动隔离 agent 执行一轮对话

// agentTurn:在独立 session 中执行,不影响主会话
// 隔离 agent 有自己的 session,执行完即销毁

五、结果投递(resolveCronDeliveryPlan)

ts
type CronDeliveryPlan =
  | { kind: "none" }          // 不投递(静默执行)
  | {
      kind: "announce";
      channel: string;         // 目标通道 ID
      to: string | string[];   // 收件人(用户/群组)
    };

投递流程:

agent 执行完成


resolveCronDeliveryPlan(job)

    ├─ none → 丢弃结果,不通知

    └─ announce → 将结果发送到 channel+to 指定的目标

六、错误恢复策略

ts
// 连续失败时的退避机制

job.state.consecutiveFailures += 1;
const backoffMs = Math.min(
  MAX_BACKOFF_MS,
  BASE_BACKOFF_MS * Math.pow(2, job.state.consecutiveFailures - 1),
);
job.state.nextRunAtMs = Date.now() + backoffMs;
  • 连续失败计数 + 指数退避
  • 防止单个任务反复失败拖垮调度器
  • 成功后重置 consecutiveFailures

七、armTimer 机制

ts
// 定时器只指向"最近一次触发时刻"

function armTimer(state: CronServiceState) {
  const next = nextWakeAtMs(state);  // 找所有 job 中最近的 nextRunAtMs
  if (!next) return;
  const delay = Math.max(0, next - Date.now());
  state.timer = setTimeout(() => {
    void tick(state);                 // 触发时检查所有到期任务
  }, delay);
}

精简设计: 只有一个定时器,指向最近的任务。任务触发后 tick 处理所有到期任务,然后重新 armTimer 指向下一个任务。

八、自检清单

  1. start() 首先清理 runningAtMs 悬空标记(崩溃恢复)。
  2. at 类型支持 at(字符串规范字段)和 atMs(数字/字符串旧字段)两种写法(向后兼容)。
  3. every 类型的触发时刻是 anchor + n * everyMs,不是"从启动时刻"计算。
  4. cron 类型精度是(不是毫秒),时间戳截断到秒再计算。
  5. armTimer 只有一个 setTimeout,指向最近的到期时间(不是每个任务一个定时器)。
  6. resolveCronDeliveryPlan 返回 none 时直接丢弃结果,不报错。

九、开发避坑

  1. at 任务过期后不再触发:如果服务在 at 时刻之后才启动,且 runMissedJobs 判断过期,该任务不会被补运行。
  2. everyanchorMs 建议显式设置:不设置时默认为 nowMs(服务启动时刻),重启后触发时刻会漂移。
  3. agentTurn 是隔离 session:与主会话完全独立,不共享上下文,也不会影响主会话的历史记录。
  4. locked 保证原子性:start/stop/add/remove 都在 mutex 锁内执行,避免并发修改。

用工程视角拆解 AI 智能体框架