type CommandQueueModule = typeofimport("./command-queue.js");
let clearCommandLane: CommandQueueModule["clearCommandLane"];
let CommandLaneClearedError: CommandQueueModule["CommandLaneClearedError"];
let enqueueCommand: CommandQueueModule["enqueueCommand"];
let enqueueCommandInLane: CommandQueueModule["enqueueCommandInLane"];
let GatewayDrainingError: CommandQueueModule["GatewayDrainingError"];
let getActiveTaskCount: CommandQueueModule["getActiveTaskCount"];
let getQueueSize: CommandQueueModule["getQueueSize"];
let markGatewayDraining: CommandQueueModule["markGatewayDraining"];
let resetAllLanes: CommandQueueModule["resetAllLanes"];
let resetCommandQueueStateForTest: CommandQueueModule["resetCommandQueueStateForTest"];
let setCommandLaneConcurrency: CommandQueueModule["setCommandLaneConcurrency"];
let waitForActiveTasks: CommandQueueModule["waitForActiveTasks"];
beforeEach(() => {
vi.useRealTimers();
resetCommandQueueStateForTest(); // Queue state is global across module instances, so reset main lane // concurrency explicitly to avoid cross-file leakage.
setCommandLaneConcurrency(CommandLane.Main, 1);
diagnosticMocks.logLaneEnqueue.mockClear();
diagnosticMocks.logLaneDequeue.mockClear();
diagnosticMocks.diag.debug.mockClear();
diagnosticMocks.diag.warn.mockClear();
diagnosticMocks.diag.error.mockClear();
});
afterEach(() => {
vi.useRealTimers();
});
it("resetAllLanes is safe when no lanes have been created", () => {
expect(getActiveTaskCount()).toBe(0);
expect(() => resetAllLanes()).not.toThrow();
expect(getActiveTaskCount()).toBe(0);
});
it("runs tasks one at a time in order", async () => {
let active = 0;
let maxActive = 0; const calls: number[] = [];
it("invokes onWait callback when a task waits past the threshold", async () => {
let waited: number | null = null;
let queuedAhead: number | null = null;
vi.useFakeTimers(); try {
let releaseFirst!: () => void; const blocker = new Promise<void>((resolve) => {
releaseFirst = resolve;
}); const first = enqueueCommand(async () => {
await blocker;
});
expect(waited).not.toBeNull();
expect(waited as unknown as number).toBeGreaterThanOrEqual(5);
expect(queuedAhead).toBe(0);
} finally {
vi.useRealTimers();
}
});
it("demotes live model switch lane failures to debug noise", async () => { const error = new Error("Live session model switch requested: anthropic/claude-opus-4-6");
error.name = "LiveSessionModelSwitchError";
it("waitForActiveTasks returns drained=false when timeout is zero and tasks are active", async () => { const { task, release } = enqueueBlockedMainTask();
// Enqueue another task — it should be stuck behind the blocker
let task2Ran = false; const task2 = enqueueCommandInLane(lane, async () => {
task2Ran = true;
});
// Simulate SIGUSR1: reset all lanes. Queued work (task2) should be // drained immediately — no fresh enqueue needed.
resetAllLanes();
// Complete the stale in-flight task; generation mismatch makes its // completion path a no-op for queue bookkeeping.
resolve1();
await task1;
// task2 should have been pumped by resetAllLanes's drain pass.
await task2;
expect(task2Ran).toBe(true);
});
it("waitForActiveTasks ignores tasks that start after the call", async () => { const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 2);
let resolve1!: () => void; const blocker1 = new Promise<void>((r) => {
resolve1 = r;
});
let resolve2!: () => void; const blocker2 = new Promise<void>((r) => {
resolve2 = r;
}); const firstStarted = createDeferred();
// Starts after waitForActiveTasks snapshot and should not block drain completion. const second = enqueueCommandInLane(lane, async () => {
await blocker2;
});
expect(getActiveTaskCount()).toBeGreaterThanOrEqual(2);
it("resetAllLanes clears gateway draining flag and re-allows enqueue", async () => {
markGatewayDraining();
resetAllLanes();
await expect(enqueueCommand(async () => "ok")).resolves.toBe("ok");
});
it("migrates legacy queue state missing activeTaskWaiters without crashing", async () => { // Simulate a SIGUSR1 in-process restart where the globalThis singleton was // created by an older code version (e.g. v2026.4.2) that did not include // the `activeTaskWaiters` field. The schema migration in getQueueState() // must patch the missing field so resetAllLanes() and // notifyActiveTaskWaiters() do not throw. const key = Symbol.for("openclaw.commandQueueState"); const globalStore = globalThis as Record<PropertyKey, unknown>; const original = globalStore[key];
try { // Plant a legacy-shaped state object (no activeTaskWaiters).
globalStore[key] = {
gatewayDraining: false,
lanes: new Map(),
nextTaskId: 1,
};
// resetAllLanes calls notifyActiveTaskWaiters → Array.from(state.activeTaskWaiters). // Without the migration this would throw: // TypeError: undefined is not iterable
expect(() => resetAllLanes()).not.toThrow();
// waitForActiveTasks also accesses activeTaskWaiters.
await expect(waitForActiveTasks(0)).resolves.toEqual({ drained: true });
} finally { // Restore original state so subsequent tests are not affected. if (original !== undefined) {
globalStore[key] = original;
} else { delete globalStore[key];
}
resetCommandQueueStateForTest();
}
});
it("shares lane state across distinct module instances", async () => { const commandQueueA = await importFreshModule<typeofimport("./command-queue.js")>( import.meta.url, "./command-queue.js?scope=shared-a",
); const commandQueueB = await importFreshModule<typeofimport("./command-queue.js")>( import.meta.url, "./command-queue.js?scope=shared-b",
); const lane = `shared-state-${Date.now()}-${Math.random().toString(16).slice(2)}`;
let release!: () => void; const blocker = new Promise<void>((resolve) => {
release = resolve;
});
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.