import type { TypingCallbacks } from "../../channels/typing.js"; import { resolveSilentReplySettings } from "../../config/silent-reply.js"; import type { HumanDelayConfig } from "../../config/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { generateSecureInt } from "../../infra/secure-random.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import {
resolveSilentReplyRewriteText,
type SilentReplyConversationType,
} from "../../shared/silent-reply-policy.js"; import { sleep } from "../../utils.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { registerDispatcher } from "./dispatcher-registry.js"; import { normalizeReplyPayload, type NormalizeReplySkipReason } from "./normalize-reply.js"; import type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.types.js"; import type { ResponsePrefixContext } from "./response-prefix-template.js"; import type { TypingController } from "./typing.js";
export type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.types.js";
/** Generate a random delay within the configured range. */ function getHumanDelay(config: HumanDelayConfig | undefined): number { const mode = config?.mode ?? "off"; if (mode === "off") { return0;
} const min =
mode === "custom" ? (config?.minMs ?? DEFAULT_HUMAN_DELAY_MIN_MS) : DEFAULT_HUMAN_DELAY_MIN_MS; const max =
mode === "custom" ? (config?.maxMs ?? DEFAULT_HUMAN_DELAY_MAX_MS) : DEFAULT_HUMAN_DELAY_MAX_MS; if (max <= min) { return min;
} return min + generateSecureInt(max - min + 1);
}
export type ReplyDispatcherOptions = {
deliver: ReplyDispatchDeliverer;
silentReplyContext?: {
cfg?: OpenClawConfig;
sessionKey?: string;
surface?: string;
conversationType?: SilentReplyConversationType;
};
responsePrefix?: string;
transformReplyPayload?: (payload: ReplyPayload) => ReplyPayload | null; /** Static context for response prefix template interpolation. */
responsePrefixContext?: ResponsePrefixContext; /** Dynamic context provider for response prefix template interpolation.
* Called at normalization time, after model selection is complete. */
responsePrefixContextProvider?: () => ResponsePrefixContext;
onHeartbeatStrip?: () => void;
onIdle?: () => void;
onError?: ReplyDispatchErrorHandler; // AIDEV-NOTE: onSkip lets channels detect silent/empty drops (e.g. Telegram empty-response fallback).
onSkip?: ReplyDispatchSkipHandler; /** Human-like delay between block replies for natural rhythm. */
humanDelay?: HumanDelayConfig;
beforeDeliver?: ReplyDispatchBeforeDeliver;
};
export type ReplyDispatcherWithTypingOptions = Omit<ReplyDispatcherOptions, "onIdle"> & {
typingCallbacks?: TypingCallbacks;
onReplyStart?: () => Promise<void> | void;
onIdle?: () => void; /** Called when the typing controller is cleaned up (e.g., on NO_REPLY). */
onCleanup?: () => void;
};
type ReplyDispatcherWithTypingResult = {
dispatcher: ReplyDispatcher;
replyOptions: Pick<GetReplyOptions, "onReplyStart" | "onTypingController" | "onTypingCleanup">;
markDispatchIdle: () => void; /** Signal that the model run is complete so the typing controller can stop. */
markRunComplete: () => void;
};
export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDispatcher {
let sendChain: Promise<void> = Promise.resolve(); // Track in-flight deliveries so we can emit a reliable "idle" signal. // Start with pending=1 as a "reservation" to prevent premature gateway restart. // This is decremented when markComplete() is called to signal no more replies will come.
let pending = 1;
let completeCalled = false; // Track whether we've sent a block reply (for human delay - skip delay on first block).
let sentFirstBlock = false; // Serialize outbound replies to preserve tool/block/final order. const queuedCounts: Record<ReplyDispatchKind, number> = {
tool: 0,
block: 0, final: 0,
}; const failedCounts: Record<ReplyDispatchKind, number> = {
tool: 0,
block: 0, final: 0,
}; const cancelledCounts: Record<ReplyDispatchKind, number> = {
tool: 0,
block: 0, final: 0,
};
// Determine if we should add human-like delay (only for block replies after the first). const shouldDelay = kind === "block" && sentFirstBlock; if (kind === "block") {
sentFirstBlock = true;
}
sendChain = sendChain
.then(async () => { // Add human-like delay between block replies for natural rhythm. if (shouldDelay) { const delayMs = getHumanDelay(options.humanDelay); if (delayMs > 0) {
await sleep(delayMs);
}
}
let deliverPayload: ReplyPayload | null = normalized; if (options.beforeDeliver) {
deliverPayload = await options.beforeDeliver(normalized, { kind }); if (!deliverPayload) {
cancelledCounts[kind] += 1; return;
}
}
await options.deliver(deliverPayload, { kind });
})
.catch((err) => {
failedCounts[kind] += 1;
options.onError?.(err, { kind });
})
.finally(() => {
pending -= 1; // Clear reservation if: // 1. pending is now 1 (just the reservation left) // 2. markComplete has been called // 3. No more replies will be enqueued if (pending === 1 && completeCalled) {
pending -= 1; // Clear the reservation
} if (pending === 0) { // Unregister from global tracking when idle.
unregister();
options.onIdle?.();
}
}); returntrue;
};
const markComplete = () => { if (completeCalled) { return;
}
completeCalled = true; // If no replies were enqueued (pending is still 1 = just the reservation), // schedule clearing the reservation after current microtasks complete. // This gives any in-flight enqueue() calls a chance to increment pending. void Promise.resolve().then(() => { if (pending === 1 && completeCalled) { // Still just the reservation, no replies were enqueued
pending -= 1; if (pending === 0) {
unregister();
options.onIdle?.();
}
}
});
};
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.