import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest" ;
import { mergeMockedModule } from "../test-utils/vitest-module-mocks.js" ;
const enqueueSystemEventMock = vi.fn();
const requestHeartbeatNowMock = vi.fn();
const readAcpSessionEntryMock = vi.fn();
const resolveSessionFilePathMock = vi.fn();
const resolveSessionFilePathOptionsMock = vi.fn();
vi.mock("../infra/system-events.js" , () => ({
enqueueSystemEvent: (...args: unknown[]) => enqueueSystemEventMock(...args),
}));
vi.mock("../infra/heartbeat-wake.js" , async () => {
return await mergeMockedModule(
await vi.importActual<typeof import ("../infra/heartbeat-wake.js" )>(
"../infra/heartbeat-wake.js" ,
),
() => ({
requestHeartbeatNow: (...args: unknown[]) => requestHeartbeatNowMock(...args),
}),
);
});
vi.mock("../acp/runtime/session-meta.js" , async () => {
return await mergeMockedModule(
await vi.importActual<typeof import ("../acp/runtime/session-meta.js" )>(
"../acp/runtime/session-meta.js" ,
),
() => ({
readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args),
}),
);
});
vi.mock("../config/sessions/paths.js" , async () => {
return await mergeMockedModule(
await vi.importActual<typeof import ("../config/sessions/paths.js" )>(
"../config/sessions/paths.js" ,
),
() => ({
resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args),
resolveSessionFilePathOptions: (...args: unknown[]) =>
resolveSessionFilePathOptionsMock(...args),
}),
);
});
let emitAgentEvent: typeof import ("../infra/agent-events.js" ).emitAgentEvent;
let resolveAcpSpawnStreamLogPath: typeof import ("./acp-spawn-parent-stream.js" ).resolveAcpSpawnStreamLogPath;
let startAcpSpawnParentStreamRelay: typeof import ("./acp-spawn-parent-stream.js" ).startAcpSpawnParentStreamRelay;
function collectedTexts() {
return enqueueSystemEventMock.mock.calls.map((call) => String(call[0 ] ?? "" ));
}
describe("startAcpSpawnParentStreamRelay" , () => {
beforeAll(async () => {
({ emitAgentEvent } = await import ("../infra/agent-events.js" ));
({ resolveAcpSpawnStreamLogPath, startAcpSpawnParentStreamRelay } =
await import ("./acp-spawn-parent-stream.js" ));
});
beforeEach(() => {
enqueueSystemEventMock.mockClear();
requestHeartbeatNowMock.mockClear();
readAcpSessionEntryMock.mockReset();
resolveSessionFilePathMock.mockReset();
resolveSessionFilePathOptionsMock.mockReset();
resolveSessionFilePathOptionsMock.mockImplementation((value: unknown) => value);
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-04T01:00:00.000Z" ));
});
afterEach(() => {
vi.useRealTimers();
});
it("relays assistant progress and completion to the parent session" , () => {
const deliveryContext = {
channel: "forum" ,
to: "-1001234567890" ,
accountId: "default" ,
threadId: 1122 ,
};
const relay = startAcpSpawnParentStreamRelay({
runId: "run-1" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child-1" ,
agentId: "codex" ,
deliveryContext,
streamFlushMs: 10 ,
noOutputNoticeMs: 120 _000 ,
});
emitAgentEvent({
runId: "run-1" ,
stream: "assistant" ,
data: {
delta: "hello from child" ,
},
});
vi.advanceTimersByTime(15 );
emitAgentEvent({
runId: "run-1" ,
stream: "lifecycle" ,
data: {
phase: "end" ,
startedAt: 1 _000 ,
endedAt: 3 _100 ,
},
});
const texts = collectedTexts();
expect(texts.some((text) => text.includes("Started codex session" ))).toBe(true );
expect(texts.some((text) => text.includes("codex: hello from child" ))).toBe(true );
expect(texts.some((text) => text.includes("codex run completed in 2s" ))).toBe(true );
expect(
enqueueSystemEventMock.mock.calls.every(
(call) => (call[1 ] as { trusted?: boolean } | undefined)?.trusted === false ,
),
).toBe(true );
expect(enqueueSystemEventMock).toHaveBeenCalledWith(
expect.any(String),
expect.objectContaining({
sessionKey: "agent:main:main" ,
deliveryContext,
trusted: false ,
}),
);
expect(requestHeartbeatNowMock).toHaveBeenCalledWith(
expect.objectContaining({
reason: "acp:spawn:stream" ,
sessionKey: "agent:main:main" ,
}),
);
relay.dispose();
});
it("emits a no-output notice and a resumed notice when output returns" , () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-2" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child-2" ,
agentId: "codex" ,
streamFlushMs: 1 ,
noOutputNoticeMs: 1 _000 ,
noOutputPollMs: 250 ,
});
vi.advanceTimersByTime(1 _500 );
expect(collectedTexts().some((text) => text.includes("has produced no output for 1s" ))).toBe(
true ,
);
emitAgentEvent({
runId: "run-2" ,
stream: "assistant" ,
data: {
delta: "resumed output" ,
},
});
vi.advanceTimersByTime(5 );
const texts = collectedTexts();
expect(texts.some((text) => text.includes("resumed output." ))).toBe(true );
expect(texts.some((text) => text.includes("codex: resumed output" ))).toBe(true );
emitAgentEvent({
runId: "run-2" ,
stream: "lifecycle" ,
data: {
phase: "error" ,
error: "boom" ,
},
});
expect(collectedTexts().some((text) => text.includes("run failed: boom" ))).toBe(true );
relay.dispose();
});
it("auto-disposes stale relays after max lifetime timeout" , () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-3" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child-3" ,
agentId: "codex" ,
streamFlushMs: 1 ,
noOutputNoticeMs: 0 ,
maxRelayLifetimeMs: 1 _000 ,
});
vi.advanceTimersByTime(1 _001 );
expect(collectedTexts().some((text) => text.includes("stream relay timed out after 1s" ))).toBe(
true ,
);
const before = enqueueSystemEventMock.mock.calls.length;
emitAgentEvent({
runId: "run-3" ,
stream: "assistant" ,
data: {
delta: "late output" ,
},
});
vi.advanceTimersByTime(5 );
expect(enqueueSystemEventMock.mock.calls).toHaveLength(before);
relay.dispose();
});
it("supports delayed start notices" , () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-4" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child-4" ,
agentId: "codex" ,
emitStartNotice: false ,
});
expect(collectedTexts().some((text) => text.includes("Started codex session" ))).toBe(false );
relay.notifyStarted();
expect(collectedTexts().some((text) => text.includes("Started codex session" ))).toBe(true );
relay.dispose();
});
it("can keep background relays out of the parent session while still logging" , () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-quiet" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child-quiet" ,
agentId: "codex" ,
surfaceUpdates: false ,
streamFlushMs: 10 ,
noOutputNoticeMs: 120 _000 ,
});
relay.notifyStarted();
emitAgentEvent({
runId: "run-quiet" ,
stream: "assistant" ,
data: {
delta: "hello from child" ,
},
});
vi.advanceTimersByTime(15 );
emitAgentEvent({
runId: "run-quiet" ,
stream: "lifecycle" ,
data: {
phase: "end" ,
},
});
expect(collectedTexts()).toEqual([]);
expect(requestHeartbeatNowMock).not.toHaveBeenCalled();
relay.dispose();
});
it("preserves delta whitespace boundaries in progress relays" , () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-5" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child-5" ,
agentId: "codex" ,
streamFlushMs: 10 ,
noOutputNoticeMs: 120 _000 ,
});
emitAgentEvent({
runId: "run-5" ,
stream: "assistant" ,
data: {
delta: "hello" ,
},
});
emitAgentEvent({
runId: "run-5" ,
stream: "assistant" ,
data: {
delta: " world" ,
},
});
vi.advanceTimersByTime(15 );
const texts = collectedTexts();
expect(texts.some((text) => text.includes("codex: hello world" ))).toBe(true );
relay.dispose();
});
it("suppresses commentary-phase assistant relay text" , () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-commentary" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child-commentary" ,
agentId: "codex" ,
streamFlushMs: 10 ,
noOutputNoticeMs: 120 _000 ,
});
emitAgentEvent({
runId: "run-commentary" ,
stream: "assistant" ,
data: {
delta: "checking thread context; then post a tight progress reply here." ,
phase: "commentary" ,
},
});
vi.advanceTimersByTime(15 );
const texts = collectedTexts();
expect(texts.some((text) => text.includes("checking thread context" ))).toBe(false );
expect(texts.some((text) => text.includes("post a tight progress reply here" ))).toBe(false );
relay.dispose();
});
it("still relays final_answer assistant text after suppressed commentary" , () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-final" ,
parentSessionKey: "agent:main:main" ,
childSessionKey: "agent:codex:acp:child-final" ,
agentId: "codex" ,
streamFlushMs: 10 ,
noOutputNoticeMs: 120 _000 ,
});
emitAgentEvent({
runId: "run-final" ,
stream: "assistant" ,
data: {
delta: "checking thread context; then post a tight progress reply here." ,
phase: "commentary" ,
},
});
emitAgentEvent({
runId: "run-final" ,
stream: "assistant" ,
data: {
delta: "final answer ready" ,
phase: "final_answer" ,
},
});
vi.advanceTimersByTime(15 );
const texts = collectedTexts();
expect(texts.some((text) => text.includes("checking thread context" ))).toBe(false );
expect(texts.some((text) => text.includes("codex: final answer ready" ))).toBe(true );
relay.dispose();
});
it("resolves ACP spawn stream log path from session metadata" , () => {
readAcpSessionEntryMock.mockReturnValue({
storePath: "/tmp/openclaw/agents/codex/sessions/sessions.json" ,
entry: {
sessionId: "sess-123" ,
sessionFile: "/tmp/openclaw/agents/codex/sessions/sess-123.jsonl" ,
},
});
resolveSessionFilePathMock.mockReturnValue(
"/tmp/openclaw/agents/codex/sessions/sess-123.jsonl" ,
);
const resolved = resolveAcpSpawnStreamLogPath({
childSessionKey: "agent:codex:acp:child-1" ,
});
expect(resolved).toBe("/tmp/openclaw/agents/codex/sessions/sess-123.acp-stream.jsonl" );
expect(readAcpSessionEntryMock).toHaveBeenCalledWith({
sessionKey: "agent:codex:acp:child-1" ,
});
expect(resolveSessionFilePathMock).toHaveBeenCalledWith(
"sess-123" ,
expect.objectContaining({
sessionId: "sess-123" ,
}),
expect.objectContaining({
storePath: "/tmp/openclaw/agents/codex/sessions/sessions.json" ,
}),
);
});
});
Messung V0.5 in Prozent C=98 H=98 G=97
¤ Dauer der Verarbeitung: 0.9 Sekunden
(vorverarbeitet am 2026-06-10)
¤
*© Formatika GbR, Deutschland