Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/Java/Openclaw/src/agents/pi-embedded-runner/   (KI Agentensystem Version 22©)  Datei vom 26.3.2026 mit Größe 9 kB image not shown  

Quelle  thinking.ts

  Sprache: JAVA
 

Spracherkennung für: .ts vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core";
import { createAssistantMessageEventStream } from "@mariozechner/pi-ai";
import { formatErrorMessage } from "../../infra/errors.js";
import { log } from "./logger.js";

type AssistantContentBlock = Extract<AgentMessage, { role: "assistant" }>["content"][number];
type AssistantMessage = Extract<AgentMessage, { role: "assistant" }>;
type RecoveryAssessment = "valid" | "incomplete-thinking" | "incomplete-text";
type RecoverySessionMeta = { id: string; recoveredAnthropicThinking?: boolean };

const THINKING_BLOCK_ERROR_PATTERN = /thinking or redacted_thinking blocks?.* cannot be modified/i;

export function isAssistantMessageWithContent(message: AgentMessage): message is AssistantMessage {
  return (
    !!message &&
    typeof message === "object" &&
    message.role === "assistant" &&
    Array.isArray(message.content)
  );
}

function isThinkingBlock(block: AssistantContentBlock): boolean {
  return (
    !!block &&
    typeof block === "object" &&
    ((block as { type?: unknown }).type === "thinking" ||
      (block as { type?: unknown }).type === "redacted_thinking")
  );
}

function isSignedThinkingBlock(block: AssistantContentBlock): boolean {
  if (!isThinkingBlock(block)) {
    return false;
  }
  const record = block as {
    type?: unknown;
    signature?: unknown;
    thinkingSignature?: unknown;
    thought_signature?: unknown;
  };
  return (
    record.type === "redacted_thinking" ||
    record.signature != null ||
    record.thinkingSignature != null ||
    record.thought_signature != null
  );
}

function hasMeaningfulText(block: AssistantContentBlock): boolean {
  if (!block || typeof block !== "object" || (block as { type?: unknown }).type !== "text") {
    return false;
  }
  return typeof (block as { text?: unknown }).text === "string"
    ? (block as { text: string }).text.trim().length > 0
    : false;
}

/**
 * Strip `type: "thinking"` and `type: "redacted_thinking"` content blocks from
 * all assistant messages except the latest one.
 *
 * Thinking blocks in the latest assistant turn are preserved verbatim so
 * providers that require replay signatures can continue the conversation.
 *
 * If a non-latest assistant message becomes empty after stripping, it is
 * replaced with a synthetic `{ type: "text", text: "" }` block to preserve
 * turn structure (some providers require strict user/assistant alternation).
 *
 * Returns the original array reference when nothing was changed (callers can
 * use reference equality to skip downstream work).
 */
export function dropThinkingBlocks(messages: AgentMessage[]): AgentMessage[] {
  let latestAssistantIndex = -1;
  for (let i = messages.length - 1; i >= 0; i -= 1) {
    if (isAssistantMessageWithContent(messages[i])) {
      latestAssistantIndex = i;
      break;
    }
  }

  let touched = false;
  const out: AgentMessage[] = [];
  for (let i = 0; i < messages.length; i += 1) {
    const msg = messages[i];
    if (!isAssistantMessageWithContent(msg)) {
      out.push(msg);
      continue;
    }
    if (i === latestAssistantIndex) {
      out.push(msg);
      continue;
    }
    const nextContent: AssistantContentBlock[] = [];
    let changed = false;
    for (const block of msg.content) {
      if (isThinkingBlock(block)) {
        touched = true;
        changed = true;
        continue;
      }
      nextContent.push(block);
    }
    if (!changed) {
      out.push(msg);
      continue;
    }
    // Preserve the assistant turn even if all blocks were thinking-only.
    const content =
      nextContent.length > 0 ? nextContent : [{ type: "text", text: "" } as AssistantContentBlock];
    out.push({ ...msg, content });
  }
  return touched ? out : messages;
}

function stripAllThinkingBlocks(messages: AgentMessage[]): AgentMessage[] {
  let touched = false;
  const out: AgentMessage[] = [];
  for (const message of messages) {
    if (!isAssistantMessageWithContent(message)) {
      out.push(message);
      continue;
    }

    const nextContent = message.content.filter((block) => !isThinkingBlock(block));
    if (nextContent.length === message.content.length) {
      out.push(message);
      continue;
    }

    touched = true;
    out.push({
      ...message,
      content:
        nextContent.length > 0
          ? nextContent
          : ([{ type: "text", text: "" }] as AssistantContentBlock[]),
    });
  }
  return touched ? out : messages;
}

export function assessLastAssistantMessage(message: AgentMessage): RecoveryAssessment {
  if (!isAssistantMessageWithContent(message)) {
    return "valid";
  }
  if (message.content.length === 0) {
    return "incomplete-thinking";
  }

  let hasSignedThinking = false;
  let hasUnsignedThinking = false;
  let hasNonThinkingContent = false;
  let hasEmptyTextBlock = false;

  for (const block of message.content) {
    if (!block || typeof block !== "object") {
      return "incomplete-thinking";
    }
    if (isThinkingBlock(block)) {
      if (isSignedThinkingBlock(block)) {
        hasSignedThinking = true;
      } else {
        hasUnsignedThinking = true;
      }
      continue;
    }
    hasNonThinkingContent = true;
    if ((block as { type?: unknown }).type === "text" && !hasMeaningfulText(block)) {
      hasEmptyTextBlock = true;
    }
  }

  if (hasUnsignedThinking) {
    return "incomplete-thinking";
  }
  if (hasSignedThinking && !hasNonThinkingContent) {
    return "incomplete-text";
  }
  if (hasSignedThinking && hasEmptyTextBlock) {
    return "incomplete-text";
  }
  return "valid";
}

