import { defaultRuntime } from "../../../runtime.js"; import { resolveGlobalMap } from "../../../shared/global-singleton.js"; import {
buildCollectPrompt,
beginQueueDrain,
clearQueueSummaryState,
drainCollectQueueStep,
drainNextQueueItem,
hasCrossChannelItems,
previewQueueSummaryPrompt,
waitForQueueDebounce,
} from "../../../utils/queue-helpers.js"; import { isRoutableChannel } from "../route-reply.js"; import { FOLLOWUP_QUEUES } from "./state.js"; import type { FollowupRun } from "./types.js";
// Persists the most recent runFollowup callback per queue key so that // enqueueFollowupRun can restart a drain that finished and deleted the queue. const FOLLOWUP_DRAIN_CALLBACKS_KEY = Symbol.for("openclaw.followupDrainCallbacks");
export function clearFollowupDrainCallback(key: string): void {
FOLLOWUP_RUN_CALLBACKS.delete(key);
}
/** Restart the drain for `key` if it is currently idle, using the stored callback. */
export function kickFollowupDrainIfIdle(key: string): void { const cb = FOLLOWUP_RUN_CALLBACKS.get(key); if (!cb) { return;
}
scheduleFollowupDrain(key, cb);
}
function resolveOriginRoutingMetadata(items: FollowupRun[]): OriginRoutingMetadata { return {
originatingChannel: items.find((item) => item.originatingChannel)?.originatingChannel,
originatingTo: items.find((item) => item.originatingTo)?.originatingTo,
originatingAccountId: items.find((item) => item.originatingAccountId)?.originatingAccountId, // Support both number (Telegram topic) and string (Slack thread_ts) thread IDs.
originatingThreadId: items.find(
(item) => item.originatingThreadId != null && item.originatingThreadId !== "",
)?.originatingThreadId,
};
}
// Keep this key aligned with the fields that affect per-message authorization or // exec-context propagation in collect-mode batching. Display-only sender fields // stay out of the key so profile/name drift does not force conservative splits. // Fields like authProfileId, elevatedLevel, ownerNumbers, and config are // intentionally excluded because they are session-level or not consulted in // per-message authorization checks.
export function resolveFollowupAuthorizationKey(run: FollowupRun["run"]): string { return JSON.stringify([
run.senderId ?? "",
run.senderE164 ?? "",
run.senderIsOwner === true,
run.execOverrides?.host ?? "",
run.execOverrides?.security ?? "",
run.execOverrides?.ask ?? "",
run.execOverrides?.node ?? "",
run.bashElevated?.enabled === true,
run.bashElevated?.allowed === true,
run.bashElevated?.defaultLevel ?? "",
]);
}
function splitCollectItemsByAuthorization(items: FollowupRun[]): FollowupRun[][] { if (items.length <= 1) { return items.length === 0 ? [] : [items];
}
const groups: FollowupRun[][] = [];
let currentGroup: FollowupRun[] = [];
let currentKey: string | undefined;
for (const item of items) { const itemKey = resolveFollowupAuthorizationKey(item.run); if (currentGroup.length === 0 || itemKey === currentKey) {
currentGroup.push(item);
currentKey = itemKey; continue;
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
): void { const queue = beginQueueDrain(FOLLOWUP_QUEUES, key); if (!queue) { return;
} const effectiveRunFollowup = FOLLOWUP_RUN_CALLBACKS.get(key) ?? runFollowup; // Cache callback only when a drain actually starts. Avoid keeping stale // callbacks around from finalize calls where no queue work is pending.
rememberFollowupDrainCallback(key, effectiveRunFollowup); void (async () => { try { const collectState = { forceIndividualCollect: false }; while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForQueueDebounce(queue); if (queue.mode === "collect") { // Once the batch is mixed, never collect again within this drain. // Prevents “collect after shift” collapsing different targets. // // Debug: `pnpm test src/auto-reply/reply/reply-flow.test.ts` // Check if messages span multiple channels. // If so, process individually to preserve per-message routing. const isCrossChannel = hasCrossChannelItems(queue.items, resolveCrossChannelKey);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.