import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" ;
import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js" ;
import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js" ;
import { resetCronActiveJobsForTests } from "../cron/active-jobs.js" ;
import {
emitAgentEvent,
registerAgentRunContext,
resetAgentRunContextForTest,
} from "../infra/agent-events.js" ;
import {
hasPendingHeartbeatWake,
resetHeartbeatWakeStateForTests,
} from "../infra/heartbeat-wake.js" ;
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js" ;
import type { ParsedAgentSessionKey } from "../routing/session-key.js" ;
import { withTempDir } from "../test-helpers/temp-dir.js" ;
import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js" ;
import { configureTaskFlowRegistryRuntime } from "./task-flow-registry.store.js" ;
import {
cancelTaskById,
createTaskRecord,
findLatestTaskForOwnerKey,
findLatestTaskForRelatedSessionKey,
findTaskByRunId,
getTaskById,
getTaskRegistrySummary,
isParentFlowLinkError,
listTasksForOwnerKey,
listTaskRecords,
linkTaskToFlowById,
maybeDeliverTaskStateChangeUpdate,
maybeDeliverTaskTerminalUpdate,
markTaskRunningByRunId,
markTaskTerminalById,
recordTaskProgressByRunId,
resetTaskRegistryControlRuntimeForTests,
resetTaskRegistryDeliveryRuntimeForTests,
resetTaskRegistryForTests,
resolveTaskForLookupToken,
setTaskRegistryControlRuntimeForTests,
setTaskRegistryDeliveryRuntimeForTests,
setTaskProgressById,
setTaskTimingById,
updateTaskNotifyPolicyById,
} from "./task-registry.js" ;
import {
getInspectableTaskAuditSummary,
previewTaskRegistryMaintenance,
resetTaskRegistryMaintenanceRuntimeForTests,
reconcileInspectableTasks,
runTaskRegistryMaintenance,
setTaskRegistryMaintenanceRuntimeForTests,
startTaskRegistryMaintenance,
stopTaskRegistryMaintenanceForTests,
sweepTaskRegistry,
} from "./task-registry.maintenance.js" ;
import { configureTaskRegistryRuntime } from "./task-registry.store.js" ;
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
const hoisted = vi.hoisted(() => {
const sendMessageMock = vi.fn();
const cancelSessionMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
return {
sendMessageMock,
cancelSessionMock,
killSubagentRunAdminMock,
};
});
vi.mock("../acp/control-plane/manager.js" , () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.mock("../agents/subagent-control.js" , () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
vi.mock("../utils/message-channel.js" , () => ({
isDeliverableMessageChannel: (channel: string) =>
channel === "notifychat" || channel === "guildchat" ,
}));
function configureTaskRegistryMaintenanceRuntimeForTest(params: {
currentTasks: Map<string, ReturnType<typeof createTaskRecord>>;
snapshotTasks: ReturnType<typeof createTaskRecord>[];
}): void {
const emptyAcpEntry = {
cfg: {} as never,
storePath: "" ,
sessionKey: "" ,
storeSessionKey: "" ,
entry: undefined,
storeReadFailed: false ,
} satisfies AcpSessionStoreEntry;
setTaskRegistryMaintenanceRuntimeForTests({
readAcpSessionEntry: () => emptyAcpEntry,
loadSessionStore: () => ({}),
resolveStorePath: () => "" ,
parseAgentSessionKey: () => null as ParsedAgentSessionKey | null ,
isCronJobActive: () => false ,
getAgentRunContext: () => undefined,
deleteTaskRecordById: (taskId: string) => params.currentTasks.delete (taskId),
ensureTaskRegistryReady: () => {},
getTaskById: (taskId: string) => params.currentTasks.get(taskId),
listTaskRecords: () => params.snapshotTasks,
markTaskLostById: (patch: {
taskId: string;
endedAt: number;
lastEventAt?: number;
error?: string;
cleanupAfter?: number;
}) => {
const current = params.currentTasks.get(patch.taskId);
if (!current) {
return null ;
}
const next = {
...current,
status: "lost" as const ,
endedAt: patch.endedAt,
lastEventAt: patch.lastEventAt ?? patch.endedAt,
...(patch.error !== undefined ? { error: patch.error } : {}),
...(patch.cleanupAfter !== undefined ? { cleanupAfter: patch.cleanupAfter } : {}),
};
params.currentTasks.set(patch.taskId, next);
return next;
},
maybeDeliverTaskTerminalUpdate: async () => null ,
resolveTaskForLookupToken: () => undefined,
setTaskCleanupAfterById: (patch: { taskId: string; cleanupAfter: number }) => {
const current = params.currentTasks.get(patch.taskId);
if (!current) {
return null ;
}
const next = {
...current,
cleanupAfter: patch.cleanupAfter,
};
params.currentTasks.set(patch.taskId, next);
return next;
},
});
}
async function waitForAssertion(assertion: () => void , timeoutMs = 2 _000 , stepMs = 5 ) {
const startedAt = Date.now();
for (;;) {
try {
assertion();
return ;
} catch (error) {
if (Date.now() - startedAt >= timeoutMs) {
throw error;
}
await new Promise((resolve) => setTimeout(resolve, stepMs));
}
}
}
async function flushAsyncWork(times = 4 ) {
for (let index = 0 ; index < times; index += 1 ) {
await Promise.resolve();
}
}
async function withTaskRegistryTempDir<T>(run: (root: string) => Promise<T>): Promise<T> {
return await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetTaskFlowRegistryForTests();
try {
return await run(root);
} finally {
// Close both sqlite-backed registries before Windows temp-dir cleanup tries to remove them.
resetTaskRegistryForTests();
resetTaskFlowRegistryForTests();
}
});
}
function configureInMemoryTaskStoresForLinkValidationTests() {
configureTaskRegistryRuntime({
store: {
loadSnapshot: () => ({
tasks: new Map(),
deliveryStates: new Map(),
}),
saveSnapshot: () => {},
upsertTask: () => {},
deleteTask: () => {},
close: () => {},
},
});
configureTaskFlowRegistryRuntime({
store: {
loadSnapshot: () => ({
flows: new Map(),
}),
saveSnapshot: () => {},
upsertFlow: () => {},
deleteFlow: () => {},
close: () => {},
},
});
}
describe("task-registry" , () => {
beforeEach(() => {
setTaskRegistryDeliveryRuntimeForTests({
sendMessage: hoisted.sendMessageMock,
});
setTaskRegistryControlRuntimeForTests({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
killSubagentRunAdmin: async (params) => hoisted.killSubagentRunAdminMock(params),
});
});
afterEach(() => {
vi.useRealTimers();
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetSystemEventsForTest();
resetHeartbeatWakeStateForTests();
resetAgentRunContextForTest();
resetCronActiveJobsForTests();
resetTaskRegistryControlRuntimeForTests();
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryMaintenanceRuntimeForTests();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
hoisted.sendMessageMock.mockReset();
hoisted.cancelSessionMock.mockReset();
hoisted.killSubagentRunAdminMock.mockReset();
});
it("updates task status from lifecycle events" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:acp:child" ,
runId: "run-1" ,
task: "Do the thing" ,
status: "running" ,
deliveryStatus: "not_applicable" ,
startedAt: 100 ,
});
emitAgentEvent({
runId: "run-1" ,
stream: "assistant" ,
data: {
text: "working" ,
},
});
emitAgentEvent({
runId: "run-1" ,
stream: "lifecycle" ,
data: {
phase: "end" ,
endedAt: 250 ,
},
});
expect(findTaskByRunId("run-1" )).toMatchObject({
runtime: "acp" ,
status: "succeeded" ,
endedAt: 250 ,
});
});
});
it("ignores late agent events for operator-cancelled tasks" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const task = createTaskRecord({
runtime: "cli" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:main" ,
runId: "run-cancel-then-end" ,
task: "Do the thing" ,
status: "running" ,
deliveryStatus: "not_applicable" ,
startedAt: 100 ,
});
markTaskTerminalById({
taskId: task.taskId,
status: "cancelled" ,
endedAt: 200 ,
lastEventAt: 200 ,
error: "Cancelled by operator." ,
});
emitAgentEvent({
runId: "run-cancel-then-end" ,
stream: "lifecycle" ,
data: {
phase: "end" ,
endedAt: 999 ,
},
});
emitAgentEvent({
runId: "run-cancel-then-end" ,
stream: "error" ,
data: {
error: "late error" ,
},
});
expect(findTaskByRunId("run-cancel-then-end" )).toMatchObject({
status: "cancelled" ,
endedAt: 200 ,
lastEventAt: 200 ,
error: "Cancelled by operator." ,
});
});
});
it("summarizes task pressure by status and runtime" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
runId: "run-summary-acp" ,
task: "Investigate issue" ,
status: "queued" ,
deliveryStatus: "pending" ,
});
createTaskRecord({
runtime: "cron" ,
ownerKey: "" ,
scopeKind: "system" ,
runId: "run-summary-cron" ,
task: "Daily digest" ,
status: "running" ,
deliveryStatus: "not_applicable" ,
});
createTaskRecord({
runtime: "subagent" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
runId: "run-summary-subagent" ,
task: "Write patch" ,
status: "timed_out" ,
deliveryStatus: "session_queued" ,
});
expect(getTaskRegistrySummary()).toEqual({
total: 3 ,
active: 2 ,
terminal: 1 ,
failures: 1 ,
byStatus: {
queued: 1 ,
running: 1 ,
succeeded: 0 ,
failed: 0 ,
timed_out: 1 ,
cancelled: 0 ,
lost: 0 ,
},
byRuntime: {
subagent: 1 ,
acp: 1 ,
cli: 0 ,
cron: 1 ,
},
});
});
});
it("rejects cross-owner parent flow links during task creation" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
configureInMemoryTaskStoresForLinkValidationTests();
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main" ,
controllerId: "tests/task-registry" ,
goal: "Owner main flow" ,
});
expect(() =>
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:other" ,
scopeKind: "session" ,
parentFlowId: flow.flowId,
runId: "cross-owner-run" ,
task: "Attempt hijack" ,
}),
).toThrow("Task ownerKey must match parent flow ownerKey." );
});
});
it("rejects system-scoped parent flow links during task creation" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
configureInMemoryTaskStoresForLinkValidationTests();
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main" ,
controllerId: "tests/task-registry" ,
goal: "Owner main flow" ,
});
expect(() =>
createTaskRecord({
runtime: "cron" ,
ownerKey: "agent:main:main" ,
scopeKind: "system" ,
parentFlowId: flow.flowId,
runId: "system-link-run" ,
task: "System task" ,
deliveryStatus: "not_applicable" ,
}),
).toThrow("Only session-scoped tasks can link to flows." );
});
});
it("rejects cross-owner flow links for existing tasks" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
configureInMemoryTaskStoresForLinkValidationTests();
const task = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
runId: "owner-main-task" ,
task: "Safe task" ,
});
const flow = createManagedTaskFlow({
ownerKey: "agent:main:other" ,
controllerId: "tests/task-registry" ,
goal: "Other owner flow" ,
});
expect(() =>
linkTaskToFlowById({
taskId: task.taskId,
flowId: flow.flowId,
}),
).toThrow("Task ownerKey must match parent flow ownerKey." );
expect(getTaskById(task.taskId)).toMatchObject({
taskId: task.taskId,
parentFlowId: undefined,
});
});
});
it("rejects parent flow links once cancellation has been requested" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
configureInMemoryTaskStoresForLinkValidationTests();
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main" ,
controllerId: "tests/task-registry" ,
goal: "Cancelling flow" ,
cancelRequestedAt: 42 ,
});
try {
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
parentFlowId: flow.flowId,
runId: "cancel-requested-link" ,
task: "Should be denied" ,
});
throw new Error("Expected createTaskRecord to throw." );
} catch (error) {
expect(isParentFlowLinkError(error)).toBe(true );
expect(error).toMatchObject({
code: "cancel_requested" ,
message: "Parent flow cancellation has already been requested." ,
});
}
});
});
it("rejects parent flow links for terminal flows" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
configureInMemoryTaskStoresForLinkValidationTests();
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main" ,
controllerId: "tests/task-registry" ,
goal: "Completed flow" ,
status: "cancelled" ,
});
expect(() =>
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
parentFlowId: flow.flowId,
runId: "terminal-flow-link" ,
task: "Should be denied" ,
}),
).toThrow("Parent flow is already cancelled." );
});
});
it("delivers ACP completion to the requester channel when a delivery origin exists" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "notifychat" ,
to: "notifychat:123" ,
via: "direct" ,
});
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
threadId: "321" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-delivery" ,
task: "Investigate issue" ,
status: "running" ,
deliveryStatus: "pending" ,
startedAt: 100 ,
});
emitAgentEvent({
runId: "run-delivery" ,
stream: "lifecycle" ,
data: {
phase: "end" ,
endedAt: 250 ,
},
});
await waitForAssertion(() =>
expect(findTaskByRunId("run-delivery" )).toMatchObject({
status: "succeeded" ,
deliveryStatus: "delivered" ,
}),
);
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "notifychat" ,
to: "notifychat:123" ,
threadId: "321" ,
content: expect.stringContaining("Background task done: ACP background task" ),
mirror: expect.objectContaining({
sessionKey: "agent:main:main" ,
}),
}),
),
);
expect(peekSystemEvents("agent:main:main" )).toEqual([]);
});
});
it("records delivery failure and queues a session fallback when direct delivery misses" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockRejectedValueOnce(new Error("notifychat unavailable" ));
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-delivery-fail" ,
task: "Investigate issue" ,
status: "running" ,
deliveryStatus: "pending" ,
startedAt: 100 ,
});
emitAgentEvent({
runId: "run-delivery-fail" ,
stream: "lifecycle" ,
data: {
phase: "error" ,
endedAt: 250 ,
error: "Permission denied by ACP runtime" ,
},
});
await waitForAssertion(() =>
expect(findTaskByRunId("run-delivery-fail" )).toMatchObject({
status: "failed" ,
deliveryStatus: "failed" ,
error: "Permission denied by ACP runtime" ,
}),
);
await waitForAssertion(() =>
expect(peekSystemEvents("agent:main:main" )).toEqual([
expect.stringContaining("Background task failed: ACP background task" ),
]),
);
});
});
it("still wakes the parent when blocked delivery misses the outward channel" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockRejectedValueOnce(new Error("notifychat unavailable" ));
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-delivery-blocked" ,
task: "Port the repo changes" ,
status: "succeeded" ,
deliveryStatus: "pending" ,
terminalOutcome: "blocked" ,
terminalSummary: "Writable session or apply_patch authorization required." ,
});
await waitForAssertion(() =>
expect(findTaskByRunId("run-delivery-blocked" )).toMatchObject({
status: "succeeded" ,
deliveryStatus: "failed" ,
terminalOutcome: "blocked" ,
}),
);
expect(peekSystemEvents("agent:main:main" )).toEqual([
"Background task blocked: ACP background task (run run-deli). Writable session or apply_patch authorization required." ,
"Task needs follow-up: ACP background task (run run-deli). Writable session or apply_patch authorization required." ,
]);
expect(hasPendingHeartbeatWake()).toBe(true );
});
});
it("marks internal fallback delivery as session queued instead of delivered" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:acp:child" ,
runId: "run-session-queued" ,
task: "Investigate issue" ,
status: "running" ,
deliveryStatus: "pending" ,
startedAt: 100 ,
});
emitAgentEvent({
runId: "run-session-queued" ,
stream: "lifecycle" ,
data: {
phase: "end" ,
endedAt: 250 ,
},
});
await waitForAssertion(() =>
expect(findTaskByRunId("run-session-queued" )).toMatchObject({
status: "succeeded" ,
deliveryStatus: "session_queued" ,
}),
);
expect(peekSystemEvents("agent:main:main" )).toEqual([
expect.stringContaining("Background task done: ACP background task" ),
]);
expect(hoisted.sendMessageMock).not.toHaveBeenCalled();
});
});
it("wakes the parent for blocked tasks even when delivery falls back to the session" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:acp:child" ,
runId: "run-session-blocked" ,
task: "Port the repo changes" ,
status: "succeeded" ,
deliveryStatus: "pending" ,
terminalOutcome: "blocked" ,
terminalSummary: "Writable session or apply_patch authorization required." ,
});
await waitForAssertion(() =>
expect(findTaskByRunId("run-session-blocked" )).toMatchObject({
status: "succeeded" ,
deliveryStatus: "session_queued" ,
}),
);
expect(peekSystemEvents("agent:main:main" )).toEqual([
"Background task blocked: ACP background task (run run-sess). Writable session or apply_patch authorization required." ,
"Task needs follow-up: ACP background task (run run-sess). Writable session or apply_patch authorization required." ,
]);
expect(hasPendingHeartbeatWake()).toBe(true );
expect(hoisted.sendMessageMock).not.toHaveBeenCalled();
});
});
it("does not include internal progress detail in the terminal channel message" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "notifychat" ,
to: "notifychat:123" ,
via: "direct" ,
});
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
threadId: "321" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-detail-leak" ,
task: "Create the file and verify it" ,
status: "running" ,
deliveryStatus: "pending" ,
startedAt: 100 ,
});
setTaskProgressById({
taskId: findTaskByRunId("run-detail-leak" )!.taskId,
progressSummary:
"I am loading the local session context and checking helper command availability before writing the file." ,
});
emitAgentEvent({
runId: "run-detail-leak" ,
stream: "lifecycle" ,
data: {
phase: "end" ,
endedAt: 250 ,
},
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content: "Background task done: ACP background task (run run-deta)." ,
}),
),
);
});
});
it("surfaces blocked outcomes separately from completed tasks" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "notifychat" ,
to: "notifychat:123" ,
via: "direct" ,
});
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-blocked-outcome" ,
task: "Port the repo changes" ,
status: "succeeded" ,
deliveryStatus: "pending" ,
terminalOutcome: "blocked" ,
terminalSummary: "Writable session or apply_patch authorization required." ,
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content:
"Background task blocked: ACP background task (run run-bloc). Writable session or apply_patch authorization required." ,
}),
),
);
expect(peekSystemEvents("agent:main:main" )).toEqual([
"Task needs follow-up: ACP background task (run run-bloc). Writable session or apply_patch authorization required." ,
]);
expect(hasPendingHeartbeatWake()).toBe(true );
});
});
it("does not queue an unblock follow-up for ordinary completed tasks" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "notifychat" ,
to: "notifychat:123" ,
via: "direct" ,
});
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-succeeded-outcome" ,
task: "Create the file and verify it" ,
status: "succeeded" ,
deliveryStatus: "pending" ,
terminalSummary: "Created /tmp/file.txt and verified contents." ,
terminalOutcome: "succeeded" ,
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content:
"Background task done: ACP background task (run run-succ). Created /tmp/file.txt and verified contents." ,
}),
),
);
expect(peekSystemEvents("agent:main:main" )).toEqual([]);
expect(hasPendingHeartbeatWake()).toBe(false );
});
});
it("keeps distinct task records when different producers share a runId" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
createTaskRecord({
runtime: "cli" ,
ownerKey: "agent:codex:acp:child" ,
scopeKind: "session" ,
childSessionKey: "agent:codex:acp:child" ,
runId: "run-shared" ,
task: "Child ACP execution" ,
status: "running" ,
deliveryStatus: "not_applicable" ,
});
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:codex:acp:child" ,
runId: "run-shared" ,
task: "Spawn ACP child" ,
status: "running" ,
deliveryStatus: "pending" ,
});
expect(listTaskRecords().filter((task) => task.runId === "run-shared" )).toHaveLength(2 );
expect(findTaskByRunId("run-shared" )).toMatchObject({
runtime: "acp" ,
task: "Spawn ACP child" ,
});
});
});
it("scopes shared-run lifecycle events to the matching session" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const victimTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:victim:main" ,
scopeKind: "session" ,
childSessionKey: "agent:victim:acp:child" ,
runId: "run-shared-scope" ,
task: "Victim ACP task" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const attackerTask = createTaskRecord({
runtime: "cli" ,
ownerKey: "agent:attacker:main" ,
scopeKind: "session" ,
childSessionKey: "agent:attacker:main" ,
runId: "run-shared-scope" ,
task: "Attacker CLI task" ,
status: "running" ,
deliveryStatus: "not_applicable" ,
});
registerAgentRunContext("run-shared-scope" , {
sessionKey: "agent:attacker:main" ,
});
emitAgentEvent({
runId: "run-shared-scope" ,
stream: "lifecycle" ,
data: {
phase: "error" ,
endedAt: 250 ,
error: "attacker controlled error" ,
},
});
expect(getTaskById(attackerTask.taskId)).toMatchObject({
status: "failed" ,
error: "attacker controlled error" ,
});
expect(getTaskById(victimTask.taskId)).toMatchObject({
status: "running" ,
});
expect(getTaskById(victimTask.taskId)).not.toHaveProperty("error" );
});
});
it("suppresses duplicate ACP delivery when a preferred spawned task shares the runId" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "notifychat" ,
to: "notifychat:123" ,
via: "direct" ,
});
const directTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-shared-delivery" ,
task: "Direct ACP child" ,
status: "succeeded" ,
deliveryStatus: "pending" ,
});
const spawnedTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-shared-delivery" ,
task: "Spawn ACP child" ,
preferMetadata: true ,
status: "succeeded" ,
deliveryStatus: "pending" ,
});
await maybeDeliverTaskTerminalUpdate(directTask.taskId);
await maybeDeliverTaskTerminalUpdate(spawnedTask.taskId);
expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1 );
expect(listTaskRecords().filter((task) => task.runId === "run-shared-delivery" )).toHaveLength(
1 ,
);
expect(findTaskByRunId("run-shared-delivery" )).toMatchObject({
taskId: directTask.taskId,
task: "Spawn ACP child" ,
deliveryStatus: "delivered" ,
});
});
});
it("does not suppress ACP delivery across different requester scopes when runIds collide" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const victimTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:victim:main" ,
scopeKind: "session" ,
childSessionKey: "agent:victim:acp:child" ,
runId: "run-cross-requester-delivery" ,
task: "Victim ACP task" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const attackerTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:attacker:main" ,
scopeKind: "session" ,
childSessionKey: "agent:attacker:acp:child" ,
runId: "run-cross-requester-delivery" ,
task: "Attacker ACP task" ,
status: "running" ,
deliveryStatus: "pending" ,
});
markTaskTerminalById({
taskId: victimTask.taskId,
status: "succeeded" ,
endedAt: 250 ,
});
markTaskTerminalById({
taskId: attackerTask.taskId,
status: "succeeded" ,
endedAt: 260 ,
});
await maybeDeliverTaskTerminalUpdate(victimTask.taskId);
await maybeDeliverTaskTerminalUpdate(attackerTask.taskId);
await waitForAssertion(() =>
expect(getTaskById(victimTask.taskId)).toMatchObject({
deliveryStatus: "session_queued" ,
}),
);
await waitForAssertion(() =>
expect(getTaskById(attackerTask.taskId)).toMatchObject({
deliveryStatus: "session_queued" ,
}),
);
});
});
it("adopts preferred ACP spawn metadata when collapsing onto an earlier direct record" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const directTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-collapse-preferred" ,
task: "Direct ACP child" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const spawnedTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-collapse-preferred" ,
label: "Quant patch" ,
task: "Implement the feature and report back" ,
preferMetadata: true ,
status: "running" ,
deliveryStatus: "pending" ,
});
expect(spawnedTask.taskId).toBe(directTask.taskId);
expect(findTaskByRunId("run-collapse-preferred" )).toMatchObject({
taskId: directTask.taskId,
label: "Quant patch" ,
task: "Implement the feature and report back" ,
});
});
});
it("collapses ACP run-owned task creation onto the existing spawned task" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const spawnedTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-collapse" ,
task: "Spawn ACP child" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const directTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-collapse" ,
task: "Direct ACP child" ,
status: "running" ,
});
expect(directTask.taskId).toBe(spawnedTask.taskId);
expect(listTaskRecords().filter((task) => task.runId === "run-collapse" )).toHaveLength(1 );
expect(findTaskByRunId("run-collapse" )).toMatchObject({
task: "Spawn ACP child" ,
});
});
});
it("delivers a terminal ACP update only once when multiple notifiers race" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "notifychat" ,
to: "notifychat:123" ,
via: "direct" ,
});
const task = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:acp:child" ,
runId: "run-racing-delivery" ,
task: "Investigate issue" ,
status: "succeeded" ,
deliveryStatus: "pending" ,
terminalOutcome: "blocked" ,
terminalSummary: "Writable session or apply_patch authorization required." ,
});
const first = maybeDeliverTaskTerminalUpdate(task.taskId);
const second = maybeDeliverTaskTerminalUpdate(task.taskId);
await Promise.all([first, second]);
expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1 );
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
idempotencyKey: `task-terminal:${task.taskId}:succeeded:blocked`,
mirror: expect.objectContaining({
idempotencyKey: `task-terminal:${task.taskId}:succeeded:blocked`,
}),
}),
);
expect(findTaskByRunId("run-racing-delivery" )).toMatchObject({
deliveryStatus: "delivered" ,
});
});
});
it("restores persisted tasks from disk on the next lookup" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const task = createTaskRecord({
runtime: "subagent" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:subagent:child" ,
runId: "run-restore" ,
task: "Restore me" ,
status: "running" ,
deliveryStatus: "pending" ,
});
resetTaskRegistryForTests({
persist: false ,
});
expect(resolveTaskForLookupToken(task.taskId)).toMatchObject({
taskId: task.taskId,
runId: "run-restore" ,
task: "Restore me" ,
});
});
});
it("indexes tasks by session key for latest and list lookups" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests({ persist: false });
const nowSpy = vi.spyOn(Date, "now" );
nowSpy.mockReturnValue(1 _700 _000 _000 _000 );
const older = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:subagent:child-1" ,
runId: "run-session-lookup-1" ,
task: "Older task" ,
});
const latest = createTaskRecord({
runtime: "subagent" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:subagent:child-2" ,
runId: "run-session-lookup-2" ,
task: "Latest task" ,
});
nowSpy.mockRestore();
expect(findLatestTaskForOwnerKey("agent:main:main" )?.taskId).toBe(latest.taskId);
expect(listTasksForOwnerKey("agent:main:main" ).map((task) => task.taskId)).toEqual([
latest.taskId,
older.taskId,
]);
expect(findLatestTaskForRelatedSessionKey("agent:main:subagent:child-1" )?.taskId).toBe(
older.taskId,
);
});
});
it("projects inspection-time orphaned tasks as lost without mutating the registry" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const task = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:acp:missing" ,
runId: "run-lost" ,
task: "Missing child" ,
status: "running" ,
deliveryStatus: "pending" ,
});
setTaskTimingById({
taskId: task.taskId,
lastEventAt: Date.now() - 10 * 60 _000 ,
});
const tasks = reconcileInspectableTasks();
expect(tasks[0 ]).toMatchObject({
runId: "run-lost" ,
status: "lost" ,
error: "backing session missing" ,
});
expect(getTaskById(task.taskId)).toMatchObject({
status: "running" ,
});
expect(peekSystemEvents("agent:main:main" )).toEqual([]);
});
});
it("marks orphaned tasks lost with cleanupAfter in a single maintenance pass" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
const task = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:acp:missing" ,
runId: "run-lost-maintenance" ,
task: "Missing child" ,
status: "running" ,
deliveryStatus: "pending" ,
});
setTaskTimingById({
taskId: task.taskId,
lastEventAt: now - 10 * 60 _000 ,
});
expect(await runTaskRegistryMaintenance()).toEqual({
reconciled: 1 ,
recovered: 0 ,
cleanupStamped: 0 ,
pruned: 0 ,
});
expect(getTaskById(task.taskId)).toMatchObject({
status: "lost" ,
error: "backing session missing" ,
});
expect(getTaskById(task.taskId)?.cleanupAfter).toBeGreaterThan(now);
});
});
it("prunes old terminal tasks during maintenance sweeps" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const task = createTaskRecord({
runtime: "cli" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:main" ,
runId: "run-prune" ,
task: "Old completed task" ,
status: "succeeded" ,
deliveryStatus: "not_applicable" ,
startedAt: Date.now() - 9 * 24 * 60 * 60 _000 ,
});
setTaskTimingById({
taskId: task.taskId,
endedAt: Date.now() - 8 * 24 * 60 * 60 _000 ,
lastEventAt: Date.now() - 8 * 24 * 60 * 60 _000 ,
});
expect(await sweepTaskRegistry()).toEqual({
reconciled: 0 ,
recovered: 0 ,
cleanupStamped: 0 ,
pruned: 1 ,
});
expect(listTaskRecords()).toEqual([]);
});
});
it("previews and repairs missing cleanup timestamps during maintenance" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
configureTaskRegistryRuntime({
store: {
loadSnapshot: () => ({
tasks: new Map([
[
"task-missing-cleanup" ,
{
taskId: "task-missing-cleanup" ,
runtime: "cron" ,
requesterSessionKey: "" ,
ownerKey: "system:cron:task-missing-cleanup" ,
scopeKind: "system" ,
runId: "run-maintenance-cleanup" ,
task: "Finished cron" ,
status: "failed" ,
deliveryStatus: "not_applicable" ,
notifyPolicy: "silent" ,
createdAt: now - 120 _000 ,
endedAt: now - 60 _000 ,
lastEventAt: now - 60 _000 ,
},
],
]),
deliveryStates: new Map(),
}),
saveSnapshot: () => {},
},
});
expect(previewTaskRegistryMaintenance()).toEqual({
reconciled: 0 ,
recovered: 0 ,
cleanupStamped: 1 ,
pruned: 0 ,
});
expect(await runTaskRegistryMaintenance()).toEqual({
reconciled: 0 ,
recovered: 0 ,
cleanupStamped: 1 ,
pruned: 0 ,
});
expect(getTaskById("task-missing-cleanup" )?.cleanupAfter).toBeGreaterThan(now);
});
});
it("cancels the deferred maintenance sweep during test teardown" , async () => {
await withTaskRegistryTempDir(async (root) => {
vi.useFakeTimers();
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
const task = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:acp:missing" ,
runId: "run-deferred-maintenance-stop" ,
task: "Missing child" ,
status: "running" ,
deliveryStatus: "pending" ,
});
setTaskTimingById({
taskId: task.taskId,
lastEventAt: now - 10 * 60 _000 ,
});
startTaskRegistryMaintenance();
stopTaskRegistryMaintenanceForTests();
await vi.advanceTimersByTimeAsync(5 _000 );
await flushAsyncWork();
expect(getTaskById(task.taskId)).toMatchObject({
status: "running" ,
});
});
});
it("does not leak unhandled rejections when the scheduled maintenance sweep fails" , async () => {
await withTaskRegistryTempDir(async (root) => {
vi.useFakeTimers();
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const unhandled: unknown[] = [];
const onUnhandledRejection = (reason: unknown) => {
unhandled.push(reason);
};
process.on("unhandledRejection" , onUnhandledRejection);
setTaskRegistryMaintenanceRuntimeForTests({
readAcpSessionEntry: () => ({
cfg: {} as never,
storePath: "" ,
sessionKey: "" ,
storeSessionKey: "" ,
entry: undefined,
storeReadFailed: false ,
}),
loadSessionStore: () => ({}),
resolveStorePath: () => "" ,
parseAgentSessionKey: () => null ,
isCronJobActive: () => false ,
getAgentRunContext: () => undefined,
deleteTaskRecordById: () => false ,
ensureTaskRegistryReady: () => {},
getTaskById: () => undefined,
listTaskRecords: () => {
throw new Error("maintenance boom" );
},
markTaskLostById: () => null ,
maybeDeliverTaskTerminalUpdate: async () => null ,
resolveTaskForLookupToken: () => undefined,
setTaskCleanupAfterById: () => null ,
});
try {
startTaskRegistryMaintenance();
await vi.advanceTimersByTimeAsync(5 _000 );
await flushAsyncWork();
expect(unhandled).toEqual([]);
} finally {
process.off("unhandledRejection" , onUnhandledRejection);
}
});
});
it("rechecks current task state before marking a task lost" , async () => {
const now = Date.now();
const snapshotTask = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:acp:missing-stale" ,
runId: "run-lost-stale" ,
task: "Missing child" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const staleTask = {
...snapshotTask,
lastEventAt: now - 10 * 60 _000 ,
};
const currentTask = {
...snapshotTask,
lastEventAt: now,
};
const currentTasks = new Map([[snapshotTask.taskId, currentTask]]);
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks,
snapshotTasks: [staleTask],
});
expect(await runTaskRegistryMaintenance()).toEqual({
reconciled: 0 ,
recovered: 0 ,
cleanupStamped: 0 ,
pruned: 0 ,
});
expect(currentTasks.get(snapshotTask.taskId)).toMatchObject({
status: "running" ,
lastEventAt: now,
});
});
it("rechecks current task state before pruning a task" , async () => {
const now = Date.now();
const snapshotTask = createTaskRecord({
runtime: "cli" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
childSessionKey: "agent:main:main" ,
runId: "run-prune-stale" ,
task: "Old completed task" ,
status: "succeeded" ,
deliveryStatus: "not_applicable" ,
startedAt: now - 9 * 24 * 60 * 60 _000 ,
});
const staleTask = {
...snapshotTask,
endedAt: now - 8 * 24 * 60 * 60 _000 ,
lastEventAt: now - 8 * 24 * 60 * 60 _000 ,
cleanupAfter: now - 1 ,
};
const currentTask = {
...staleTask,
cleanupAfter: now + 60 _000 ,
};
const currentTasks = new Map([[snapshotTask.taskId, currentTask]]);
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks,
snapshotTasks: [staleTask],
});
expect(await sweepTaskRegistry()).toEqual({
reconciled: 0 ,
recovered: 0 ,
cleanupStamped: 0 ,
pruned: 0 ,
});
expect(currentTasks.get(snapshotTask.taskId)).toMatchObject({
status: "succeeded" ,
cleanupAfter: now + 60 _000 ,
});
});
it("summarizes inspectable task audit findings" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
configureTaskRegistryRuntime({
store: {
loadSnapshot: () => ({
tasks: new Map([
[
"task-audit-summary" ,
{
taskId: "task-audit-summary" ,
runtime: "acp" ,
requesterSessionKey: "agent:main:main" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
runId: "run-audit-summary" ,
task: "Hung task" ,
status: "running" ,
deliveryStatus: "pending" ,
notifyPolicy: "done_only" ,
createdAt: now - 50 * 60 _000 ,
startedAt: now - 40 * 60 _000 ,
lastEventAt: now - 40 * 60 _000 ,
},
],
]),
deliveryStates: new Map(),
}),
saveSnapshot: () => {},
},
});
expect(getInspectableTaskAuditSummary()).toEqual({
total: 1 ,
warnings: 0 ,
errors: 1 ,
byCode: {
stale_queued: 0 ,
stale_running: 1 ,
lost: 0 ,
delivery_failed: 0 ,
missing_cleanup: 0 ,
inconsistent_timestamps: 0 ,
},
});
});
});
it("delivers concise state-change updates only when notify policy requests them" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
hoisted.sendMessageMock.mockResolvedValue({
channel: "guildchat" ,
to: "guildchat:123" ,
via: "direct" ,
});
const task = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "guildchat" ,
to: "guildchat:123" ,
},
childSessionKey: "agent:codex:acp:child" ,
runId: "run-state-change" ,
task: "Investigate issue" ,
status: "queued" ,
notifyPolicy: "done_only" ,
});
markTaskRunningByRunId({
runId: "run-state-change" ,
eventSummary: "Started." ,
});
await waitForAssertion(() => expect(hoisted.sendMessageMock).not.toHaveBeenCalled());
updateTaskNotifyPolicyById({
taskId: task.taskId,
notifyPolicy: "state_changes" ,
});
recordTaskProgressByRunId({
runId: "run-state-change" ,
eventSummary: "No output for 60s. It may be waiting for input." ,
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content:
"Background task update: ACP background task. No output for 60s. It may be waiting for input." ,
}),
),
);
expect(findTaskByRunId("run-state-change" )).toMatchObject({
notifyPolicy: "state_changes" ,
});
await maybeDeliverTaskStateChangeUpdate(task.taskId);
expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1 );
});
});
it("keeps background ACP progress off the foreground lane and only sends a terminal notify" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetSystemEventsForTest();
hoisted.sendMessageMock.mockResolvedValue({
channel: "guildchat" ,
to: "guildchat:123" ,
via: "direct" ,
});
vi.useFakeTimers();
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "guildchat" ,
to: "guildchat:123" ,
},
childSessionKey: "agent:codex:acp:child" ,
runId: "run-quiet-terminal" ,
task: "Create the file" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const relay = startAcpSpawnParentStreamRelay({
runId: "run-quiet-terminal" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child" ,
agentId: "codex" ,
surfaceUpdates: false ,
streamFlushMs: 1 ,
noOutputNoticeMs: 1 _000 ,
noOutputPollMs: 250 ,
});
relay.notifyStarted();
emitAgentEvent({
runId: "run-quiet-terminal" ,
stream: "assistant" ,
data: {
delta: "working on it" ,
},
});
vi.advanceTimersByTime(10 );
expect(peekSystemEvents("agent:main:main" )).toEqual([]);
expect(hoisted.sendMessageMock).not.toHaveBeenCalled();
emitAgentEvent({
runId: "run-quiet-terminal" ,
stream: "lifecycle" ,
data: {
phase: "end" ,
endedAt: 250 ,
},
});
await flushAsyncWork();
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "guildchat" ,
to: "guildchat:123" ,
content: "Background task done: ACP background task (run run-quie)." ,
}),
);
expect(peekSystemEvents("agent:main:main" )).toEqual([]);
relay.dispose();
vi.useRealTimers();
});
});
it("delivers a concise terminal failure message without internal ACP chatter" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetSystemEventsForTest();
hoisted.sendMessageMock.mockResolvedValue({
channel: "guildchat" ,
to: "guildchat:123" ,
via: "direct" ,
});
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "guildchat" ,
to: "guildchat:123" ,
},
childSessionKey: "agent:codex:acp:child" ,
runId: "run-failure-terminal" ,
task: "Write the file" ,
status: "running" ,
deliveryStatus: "pending" ,
progressSummary:
"I am loading session context and checking helper availability before writing the file." ,
});
emitAgentEvent({
runId: "run-failure-terminal" ,
stream: "lifecycle" ,
data: {
phase: "error" ,
endedAt: 250 ,
error: "Permission denied by ACP runtime" ,
},
});
await flushAsyncWork();
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "guildchat" ,
to: "guildchat:123" ,
content:
"Background task failed: ACP background task (run run-fail). Permission denied by ACP runtime" ,
}),
);
expect(peekSystemEvents("agent:main:main" )).toEqual([]);
});
});
it("emits concise state-change updates without surfacing raw ACP chatter" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetSystemEventsForTest();
hoisted.sendMessageMock.mockResolvedValue({
channel: "guildchat" ,
to: "guildchat:123" ,
via: "direct" ,
});
vi.useFakeTimers();
createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "guildchat" ,
to: "guildchat:123" ,
},
childSessionKey: "agent:codex:acp:child" ,
runId: "run-state-stream" ,
task: "Create the file" ,
status: "running" ,
deliveryStatus: "pending" ,
notifyPolicy: "state_changes" ,
});
const relay = startAcpSpawnParentStreamRelay({
runId: "run-state-stream" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child" ,
agentId: "codex" ,
surfaceUpdates: false ,
streamFlushMs: 1 ,
noOutputNoticeMs: 1 _000 ,
noOutputPollMs: 250 ,
});
relay.notifyStarted();
await flushAsyncWork();
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content: "Background task update: ACP background task. Started." ,
}),
);
hoisted.sendMessageMock.mockClear();
vi.advanceTimersByTime(1 _500 );
await flushAsyncWork();
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
content:
"Background task update: ACP background task. No output for 1s. It may be waiting for input." ,
}),
);
expect(peekSystemEvents("agent:main:main" )).toEqual([]);
relay.dispose();
vi.useRealTimers();
});
});
it("cancels ACP-backed tasks through the ACP session manager" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const task = createTaskRecord({
runtime: "acp" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:codex:acp:child" ,
runId: "run-cancel-acp" ,
task: "Investigate issue" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const result = await cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(hoisted.cancelSessionMock).toHaveBeenCalledWith(
expect.objectContaining({
cfg: {},
sessionKey: "agent:codex:acp:child" ,
reason: "task-cancel" ,
}),
);
expect(result).toMatchObject({
found: true ,
cancelled: true ,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled" ,
error: "Cancelled by operator." ,
}),
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "notifychat" ,
to: "notifychat:123" ,
content: "Background task cancelled: ACP background task (run run-canc)." ,
}),
),
);
});
});
it("cancels subagent-backed tasks through subagent control" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
hoisted.killSubagentRunAdminMock.mockResolvedValue({
found: true ,
killed: true ,
});
const task = createTaskRecord({
runtime: "subagent" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:worker:subagent:child" ,
runId: "run-cancel-subagent" ,
task: "Investigate issue" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const result = await cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(hoisted.killSubagentRunAdminMock).toHaveBeenCalledWith(
expect.objectContaining({
cfg: {},
sessionKey: "agent:worker:subagent:child" ,
}),
);
expect(result).toMatchObject({
found: true ,
cancelled: true ,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled" ,
error: "Cancelled by operator." ,
}),
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "notifychat" ,
to: "notifychat:123" ,
content: "Background task cancelled: Subagent task (run run-canc)." ,
}),
),
);
});
});
it("cancels CLI-tracked tasks in the registry without ACP or subagent teardown" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
hoisted.cancelSessionMock.mockClear();
hoisted.killSubagentRunAdminMock.mockClear();
const task = createTaskRecord({
runtime: "cli" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
childSessionKey: "agent:main:main" ,
runId: "run-cancel-cli" ,
task: "Investigate issue" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const result = await cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(hoisted.cancelSessionMock).not.toHaveBeenCalled();
expect(hoisted.killSubagentRunAdminMock).not.toHaveBeenCalled();
expect(result).toMatchObject({
found: true ,
cancelled: true ,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled" ,
error: "Cancelled by operator." ,
}),
});
await waitForAssertion(() =>
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "notifychat" ,
to: "notifychat:123" ,
content: "Background task cancelled: Investigate issue (run run-canc)." ,
}),
),
);
});
});
it("cancels CLI-tracked tasks without childSessionKey" , async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
const task = createTaskRecord({
runtime: "cli" ,
ownerKey: "agent:main:main" ,
scopeKind: "session" ,
requesterOrigin: {
channel: "notifychat" ,
to: "notifychat:123" ,
},
runId: "run-cli-no-child" ,
task: "Legacy row" ,
status: "running" ,
deliveryStatus: "pending" ,
});
const result = await cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(result).toMatchObject({
found: true ,
cancelled: true ,
task: expect.objectContaining({
taskId: task.taskId,
status: "cancelled" ,
}),
});
});
});
});
Messung V0.5 in Prozent C=99 H=99 G=98
¤ Dauer der Verarbeitung: 0.23 Sekunden
(vorverarbeitet am 2026-06-10)
¤
*© Formatika GbR, Deutschland