import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" ;
import { LiveSessionModelSwitchError } from "../../agents/live-model-switch-error.js" ;
import type { SessionEntry } from "../../config/sessions.js" ;
import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js" ;
import type { TemplateContext } from "../templating.js" ;
import type { GetReplyOptions, ReplyPayload } from "../types.js" ;
import { MAX_LIVE_SWITCH_RETRIES } from "./agent-runner-execution.js" ;
import type { FollowupRun } from "./queue.js" ;
import type { ReplyOperation } from "./reply-run-registry.js" ;
import type { TypingSignaler } from "./typing-mode.js" ;
const state = vi.hoisted(() => ({
runEmbeddedPiAgentMock: vi.fn(),
runCliAgentMock: vi.fn(),
runWithModelFallbackMock: vi.fn(),
isCliProviderMock: vi.fn((_: unknown) => false ),
isInternalMessageChannelMock: vi.fn((_: unknown) => false ),
createBlockReplyDeliveryHandlerMock: vi.fn(),
}));
vi.mock("../../agents/pi-embedded.js" , () => ({
runEmbeddedPiAgent: (params: unknown) => state.runEmbeddedPiAgentMock(params),
}));
vi.mock("../../agents/cli-runner.js" , () => ({
runCliAgent: (params: unknown) => state.runCliAgentMock(params),
}));
vi.mock("../../agents/model-fallback.js" , () => ({
runWithModelFallback: (params: unknown) => state.runWithModelFallbackMock(params),
isFallbackSummaryError: (err: unknown) =>
err instanceof Error &&
err.name === "FallbackSummaryError" &&
Array.isArray((err as { attempts?: unknown[] }).attempts),
}));
vi.mock("../../agents/model-selection.js" , async () => {
const actual = await vi.importActual<typeof import ("../../agents/model-selection.js" )>(
"../../agents/model-selection.js" ,
);
return {
...actual,
isCliProvider: (provider: unknown) => state.isCliProviderMock(provider),
};
});
vi.mock("../../agents/bootstrap-budget.js" , () => ({
resolveBootstrapWarningSignaturesSeen: () => [],
}));
vi.mock("../../agents/pi-embedded-helpers.js" , () => ({
BILLING_ERROR_USER_MESSAGE: "billing" ,
formatRateLimitOrOverloadedErrorCopy: (message: string) => {
if (/model\s+(?:is\s+)?at capacity/i.test(message)) {
return "⚠️ Selected model is at capacity. Try a different model, or wait and retry." ;
}
if (/rate.limit|too many requests|429 /i.test(message)) {
return "⚠️ API rate limit reached. Please try again later." ;
}
if (/overloaded/i.test(message)) {
return "The AI service is temporarily overloaded. Please try again in a moment." ;
}
return undefined;
},
isCompactionFailureError: () => false ,
isContextOverflowError: () => false ,
isBillingErrorMessage: () => false ,
isLikelyContextOverflowError: () => false ,
isOverloadedErrorMessage: (message: string) => /overloaded|capacity/i.test(message),
isRateLimitErrorMessage: () => false ,
isTransientHttpError: () => false ,
sanitizeUserFacingText: (text?: string) => text ?? "" ,
}));
vi.mock("../../config/sessions.js" , () => ({
resolveGroupSessionKey: vi.fn(() => null ),
resolveSessionTranscriptPath: vi.fn(),
updateSessionStore: vi.fn(),
}));
vi.mock("../../globals.js" , () => ({
logVerbose: vi.fn(),
}));
vi.mock("../../infra/agent-events.js" , async () => {
const actual = await vi.importActual<typeof import ("../../infra/agent-events.js" )>(
"../../infra/agent-events.js" ,
);
return {
...actual,
emitAgentEvent: vi.fn(),
registerAgentRunContext: vi.fn(),
};
});
vi.mock("../../runtime.js" , () => ({
defaultRuntime: {
error: vi.fn(),
},
}));
vi.mock("../../utils/message-channel.js" , () => ({
isMarkdownCapableMessageChannel: () => true ,
resolveMessageChannel: () => "whatsapp" ,
isInternalMessageChannel: (value: unknown) => state.isInternalMessageChannelMock(value),
}));
vi.mock("../heartbeat.js" , () => ({
stripHeartbeatToken: (text: string) => ({
text,
didStrip: false ,
shouldSkip: false ,
}),
}));
vi.mock("./agent-runner-utils.js" , () => ({
buildEmbeddedRunExecutionParams: (params: {
provider: string;
model: string;
run: { provider?: string; authProfileId?: string; authProfileIdSource?: "auto" | "user" };
}) => ({
embeddedContext: {},
senderContext: {},
runBaseParams: {
provider: params.provider,
model: params.model,
authProfileId: params.provider === params.run.provider ? params.run.authProfileId : undefined,
authProfileIdSource:
params.provider === params.run.provider ? params.run.authProfileIdSource : undefined,
},
}),
resolveQueuedReplyRuntimeConfig: <T>(config: T) => config,
resolveModelFallbackOptions: vi.fn(() => ({})),
}));
vi.mock("./reply-delivery.js" , () => ({
createBlockReplyDeliveryHandler: (params: unknown) =>
state.createBlockReplyDeliveryHandlerMock(params),
}));
vi.mock("./reply-media-paths.runtime.js" , () => ({
createReplyMediaContext: () => ({
normalizePayload: (payload: unknown) => payload,
}),
createReplyMediaPathNormalizer: () => (payload: unknown) => payload,
}));
async function getRunAgentTurnWithFallback() {
return (await import ("./agent-runner-execution.js" )).runAgentTurnWithFallback;
}
async function getApplyFallbackCandidateSelectionToEntry() {
return (await import ("./agent-runner-execution.js" )).applyFallbackCandidateSelectionToEntry;
}
type FallbackRunnerParams = {
run: (provider: string, model: string) => Promise<unknown>;
classifyResult?: (params: {
result: { payloads?: Array<{ text?: string; isError?: boolean ; isReasoning?: boolean }> };
provider: string;
model: string;
attempt: number;
total: number;
}) => Promise<unknown>;
};
type EmbeddedAgentParams = {
onBlockReply?: (payload: { text?: string; mediaUrls?: string[] }) => Promise<void > | void ;
onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => Promise<void > | void ;
onItemEvent?: (payload: {
itemId?: string;
kind?: string;
title?: string;
name?: string;
phase?: string;
status?: string;
summary?: string;
progressText?: string;
approvalId?: string;
approvalSlug?: string;
}) => Promise<void > | void ;
onAgentEvent?: (payload: {
stream: string;
data: Record<string, unknown>;
}) => Promise<void > | void ;
};
function createMockTypingSignaler(): TypingSignaler {
return {
mode: "message" ,
shouldStartImmediately: false ,
shouldStartOnMessageStart: true ,
shouldStartOnText: true ,
shouldStartOnReasoning: false ,
signalRunStart: vi.fn(async () => {}),
signalMessageStart: vi.fn(async () => {}),
signalTextDelta: vi.fn(async () => {}),
signalReasoningDelta: vi.fn(async () => {}),
signalToolStart: vi.fn(async () => {}),
};
}
function createFollowupRun(): FollowupRun {
return {
prompt: "hello" ,
summaryLine: "hello" ,
enqueuedAt: Date.now(),
run: {
agentId: "agent" ,
agentDir: "/tmp/agent" ,
sessionId: "session" ,
sessionKey: "main" ,
messageProvider: "whatsapp" ,
sessionFile: "/tmp/session.jsonl" ,
workspaceDir: "/tmp" ,
config: {},
skillsSnapshot: {},
provider: "anthropic" ,
model: "claude" ,
thinkLevel: "low" ,
verboseLevel: "off" ,
elevatedLevel: "off" ,
bashElevated: {
enabled: false ,
allowed: false ,
defaultLevel: "off" ,
},
timeoutMs: 1 _000 ,
blockReplyBreak: "message_end" ,
},
} as unknown as FollowupRun;
}
function createMockReplyOperation(): {
replyOperation: ReplyOperation;
failMock: ReturnType<typeof vi.fn>;
} {
const failMock = vi.fn();
return {
failMock,
replyOperation: {
key: "main" ,
sessionId: "session" ,
abortSignal: new AbortController().signal,
resetTriggered: false ,
phase: "running" ,
result: null ,
setPhase: vi.fn(),
updateSessionId: vi.fn(),
attachBackend: vi.fn(),
detachBackend: vi.fn(),
complete: vi.fn(),
fail: failMock,
abortByUser: vi.fn(),
abortForRestart: vi.fn(),
},
};
}
function createMinimalRunAgentTurnParams(overrides?: {
followupRun?: FollowupRun;
opts?: GetReplyOptions;
}) {
return {
commandBody: "fix it" ,
followupRun: overrides?.followupRun ?? createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: overrides?.opts ?? ({} satisfies GetReplyOptions),
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" as const ,
applyReplyToMode: (payload: ReplyPayload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set<Promise<void >>(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" as const ,
};
}
describe("runAgentTurnWithFallback" , () => {
beforeEach(() => {
state.runEmbeddedPiAgentMock.mockReset();
state.runCliAgentMock.mockReset();
state.runWithModelFallbackMock.mockReset();
state.isCliProviderMock.mockReset();
state.isCliProviderMock.mockReturnValue(false );
state.isInternalMessageChannelMock.mockReset();
state.isInternalMessageChannelMock.mockReturnValue(false );
state.createBlockReplyDeliveryHandlerMock.mockReset();
state.createBlockReplyDeliveryHandlerMock.mockReturnValue(undefined);
state.runWithModelFallbackMock.mockImplementation(async (params: FallbackRunnerParams) => ({
result: await params.run("anthropic" , "claude" ),
provider: "anthropic" ,
model: "claude" ,
attempts: [],
}));
});
afterEach(() => {
vi.clearAllMocks();
});
it("forwards the static extra system prompt to CLI backends" , async () => {
state.isCliProviderMock.mockReturnValue(true );
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => ({
result: await params.run("codex-cli" , "gpt-5.4" ),
provider: "codex-cli" ,
model: "gpt-5.4" ,
attempts: [],
}));
state.runCliAgentMock.mockResolvedValueOnce({
payloads: [{ text: "final" }],
meta: {},
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const followupRun = createFollowupRun();
followupRun.run.provider = "codex-cli" ;
followupRun.run.model = "gpt-5.4" ;
followupRun.run.extraSystemPrompt = "dynamic inbound metadata\n\nstable group prompt" ;
followupRun.run.extraSystemPromptStatic = "stable group prompt" ;
followupRun.originatingChannel = "telegram" ;
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(state.runCliAgentMock).toHaveBeenCalledWith(
expect.objectContaining({
extraSystemPrompt: "dynamic inbound metadata\n\nstable group prompt" ,
extraSystemPromptStatic: "stable group prompt" ,
trigger: "user" ,
messageChannel: "telegram" ,
messageProvider: "telegram" ,
}),
);
});
it("resolves CLI messageProvider from the live session surface when no origin channel is set" , async () => {
state.isCliProviderMock.mockReturnValue(true );
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => ({
result: await params.run("codex-cli" , "gpt-5.4" ),
provider: "codex-cli" ,
model: "gpt-5.4" ,
attempts: [],
}));
state.runCliAgentMock.mockResolvedValueOnce({
payloads: [{ text: "final" }],
meta: {},
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const followupRun = createFollowupRun();
followupRun.run.provider = "codex-cli" ;
followupRun.run.model = "gpt-5.4" ;
followupRun.run.messageProvider = "stale-provider" ;
await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "discord" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(state.runCliAgentMock).toHaveBeenCalledWith(
expect.objectContaining({
messageChannel: undefined,
messageProvider: "discord" ,
}),
);
});
it("forwards media-only tool results without typing text" , async () => {
const onToolResult = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onToolResult?.({ mediaUrls: ["/tmp/generated.png" ] });
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void >>();
const typingSignals = createMockTypingSignaler();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {
onToolResult,
} satisfies GetReplyOptions,
typingSignals,
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
await Promise.all(pendingToolTasks);
expect(result.kind).toBe("success" );
expect(typingSignals.signalTextDelta).not.toHaveBeenCalled();
expect(onToolResult).toHaveBeenCalledTimes(1 );
expect(onToolResult.mock.calls[0 ]?.[0 ]).toMatchObject({
mediaUrls: ["/tmp/generated.png" ],
});
expect(onToolResult.mock.calls[0 ]?.[0 ]?.text).toBeUndefined();
});
it("surfaces model capacity errors from no-text mid-turn failures" , async () => {
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "thinking" , isReasoning: true }],
meta: {
error: {
kind: "server_overloaded" ,
message: "Selected model is at capacity. Please try a different model." ,
},
},
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
if (result.kind === "success" ) {
expect(result.runResult.payloads).toEqual([
{
text: "⚠️ Selected model is at capacity. Try a different model, or wait and retry." ,
isError: true ,
},
]);
}
});
it("surfaces model capacity errors from pre-reply CLI failures" , async () => {
state.runWithModelFallbackMock.mockRejectedValueOnce(
new Error("Selected model is at capacity. Please try a different model." ),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const followupRun = createFollowupRun();
followupRun.run.provider = "openai-codex" ;
followupRun.run.model = "gpt-5.5" ;
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result).toEqual({
kind: "final" ,
payload: {
text: "⚠️ Selected model is at capacity. Try a different model, or wait and retry." ,
},
});
});
it("classifies GPT-5 plan-only terminal results as fallback-eligible" , async () => {
const followupRun = createFollowupRun();
followupRun.run.provider = "openai-codex" ;
followupRun.run.model = "gpt-5.4" ;
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [
{
text: "agent stopped after repeated plan-only turns without taking a concrete action." ,
isError: true ,
},
],
meta: {},
});
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => {
const first = (await params.run("openai-codex" , "gpt-5.4" )) as {
payloads?: Array<{ text?: string; isError?: boolean ; isReasoning?: boolean }>;
};
expect(
await params.classifyResult?.({
result: first,
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempt: 1 ,
total: 2 ,
}),
).toMatchObject({
reason: "format" ,
code: "planning_only_result" ,
});
return {
result: { payloads: [{ text: "fallback ok" }], meta: {} },
provider: "anthropic" ,
model: "claude" ,
attempts: [
{
provider: "openai-codex" ,
model: "gpt-5.4" ,
error: "planning-only" ,
reason: "format" ,
},
],
};
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback(createMinimalRunAgentTurnParams({ followupRun }));
expect(result.kind).toBe("success" );
if (result.kind === "success" ) {
expect(result.runResult.payloads?.[0 ]?.text).toBe("fallback ok" );
expect(result.fallbackProvider).toBe("anthropic" );
expect(result.fallbackAttempts[0 ]?.reason).toBe("format" );
}
});
it("does not classify silent NO_REPLY terminal results for fallback" , async () => {
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => {
const result = { payloads: [{ text: "NO_REPLY" }], meta: {} };
expect(
await params.classifyResult?.({
result,
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempt: 1 ,
total: 2 ,
}),
).toBeNull();
return {
result,
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempts: [],
};
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback(createMinimalRunAgentTurnParams());
expect(result.kind).toBe("success" );
});
it("does not classify empty final payloads after block replies were sent" , async () => {
const followupRun = createFollowupRun();
followupRun.run.provider = "openai-codex" ;
followupRun.run.model = "gpt-5.4" ;
state.createBlockReplyDeliveryHandlerMock.mockImplementationOnce(
(params: { directlySentBlockKeys?: Set<string> }) => async () => {
params.directlySentBlockKeys?.add("block:1" );
},
);
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onBlockReply?.({ text: "streamed block" });
return { payloads: [], meta: {} };
});
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => {
const result = (await params.run("openai-codex" , "gpt-5.4" )) as {
payloads?: Array<{ text?: string; isError?: boolean ; isReasoning?: boolean }>;
};
expect(
await params.classifyResult?.({
result,
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempt: 1 ,
total: 2 ,
}),
).toBeNull();
return {
result,
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempts: [],
};
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback(
createMinimalRunAgentTurnParams({
followupRun,
opts: { onBlockReply: vi.fn() } satisfies GetReplyOptions,
}),
);
expect(result.kind).toBe("success" );
});
it("does not classify empty final payloads while block replies are buffered" , async () => {
const followupRun = createFollowupRun();
followupRun.run.provider = "openai-codex" ;
followupRun.run.model = "gpt-5.4" ;
const blockReplyPipeline = {
enqueue: vi.fn(),
flush: vi.fn(async () => {}),
stop: vi.fn(),
hasBuffered: vi.fn(() => true ),
didStream: vi.fn(() => false ),
isAborted: vi.fn(() => false ),
hasSentPayload: vi.fn(() => false ),
getSentMediaUrls: vi.fn(() => []),
};
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => {
const result = { payloads: [], meta: {} };
expect(
await params.classifyResult?.({
result,
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempt: 1 ,
total: 2 ,
}),
).toBeNull();
return {
result,
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempts: [],
};
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
...createMinimalRunAgentTurnParams({ followupRun }),
blockReplyPipeline,
blockStreamingEnabled: true ,
opts: { onBlockReply: vi.fn() } satisfies GetReplyOptions,
});
expect(result.kind).toBe("success" );
});
it("classifies final GPT-5 terminal-empty results instead of silently succeeding" , async () => {
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => {
const result = { payloads: [], meta: {} };
expect(
await params.classifyResult?.({
result,
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempt: 1 ,
total: 1 ,
}),
).toMatchObject({
reason: "format" ,
code: "empty_result" ,
});
return {
result,
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempts: [],
};
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback(createMinimalRunAgentTurnParams());
expect(result.kind).toBe("success" );
});
it("rolls back persisted fallback selection when result classification rejects a candidate" , async () => {
const followupRun = createFollowupRun();
followupRun.run.provider = "anthropic" ;
followupRun.run.model = "claude" ;
const sessionEntry: SessionEntry = {
sessionId: "session" ,
updatedAt: Date.now(),
totalTokens: 1 ,
compactionCount: 0 ,
};
const activeSessionStore = { main: sessionEntry };
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({ payloads: [], meta: {} });
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => {
const failedResult = await params.run("openai-codex" , "gpt-5.4" );
expect(sessionEntry.providerOverride).toBe("openai-codex" );
expect(sessionEntry.modelOverride).toBe("gpt-5.4" );
expect(
await params.classifyResult?.({
result: failedResult as { payloads?: [] },
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempt: 1 ,
total: 2 ,
}),
).toMatchObject({
code: "empty_result" ,
});
expect(sessionEntry.providerOverride).toBeUndefined();
expect(sessionEntry.modelOverride).toBeUndefined();
return {
result: { payloads: [{ text: "fallback ok" }], meta: {} },
provider: "anthropic" ,
model: "claude" ,
attempts: [],
};
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
...createMinimalRunAgentTurnParams({ followupRun }),
activeSessionStore,
getActiveSessionEntry: () => sessionEntry,
});
expect(result.kind).toBe("success" );
expect(sessionEntry.providerOverride).toBeUndefined();
expect(sessionEntry.modelOverride).toBeUndefined();
});
it("strips a glued leading NO_REPLY token from streamed tool results" , async () => {
const onToolResult = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onToolResult?.({ text: "NO_REPLYThe user is saying hello" });
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void >>();
const typingSignals = createMockTypingSignaler();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {
onToolResult,
} satisfies GetReplyOptions,
typingSignals,
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
await Promise.all(pendingToolTasks);
expect(result.kind).toBe("success" );
expect(typingSignals.signalTextDelta).toHaveBeenCalledWith("The user is saying hello" );
expect(onToolResult).toHaveBeenCalledWith({ text: "The user is saying hello" });
});
it("continues delivering later streamed tool results after an earlier delivery failure" , async () => {
const delivered: string[] = [];
const onToolResult = vi.fn(async (payload: { text?: string }) => {
if (payload.text === "first" ) {
throw new Error("simulated delivery failure" );
}
delivered.push(payload.text ?? "" );
});
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
void params.onToolResult?.({ text: "first" , mediaUrls: [] });
void params.onToolResult?.({ text: "second" , mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void >>();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: { onToolResult } satisfies GetReplyOptions,
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
await Promise.all(pendingToolTasks);
expect(result.kind).toBe("success" );
expect(onToolResult).toHaveBeenCalledTimes(2 );
expect(delivered).toEqual(["second" ]);
});
it("delivers streamed tool results in callback order even when dispatch latency differs" , async () => {
const deliveryOrder: string[] = [];
const onToolResult = vi.fn(async (payload: { text?: string }) => {
const delay = payload.text === "first" ? 5 : 1 ;
await new Promise((resolve) => setTimeout(resolve, delay));
deliveryOrder.push(payload.text ?? "" );
});
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
void params.onToolResult?.({ text: "first" , mediaUrls: [] });
void params.onToolResult?.({ text: "second" , mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void >>();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: { onToolResult } satisfies GetReplyOptions,
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
await Promise.all(pendingToolTasks);
expect(result.kind).toBe("success" );
expect(onToolResult).toHaveBeenCalledTimes(2 );
expect(deliveryOrder).toEqual(["first" , "second" ]);
});
it("forwards item lifecycle events to reply options" , async () => {
const onItemEvent = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "item" ,
data: {
itemId: "tool:read-1" ,
kind: "tool" ,
title: "read" ,
name: "read" ,
phase: "start" ,
status: "running" ,
},
});
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void >>();
const typingSignals = createMockTypingSignaler();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {
onItemEvent,
} satisfies GetReplyOptions,
typingSignals,
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
await Promise.all(pendingToolTasks);
expect(result.kind).toBe("success" );
expect(onItemEvent).toHaveBeenCalledWith({
itemId: "tool:read-1" ,
kind: "tool" ,
title: "read" ,
name: "read" ,
phase: "start" ,
status: "running" ,
});
});
it("publishes Codex app-server telemetry to agent event subscribers" , async () => {
const agentEvents = await import ("../../infra/agent-events.js" );
const emitAgentEvent = vi.mocked(agentEvents.emitAgentEvent);
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "codex_app_server.guardian" ,
data: {
phase: "blocked" ,
message: "command requires approval" ,
},
});
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: { runId: "run-codex" } as GetReplyOptions,
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(emitAgentEvent).toHaveBeenCalledWith({
runId: "run-codex" ,
stream: "codex_app_server.guardian" ,
data: {
phase: "blocked" ,
message: "command requires approval" ,
},
});
});
it("trims chatty GPT ack-turn final prose" , async () => {
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => ({
result: await params.run("openai" , "gpt-5.4" ),
provider: "openai" ,
model: "gpt-5.4" ,
attempts: [],
}));
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
payloads: [
{
text: [
"I updated the prompt overlay and tightened the runtime guard." ,
"I also added the ack-turn fast path so short approvals skip the recap." ,
"The reply-side brevity cap now trims long prose-heavy GPT confirmations." ,
"I updated tests for the overlay, retry guard, and reply normalization." ,
"Everything is wired together and ready for verification." ,
].join(" " ),
},
],
meta: {},
}));
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const followupRun = createFollowupRun();
followupRun.run.provider = "openai" ;
followupRun.run.model = "gpt-5.4" ;
const result = await runAgentTurnWithFallback({
commandBody: "ok do it" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
if (result.kind === "success" ) {
expect(result.runResult.payloads?.[0 ]?.text).toBe(
"I updated the prompt overlay and tightened the runtime guard. I also added the ack-turn fast path so short approvals skip the recap. The reply-side brevity cap now trims long prose-heavy GPT confirmations..." ,
);
}
});
it("does not trim GPT replies when the user asked for depth" , async () => {
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => ({
result: await params.run("openai" , "gpt-5.4" ),
provider: "openai" ,
model: "gpt-5.4" ,
attempts: [],
}));
const longDetailedReply = [
"Here is the detailed breakdown." ,
"First, the runner now detects short approval turns and skips the recap path." ,
"Second, the reply layer scores long prose-heavy GPT confirmations and trims them only in chat-style turns." ,
"Third, code fences and richer structured outputs are left untouched so technical answers stay intact." ,
"Finally, the overlay reinforces that this is a live chat and nudges the model toward short natural replies." ,
].join(" " );
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({
payloads: [{ text: longDetailedReply }],
meta: {},
}));
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const followupRun = createFollowupRun();
followupRun.run.provider = "openai" ;
followupRun.run.model = "gpt-5.4" ;
const result = await runAgentTurnWithFallback({
commandBody: "explain in detail what changed" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
if (result.kind === "success" ) {
expect(result.runResult.payloads?.[0 ]?.text).toBe(longDetailedReply);
}
});
it("forwards plan, approval, command output, and patch events" , async () => {
const onPlanUpdate = vi.fn();
const onApprovalEvent = vi.fn();
const onCommandOutput = vi.fn();
const onPatchSummary = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "plan" ,
data: {
phase: "update" ,
title: "Assistant proposed a plan" ,
explanation: "Inspect code, patch it, run tests." ,
steps: ["Inspect code" , "Patch code" , "Run tests" ],
},
});
await params.onAgentEvent?.({
stream: "approval" ,
data: {
phase: "requested" ,
kind: "exec" ,
status: "pending" ,
title: "Command approval requested" ,
approvalId: "approval-1" ,
},
});
await params.onAgentEvent?.({
stream: "command_output" ,
data: {
itemId: "command:exec-1" ,
phase: "delta" ,
title: "command ls" ,
toolCallId: "exec-1" ,
output: "README.md" ,
},
});
await params.onAgentEvent?.({
stream: "patch" ,
data: {
itemId: "patch:patch-1" ,
phase: "end" ,
title: "apply patch" ,
toolCallId: "patch-1" ,
added: ["a.ts" ],
modified: ["b.ts" ],
deleted: [],
summary: "1 added, 1 modified" ,
},
});
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void >>();
await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {
onPlanUpdate,
onApprovalEvent,
onCommandOutput,
onPatchSummary,
} satisfies GetReplyOptions,
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(onPlanUpdate).toHaveBeenCalledWith({
phase: "update" ,
title: "Assistant proposed a plan" ,
explanation: "Inspect code, patch it, run tests." ,
steps: ["Inspect code" , "Patch code" , "Run tests" ],
source: undefined,
});
expect(onApprovalEvent).toHaveBeenCalledWith({
phase: "requested" ,
kind: "exec" ,
status: "pending" ,
title: "Command approval requested" ,
itemId: undefined,
toolCallId: undefined,
approvalId: "approval-1" ,
approvalSlug: undefined,
command: undefined,
host: undefined,
reason: undefined,
scope: undefined,
message: undefined,
});
expect(onCommandOutput).toHaveBeenCalledWith({
itemId: "command:exec-1" ,
phase: "delta" ,
title: "command ls" ,
toolCallId: "exec-1" ,
name: undefined,
output: "README.md" ,
status: undefined,
exitCode: undefined,
durationMs: undefined,
cwd: undefined,
});
expect(onPatchSummary).toHaveBeenCalledWith({
itemId: "patch:patch-1" ,
phase: "end" ,
title: "apply patch" ,
toolCallId: "patch-1" ,
name: undefined,
added: ["a.ts" ],
modified: ["b.ts" ],
deleted: [],
summary: "1 added, 1 modified" ,
});
});
it("keeps compaction start notices silent by default" , async () => {
const onBlockReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({ stream: "compaction" , data: { phase: "start" } });
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: { onBlockReply },
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(onBlockReply).not.toHaveBeenCalled();
});
it("keeps compaction callbacks active when notices are silent by default" , async () => {
const onBlockReply = vi.fn();
const onCompactionStart = vi.fn();
const onCompactionEnd = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({ stream: "compaction" , data: { phase: "start" } });
await params.onAgentEvent?.({
stream: "compaction" ,
data: { phase: "end" , completed: true },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {
onBlockReply,
onCompactionStart,
onCompactionEnd,
},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(onCompactionStart).toHaveBeenCalledTimes(1 );
expect(onCompactionEnd).toHaveBeenCalledTimes(1 );
expect(onBlockReply).not.toHaveBeenCalled();
});
it("emits a compaction start notice when notifyUser is enabled" , async () => {
const onBlockReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({ stream: "compaction" , data: { phase: "start" } });
return { payloads: [{ text: "final" }], meta: {} };
});
const followupRun = createFollowupRun();
followupRun.run.config = {
agents: {
defaults: {
compaction: {
notifyUser: true ,
},
},
},
};
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: { onBlockReply },
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(onBlockReply).toHaveBeenCalledTimes(1 );
expect(onBlockReply).toHaveBeenCalledWith(
expect.objectContaining({
text: " Compacting context..." ,
replyToId: "msg" ,
replyToCurrent: true ,
isCompactionNotice: true ,
}),
);
});
it("emits a compaction completion notice when notifyUser is enabled" , async () => {
const onBlockReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({ stream: "compaction" , data: { phase: "start" } });
await params.onAgentEvent?.({
stream: "compaction" ,
data: { phase: "end" , completed: true },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const followupRun = createFollowupRun();
followupRun.run.config = {
agents: {
defaults: {
compaction: {
notifyUser: true ,
},
},
},
};
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: { onBlockReply },
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(onBlockReply).toHaveBeenNthCalledWith(
1 ,
expect.objectContaining({
text: " Compacting context..." ,
replyToId: "msg" ,
replyToCurrent: true ,
isCompactionNotice: true ,
}),
);
expect(onBlockReply).toHaveBeenNthCalledWith(
2 ,
expect.objectContaining({
text: " Compaction complete" ,
replyToId: "msg" ,
replyToCurrent: true ,
isCompactionNotice: true ,
}),
);
});
it("prefers onCompactionEnd callback over default notice when notifyUser is enabled" , async () => {
const onBlockReply = vi.fn();
const onCompactionEnd = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({ stream: "compaction" , data: { phase: "start" } });
await params.onAgentEvent?.({
stream: "compaction" ,
data: { phase: "end" , completed: true },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const followupRun = createFollowupRun();
followupRun.run.config = {
agents: {
defaults: {
compaction: {
notifyUser: true ,
},
},
},
};
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: { onBlockReply, onCompactionEnd },
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(onCompactionEnd).toHaveBeenCalledTimes(1 );
// The start notice still fires (no onCompactionStart callback provided),
// but the completion notice is suppressed in favor of the callback.
expect(onBlockReply).toHaveBeenCalledTimes(1 );
expect(onBlockReply).toHaveBeenCalledWith(
expect.objectContaining({
text: " Compacting context..." ,
isCompactionNotice: true ,
}),
);
});
it("emits an incomplete compaction notice when compaction ends without completing" , async () => {
const onBlockReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({ stream: "compaction" , data: { phase: "start" } });
await params.onAgentEvent?.({
stream: "compaction" ,
data: { phase: "end" , completed: false },
});
return { payloads: [{ text: "final" }], meta: {} };
});
const followupRun = createFollowupRun();
followupRun.run.config = {
agents: {
defaults: {
compaction: {
notifyUser: true ,
},
},
},
};
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: { onBlockReply },
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(onBlockReply).toHaveBeenNthCalledWith(
1 ,
expect.objectContaining({
text: " Compacting context..." ,
isCompactionNotice: true ,
}),
);
expect(onBlockReply).toHaveBeenNthCalledWith(
2 ,
expect.objectContaining({
text: " Compaction incomplete" ,
isCompactionNotice: true ,
}),
);
});
it("does not show a rate-limit countdown for mixed-cause fallback exhaustion" , async () => {
state.runWithModelFallbackMock.mockRejectedValueOnce(
Object.assign(
new Error(
"All models failed (2): anthropic/claude: 429 (rate_limit) | openai/gpt-5.4: 402 (billing)" ,
),
{
name: "FallbackSummaryError" ,
attempts: [
{ provider: "anthropic" , model: "claude" , error: "429" , reason: "rate_limit" },
{ provider: "openai" , model: "gpt-5.4" , error: "402" , reason: "billing" },
],
soonestCooldownExpiry: Date.now() + 60 _000 ,
},
),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toContain("Something went wrong while processing your request" );
expect(result.payload.text).not.toContain("Rate-limited" );
}
});
it("surfaces billing guidance for pure billing cooldown fallback exhaustion" , async () => {
state.runWithModelFallbackMock.mockRejectedValueOnce(
Object.assign(
new Error(
"All models failed (2): anthropic/claude-opus-4-6: Provider anthropic has billing issue (skipping all models) (billing) | anthropic/claude-sonnet-4-6: Provider anthropic has billing issue (skipping all models) (billing)" ,
),
{
name: "FallbackSummaryError" ,
attempts: [
{
provider: "anthropic" ,
model: "claude-opus-4-6" ,
error: "Provider anthropic has billing issue (skipping all models)" ,
reason: "billing" ,
},
{
provider: "anthropic" ,
model: "claude-sonnet-4-6" ,
error: "Provider anthropic has billing issue (skipping all models)" ,
reason: "billing" ,
},
],
soonestCooldownExpiry: Date.now() + 60 _000 ,
},
),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe("billing" );
}
});
it("surfaces gateway restart text when fallback exhaustion wraps a drain error" , async () => {
const { replyOperation, failMock } = createMockReplyOperation();
state.runWithModelFallbackMock.mockRejectedValueOnce(
Object.assign(new Error("fallback exhausted" ), {
name: "FallbackSummaryError" ,
attempts: [
{
provider: "anthropic" ,
model: "claude" ,
error: new GatewayDrainingError(),
},
],
soonestCooldownExpiry: null ,
cause: new GatewayDrainingError(),
}),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
replyOperation,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe(
"⚠️ Gateway is restarting. Please wait a few seconds and try again." ,
);
}
expect(failMock).toHaveBeenCalledWith("gateway_draining" , expect.any(GatewayDrainingError));
});
it("surfaces gateway restart text when fallback exhaustion wraps a cleared lane error" , async () => {
const { replyOperation, failMock } = createMockReplyOperation();
state.runWithModelFallbackMock.mockRejectedValueOnce(
Object.assign(new Error("fallback exhausted" ), {
name: "FallbackSummaryError" ,
attempts: [
{
provider: "anthropic" ,
model: "claude" ,
error: new CommandLaneClearedError("session:main" ),
},
],
soonestCooldownExpiry: null ,
cause: new CommandLaneClearedError("session:main" ),
}),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
replyOperation,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe(
"⚠️ Gateway is restarting. Please wait a few seconds and try again." ,
);
}
expect(failMock).toHaveBeenCalledWith(
"command_lane_cleared" ,
expect.any(CommandLaneClearedError),
);
});
it("surfaces gateway restart text when the reply operation was aborted for restart" , async () => {
const { replyOperation, failMock } = createMockReplyOperation();
Object.defineProperty(replyOperation, "result" , {
value: { kind: "aborted" , code: "aborted_for_restart" } as const ,
configurable: true ,
});
state.runWithModelFallbackMock.mockRejectedValueOnce(
Object.assign(new Error("aborted" ), { name: "AbortError" }),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
replyOperation,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe(
"⚠️ Gateway is restarting. Please wait a few seconds and try again." ,
);
}
expect(failMock).not.toHaveBeenCalled();
});
it("returns a friendly generic error on external chat channels" , async () => {
state.runEmbeddedPiAgentMock.mockRejectedValueOnce(
new Error("INVALID_ARGUMENT: some other failure" ),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe(
"⚠️ Something went wrong while processing your request. Please try again, or use /new to start a fresh session." ,
);
}
});
it("surfaces gateway reauth guidance for known OAuth refresh failures" , async () => {
state.runEmbeddedPiAgentMock.mockRejectedValueOnce(
new Error(
"OAuth token refresh failed for openai-codex: refresh_token_reused. Please try again or re-authenticate." ,
),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe(
"⚠️ Model login expired on the gateway for openai-codex. Re-auth with `openclaw models auth login --provider openai-codex`, then try again." ,
);
}
});
it("surfaces direct provider auth guidance for missing API keys" , async () => {
state.runEmbeddedPiAgentMock.mockRejectedValueOnce(
new Error(
'No API key found for provider "openai". You are authenticated with OpenAI Codex OAuth. Use openai-codex/gpt-5.5, or set OPENAI_API_KEY for direct OpenAI API access. | No API key found for provider "openai". You are authenticated with OpenAI Codex OAuth. Use openai-codex/gpt-5.5, or set OPENAI_API_KEY for direct OpenAI API access.' ,
),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe(
"⚠️ Missing API key for OpenAI on the gateway. Use `openai-codex/gpt-5.5`, or set `OPENAI_API_KEY`, then try again." ,
);
}
});
it("falls back to a generic provider message for unsafe missing-key provider ids" , async () => {
state.runEmbeddedPiAgentMock.mockRejectedValueOnce(
new Error('No API key found for provider "openai`\nrm -rf /".' ),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe(
"⚠️ Missing API key for the selected provider on the gateway. Configure provider auth, then try again." ,
);
}
});
it("falls back to a generic reauth command when the provider in the OAuth error is unsafe" , async () => {
state.runEmbeddedPiAgentMock.mockRejectedValueOnce(
new Error(
"OAuth token refresh failed for openai-codex`\nrm -rf /: invalid_grant. Please try again or re-authenticate." ,
),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe(
"⚠️ Model login expired on the gateway. Re-auth with `openclaw models auth login`, then try again." ,
);
}
});
it("returns a session reset hint for Bedrock tool mismatch errors on external chat channels" , async () => {
state.runEmbeddedPiAgentMock.mockRejectedValueOnce(
new Error(
"The number of toolResult blocks at messages.186.content exceeds the number of toolUse blocks of previous turn." ,
),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toBe(
"⚠️ Session history got out of sync. Please try again, or use /new to start a fresh session." ,
);
}
});
it("keeps raw generic errors on internal control surfaces" , async () => {
state.isInternalMessageChannelMock.mockReturnValue(true );
state.runEmbeddedPiAgentMock.mockRejectedValueOnce(
new Error("INVALID_ARGUMENT: some other failure" ),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "chat" ,
Surface: "chat" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
if (result.kind === "final" ) {
expect(result.payload.text).toContain("Agent failed before reply" );
expect(result.payload.text).toContain("INVALID_ARGUMENT: some other failure" );
expect(result.payload.text).toContain("Logs: openclaw logs --follow" );
}
});
it("restarts the active prompt when a live model switch is requested" , async () => {
let fallbackInvocation = 0 ;
state.runWithModelFallbackMock.mockImplementation(
async (params: { run: (provider: string, model: string) => Promise<unknown> }) => ({
result: await params.run(
fallbackInvocation === 0 ? "anthropic" : "openai" ,
fallbackInvocation === 0 ? "claude" : "gpt-5.4" ,
),
provider: fallbackInvocation === 0 ? "anthropic" : "openai" ,
model: fallbackInvocation++ === 0 ? "claude" : "gpt-5.4" ,
attempts: [],
}),
);
state.runEmbeddedPiAgentMock
.mockImplementationOnce(async () => {
throw new LiveSessionModelSwitchError({
provider: "openai" ,
model: "gpt-5.4" ,
});
})
.mockImplementationOnce(async () => {
return {
payloads: [{ text: "switched" }],
meta: {
agentMeta: {
sessionId: "session" ,
provider: "openai" ,
model: "gpt-5.4" ,
},
},
};
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const followupRun = createFollowupRun();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(2 );
expect(followupRun.run.provider).toBe("openai" );
expect(followupRun.run.model).toBe("gpt-5.4" );
});
it("breaks out of the retry loop when LiveSessionModelSwitchError is thrown repeatedly (#58348)" , async () => {
// Simulate a scenario where the persisted session selection keeps conflicting
// with the fallback model, causing LiveSessionModelSwitchError on every attempt.
// The outer loop must be bounded to prevent a session death loop.
let switchCallCount = 0 ;
state.runWithModelFallbackMock.mockImplementation(
async (params: { run: (provider: string, model: string) => Promise<unknown> }) => {
switchCallCount++;
return {
result: await params.run("anthropic" , "claude" ),
provider: "anthropic" ,
model: "claude" ,
attempts: [],
};
},
);
state.runEmbeddedPiAgentMock.mockImplementation(async () => {
throw new LiveSessionModelSwitchError({
provider: "openai" ,
model: "gpt-5.4" ,
});
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const followupRun = createFollowupRun();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
// After MAX_LIVE_SWITCH_RETRIES (2) the loop must break instead of continuing
// forever. The result should be a final error, not an infinite hang.
expect(result.kind).toBe("final" );
// 1 initial + MAX_LIVE_SWITCH_RETRIES retries = exact total invocations
expect(switchCallCount).toBe(1 + MAX_LIVE_SWITCH_RETRIES);
});
it("propagates auth profile state on bounded live model switch retries (#58348)" , async () => {
let invocation = 0 ;
state.runWithModelFallbackMock.mockImplementation(
async (params: { run: (provider: string, model: string) => Promise<unknown> }) => {
invocation++;
if (invocation <= 2 ) {
return {
result: await params.run("anthropic" , "claude" ),
provider: "anthropic" ,
model: "claude" ,
attempts: [],
};
}
// Third invocation succeeds with the switched model
return {
result: await params.run("openai" , "gpt-5.4" ),
provider: "openai" ,
model: "gpt-5.4" ,
attempts: [],
};
},
);
state.runEmbeddedPiAgentMock
.mockImplementationOnce(async () => {
throw new LiveSessionModelSwitchError({
provider: "openai" ,
model: "gpt-5.4" ,
authProfileId: "profile-b" ,
authProfileIdSource: "user" ,
});
})
.mockImplementationOnce(async () => {
throw new LiveSessionModelSwitchError({
provider: "openai" ,
model: "gpt-5.4" ,
authProfileId: "profile-c" ,
authProfileIdSource: "auto" ,
});
})
.mockImplementationOnce(async () => {
return {
payloads: [{ text: "finally ok" }],
meta: {
agentMeta: {
sessionId: "session" ,
provider: "openai" ,
model: "gpt-5.4" ,
},
},
};
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const followupRun = createFollowupRun();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off" ,
});
// Two switches (within the limit of 2) then success on third attempt
expect(result.kind).toBe("success" );
expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(3 );
expect(followupRun.run.provider).toBe("openai" );
expect(followupRun.run.model).toBe("gpt-5.4" );
expect(followupRun.run.authProfileId).toBe("profile-c" );
expect(followupRun.run.authProfileIdSource).toBe("auto" );
});
it("does not roll back newer override changes after a failed fallback candidate" , async () => {
state.runWithModelFallbackMock.mockImplementation(
async (params: { run: (provider: string, model: string) => Promise<unknown> }) => {
await expect(params.run("openai" , "gpt-5.4" )).rejects.toThrow("fallback failed" );
throw new Error("fallback failed" );
},
);
const sessionEntry: SessionEntry = {
sessionId: "session" ,
updatedAt: Date.now(),
providerOverride: "anthropic" ,
modelOverride: "claude" ,
authProfileOverride: "anthropic:default" ,
authProfileOverrideSource: "user" ,
};
const sessionStore = { main: sessionEntry };
state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => {
sessionEntry.providerOverride = "zai" ;
sessionEntry.modelOverride = "glm-5" ;
sessionEntry.authProfileOverride = "zai:work" ;
sessionEntry.authProfileOverrideSource = "user" ;
throw new Error("fallback failed" );
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => sessionEntry,
activeSessionStore: sessionStore,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("final" );
expect(sessionEntry.providerOverride).toBe("zai" );
expect(sessionEntry.modelOverride).toBe("glm-5" );
expect(sessionEntry.authProfileOverride).toBe("zai:work" );
expect(sessionEntry.authProfileOverrideSource).toBe("user" );
expect(sessionStore.main.providerOverride).toBe("zai" );
expect(sessionStore.main.modelOverride).toBe("glm-5" );
});
it("drops authProfileId when fallback switches providers" , async () => {
state.runWithModelFallbackMock.mockImplementation(
async (params: { run: (provider: string, model: string) => Promise<unknown> }) => ({
result: await params.run("openai-codex" , "gpt-5.4" ),
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempts: [],
}),
);
state.runEmbeddedPiAgentMock.mockResolvedValue({
payloads: [{ text: "ok" }],
meta: {},
});
const followupRun = createFollowupRun();
followupRun.run.provider = "anthropic" ;
followupRun.run.model = "claude-opus" ;
followupRun.run.authProfileId = "anthropic:openclaw" ;
followupRun.run.authProfileIdSource = "user" ;
const sessionEntry: SessionEntry = {
sessionId: "session" ,
updatedAt: Date.now(),
totalTokens: 1 ,
compactionCount: 0 ,
};
const sessionStore = { main: sessionEntry };
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "telegram" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => sessionEntry,
activeSessionStore: sessionStore,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1 );
expect(state.runEmbeddedPiAgentMock.mock.calls[0 ]?.[0 ]).toMatchObject({
provider: "openai-codex" ,
model: "gpt-5.4" ,
authProfileId: undefined,
authProfileIdSource: undefined,
});
expect(sessionEntry.providerOverride).toBe("openai-codex" );
expect(sessionEntry.modelOverride).toBe("gpt-5.4" );
expect(sessionEntry.modelOverrideSource).toBe("auto" );
expect(sessionEntry.authProfileOverride).toBeUndefined();
expect(sessionEntry.authProfileOverrideSource).toBeUndefined();
expect(sessionStore.main.authProfileOverride).toBeUndefined();
});
it("does not persist fallback selection for legacy user overrides without modelOverrideSource" , async () => {
// Regression: older persisted sessions can have a user-selected override
// (modelOverride set) but no modelOverrideSource field, because the field
// was added later. These legacy entries must still be protected from
// fallback overwrite, matching the backward-compat treatment in
// session-reset-service.
state.runWithModelFallbackMock.mockImplementation(
async (params: { run: (provider: string, model: string) => Promise<unknown> }) => ({
result: await params.run("openai-codex" , "gpt-5.4" ),
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempts: [],
}),
);
state.runEmbeddedPiAgentMock.mockResolvedValue({
payloads: [{ text: "ok" }],
meta: {},
});
const followupRun = createFollowupRun();
followupRun.run.provider = "anthropic" ;
followupRun.run.model = "claude-opus-4-6" ;
const sessionEntry: SessionEntry = {
sessionId: "session" ,
updatedAt: Date.now(),
totalTokens: 1 ,
compactionCount: 0 ,
// Legacy entry: override is set but the source field is missing.
providerOverride: "anthropic" ,
modelOverride: "claude-opus-4-6" ,
// modelOverrideSource intentionally absent
};
const sessionStore = { main: sessionEntry };
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "telegram" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => sessionEntry,
activeSessionStore: sessionStore,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
// Legacy user override must survive the fallback unchanged.
expect(sessionEntry.providerOverride).toBe("anthropic" );
expect(sessionEntry.modelOverride).toBe("claude-opus-4-6" );
expect(sessionEntry.modelOverrideSource).toBeUndefined();
});
it("does not persist fallback selection when modelOverrideSource is user" , async () => {
// Regression: fallback persistence overwrote user-initiated /models
// selections. When the user explicitly picked a model, the fallback
// should NOT clobber it even when the primary model fails.
state.runWithModelFallbackMock.mockImplementation(
async (params: { run: (provider: string, model: string) => Promise<unknown> }) => ({
result: await params.run("openai-codex" , "gpt-5.4" ),
provider: "openai-codex" ,
model: "gpt-5.4" ,
attempts: [],
}),
);
state.runEmbeddedPiAgentMock.mockResolvedValue({
payloads: [{ text: "ok" }],
meta: {},
});
const followupRun = createFollowupRun();
followupRun.run.provider = "anthropic" ;
followupRun.run.model = "claude-opus-4-6" ;
const sessionEntry: SessionEntry = {
sessionId: "session" ,
updatedAt: Date.now(),
totalTokens: 1 ,
compactionCount: 0 ,
// User explicitly selected this model via /models
providerOverride: "anthropic" ,
modelOverride: "claude-opus-4-6" ,
modelOverrideSource: "user" ,
};
const sessionStore = { main: sessionEntry };
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello" ,
followupRun,
sessionCtx: {
Provider: "telegram" ,
MessageSid: "msg" ,
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null ,
blockStreamingEnabled: false ,
resolvedBlockStreamingBreak: "message_end" ,
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true ,
shouldEmitToolOutput: () => false ,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false ,
resetSessionAfterRoleOrderingConflict: async () => false ,
isHeartbeat: false ,
sessionKey: "main" ,
getActiveSessionEntry: () => sessionEntry,
activeSessionStore: sessionStore,
resolvedVerboseLevel: "off" ,
});
expect(result.kind).toBe("success" );
// The user's /models selection must survive the fallback.
expect(sessionEntry.providerOverride).toBe("anthropic" );
expect(sessionEntry.modelOverride).toBe("claude-opus-4-6" );
expect(sessionEntry.modelOverrideSource).toBe("user" );
});
it("keeps same-provider auth profile when fallback only changes model" , async () => {
const applyFallbackCandidateSelectionToEntry =
await getApplyFallbackCandidateSelectionToEntry();
const entry = {
sessionId: "session" ,
updatedAt: 1 ,
authProfileOverride: "anthropic:openclaw" ,
authProfileOverrideSource: "user" as const ,
} as SessionEntry;
const { updated } = applyFallbackCandidateSelectionToEntry({
entry,
run: {
provider: "anthropic" ,
model: "claude-opus" ,
authProfileId: "anthropic:openclaw" ,
authProfileIdSource: "user" ,
} as FollowupRun["run" ],
provider: "anthropic" ,
model: "claude-sonnet" ,
now: 123 ,
});
expect(updated).toBe(true );
expect(entry).toMatchObject({
updatedAt: 123 ,
providerOverride: "anthropic" ,
modelOverride: "claude-sonnet" ,
modelOverrideSource: "auto" ,
authProfileOverride: "anthropic:openclaw" ,
authProfileOverrideSource: "user" ,
});
});
});
Messung V0.5 in Prozent C=100 H=100 G=100
¤ Dauer der Verarbeitung: 0.37 Sekunden
(vorverarbeitet am 2026-06-05)
¤
*© Formatika GbR, Deutschland