03 会话与并发框架(session key + lane + queue)
小白先懂(30秒)
- 这套模块就是“排队系统”。
- 同一个会话必须按顺序排队,不同会话可以并行。
sessionKey决定排到哪条队,lane决定这条队能同时跑几个任务。
你先照着做(不求全懂)
- 先固定生成
sessionKey(同会话永远同 key)。 - 再实现
enqueueCommandInLane(sessionLane, task)。 - 再给全局 lane 加并发上限。
- 最后实现
runId -> sessionKey反查缓存。
对应核心代码:
src/agents/pi-embedded-runner/lanes.tssrc/process/command-queue.tssrc/gateway/server-lanes.tssrc/gateway/server-session-key.tssrc/config/sessions/session-key.ts
步骤一:执行链路拆解(具体到函数)
- 生成会话键
resolveSessionKey(...)(src/config/sessions/session-key.ts)生成稳定sessionKey。 - 解析 lane
resolveSessionLane(...)+resolveGlobalLane(...)。 - 入队执行
enqueueCommandInLane(...)(src/process/command-queue.ts)。 - 应用并发配置
applyGatewayLaneConcurrency(...)设置Cron/Main/Subagent三类并发。 - run 反查 session
resolveSessionKeyForRun(runId)从 run context 缓存和 session store 反查。
步骤二:实现细节(内部结构)
- lane 的内部状态(
LaneState)
queue: 等待任务activeTaskIds: 正在运行任务maxConcurrent: 当前 lane 并发上限generation: reset 后防旧任务回写
enqueueCommandInLane关键点
- 自动创建 lane(默认
main)。 - 入队后立刻
drainLane(...)。 - 每个任务有
warnAfterMs/onWait,可做排队告警。
clearCommandLane与resetAllLanes
clearCommandLane只清“未执行任务”,并抛CommandLaneClearedError。resetAllLanes会提升generation,防止旧任务结束时污染新状态。
- sessionKey 设计要点
- 直聊默认可归并到 main 会话桶。
- 群聊会话独立,避免群上下文污染私聊。
- runId -> sessionKey 回填
- 先查
getAgentRunContext(runId)。 - 缓存没命中才查持久化 store。
- 查到后
registerAgentRunContext(...)回填缓存。
最小复刻骨架(更贴近真实)
ts
type Lane = { queue: Array<() => Promise<void>>; active: number; max: number };
const lanes = new Map<string, Lane>();
function setLaneConcurrency(name: string, max: number) {
const lane = lanes.get(name) ?? { queue: [], active: 0, max: 1 };
lane.max = Math.max(1, Math.floor(max));
lanes.set(name, lane);
}
async function enqueue(name: string, task: () => Promise<void>) {
const lane = lanes.get(name) ?? { queue: [], active: 0, max: 1 };
lanes.set(name, lane);
lane.queue.push(task);
pump(name, lane);
}
function pump(name: string, lane: Lane) {
while (lane.active < lane.max && lane.queue.length > 0) {
const task = lane.queue.shift()!;
lane.active += 1;
void task().finally(() => {
lane.active -= 1;
pump(name, lane);
});
}
}