export function sanitizeThinkingForRecovery(messages: AgentMessage[]): {
  messages: AgentMessage[];
  prefill: boolean;
} {
  if (messages.length === 0) {
    return { messages, prefill: false };
  }

  let lastAssistantIndex = -1;
  for (let index = messages.length - 1; index >= 0; index -= 1) {
    if ((messages[index] as { role?: unknown }).role === "assistant") {
      lastAssistantIndex = index;
      break;
    }
  }
  if (lastAssistantIndex === -1) {
    return { messages, prefill: false };
  }

  const assessment = assessLastAssistantMessage(messages[lastAssistantIndex]);
  if (assessment === "valid") {
    return { messages, prefill: false };
  }
  if (assessment === "incomplete-text") {
    return { messages, prefill: true };
  }

  return {
    messages: [...messages.slice(0, lastAssistantIndex), ...messages.slice(lastAssistantIndex + 1)],
    prefill: false,
  };
}

function shouldRecoverAnthropicThinkingError(
  error: unknown,
  sessionMeta: RecoverySessionMeta,
): boolean {
  const message = formatErrorMessage(error);
  if (!THINKING_BLOCK_ERROR_PATTERN.test(message)) {
    return false;
  }
  if (sessionMeta.recoveredAnthropicThinking) {
    log.warn(
      `[session-recovery] Anthropic thinking recovery already attempted: sessionId=${sessionMeta.id}`,
    );
    return false;
  }
  return true;
}

async function pumpStreamWithRecovery(
  outer: ReturnType<typeof createAssistantMessageEventStream>,
  stream: ReturnType<StreamFn>,
  sessionMeta: RecoverySessionMeta,
  retry: () => ReturnType<StreamFn>,
): Promise<AssistantMessage> {
  let yieldedChunk = false;
  try {
    const resolved = stream instanceof Promise ? await stream : stream;
    for await (const chunk of resolved as AsyncIterable<unknown>) {
      yieldedChunk = true;
      outer.push(chunk as Parameters<typeof outer.push>[0]);
    }
    const result = await (resolved as { result?: () => Promise<AssistantMessage> }).result?.();
    return result as AssistantMessage;
  } catch (error: unknown) {
    if (!shouldRecoverAnthropicThinkingError(error, sessionMeta)) {
      throw error;
    }
    if (yieldedChunk) {
      log.warn(
        `[session-recovery] Anthropic thinking error occurred after streaming began; skipping retry to avoid duplicate chunks: sessionId=${sessionMeta.id}`,
      );
      throw error;
    }
    sessionMeta.recoveredAnthropicThinking = true;
    log.warn(
      `[session-recovery] Anthropic thinking error during stream; retrying once without thinking blocks: sessionId=${sessionMeta.id}`,
    );
    const retryStream = retry();
    const resolvedRetry = retryStream instanceof Promise ? await retryStream : retryStream;
    for await (const chunk of resolvedRetry as AsyncIterable<unknown>) {
      outer.push(chunk as Parameters<typeof outer.push>[0]);
    }
    const result = await (resolvedRetry as { result?: () => Promise<AssistantMessage> }).result?.();
    return result as AssistantMessage;
  }
}

export function wrapAnthropicStreamWithRecovery(
  innerStreamFn: StreamFn,
  sessionMeta: RecoverySessionMeta,
): StreamFn {
  return (model, context, options) => {
    const contextRecord = context as unknown as { messages?: unknown };
    const originalMessages = Array.isArray(contextRecord.messages)
      ? (contextRecord.messages as AgentMessage[])
      : [];
    const retry = () => {
      const cleanedMessages = stripAllThinkingBlocks(originalMessages);
      const nextContext = {
        ...(context as unknown as Record<string, unknown>),
        messages: cleanedMessages,
      } as typeof context;
      return innerStreamFn(model, nextContext, options);
    };

    const stream = innerStreamFn(model, context, options);
    if (stream instanceof Promise) {
      return stream.catch((error: unknown) => {
        if (!shouldRecoverAnthropicThinkingError(error, sessionMeta)) {
          throw error;
        }
        sessionMeta.recoveredAnthropicThinking = true;
        log.warn(
          `[session-recovery] Anthropic thinking request rejected; retrying once without thinking blocks: sessionId=${sessionMeta.id}`,
        );
        return retry();
      }) as ReturnType<StreamFn>;
    }
    const outer = createAssistantMessageEventStream();
    const finalResultPromise = pumpStreamWithRecovery(outer, stream, sessionMeta, retry).finally(
      () => {
        outer.end();
      },
    );
    outer.result = () => finalResultPromise;
    return outer as unknown as ReturnType<StreamFn>;
  };
}

¤ Dauer der Verarbeitung: 0.21 Sekunden  (vorverarbeitet am  2026-04-27) ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.