import {
diagnosticLogger as diag,
logLaneDequeue,
logLaneEnqueue,
} from "../logging/diagnostic-runtime.js"; import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { CommandLane } from "./lanes.js"; /** * Dedicated error type thrown when a queued command is rejected because * its lane was cleared. Callers that fire-and-forget enqueued tasks can * catch (or ignore) this specific type to avoid unhandled-rejection noise.
*/
export class CommandLaneClearedError extends Error {
constructor(lane?: string) { super(lane ? `Command lane "${lane}" cleared` : "Command lane cleared"); this.name = "CommandLaneClearedError";
}
}
/** * Dedicated error type thrown when a new command is rejected because the * gateway is currently draining for restart.
*/
export class GatewayDrainingError extends Error {
constructor() { super("Gateway is draining for restart; new tasks are not accepted"); this.name = "GatewayDrainingError";
}
}
// Minimal in-process queue to serialize command executions. // Default lane ("main") preserves the existing behavior. Additional lanes allow // low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for // the main auto-reply workflow.
/** * Keep queue runtime state on globalThis so every bundled entry/chunk shares * the same lanes, counters, and draining flag in production builds.
*/ const COMMAND_QUEUE_STATE_KEY = Symbol.for("openclaw.commandQueueState");
function getQueueState() { const state = resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({
gatewayDraining: false,
lanes: new Map<string, LaneState>(),
activeTaskWaiters: new Set<ActiveTaskWaiter>(),
nextTaskId: 1,
})); // Schema migration: the singleton may have been created by an older code // version (e.g. v2026.4.2) that did not include `activeTaskWaiters`. After // a SIGUSR1 in-process restart the new code inherits the stale object via // `resolveGlobalSingleton` because the Symbol key already exists on // globalThis. Patch the missing field so all downstream consumers see a // valid Set instead of `undefined`. if (!state.activeTaskWaiters) {
state.activeTaskWaiters = new Set<ActiveTaskWaiter>();
} return state;
}
function normalizeLane(lane: string): string { return lane.trim() || CommandLane.Main;
}
function getLaneDepth(state: LaneState): number { return state.queue.length + state.activeTaskIds.size;
}
function completeTask(state: LaneState, taskId: number, taskGeneration: number): boolean { if (taskGeneration !== state.generation) { returnfalse;
}
state.activeTaskIds.delete(taskId); returntrue;
}
function hasPendingActiveTasks(taskIds: Set<number>): boolean { const queueState = getQueueState(); for (const state of queueState.lanes.values()) { for (const taskId of state.activeTaskIds) { if (taskIds.has(taskId)) { returntrue;
}
}
} returnfalse;
}
function resolveActiveTaskWaiter(waiter: ActiveTaskWaiter, result: { drained: boolean }): void { const queueState = getQueueState(); if (!queueState.activeTaskWaiters.delete(waiter)) { return;
} if (waiter.timeout) {
clearTimeout(waiter.timeout);
}
waiter.resolve(result);
}
function notifyActiveTaskWaiters(): void { const queueState = getQueueState(); for (const waiter of Array.from(queueState.activeTaskWaiters)) { if (waiter.activeTaskIds.size === 0 || !hasPendingActiveTasks(waiter.activeTaskIds)) {
resolveActiveTaskWaiter(waiter, { drained: true });
}
}
}
function drainLane(lane: string) { const state = getLaneState(lane); if (state.draining) { if (state.activeTaskIds.size === 0 && state.queue.length > 0) {
diag.warn(
`drainLane blocked: lane=${lane} draining=true active=0 queue=${state.queue.length}`,
);
} return;
}
state.draining = true;
/** * Mark gateway as draining for restart so new enqueues fail fast with * `GatewayDrainingError` instead of being silently killed on shutdown.
*/
export function markGatewayDraining(): void {
getQueueState().gatewayDraining = true;
}
export function getQueueSize(lane: string = CommandLane.Main) { const resolved = normalizeLane(lane); const state = getQueueState().lanes.get(resolved); if (!state) { return0;
} return getLaneDepth(state);
}
export function getTotalQueueSize() {
let total = 0; for (const s of getQueueState().lanes.values()) {
total += getLaneDepth(s);
} return total;
}
export function clearCommandLane(lane: string = CommandLane.Main) { const cleaned = normalizeLane(lane); const state = getQueueState().lanes.get(cleaned); if (!state) { return0;
} const removed = state.queue.length; const pending = state.queue.splice(0); for (const entry of pending) {
entry.reject(new CommandLaneClearedError(cleaned));
} return removed;
}
/** * Test-only hard reset that discards all queue state, including preserved * queued work from previous generations. Use this when a suite needs an * isolated baseline across shared-worker runs.
*/
export function resetCommandQueueStateForTest(): void { const queueState = getQueueState();
queueState.gatewayDraining = false;
queueState.lanes.clear(); for (const waiter of Array.from(queueState.activeTaskWaiters)) {
resolveActiveTaskWaiter(waiter, { drained: true });
}
queueState.nextTaskId = 1;
}
/** * Reset all lane runtime state to idle. Used after SIGUSR1 in-process * restarts where interrupted tasks' finally blocks may not run, leaving * stale active task IDs that permanently block new work from draining. * * Bumps lane generation and clears execution counters so stale completions * from old in-flight tasks are ignored. Queued entries are intentionally * preserved — they represent pending user work that should still execute * after restart. * * After resetting, drains any lanes that still have queued entries so * preserved work is pumped immediately rather than waiting for a future * `enqueueCommandInLane()` call (which may never come).
*/
export function resetAllLanes(): void { const queueState = getQueueState();
queueState.gatewayDraining = false; const lanesToDrain: string[] = []; for (const state of queueState.lanes.values()) {
state.generation += 1;
state.activeTaskIds.clear();
state.draining = false; if (state.queue.length > 0) {
lanesToDrain.push(state.lane);
}
} // Drain after the full reset pass so all lanes are in a clean state first. for (const lane of lanesToDrain) {
drainLane(lane);
}
notifyActiveTaskWaiters();
}
/** * Returns the total number of actively executing tasks across all lanes * (excludes queued-but-not-started entries).
*/
export function getActiveTaskCount(): number { const queueState = getQueueState();
let total = 0; for (const s of queueState.lanes.values()) {
total += s.activeTaskIds.size;
} return total;
}
/** * Wait for all currently active tasks across all lanes to finish. * Polls at a short interval; resolves when no tasks are active or * when `timeoutMs` elapses (whichever comes first). * * New tasks enqueued after this call are ignored — only tasks that are * already executing are waited on.
*/
export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolean }> { const queueState = getQueueState(); const activeAtStart = new Set<number>(); for (const state of queueState.lanes.values()) { for (const taskId of state.activeTaskIds) {
activeAtStart.add(taskId);
}
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.