import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js" ;
import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js" ;
import { toAcpRuntimeError } from "../../acp/runtime/errors.js" ;
import { resolveAcpThreadSessionDetailLines } from "../../acp/runtime/session-identifiers.js" ;
import {
isSessionIdentityPending,
resolveSessionIdentityFromMeta,
} from "../../acp/runtime/session-identity.js" ;
import type { OpenClawConfig } from "../../config/types.openclaw.js" ;
import type { TtsAutoMode } from "../../config/types.tts.js" ;
import { logVerbose } from "../../globals.js" ;
import { emitAgentEvent } from "../../infra/agent-events.js" ;
import { formatErrorMessage } from "../../infra/errors.js" ;
import { generateSecureUuid } from "../../infra/secure-random.js" ;
import { prefixSystemMessage } from "../../infra/system-message.js" ;
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js" ;
import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalLowercaseString,
normalizeOptionalString,
} from "../../shared/string-coerce.js" ;
import { resolveStatusTtsSnapshot } from "../../tts/status-config.js" ;
import { resolveConfiguredTtsMode } from "../../tts/tts-config.js" ;
import type { FinalizedMsgContext } from "../templating.js" ;
import { createAcpReplyProjector } from "./acp-projector.js" ;
import {
loadDispatchAcpMediaRuntime,
resolveAcpAttachments,
resolveAcpInlineImageAttachments,
} from "./dispatch-acp-attachments.js" ;
import {
createAcpDispatchDeliveryCoordinator,
type AcpDispatchDeliveryCoordinator,
} from "./dispatch-acp-delivery.js" ;
import { hasInboundMedia } from "./inbound-media.js" ;
import type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.types.js" ;
let dispatchAcpManagerRuntimePromise: Promise<
typeof import ("./dispatch-acp-manager.runtime.js" )
> | null = null ;
let dispatchAcpSessionRuntimePromise: Promise<
typeof import ("./dispatch-acp-session.runtime.js" )
> | null = null ;
let dispatchAcpTtsRuntimePromise: Promise<typeof import ("./dispatch-acp-tts.runtime.js" )> | null =
null ;
function loadDispatchAcpManagerRuntime() {
dispatchAcpManagerRuntimePromise ??= import ("./dispatch-acp-manager.runtime.js" );
return dispatchAcpManagerRuntimePromise;
}
function loadDispatchAcpSessionRuntime() {
dispatchAcpSessionRuntimePromise ??= import ("./dispatch-acp-session.runtime.js" );
return dispatchAcpSessionRuntimePromise;
}
function loadDispatchAcpTtsRuntime() {
dispatchAcpTtsRuntimePromise ??= import ("./dispatch-acp-tts.runtime.js" );
return dispatchAcpTtsRuntimePromise;
}
type DispatchProcessedRecorder = (
outcome: "completed" | "skipped" | "error" ,
opts?: {
reason?: string;
error?: string;
},
) => void ;
function resolveFirstContextText(
ctx: FinalizedMsgContext,
keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body" >,
): string {
for (const key of keys) {
const value = ctx[key];
if (typeof value === "string" ) {
return value;
}
}
return "" ;
}
function resolveAcpPromptText(ctx: FinalizedMsgContext): string {
return resolveFirstContextText(ctx, [
"BodyForAgent" ,
"BodyForCommands" ,
"CommandBody" ,
"RawBody" ,
"Body" ,
]).trim();
}
function resolveAcpRequestId(ctx: FinalizedMsgContext): string {
const id = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
if (typeof id === "string" ) {
const normalizedId = normalizeOptionalString(id);
if (normalizedId) {
return normalizedId;
}
}
if (typeof id === "number" || typeof id === "bigint" ) {
return String(id);
}
return generateSecureUuid();
}
async function hasBoundConversationForSession(params: {
cfg: OpenClawConfig;
sessionKey: string;
channelRaw: string | undefined;
accountIdRaw: string | undefined;
}): Promise<boolean > {
const channel = normalizeOptionalLowercaseString(params.channelRaw) ?? "" ;
if (!channel) {
return false ;
}
const accountId = normalizeOptionalLowercaseString(params.accountIdRaw) ?? "" ;
const channels = params.cfg.channels as Record<string, { defaultAccount?: unknown } | undefined>;
const configuredDefaultAccountId = channels?.[channel]?.defaultAccount;
const normalizedAccountId =
accountId || normalizeOptionalLowercaseString(configuredDefaultAccountId) || "default" ;
const { getSessionBindingService } = await loadDispatchAcpManagerRuntime();
const bindingService = getSessionBindingService();
const bindings = bindingService.listBySession(params.sessionKey);
return bindings.some((binding) => {
const bindingChannel = normalizeOptionalLowercaseString(binding.conversation.channel) ?? "" ;
const bindingAccountId = normalizeOptionalLowercaseString(binding.conversation.accountId) ?? "" ;
const conversationId = normalizeOptionalString(binding.conversation.conversationId) ?? "" ;
return (
bindingChannel === channel &&
(bindingAccountId || "default" ) === normalizedAccountId &&
conversationId.length > 0
);
});
}
export type AcpDispatchAttemptResult = {
queuedFinal: boolean ;
counts: Record<ReplyDispatchKind, number>;
};
const ACP_STALE_BINDING_UNBIND_REASON = "acp-session-init-failed" ;
function isStaleSessionInitError(params: { code: string; message: string }): boolean {
if (params.code !== "ACP_SESSION_INIT_FAILED" ) {
return false ;
}
return /(ACP (session )?metadata is missing|missing ACP metadata|Session is not ACP-enabled|Resource not found)/i.test(
params.message,
);
}
async function maybeUnbindStaleBoundConversations(params: {
targetSessionKey: string;
error: { code: string; message: string };
}): Promise<void > {
if (!isStaleSessionInitError(params.error)) {
return ;
}
try {
const { getSessionBindingService } = await loadDispatchAcpManagerRuntime();
const removed = await getSessionBindingService().unbind({
targetSessionKey: params.targetSessionKey,
reason: ACP_STALE_BINDING_UNBIND_REASON,
});
if (removed.length > 0 ) {
logVerbose(
`dispatch-acp: removed ${removed.length} stale bound conversation(s) for ${params.targetSessionKey} after ${params.error.code}: ${params.error.message}`,
);
}
} catch (error) {
logVerbose(
`dispatch-acp: failed to unbind stale bound conversations for ${params.targetSessionKey}: ${formatErrorMessage(error)}`,
);
}
}
async function finalizeAcpTurnOutput(params: {
cfg: OpenClawConfig;
sessionKey: string;
delivery: AcpDispatchDeliveryCoordinator;
inboundAudio: boolean ;
sessionTtsAuto?: TtsAutoMode;
ttsChannel?: string;
shouldEmitResolvedIdentityNotice: boolean ;
}): Promise<boolean > {
await params.delivery.settleVisibleText();
let queuedFinal =
params.delivery.hasDeliveredVisibleText() && !params.delivery.hasFailedVisibleTextDelivery();
const ttsMode = resolveConfiguredTtsMode(params.cfg);
const accumulatedBlockText = params.delivery.getAccumulatedBlockText();
const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0 ;
const ttsStatus = resolveStatusTtsSnapshot({
cfg: params.cfg,
sessionAuto: params.sessionTtsAuto,
});
const canAttemptFinalTts =
ttsStatus != null && !(ttsStatus.autoMode === "inbound" && !params.inboundAudio);
let finalMediaDelivered = false ;
if (ttsMode === "final" && hasAccumulatedBlockText && canAttemptFinalTts) {
try {
const { maybeApplyTtsToPayload } = await loadDispatchAcpTtsRuntime();
const ttsSyntheticReply = await maybeApplyTtsToPayload({
payload: { text: accumulatedBlockText },
cfg: params.cfg,
channel: params.ttsChannel,
kind: "final" ,
inboundAudio: params.inboundAudio,
ttsAuto: params.sessionTtsAuto,
});
if (ttsSyntheticReply.mediaUrl) {
const delivered = await params.delivery.deliver("final" , {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
spokenText: accumulatedBlockText,
});
queuedFinal = queuedFinal || delivered;
finalMediaDelivered = delivered;
}
} catch (err) {
logVerbose(`dispatch-acp: accumulated ACP block TTS failed: ${formatErrorMessage(err)}`);
}
}
// Some ACP parent surfaces only expose terminal replies, so block routing alone is not enough
// to prove the final result was visible to the user.
const shouldDeliverTextFallback =
ttsMode !== "all" &&
hasAccumulatedBlockText &&
!finalMediaDelivered &&
!params.delivery.hasDeliveredFinalReply() &&
(!params.delivery.hasDeliveredVisibleText() || params.delivery.hasFailedVisibleTextDelivery());
if (shouldDeliverTextFallback) {
const delivered = await params.delivery.deliver(
"final" ,
{ text: accumulatedBlockText },
{ skipTts: true },
);
queuedFinal = queuedFinal || delivered;
}
if (params.shouldEmitResolvedIdentityNotice) {
const { readAcpSessionEntry } = await loadDispatchAcpSessionRuntime();
const currentMeta = readAcpSessionEntry({
cfg: params.cfg,
sessionKey: params.sessionKey,
})?.acp;
const identityAfterTurn = resolveSessionIdentityFromMeta(currentMeta);
if (!isSessionIdentityPending(identityAfterTurn)) {
const resolvedDetails = resolveAcpThreadSessionDetailLines({
sessionKey: params.sessionKey,
meta: currentMeta,
});
if (resolvedDetails.length > 0 ) {
const delivered = await params.delivery.deliver("final" , {
text: prefixSystemMessage(["Session ids resolved." , ...resolvedDetails].join("\n" )),
});
queuedFinal = queuedFinal || delivered;
}
}
}
return queuedFinal;
}
export async function tryDispatchAcpReply(params: {
ctx: FinalizedMsgContext;
cfg: OpenClawConfig;
dispatcher: ReplyDispatcher;
runId?: string;
sessionKey?: string;
images?: Array<{ data: string; mimeType: string }>;
abortSignal?: AbortSignal;
inboundAudio: boolean ;
sessionTtsAuto?: TtsAutoMode;
ttsChannel?: string;
suppressUserDelivery?: boolean ;
shouldRouteToOriginating: boolean ;
originatingChannel?: string;
originatingTo?: string;
shouldSendToolSummaries: boolean ;
bypassForCommand: boolean ;
onReplyStart?: () => Promise<void > | void ;
recordProcessed: DispatchProcessedRecorder;
markIdle: (reason: string) => void ;
}): Promise<AcpDispatchAttemptResult | null > {
const sessionKey = normalizeOptionalString(params.sessionKey);
if (!sessionKey || params.bypassForCommand) {
return null ;
}
const { getAcpSessionManager } = await loadDispatchAcpManagerRuntime();
const acpManager = getAcpSessionManager();
const acpResolution = acpManager.resolveSession({
cfg: params.cfg,
sessionKey,
});
if (acpResolution.kind === "none" ) {
return null ;
}
const canonicalSessionKey = acpResolution.sessionKey;
let queuedFinal = false ;
const delivery = createAcpDispatchDeliveryCoordinator({
cfg: params.cfg,
ctx: params.ctx,
dispatcher: params.dispatcher,
inboundAudio: params.inboundAudio,
sessionKey: canonicalSessionKey,
sessionTtsAuto: params.sessionTtsAuto,
ttsChannel: params.ttsChannel,
suppressUserDelivery: params.suppressUserDelivery,
shouldRouteToOriginating: params.shouldRouteToOriginating,
originatingChannel: params.originatingChannel,
originatingTo: params.originatingTo,
onReplyStart: params.onReplyStart,
});
const identityPendingBeforeTurn = isSessionIdentityPending(
resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined),
);
const shouldEmitResolvedIdentityNotice =
!params.suppressUserDelivery &&
identityPendingBeforeTurn &&
(Boolean (
params.ctx.MessageThreadId != null &&
(normalizeOptionalString(String(params.ctx.MessageThreadId)) ?? "" ),
) ||
(await hasBoundConversationForSession({
cfg: params.cfg,
sessionKey: canonicalSessionKey,
channelRaw: params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider,
accountIdRaw: params.ctx.AccountId,
})));
const resolvedAcpAgent =
acpResolution.kind === "ready"
? (normalizeOptionalString(acpResolution.meta.agent) ??
normalizeOptionalString(params.cfg.acp?.defaultAgent) ??
resolveAgentIdFromSessionKey(canonicalSessionKey))
: resolveAgentIdFromSessionKey(canonicalSessionKey);
const normalizedDispatchChannel = normalizeOptionalLowercaseString(
params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider,
);
const explicitDispatchAccountId = normalizeOptionalString(params.ctx.AccountId);
const dispatchChannels = params.cfg.channels as
| Record<string, { defaultAccount?: unknown } | undefined>
| undefined;
const defaultDispatchAccount =
normalizedDispatchChannel == null
? undefined
: dispatchChannels?.[normalizedDispatchChannel]?.defaultAccount;
const effectiveDispatchAccountId =
explicitDispatchAccountId ?? normalizeOptionalString(defaultDispatchAccount);
const projector = createAcpReplyProjector({
cfg: params.cfg,
shouldSendToolSummaries: params.shouldSendToolSummaries,
deliver: delivery.deliver,
provider: params.ctx.Surface ?? params.ctx.Provider,
accountId: effectiveDispatchAccountId,
});
const acpDispatchStartedAt = Date.now();
try {
const dispatchPolicyError = resolveAcpDispatchPolicyError(params.cfg);
if (dispatchPolicyError) {
throw dispatchPolicyError;
}
if (acpResolution.kind === "stale" ) {
await maybeUnbindStaleBoundConversations({
targetSessionKey: canonicalSessionKey,
error: acpResolution.error,
});
const delivered = await delivery.deliver("final" , {
text: formatAcpRuntimeErrorText(acpResolution.error),
isError: true ,
});
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=error code=${acpResolution.error.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed" , {
reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpResolution.error.code)}`,
});
params.markIdle("message_completed" );
return { queuedFinal: delivered, counts };
}
const agentPolicyError = resolveAcpAgentPolicyError(params.cfg, resolvedAcpAgent);
if (agentPolicyError) {
throw agentPolicyError;
}
if (hasInboundMedia(params.ctx) && !params.ctx.MediaUnderstanding?.length) {
try {
const { applyMediaUnderstanding } = await loadDispatchAcpMediaRuntime();
await applyMediaUnderstanding({
ctx: params.ctx,
cfg: params.cfg,
});
} catch (err) {
logVerbose(
`dispatch-acp: media understanding failed, proceeding with raw content: ${formatErrorMessage(err)}`,
);
}
}
const promptText = resolveAcpPromptText(params.ctx);
const mediaAttachments = hasInboundMedia(params.ctx)
? await resolveAcpAttachments({ ctx: params.ctx, cfg: params.cfg })
: [];
const attachments =
mediaAttachments.length > 0
? mediaAttachments
: resolveAcpInlineImageAttachments(params.images);
if (!promptText && attachments.length === 0 ) {
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
params.recordProcessed("completed" , { reason: "acp_empty_prompt" });
params.markIdle("message_completed" );
return { queuedFinal: false , counts };
}
try {
await delivery.startReplyLifecycle();
} catch (error) {
logVerbose(`dispatch-acp: start reply lifecycle failed: ${formatErrorMessage(error)}`);
}
await acpManager.runTurn({
cfg: params.cfg,
sessionKey: canonicalSessionKey,
text: promptText,
attachments: attachments.length > 0 ? attachments : undefined,
mode: "prompt" ,
requestId: resolveAcpRequestId(params.ctx),
...(params.abortSignal ? { signal: params.abortSignal } : {}),
onEvent: async (event) => await projector.onEvent(event),
});
await projector.flush(true );
queuedFinal =
(await finalizeAcpTurnOutput({
cfg: params.cfg,
sessionKey: canonicalSessionKey,
delivery,
inboundAudio: params.inboundAudio,
sessionTtsAuto: params.sessionTtsAuto,
ttsChannel: params.ttsChannel,
shouldEmitResolvedIdentityNotice,
})) || queuedFinal;
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
const runId = normalizeOptionalString(params.runId);
if (runId) {
emitAgentEvent({
runId,
sessionKey,
stream: "lifecycle" ,
data: {
phase: "end" ,
startedAt: acpDispatchStartedAt,
endedAt: Date.now(),
},
});
}
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=ok latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed" , { reason: "acp_dispatch" });
params.markIdle("message_completed" );
return { queuedFinal, counts };
} catch (err) {
await projector.flush(true );
const acpError = toAcpRuntimeError({
error: err,
fallbackCode: "ACP_TURN_FAILED" ,
fallbackMessage: "ACP turn failed before completion." ,
});
await maybeUnbindStaleBoundConversations({
targetSessionKey: canonicalSessionKey,
error: acpError,
});
const delivered = await delivery.deliver("final" , {
text: formatAcpRuntimeErrorText(acpError),
isError: true ,
});
queuedFinal = queuedFinal || delivered;
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
const runId = normalizeOptionalString(params.runId);
if (runId) {
emitAgentEvent({
runId,
sessionKey,
stream: "lifecycle" ,
data: {
phase: "error" ,
startedAt: acpDispatchStartedAt,
endedAt: Date.now(),
error: acpError.message,
},
});
}
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=error code=${acpError.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed" , {
reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpError.code)}`,
});
params.markIdle("message_completed" );
return { queuedFinal, counts };
}
}
Messung V0.5 in Prozent C=97 H=97 G=96
¤ Dauer der Verarbeitung: 0.10 Sekunden
(vorverarbeitet am 2026-06-10)
¤
*© Formatika GbR, Deutschland