Spracherkennung für: .ts vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]
import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core";
import { createAssistantMessageEventStream } from "@mariozechner/pi-ai";
import { formatErrorMessage } from "../../infra/errors.js";
import { log } from "./logger.js";
type AssistantContentBlock = Extract<AgentMessage, { role: "assistant" }>["content"][number];
type AssistantMessage = Extract<AgentMessage, { role: "assistant" }>;
type RecoveryAssessment = "valid" | "incomplete-thinking" | "incomplete-text";
type RecoverySessionMeta = { id: string; recoveredAnthropicThinking?: boolean };
const THINKING_BLOCK_ERROR_PATTERN = /thinking or redacted_thinking blocks?.* cannot be modified/i;
export function isAssistantMessageWithContent(message: AgentMessage): message is AssistantMessage {
return (
!!message &&
typeof message === "object" &&
message.role === "assistant" &&
Array.isArray(message.content)
);
}
function isThinkingBlock(block: AssistantContentBlock): boolean {
return (
!!block &&
typeof block === "object" &&
((block as { type?: unknown }).type === "thinking" ||
(block as { type?: unknown }).type === "redacted_thinking")
);
}
function isSignedThinkingBlock(block: AssistantContentBlock): boolean {
if (!isThinkingBlock(block)) {
return false;
}
const record = block as {
type?: unknown;
signature?: unknown;
thinkingSignature?: unknown;
thought_signature?: unknown;
};
return (
record.type === "redacted_thinking" ||
record.signature != null ||
record.thinkingSignature != null ||
record.thought_signature != null
);
}
function hasMeaningfulText(block: AssistantContentBlock): boolean {
if (!block || typeof block !== "object" || (block as { type?: unknown }).type !== "text") {
return false;
}
return typeof (block as { text?: unknown }).text === "string"
? (block as { text: string }).text.trim().length > 0
: false;
}
/**
* Strip `type: "thinking"` and `type: "redacted_thinking"` content blocks from
* all assistant messages except the latest one.
*
* Thinking blocks in the latest assistant turn are preserved verbatim so
* providers that require replay signatures can continue the conversation.
*
* If a non-latest assistant message becomes empty after stripping, it is
* replaced with a synthetic `{ type: "text", text: "" }` block to preserve
* turn structure (some providers require strict user/assistant alternation).
*
* Returns the original array reference when nothing was changed (callers can
* use reference equality to skip downstream work).
*/
export function dropThinkingBlocks(messages: AgentMessage[]): AgentMessage[] {
let latestAssistantIndex = -1;
for (let i = messages.length - 1; i >= 0; i -= 1) {
if (isAssistantMessageWithContent(messages[i])) {
latestAssistantIndex = i;
break;
}
}
let touched = false;
const out: AgentMessage[] = [];
for (let i = 0; i < messages.length; i += 1) {
const msg = messages[i];
if (!isAssistantMessageWithContent(msg)) {
out.push(msg);
continue;
}
if (i === latestAssistantIndex) {
out.push(msg);
continue;
}
const nextContent: AssistantContentBlock[] = [];
let changed = false;
for (const block of msg.content) {
if (isThinkingBlock(block)) {
touched = true;
changed = true;
continue;
}
nextContent.push(block);
}
if (!changed) {
out.push(msg);
continue;
}
// Preserve the assistant turn even if all blocks were thinking-only.
const content =
nextContent.length > 0 ? nextContent : [{ type: "text", text: "" } as AssistantContentBlock];
out.push({ ...msg, content });
}
return touched ? out : messages;
}
function stripAllThinkingBlocks(messages: AgentMessage[]): AgentMessage[] {
let touched = false;
const out: AgentMessage[] = [];
for (const message of messages) {
if (!isAssistantMessageWithContent(message)) {
out.push(message);
continue;
}
const nextContent = message.content.filter((block) => !isThinkingBlock(block));
if (nextContent.length === message.content.length) {
out.push(message);
continue;
}
touched = true;
out.push({
...message,
content:
nextContent.length > 0
? nextContent
: ([{ type: "text", text: "" }] as AssistantContentBlock[]),
});
}
return touched ? out : messages;
}
export function assessLastAssistantMessage(message: AgentMessage): RecoveryAssessment {
if (!isAssistantMessageWithContent(message)) {
return "valid";
}
if (message.content.length === 0) {
return "incomplete-thinking";
}
let hasSignedThinking = false;
let hasUnsignedThinking = false;
let hasNonThinkingContent = false;
let hasEmptyTextBlock = false;
for (const block of message.content) {
if (!block || typeof block !== "object") {
return "incomplete-thinking";
}
if (isThinkingBlock(block)) {
if (isSignedThinkingBlock(block)) {
hasSignedThinking = true;
} else {
hasUnsignedThinking = true;
}
continue;
}
hasNonThinkingContent = true;
if ((block as { type?: unknown }).type === "text" && !hasMeaningfulText(block)) {
hasEmptyTextBlock = true;
}
}
if (hasUnsignedThinking) {
return "incomplete-thinking";
}
if (hasSignedThinking && !hasNonThinkingContent) {
return "incomplete-text";
}
if (hasSignedThinking && hasEmptyTextBlock) {
return "incomplete-text";
}
return "valid";
}
export function sanitizeThinkingForRecovery(messages: AgentMessage[]): {
messages: AgentMessage[];
prefill: boolean;
} {
if (messages.length === 0) {
return { messages, prefill: false };
}
let lastAssistantIndex = -1;
for (let index = messages.length - 1; index >= 0; index -= 1) {
if ((messages[index] as { role?: unknown }).role === "assistant") {
lastAssistantIndex = index;
break;
}
}
if (lastAssistantIndex === -1) {
return { messages, prefill: false };
}
const assessment = assessLastAssistantMessage(messages[lastAssistantIndex]);
if (assessment === "valid") {
return { messages, prefill: false };
}
if (assessment === "incomplete-text") {
return { messages, prefill: true };
}
return {
messages: [...messages.slice(0, lastAssistantIndex), ...messages.slice(lastAssistantIndex + 1)],
prefill: false,
};
}
function shouldRecoverAnthropicThinkingError(
error: unknown,
sessionMeta: RecoverySessionMeta,
): boolean {
const message = formatErrorMessage(error);
if (!THINKING_BLOCK_ERROR_PATTERN.test(message)) {
return false;
}
if (sessionMeta.recoveredAnthropicThinking) {
log.warn(
`[session-recovery] Anthropic thinking recovery already attempted: sessionId=${sessionMeta.id}`,
);
return false;
}
return true;
}
async function pumpStreamWithRecovery(
outer: ReturnType<typeof createAssistantMessageEventStream>,
stream: ReturnType<StreamFn>,
sessionMeta: RecoverySessionMeta,
retry: () => ReturnType<StreamFn>,
): Promise<AssistantMessage> {
let yieldedChunk = false;
try {
const resolved = stream instanceof Promise ? await stream : stream;
for await (const chunk of resolved as AsyncIterable<unknown>) {
yieldedChunk = true;
outer.push(chunk as Parameters<typeof outer.push>[0]);
}
const result = await (resolved as { result?: () => Promise<AssistantMessage> }).result?.();
return result as AssistantMessage;
} catch (error: unknown) {
if (!shouldRecoverAnthropicThinkingError(error, sessionMeta)) {
throw error;
}
if (yieldedChunk) {
log.warn(
`[session-recovery] Anthropic thinking error occurred after streaming began; skipping retry to avoid duplicate chunks: sessionId=${sessionMeta.id}`,
);
throw error;
}
sessionMeta.recoveredAnthropicThinking = true;
log.warn(
`[session-recovery] Anthropic thinking error during stream; retrying once without thinking blocks: sessionId=${sessionMeta.id}`,
);
const retryStream = retry();
const resolvedRetry = retryStream instanceof Promise ? await retryStream : retryStream;
for await (const chunk of resolvedRetry as AsyncIterable<unknown>) {
outer.push(chunk as Parameters<typeof outer.push>[0]);
}
const result = await (resolvedRetry as { result?: () => Promise<AssistantMessage> }).result?.();
return result as AssistantMessage;
}
}
export function wrapAnthropicStreamWithRecovery(
innerStreamFn: StreamFn,
sessionMeta: RecoverySessionMeta,
): StreamFn {
return (model, context, options) => {
const contextRecord = context as unknown as { messages?: unknown };
const originalMessages = Array.isArray(contextRecord.messages)
? (contextRecord.messages as AgentMessage[])
: [];
const retry = () => {
const cleanedMessages = stripAllThinkingBlocks(originalMessages);
const nextContext = {
...(context as unknown as Record<string, unknown>),
messages: cleanedMessages,
} as typeof context;
return innerStreamFn(model, nextContext, options);
};
const stream = innerStreamFn(model, context, options);
if (stream instanceof Promise) {
return stream.catch((error: unknown) => {
if (!shouldRecoverAnthropicThinkingError(error, sessionMeta)) {
throw error;
}
sessionMeta.recoveredAnthropicThinking = true;
log.warn(
`[session-recovery] Anthropic thinking request rejected; retrying once without thinking blocks: sessionId=${sessionMeta.id}`,
);
return retry();
}) as ReturnType<StreamFn>;
}
const outer = createAssistantMessageEventStream();
const finalResultPromise = pumpStreamWithRecovery(outer, stream, sessionMeta, retry).finally(
() => {
outer.end();
},
);
outer.result = () => finalResultPromise;
return outer as unknown as ReturnType<StreamFn>;
};
}
¤ Dauer der Verarbeitung: 0.16 Sekunden
(vorverarbeitet am 2026-04-27)
¤
*© Formatika GbR, Deutschland