21 并发队列与Lane状态机实现实战(session/global 双层排队)
这篇是"并发控制"的实现手册,重点解决三件事:
- 同会话不乱序。
- 全局并发可控。
- 重启后队列不僵死。
对应源码入口
src/process/command-queue.tssrc/process/lanes.tssrc/agents/pi-embedded-runner/lanes.tssrc/gateway/server-lanes.tssrc/agents/pi-embedded-runner/run.tssrc/agents/pi-embedded-runner/runs.ts
一、核心数据结构
LaneState(完整字段)
type LaneState = {
lane: string;
queue: QueueEntry[];
activeTaskIds: Set<number>;
maxConcurrent: number; // 默认 1(串行),setCommandLaneConcurrency 可修改
draining: boolean; // 防重复 pump:true 时不再触发新一轮 drain
generation: number; // 默认 0,resetAllLanes 时递增,让旧回调回写被忽略
};QueueEntry(完整字段)
type QueueEntry = {
task: () => Promise<unknown>;
resolve: (v: unknown) => void;
reject: (e: unknown) => void;
enqueuedAt: number;
warnAfterMs: number; // 默认 2000ms(源码:opts?.warnAfterMs ?? 2_000)
onWait?: (waitedMs: number) => void; // 触发后仅告警,不取消任务
};超时告警触发条件:if (waitedMs >= entry.warnAfterMs) → 调 onWait + diag.warn。 不是异常,不取消,只是告警,让上层感知排队过久。
二、双层排队模型(run.ts)
runEmbeddedPiAgent(...) 不只排一次队,而是两层:
const sessionLane = resolveSessionLane(sessionKey || sessionId);
const globalLane = resolveGlobalLane(params.lane);
// 外层:session 串行(同会话顺序保证)
// 内层:global 限流(跨会话资源门控)
await enqueue(sessionLane, () => enqueue(globalLane, task));这就是"局部有序 + 全局限流"的本质。
Lane 名称解析规则(源码精确)
// src/agents/pi-embedded-runner/lanes.ts
export function resolveSessionLane(key: string) {
const cleaned = key.trim() || CommandLane.Main;
return cleaned.startsWith("session:") ? cleaned : `session:${cleaned}`;
// 幂等:已含前缀不再加;空 key 回落 Main
}
export function resolveGlobalLane(lane?: string) {
const cleaned = lane?.trim();
return cleaned ? cleaned : CommandLane.Main;
// 空值默认走 CommandLane.Main
}三、状态机流程
1) 入队
enqueueCommandInLane(lane, task, opts):
- 创建/获取
LaneState。 - 记录
enqueuedAt = Date.now(),warnAfterMs = opts?.warnAfterMs ?? 2_000。 push(entry)到queue。drainLane(state)立即尝试拉起执行。
2) 出队执行
drainLane(state):
if (state.draining) return— 防重入。while active < maxConcurrent && queue.length > 0循环拉起任务。- 任务成功:
completeTask(state, id)→ resolve +pump()。 - 任务失败:
completeTask(state, id)→ reject +pump()(失败也 pump,不饿死后续)。
3) Probe Lane 特殊静默
// command-queue.ts
function isProbeLane(lane: string): boolean {
return lane.startsWith("auth-probe:") || lane.startsWith("session:probe-");
}probe lane 的任务失败时不打错误日志。 探针任务是试错用的,错误是预期行为,记录会产生误导性噪音。
4) 清理/恢复
clearCommandLane(lane):只取消 queue 里未开始的任务,抛 CommandLaneClearedError; 不影响已 active 的任务。
resetAllLanes():
// 源码注释(精确):
// "Used after SIGUSR1 in-process restarts where interrupted tasks'
// finally blocks may not run"执行步骤:
state.generation++— 让旧回调回写时发现 generation 不匹配,直接忽略。state.activeTaskIds.clear()— 强制归零(旧 finally 可能永远不跑了)。- 保留
queue,重新drainLane— 积压任务继续执行。
这是"热重启后不僵死"的关键,generation 机制防止旧 finally 污染新状态。
waitForActiveTasks(timeoutMs):
const POLL_INTERVAL_MS = 50; // 轮询间隔(源码常量)
// 返回 { drained: boolean },超时不 reject进入时快照 activeAtStart(当前所有 active 任务),每 50ms 检查这批 ID 是否都已完成。 只等调用时已经 active 的任务,不等后来新进来的任务。
四、并发配置入口
// src/gateway/server-lanes.ts
export function applyGatewayLaneConcurrency(cfg: Config) {
setCommandLaneConcurrency(CommandLane.Cron, cfg.cron?.maxConcurrentRuns ?? 1);
setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(cfg));
setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(cfg));
}setCommandLaneConcurrency 写完 maxConcurrent 后立刻 drainLane, "提高并发"的变更实时生效,不等下条消息触发。
热重载时再次调用同一函数,保持运行时与配置一致。
五、运行时控制(Active Run Registry)
runs.ts 维护 ACTIVE_EMBEDDED_RUNS: Map<sessionId, EmbeddedPiQueueHandle>:
type EmbeddedPiQueueHandle = {
queueMessage: (msg: string) => boolean;
isStreaming: boolean;
isCompacting: boolean;
abort: () => void;
};setActiveEmbeddedRun
// 日志区分:首次注册 vs 替换("run_started" vs "run_replaced")
ACTIVE_EMBEDDED_RUNS.set(sessionId, handle);queueEmbeddedPiMessage
返回 false(静默失败)的三种场景:
no_active_run:sessionId 无活动 runnot_streaming:run 存在但isStreaming=false(还未进入流式阶段)compacting:正在压缩上下文(isCompacting=true),不接受新消息
clearActiveEmbeddedRun(handle 匹配校验)
// runs.ts 精确实现模式
if (ACTIVE_EMBEDDED_RUNS.get(sessionId) === handle) {
ACTIVE_EMBEDDED_RUNS.delete(sessionId);
}必须 handle 匹配才删除,防止"旧 finally 块误删新 run"的竞态。
waitForEmbeddedPiRunEnd
// 默认超时:15000ms
// 最小超时:Math.max(100, timeoutMs)
// 超时返回 false(不 reject)
// run 正常结束返回 true内部维护 EMBEDDED_RUN_WAITERS: Map<sessionId, Set<(done: boolean) => void>>, run 结束时通知所有等待者。
六、可复刻最小实现
// === Lane 状态机 ===
async function runWithLanes(req: Req) {
const sLane = resolveSessionLane(req.sessionKey); // "session:xxx"
const gLane = resolveGlobalLane(req.lane); // "main" | "cron" | "subagent"
return enqueue(sLane, () =>
enqueue(gLane, async () => {
const handle = { isStreaming: false, isCompacting: false, abort: () => {}, queueMessage: () => false };
setActiveRun(req.sessionId, handle);
try {
return await doWork(req);
} finally {
clearActiveRun(req.sessionId, handle); // handle 必须匹配才清理
}
}),
);
}
// === resetAllLanes(SIGUSR1 热重启)===
function resetAllLanes() {
for (const state of lanes.values()) {
state.generation++;
state.activeTaskIds.clear();
drainLane(state); // 让积压任务继续
}
}七、验收清单
- 同一 session 连发 10 条消息,回复顺序稳定。
- 两个 session 可并发执行,不互相阻塞。
clearCommandLane不会取消已在执行中的任务。resetAllLanes后旧任务回调因 generation 不匹配而被忽略。waitForActiveTasks(timeout)能给出drained: true/false,POLL_INTERVAL=50ms。- probe lane 的失败被静默处理,不打错误日志。
clearActiveEmbeddedRun做了 handle 匹配,误删新 run 的场景验证。queueEmbeddedPiMessage在压缩中返回 false,不阻塞也不报错。
八、常见坑
- 只有全局队列,没有 session 队列,导致串线。
- reset 只 clear active,不做 generation,旧回调回写脏状态。
clearActiveRun不校验 handle,误删新任务(高并发下必现)。- 忘了失败分支继续 pump,队列卡死。
- probe lane 错误日志轰炸(应判断 isProbeLane 静默)。
waitForActiveTasks等待所有队列任务而不是快照时已 active 的 —— 导致永远等不完。