import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload" ;
import { isParentOwnedBackgroundAcpSession } from "../../acp/session-interaction-mode.js" ;
import {
resolveAgentConfig,
resolveAgentWorkspaceDir,
resolveSessionAgentId,
} from "../../agents/agent-scope.js" ;
import {
resolveConversationBindingRecord,
touchConversationBindingRecord,
} from "../../bindings/records.js" ;
import { normalizeChatType } from "../../channels/chat-type.js" ;
import { shouldSuppressLocalExecApprovalPrompt } from "../../channels/plugins/exec-approval-local.js" ;
import { parseSessionThreadInfoFast } from "../../config/sessions/thread-info.js" ;
import type { SessionEntry } from "../../config/sessions/types.js" ;
import type { OpenClawConfig } from "../../config/types.openclaw.js" ;
import { logVerbose } from "../../globals.js" ;
import { fireAndForgetHook } from "../../hooks/fire-and-forget.js" ;
import {
deriveInboundMessageHookContext,
toPluginInboundClaimContext,
toPluginInboundClaimEvent,
toInternalMessageReceivedContext,
toPluginMessageContext,
toPluginMessageReceivedEvent,
} from "../../hooks/message-hook-mappers.js" ;
import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js" ;
import { formatErrorMessage } from "../../infra/errors.js" ;
import {
logMessageProcessed,
logMessageQueued,
logSessionStateChange,
} from "../../logging/diagnostic.js" ;
import {
buildPluginBindingDeclinedText,
buildPluginBindingErrorText,
buildPluginBindingUnavailableText,
hasShownPluginBindingFallbackNotice,
isPluginOwnedSessionBindingRecord,
markPluginBindingFallbackNoticeShown,
toPluginConversationBinding,
} from "../../plugins/conversation-binding.js" ;
import { getGlobalHookRunner, getGlobalPluginRegistry } from "../../plugins/hook-runner-global.js" ;
import { resolveSendPolicy } from "../../sessions/send-policy.js" ;
import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js" ;
import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
} from "../../shared/string-coerce.js" ;
import {
normalizeTtsAutoMode,
resolveConfiguredTtsMode,
shouldAttemptTtsPayload,
} from "../../tts/tts-config.js" ;
import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js" ;
import type { BlockReplyContext } from "../get-reply-options.types.js" ;
import { getReplyPayloadMetadata, type ReplyPayload } from "../reply-payload.js" ;
import type { FinalizedMsgContext } from "../templating.js" ;
import { normalizeVerboseLevel } from "../thinking.js" ;
import {
createInternalHookEvent,
loadSessionStore,
resolveSessionStoreEntry,
resolveStorePath,
triggerInternalHook,
} from "./dispatch-from-config.runtime.js" ;
import type {
DispatchFromConfigParams,
DispatchFromConfigResult,
} from "./dispatch-from-config.types.js" ;
import { resolveEffectiveReplyRoute } from "./effective-reply-route.js" ;
import { claimInboundDedupe, commitInboundDedupe, releaseInboundDedupe } from "./inbound-dedupe.js" ;
import { resolveReplyRoutingDecision } from "./routing-policy.js" ;
import { resolveRunTypingPolicy } from "./typing-policy.js" ;
let routeReplyRuntimePromise: Promise<typeof import ("./route-reply.runtime.js" )> | null = null ;
let getReplyFromConfigRuntimePromise: Promise<
typeof import ("./get-reply-from-config.runtime.js" )
> | null = null ;
let abortRuntimePromise: Promise<typeof import ("./abort.runtime.js" )> | null = null ;
let ttsRuntimePromise: Promise<typeof import ("../../tts/tts.runtime.js" )> | null = null ;
let runtimePluginsPromise: Promise<typeof import ("../../agents/runtime-plugins.js" )> | null = null ;
let replyMediaPathsRuntimePromise: Promise<typeof import ("./reply-media-paths.runtime.js" )> | null =
null ;
function loadRouteReplyRuntime() {
routeReplyRuntimePromise ??= import ("./route-reply.runtime.js" );
return routeReplyRuntimePromise;
}
function loadGetReplyFromConfigRuntime() {
getReplyFromConfigRuntimePromise ??= import ("./get-reply-from-config.runtime.js" );
return getReplyFromConfigRuntimePromise;
}
function loadAbortRuntime() {
abortRuntimePromise ??= import ("./abort.runtime.js" );
return abortRuntimePromise;
}
function loadTtsRuntime() {
ttsRuntimePromise ??= import ("../../tts/tts.runtime.js" );
return ttsRuntimePromise;
}
function loadRuntimePlugins() {
runtimePluginsPromise ??= import ("../../agents/runtime-plugins.js" );
return runtimePluginsPromise;
}
function loadReplyMediaPathsRuntime() {
replyMediaPathsRuntimePromise ??= import ("./reply-media-paths.runtime.js" );
return replyMediaPathsRuntimePromise;
}
async function maybeApplyTtsToReplyPayload(
params: Parameters<Awaited<ReturnType<typeof loadTtsRuntime>>["maybeApplyTtsToPayload" ]>[0 ],
) {
if (!shouldAttemptTtsPayload({ cfg: params.cfg, ttsAuto: params.ttsAuto })) {
return params.payload;
}
const { maybeApplyTtsToPayload } = await loadTtsRuntime();
return maybeApplyTtsToPayload(params);
}
const AUDIO_PLACEHOLDER_RE = /^<media:audio>(\s*\([^)]*\))?$/i;
const AUDIO_HEADER_RE = /^\[Audio\b/i;
const normalizeMediaType = (value: string): string =>
normalizeOptionalLowercaseString(value.split(";" )[0 ]) ?? "" ;
const isInboundAudioContext = (ctx: FinalizedMsgContext): boolean => {
const rawTypes = [
typeof ctx.MediaType === "string" ? ctx.MediaType : undefined,
...(Array.isArray(ctx.MediaTypes) ? ctx.MediaTypes : []),
].filter(Boolean ) as string[];
const types = rawTypes.map((type) => normalizeMediaType(type));
if (types.some((type) => type === "audio" || type.startsWith("audio/" ))) {
return true ;
}
const body =
typeof ctx.BodyForCommands === "string"
? ctx.BodyForCommands
: typeof ctx.CommandBody === "string"
? ctx.CommandBody
: typeof ctx.RawBody === "string"
? ctx.RawBody
: typeof ctx.Body === "string"
? ctx.Body
: "" ;
const trimmed = body.trim();
if (!trimmed) {
return false ;
}
if (AUDIO_PLACEHOLDER_RE.test(trimmed)) {
return true ;
}
return AUDIO_HEADER_RE.test(trimmed);
};
const resolveRoutedPolicyConversationType = (
ctx: FinalizedMsgContext,
): "direct" | "group" | undefined => {
if (
ctx.CommandSource === "native" &&
ctx.CommandTargetSessionKey &&
ctx.CommandTargetSessionKey !== ctx.SessionKey
) {
return undefined;
}
const chatType = normalizeChatType(ctx.ChatType);
if (chatType === "direct" ) {
return "direct" ;
}
if (chatType === "group" || chatType === "channel" ) {
return "group" ;
}
return undefined;
};
const resolveSessionStoreLookup = (
ctx: FinalizedMsgContext,
cfg: OpenClawConfig,
): {
sessionKey?: string;
storePath?: string;
entry?: SessionEntry;
} => {
const targetSessionKey =
ctx.CommandSource === "native"
? normalizeOptionalString(ctx.CommandTargetSessionKey)
: undefined;
const sessionKey = normalizeOptionalString(targetSessionKey ?? ctx.SessionKey);
if (!sessionKey) {
return {};
}
const agentId = resolveSessionAgentId({ sessionKey, config: cfg });
const storePath = resolveStorePath(cfg.session?.store, { agentId });
try {
const store = loadSessionStore(storePath);
return {
sessionKey,
storePath,
entry: resolveSessionStoreEntry({ store, sessionKey }).existing,
};
} catch {
return {
sessionKey,
storePath,
};
}
};
const createShouldEmitVerboseProgress = (params: {
sessionKey?: string;
storePath?: string;
fallbackLevel: string;
}) => {
return () => {
if (params.sessionKey && params.storePath) {
try {
const store = loadSessionStore(params.storePath);
const entry = resolveSessionStoreEntry({ store, sessionKey: params.sessionKey }).existing;
const currentLevel = normalizeVerboseLevel(entry?.verboseLevel ?? "" );
if (currentLevel) {
return currentLevel !== "off" ;
}
} catch {
// Ignore transient store read failures and fall back to the current dispatch snapshot.
}
}
return params.fallbackLevel !== "off" ;
};
};
export type {
DispatchFromConfigParams,
DispatchFromConfigResult,
} from "./dispatch-from-config.types.js" ;
export async function dispatchReplyFromConfig(
params: DispatchFromConfigParams,
): Promise<DispatchFromConfigResult> {
const { ctx, cfg, dispatcher } = params;
const diagnosticsEnabled = isDiagnosticsEnabled(cfg);
const channel = normalizeLowercaseStringOrEmpty(ctx.Surface ?? ctx.Provider ?? "unknown" );
const chatId = ctx.To ?? ctx.From;
const messageId = ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
const sessionKey = ctx.SessionKey;
const startTime = diagnosticsEnabled ? Date.now() : 0 ;
const canTrackSession = diagnosticsEnabled && Boolean (sessionKey);
const recordProcessed = (
outcome: "completed" | "skipped" | "error" ,
opts?: {
reason?: string;
error?: string;
},
) => {
if (!diagnosticsEnabled) {
return ;
}
logMessageProcessed({
channel,
chatId,
messageId,
sessionKey,
durationMs: Date.now() - startTime,
outcome,
reason: opts?.reason,
error: opts?.error,
});
};
const markProcessing = () => {
if (!canTrackSession || !sessionKey) {
return ;
}
logMessageQueued({ sessionKey, channel, source: "dispatch" });
logSessionStateChange({
sessionKey,
state: "processing" ,
reason: "message_start" ,
});
};
const markIdle = (reason: string) => {
if (!canTrackSession || !sessionKey) {
return ;
}
logSessionStateChange({
sessionKey,
state: "idle" ,
reason,
});
};
const inboundDedupeClaim = claimInboundDedupe(ctx);
if (inboundDedupeClaim.status === "duplicate" || inboundDedupeClaim.status === "inflight" ) {
recordProcessed("skipped" , { reason: "duplicate" });
return { queuedFinal: false , counts: dispatcher.getQueuedCounts() };
}
const sessionStoreEntry = resolveSessionStoreLookup(ctx, cfg);
const acpDispatchSessionKey = sessionStoreEntry.sessionKey ?? sessionKey;
const sessionAgentId = resolveSessionAgentId({ sessionKey: acpDispatchSessionKey, config: cfg });
const sessionAgentCfg = resolveAgentConfig(cfg, sessionAgentId);
const shouldEmitVerboseProgress = createShouldEmitVerboseProgress({
sessionKey: acpDispatchSessionKey,
storePath: sessionStoreEntry.storePath,
fallbackLevel:
normalizeVerboseLevel(
sessionStoreEntry.entry?.verboseLevel ??
sessionAgentCfg?.verboseDefault ??
cfg.agents?.defaults?.verboseDefault ??
"" ,
) ?? "off" ,
});
const replyRoute = resolveEffectiveReplyRoute({ ctx, entry: sessionStoreEntry.entry });
// Restore route thread context only from the active turn or the thread-scoped session key.
// Do not read thread ids from the normalised session store here: `origin.threadId` can be
// folded back into lastThreadId/deliveryContext during store normalisation and resurrect a
// stale route after thread delivery was intentionally cleared.
const routeThreadId =
ctx.MessageThreadId ?? parseSessionThreadInfoFast(acpDispatchSessionKey).threadId;
const inboundAudio = isInboundAudioContext(ctx);
const sessionTtsAuto = normalizeTtsAutoMode(sessionStoreEntry.entry?.ttsAuto);
const workspaceDir = resolveAgentWorkspaceDir(cfg, sessionAgentId);
const { ensureRuntimePluginsLoaded } = await loadRuntimePlugins();
ensureRuntimePluginsLoaded({ config: cfg, workspaceDir });
const hookRunner = getGlobalHookRunner();
// Extract message context for hooks (plugin and internal)
const timestamp =
typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined;
const messageIdForHook =
ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
const hookContext = deriveInboundMessageHookContext(ctx, { messageId: messageIdForHook });
const { isGroup, groupId } = hookContext;
const inboundClaimContext = toPluginInboundClaimContext(hookContext);
const inboundClaimEvent = toPluginInboundClaimEvent(hookContext, {
commandAuthorized:
typeof ctx.CommandAuthorized === "boolean" ? ctx.CommandAuthorized : undefined,
wasMentioned: typeof ctx.WasMentioned === "boolean" ? ctx.WasMentioned : undefined,
});
// Check if we should route replies to originating channel instead of dispatcher.
// Only route when the originating channel is DIFFERENT from the current surface.
// This handles cross-provider routing (e.g., message from Telegram being processed
// by a shared session that's currently on Slack) while preserving normal dispatcher
// flow when the provider handles its own messages.
//
// Debug: `pnpm test src/auto-reply/reply/dispatch-from-config.test.ts`
const suppressAcpChildUserDelivery = isParentOwnedBackgroundAcpSession(sessionStoreEntry.entry);
const normalizedRouteReplyChannel = normalizeMessageChannel(replyRoute.channel);
const normalizedProviderChannel = normalizeMessageChannel(ctx.Provider);
const normalizedSurfaceChannel = normalizeMessageChannel(ctx.Surface);
const normalizedCurrentSurface = normalizedProviderChannel ?? normalizedSurfaceChannel;
const isInternalWebchatTurn =
normalizedCurrentSurface === INTERNAL_MESSAGE_CHANNEL &&
(normalizedSurfaceChannel === INTERNAL_MESSAGE_CHANNEL || !normalizedSurfaceChannel) &&
ctx.ExplicitDeliverRoute !== true ;
const hasRouteReplyCandidate = Boolean (
!suppressAcpChildUserDelivery &&
!isInternalWebchatTurn &&
normalizedRouteReplyChannel &&
replyRoute.to &&
normalizedRouteReplyChannel !== normalizedCurrentSurface,
);
const routeReplyRuntime = hasRouteReplyCandidate ? await loadRouteReplyRuntime() : undefined;
const {
originatingChannel: routeReplyChannel,
currentSurface,
shouldRouteToOriginating,
shouldSuppressTyping,
} = resolveReplyRoutingDecision({
provider: ctx.Provider,
surface: ctx.Surface,
explicitDeliverRoute: ctx.ExplicitDeliverRoute,
originatingChannel: replyRoute.channel,
originatingTo: replyRoute.to,
suppressDirectUserDelivery: suppressAcpChildUserDelivery,
isRoutableChannel: routeReplyRuntime?.isRoutableChannel ?? (() => false ),
});
const routeReplyTo = replyRoute.to;
const deliveryChannel = shouldRouteToOriginating ? routeReplyChannel : currentSurface;
const { createReplyMediaPathNormalizer } = await loadReplyMediaPathsRuntime();
const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({
cfg,
sessionKey: acpDispatchSessionKey,
workspaceDir,
messageProvider: deliveryChannel,
accountId: replyRoute.accountId,
groupId,
groupChannel: ctx.GroupChannel,
groupSpace: ctx.GroupSpace,
requesterSenderId: ctx.SenderId,
requesterSenderName: ctx.SenderName,
requesterSenderUsername: ctx.SenderUsername,
requesterSenderE164: ctx.SenderE164,
});
const normalizeReplyMediaPayload = async (payload: ReplyPayload): Promise<ReplyPayload> => {
if (!resolveSendableOutboundReplyParts(payload).hasMedia) {
return payload;
}
return await normalizeReplyMediaPaths(payload);
};
const routeReplyToOriginating = async (
payload: ReplyPayload,
options?: { abortSignal?: AbortSignal; mirror?: boolean },
) => {
if (!shouldRouteToOriginating || !routeReplyChannel || !routeReplyTo || !routeReplyRuntime) {
return null ;
}
return await routeReplyRuntime.routeReply({
payload,
channel: routeReplyChannel,
to: routeReplyTo,
sessionKey: ctx.SessionKey,
policySessionKey:
ctx.CommandSource === "native"
? (ctx.CommandTargetSessionKey ?? ctx.SessionKey)
: ctx.SessionKey,
policyConversationType: resolveRoutedPolicyConversationType(ctx),
accountId: replyRoute.accountId,
requesterSenderId: ctx.SenderId,
requesterSenderName: ctx.SenderName,
requesterSenderUsername: ctx.SenderUsername,
requesterSenderE164: ctx.SenderE164,
threadId: routeThreadId,
cfg,
abortSignal: options?.abortSignal,
mirror: options?.mirror,
isGroup,
groupId,
});
};
/**
* Helper to send a payload via route - reply ( async ) .
* Only used when actually routing to a different provider .
* Note : Only called when shouldRouteToOriginating is true , so
* routeReplyChannel and routeReplyTo are guaranteed to be defined .
*/
const sendPayloadAsync = async (
payload: ReplyPayload,
abortSignal?: AbortSignal,
mirror?: boolean ,
): Promise<void > => {
// Keep the runtime guard explicit because this helper is called from nested
// reply callbacks where TypeScript cannot narrow shouldRouteToOriginating.
if (!routeReplyRuntime || !routeReplyChannel || !routeReplyTo) {
return ;
}
if (abortSignal?.aborted) {
return ;
}
const result = await routeReplyToOriginating(payload, {
abortSignal,
mirror,
});
if (result && !result.ok) {
logVerbose(`dispatch-from-config: route-reply failed: ${result.error ?? "unknown error" }`);
}
};
const sendBindingNotice = async (
payload: ReplyPayload,
mode: "additive" | "terminal" ,
): Promise<boolean > => {
const result = await routeReplyToOriginating(payload);
if (result) {
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (plugin binding notice) failed: ${result.error ?? "unknown error" }`,
);
}
return result.ok;
}
return mode === "additive"
? dispatcher.sendToolResult(payload)
: dispatcher.sendFinalReply(payload);
};
const pluginOwnedBindingRecord =
inboundClaimContext.conversationId && inboundClaimContext.channelId
? resolveConversationBindingRecord({
channel: inboundClaimContext.channelId,
accountId:
inboundClaimContext.accountId ??
((
cfg.channels as Record<string, { defaultAccount?: unknown } | undefined> | undefined
)?.[inboundClaimContext.channelId]?.defaultAccount as string | undefined) ??
"default" ,
conversationId: inboundClaimContext.conversationId,
parentConversationId: inboundClaimContext.parentConversationId,
})
: null ;
const pluginOwnedBinding = isPluginOwnedSessionBindingRecord(pluginOwnedBindingRecord)
? toPluginConversationBinding(pluginOwnedBindingRecord)
: null ;
// Resolve sendPolicy early so every outbound path below (plugin-binding
// notices, fast-abort, normal dispatch) honors suppressDelivery. Under
// sendPolicy: "deny" the agent still processes inbound, but no outbound
// reply/notice/indicator is allowed. See #53328.
const sendPolicy = resolveSendPolicy({
cfg,
entry: sessionStoreEntry.entry,
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
channel:
(shouldRouteToOriginating ? routeReplyChannel : undefined) ??
sessionStoreEntry.entry?.channel ??
replyRoute.channel ??
ctx.Surface ??
ctx.Provider ??
undefined,
chatType: sessionStoreEntry.entry?.chatType,
});
const suppressDelivery = sendPolicy === "deny" ;
const suppressHookUserDelivery = suppressAcpChildUserDelivery || suppressDelivery;
let pluginFallbackReason:
| "plugin-bound-fallback-missing-plugin"
| "plugin-bound-fallback-no-handler"
| undefined;
if (pluginOwnedBinding) {
touchConversationBindingRecord(pluginOwnedBinding.bindingId);
if (suppressDelivery) {
// Plugin-bound inbound handlers typically emit outbound replies we
// cannot rewind. Under deny, skip the plugin claim entirely and fall
// through to normal (suppressed) agent processing so no delivery leaks
// via the plugin path. See #53328.
logVerbose(
`plugin-bound inbound skipped under sendPolicy: deny (plugin=${pluginOwnedBinding.pluginId} session=${sessionKey ?? "unknown" }); falling through to suppressed agent processing`,
);
} else {
logVerbose(
`plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`,
);
const targetedClaimOutcome = hookRunner?.runInboundClaimForPluginOutcome
? await hookRunner.runInboundClaimForPluginOutcome(
pluginOwnedBinding.pluginId,
inboundClaimEvent,
{ ...inboundClaimContext, pluginBinding: pluginOwnedBinding },
)
: (() => {
const pluginLoaded =
getGlobalPluginRegistry()?.plugins.some(
(plugin) => plugin.id === pluginOwnedBinding.pluginId && plugin.status === "loaded" ,
) ?? false ;
return pluginLoaded
? ({ status: "no_handler" } as const )
: ({ status: "missing_plugin" } as const );
})();
switch (targetedClaimOutcome.status) {
case "handled" : {
if (targetedClaimOutcome.result.reply) {
await sendBindingNotice(targetedClaimOutcome.result.reply, "terminal" );
}
markIdle("plugin_binding_dispatch" );
recordProcessed("completed" , { reason: "plugin-bound-handled" });
return { queuedFinal: false , counts: dispatcher.getQueuedCounts() };
}
case "missing_plugin" :
case "no_handler" : {
pluginFallbackReason =
targetedClaimOutcome.status === "missing_plugin"
? "plugin-bound-fallback-missing-plugin"
: "plugin-bound-fallback-no-handler" ;
if (!hasShownPluginBindingFallbackNotice(pluginOwnedBinding.bindingId)) {
const didSendNotice = await sendBindingNotice(
{ text: buildPluginBindingUnavailableText(pluginOwnedBinding) },
"additive" ,
);
if (didSendNotice) {
markPluginBindingFallbackNoticeShown(pluginOwnedBinding.bindingId);
}
}
break ;
}
case "declined" : {
await sendBindingNotice(
{ text: buildPluginBindingDeclinedText(pluginOwnedBinding) },
"terminal" ,
);
markIdle("plugin_binding_declined" );
recordProcessed("completed" , { reason: "plugin-bound-declined" });
return { queuedFinal: false , counts: dispatcher.getQueuedCounts() };
}
case "error" : {
logVerbose(
`plugin-bound inbound claim failed for ${pluginOwnedBinding.pluginId}: ${targetedClaimOutcome.error}`,
);
await sendBindingNotice(
{ text: buildPluginBindingErrorText(pluginOwnedBinding) },
"terminal" ,
);
markIdle("plugin_binding_error" );
recordProcessed("completed" , { reason: "plugin-bound-error" });
return { queuedFinal: false , counts: dispatcher.getQueuedCounts() };
}
}
}
}
// Trigger plugin hooks (fire-and-forget)
if (hookRunner?.hasHooks("message_received" )) {
fireAndForgetHook(
hookRunner.runMessageReceived(
toPluginMessageReceivedEvent(hookContext),
toPluginMessageContext(hookContext),
),
"dispatch-from-config: message_received plugin hook failed" ,
);
}
// Bridge to internal hooks (HOOK.md discovery system) - refs #8807
if (sessionKey) {
fireAndForgetHook(
triggerInternalHook(
createInternalHookEvent("message" , "received" , sessionKey, {
...toInternalMessageReceivedContext(hookContext),
timestamp,
}),
),
"dispatch-from-config: message_received internal hook failed" ,
);
}
markProcessing();
try {
const abortRuntime = params.fastAbortResolver ? null : await loadAbortRuntime();
const fastAbortResolver = params.fastAbortResolver ?? abortRuntime?.tryFastAbortFromMessage;
const formatAbortReplyTextResolver =
params.formatAbortReplyTextResolver ?? abortRuntime?.formatAbortReplyText;
if (!fastAbortResolver || !formatAbortReplyTextResolver) {
throw new Error("abort runtime unavailable" );
}
const fastAbort = await fastAbortResolver({ ctx, cfg });
if (fastAbort.handled) {
let queuedFinal = false ;
let routedFinalCount = 0 ;
if (!suppressDelivery) {
const payload = {
text: formatAbortReplyTextResolver(fastAbort.stoppedSubagents),
} satisfies ReplyPayload;
const result = await routeReplyToOriginating(payload);
if (result) {
queuedFinal = result.ok;
if (result.ok) {
routedFinalCount += 1 ;
}
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error" }`,
);
}
} else {
queuedFinal = dispatcher.sendFinalReply(payload);
}
} else {
logVerbose(
`dispatch-from-config: fast_abort reply suppressed by sendPolicy: deny (session=${sessionKey ?? "unknown" })`,
);
}
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
recordProcessed("completed" , { reason: "fast_abort" });
markIdle("message_completed" );
return { queuedFinal, counts };
}
const isSlackNonDirectSurface =
(ctx.Surface === "slack" || ctx.Provider === "slack" ) && ctx.ChatType !== "direct" ;
const shouldSendVerboseProgressMessages =
!isSlackNonDirectSurface && (ctx.ChatType !== "group" || ctx.IsForum === true );
const shouldSendToolSummaries = shouldSendVerboseProgressMessages;
const shouldSendToolStartStatuses = shouldSendVerboseProgressMessages;
const sendFinalPayload = async (
payload: ReplyPayload,
): Promise<{ queuedFinal: boolean ; routedFinalCount: number }> => {
const ttsPayload = await maybeApplyTtsToReplyPayload({
payload,
cfg,
channel: deliveryChannel,
kind: "final" ,
inboundAudio,
ttsAuto: sessionTtsAuto,
});
const normalizedPayload = await normalizeReplyMediaPayload(ttsPayload);
const result = await routeReplyToOriginating(normalizedPayload);
if (result) {
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (final ) failed: ${result.error ?? "unknown error" }`,
);
}
return {
queuedFinal: result.ok,
routedFinalCount: result.ok ? 1 : 0 ,
};
}
return {
queuedFinal: dispatcher.sendFinalReply(normalizedPayload),
routedFinalCount: 0 ,
};
};
// Run before_dispatch hook — let plugins inspect or handle before model dispatch.
if (hookRunner?.hasHooks("before_dispatch" )) {
const beforeDispatchResult = await hookRunner.runBeforeDispatch(
{
content: hookContext.content,
body: hookContext.bodyForAgent ?? hookContext.body,
channel: hookContext.channelId,
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
senderId: hookContext.senderId,
isGroup: hookContext.isGroup,
timestamp: hookContext.timestamp,
},
{
channelId: hookContext.channelId,
accountId: hookContext.accountId,
conversationId: inboundClaimContext.conversationId,
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
senderId: hookContext.senderId,
},
);
if (beforeDispatchResult?.handled) {
const text = beforeDispatchResult.text;
let queuedFinal = false ;
let routedFinalCount = 0 ;
if (text && !suppressDelivery) {
const handledReply = await sendFinalPayload({ text });
queuedFinal = handledReply.queuedFinal;
routedFinalCount += handledReply.routedFinalCount;
}
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
recordProcessed("completed" , { reason: "before_dispatch_handled" });
markIdle("message_completed" );
return { queuedFinal, counts };
}
}
if (hookRunner?.hasHooks("reply_dispatch" )) {
const replyDispatchResult = await hookRunner.runReplyDispatch(
{
ctx,
runId: params.replyOptions?.runId,
sessionKey: acpDispatchSessionKey,
images: params.replyOptions?.images,
inboundAudio,
sessionTtsAuto,
ttsChannel: deliveryChannel,
suppressUserDelivery: suppressHookUserDelivery,
shouldRouteToOriginating,
originatingChannel: routeReplyChannel,
originatingTo: routeReplyTo,
shouldSendToolSummaries,
sendPolicy,
},
{
cfg,
dispatcher,
abortSignal: params.replyOptions?.abortSignal,
onReplyStart: params.replyOptions?.onReplyStart,
recordProcessed,
markIdle,
},
);
if (replyDispatchResult?.handled) {
return {
queuedFinal: replyDispatchResult.queuedFinal,
counts: replyDispatchResult.counts,
};
}
}
// When sendPolicy is "deny", we still let the agent process the inbound message
// (context, memory, tool calls) but suppress all outbound delivery.
if (suppressDelivery) {
logVerbose(
`Delivery suppressed by send policy for session ${sessionStoreEntry.sessionKey ?? sessionKey ?? "unknown" } — agent will still process the message`,
);
}
const toolStartStatusesSent = new Set<string>();
let toolStartStatusCount = 0 ;
const normalizeWorkingLabel = (label: string) => {
const collapsed = label.replace(/\s+/g, " " ).trim();
if (collapsed.length <= 80 ) {
return collapsed;
}
return `${collapsed.slice(0 , 77 ).trimEnd()}...`;
};
const formatPlanUpdateText = (payload: { explanation?: string; steps?: string[] }) => {
const explanation = payload.explanation?.replace(/\s+/g, " " ).trim();
const steps = (payload.steps ?? [])
.map((step) => step.replace(/\s+/g, " " ).trim())
.filter(Boolean );
const parts: string[] = [];
if (explanation) {
parts.push(explanation);
}
if (steps.length > 0 ) {
parts.push(steps.map((step, index) => `${index + 1 }. ${step}`).join("\n" ));
}
return parts.join("\n\n" ).trim() || "Planning next steps." ;
};
const maybeSendWorkingStatus = async (label: string): Promise<void > => {
if (suppressDelivery) {
return ;
}
const normalizedLabel = normalizeWorkingLabel(label);
if (
!shouldEmitVerboseProgress() ||
!shouldSendToolStartStatuses ||
!normalizedLabel ||
toolStartStatusCount >= 2 ||
toolStartStatusesSent.has(normalizedLabel)
) {
return ;
}
toolStartStatusesSent.add(normalizedLabel);
toolStartStatusCount += 1 ;
const payload: ReplyPayload = {
text: `Working: ${normalizedLabel}`,
};
if (shouldRouteToOriginating) {
await sendPayloadAsync(payload, undefined, false );
return ;
}
dispatcher.sendToolResult(payload);
};
const sendPlanUpdate = async (payload: {
explanation?: string;
steps?: string[];
}): Promise<void > => {
if (suppressDelivery || !shouldEmitVerboseProgress() || !shouldSendVerboseProgressMessages) {
return ;
}
const replyPayload: ReplyPayload = {
text: formatPlanUpdateText(payload),
};
if (shouldRouteToOriginating) {
await sendPayloadAsync(replyPayload, undefined, false );
return ;
}
dispatcher.sendToolResult(replyPayload);
};
const summarizeApprovalLabel = (payload: {
status?: string;
command?: string;
message?: string;
}) => {
if (payload.status === "pending" ) {
const command = normalizeOptionalString(payload.command);
if (command) {
return normalizeWorkingLabel(`awaiting approval: ${command}`);
}
return "awaiting approval" ;
}
if (payload.status === "unavailable" ) {
const message = normalizeOptionalString(payload.message);
if (message) {
return normalizeWorkingLabel(message);
}
return "approval unavailable" ;
}
return "" ;
};
const summarizePatchLabel = (payload: { summary?: string; title?: string }) => {
const summary = normalizeOptionalString(payload.summary);
if (summary) {
return normalizeWorkingLabel(summary);
}
const title = normalizeOptionalString(payload.title);
if (title) {
return normalizeWorkingLabel(title);
}
return "" ;
};
// Track accumulated block text for TTS generation after streaming completes.
// When block streaming succeeds, there's no final reply, so we need to generate
// TTS audio separately from the accumulated block content.
let accumulatedBlockText = "" ;
let blockCount = 0 ;
const resolveToolDeliveryPayload = (payload: ReplyPayload): ReplyPayload | null => {
if (
shouldSuppressLocalExecApprovalPrompt({
channel: normalizeMessageChannel(ctx.Surface ?? ctx.Provider),
cfg,
accountId: ctx.AccountId,
payload,
})
) {
return null ;
}
if (shouldSendToolSummaries) {
return payload;
}
const execApproval =
payload.channelData &&
typeof payload.channelData === "object" &&
!Array.isArray(payload.channelData)
? payload.channelData.execApproval
: undefined;
if (execApproval && typeof execApproval === "object" && !Array.isArray(execApproval)) {
return payload;
}
// Group/native flows intentionally suppress tool summary text, but media-only
// tool results (for example TTS audio) must still be delivered.
const hasMedia = resolveSendableOutboundReplyParts(payload).hasMedia;
if (!hasMedia) {
return null ;
}
return { ...payload, text: undefined };
};
const typing = resolveRunTypingPolicy({
requestedPolicy: params.replyOptions?.typingPolicy,
suppressTyping:
suppressDelivery || params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
originatingChannel: routeReplyChannel,
systemEvent: shouldRouteToOriginating,
});
const suppressDefaultToolProgressMessages =
params.replyOptions?.suppressDefaultToolProgressMessages === true ;
const onToolResultFromReplyOptions = params.replyOptions?.onToolResult;
const onPlanUpdateFromReplyOptions = params.replyOptions?.onPlanUpdate;
const onApprovalEventFromReplyOptions = params.replyOptions?.onApprovalEvent;
const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary;
const replyResolver =
params.replyResolver ?? (await loadGetReplyFromConfigRuntime()).getReplyFromConfig;
const replyResult = await replyResolver(
ctx,
{
...params.replyOptions,
typingPolicy: typing.typingPolicy,
suppressTyping: typing.suppressTyping,
onToolResult: (payload: ReplyPayload) => {
const run = async () => {
await onToolResultFromReplyOptions?.(payload);
if (suppressDelivery) {
return ;
}
const ttsPayload = await maybeApplyTtsToReplyPayload({
payload,
cfg,
channel: deliveryChannel,
kind: "tool" ,
inboundAudio,
ttsAuto: sessionTtsAuto,
});
const normalizedPayload = await normalizeReplyMediaPayload(ttsPayload);
const deliveryPayload = resolveToolDeliveryPayload(normalizedPayload);
if (!deliveryPayload) {
return ;
}
if (suppressDefaultToolProgressMessages) {
const hasMedia = resolveSendableOutboundReplyParts(deliveryPayload).hasMedia;
const execApproval =
deliveryPayload.channelData &&
typeof deliveryPayload.channelData === "object" &&
!Array.isArray(deliveryPayload.channelData)
? deliveryPayload.channelData.execApproval
: undefined;
const hasExecApproval =
execApproval && typeof execApproval === "object" && !Array.isArray(execApproval);
if (!hasMedia && !hasExecApproval && deliveryPayload.isError !== true ) {
return ;
}
}
if (shouldRouteToOriginating) {
await sendPayloadAsync(deliveryPayload, undefined, false );
} else {
dispatcher.sendToolResult(deliveryPayload);
}
};
return run();
},
onPlanUpdate: async (payload) => {
await onPlanUpdateFromReplyOptions?.(payload);
if (payload.phase !== "update" || suppressDefaultToolProgressMessages) {
return ;
}
await sendPlanUpdate({ explanation: payload.explanation, steps: payload.steps });
},
onApprovalEvent: async (payload) => {
await onApprovalEventFromReplyOptions?.(payload);
if (payload.phase !== "requested" || suppressDefaultToolProgressMessages) {
return ;
}
const label = summarizeApprovalLabel({
status: payload.status,
command: payload.command,
message: payload.message,
});
if (!label) {
return ;
}
await maybeSendWorkingStatus(label);
},
onPatchSummary: async (payload) => {
await onPatchSummaryFromReplyOptions?.(payload);
if (payload.phase !== "end" || suppressDefaultToolProgressMessages) {
return ;
}
const label = summarizePatchLabel({ summary: payload.summary, title: payload.title });
if (!label) {
return ;
}
await maybeSendWorkingStatus(label);
},
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => {
const run = async () => {
if (suppressDelivery) {
return ;
}
// Suppress reasoning payloads — channels using this generic dispatch
// path (WhatsApp, web, etc.) do not have a dedicated reasoning lane.
// Telegram has its own dispatch path that handles reasoning splitting.
if (payload.isReasoning === true ) {
return ;
}
// Accumulate block text for TTS generation after streaming.
// Exclude compaction status notices — they are informational UI
// signals and must not be synthesised into the spoken reply.
if (payload.text && !payload.isCompactionNotice) {
if (accumulatedBlockText.length > 0 ) {
accumulatedBlockText += "\n" ;
}
accumulatedBlockText += payload.text;
blockCount++;
}
// Channels that keep a live draft preview may need to rotate their
// preview state at the logical block boundary before queued block
// delivery drains asynchronously through the dispatcher.
const payloadMetadata = getReplyPayloadMetadata(payload);
const queuedContext =
payloadMetadata?.assistantMessageIndex !== undefined
? {
...context,
assistantMessageIndex: payloadMetadata.assistantMessageIndex,
}
: context;
await params.replyOptions?.onBlockReplyQueued?.(payload, queuedContext);
const ttsPayload = await maybeApplyTtsToReplyPayload({
payload,
cfg,
channel: deliveryChannel,
kind: "block" ,
inboundAudio,
ttsAuto: sessionTtsAuto,
});
const normalizedPayload = await normalizeReplyMediaPayload(ttsPayload);
if (shouldRouteToOriginating) {
await sendPayloadAsync(normalizedPayload, context?.abortSignal, false );
} else {
dispatcher.sendBlockReply(normalizedPayload);
}
};
return run();
},
},
params.configOverride,
);
if (ctx.AcpDispatchTailAfterReset === true ) {
// Command handling prepared a trailing prompt after ACP in-place reset.
// Route that tail through ACP now (same turn) instead of embedded dispatch.
ctx.AcpDispatchTailAfterReset = false ;
if (hookRunner?.hasHooks("reply_dispatch" )) {
const tailDispatchResult = await hookRunner.runReplyDispatch(
{
ctx,
runId: params.replyOptions?.runId,
sessionKey: acpDispatchSessionKey,
images: params.replyOptions?.images,
inboundAudio,
sessionTtsAuto,
ttsChannel: deliveryChannel,
suppressUserDelivery: suppressHookUserDelivery,
shouldRouteToOriginating,
originatingChannel: routeReplyChannel,
originatingTo: routeReplyTo,
shouldSendToolSummaries,
sendPolicy,
isTailDispatch: true ,
},
{
cfg,
dispatcher,
abortSignal: params.replyOptions?.abortSignal,
onReplyStart: params.replyOptions?.onReplyStart,
recordProcessed,
markIdle,
},
);
if (tailDispatchResult?.handled) {
return {
queuedFinal: tailDispatchResult.queuedFinal,
counts: tailDispatchResult.counts,
};
}
}
}
const replies = replyResult ? (Array.isArray(replyResult) ? replyResult : [replyResult]) : [];
let queuedFinal = false ;
let routedFinalCount = 0 ;
if (!suppressDelivery) {
for (const reply of replies) {
// Suppress reasoning payloads from channel delivery — channels using this
// generic dispatch path do not have a dedicated reasoning lane.
if (reply.isReasoning === true ) {
continue ;
}
const finalReply = await sendFinalPayload(reply);
queuedFinal = finalReply.queuedFinal || queuedFinal;
routedFinalCount += finalReply.routedFinalCount;
}
const ttsMode = resolveConfiguredTtsMode(cfg);
// Generate TTS-only reply after block streaming completes (when there's no final reply).
// This handles the case where block streaming succeeds and drops final payloads,
// but we still want TTS audio to be generated from the accumulated block content.
if (
ttsMode === "final" &&
replies.length === 0 &&
blockCount > 0 &&
accumulatedBlockText.trim()
) {
try {
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
payload: { text: accumulatedBlockText },
cfg,
channel: deliveryChannel,
kind: "final" ,
inboundAudio,
ttsAuto: sessionTtsAuto,
});
// Only send if TTS was actually applied (mediaUrl exists)
if (ttsSyntheticReply.mediaUrl) {
// Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content.
// Keep the spoken text only for hooks/archive consumers.
const ttsOnlyPayload: ReplyPayload = {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
spokenText: accumulatedBlockText,
};
const result = await routeReplyToOriginating(ttsOnlyPayload);
if (result) {
queuedFinal = result.ok || queuedFinal;
if (result.ok) {
routedFinalCount += 1 ;
}
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error" }`,
);
}
} else {
const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload);
queuedFinal = didQueue || queuedFinal;
}
}
} catch (err) {
logVerbose(
`dispatch-from-config: accumulated block TTS failed: ${formatErrorMessage(err)}`,
);
}
}
}
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
if (inboundDedupeClaim.status === "claimed" ) {
commitInboundDedupe(inboundDedupeClaim.key);
}
recordProcessed(
"completed" ,
pluginFallbackReason ? { reason: pluginFallbackReason } : undefined,
);
markIdle("message_completed" );
return { queuedFinal, counts };
} catch (err) {
if (inboundDedupeClaim.status === "claimed" ) {
releaseInboundDedupe(inboundDedupeClaim.key);
}
recordProcessed("error" , { error: String(err) });
markIdle("message_error" );
throw err;
}
}
Messung V0.5 in Prozent C=97 H=99 G=97
¤ Dauer der Verarbeitung: 0.14 Sekunden
(vorverarbeitet am 2026-06-10)
¤
*© Formatika GbR, Deutschland