import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { formatErrorMessage } from "../errors.js"; import {
ackDelivery,
failDelivery,
loadPendingDelivery,
loadPendingDeliveries,
moveToFailed,
type QueuedDelivery,
type QueuedDeliveryPayload,
} from "./delivery-queue-storage.js";
const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [
/no conversation reference found/i,
/chat not found/i,
/user not found/i,
/bot.*not.*member/i,
/bot was blocked by the user/i,
/forbidden: bot was kicked/i,
/chat_id is empty/i,
/recipient is not a valid/i,
/outbound not configured for channel/i,
/ambiguous .* recipient/i,
/User .* not in room/i,
];
const drainInProgress = new Map<string, boolean>(); const entriesInProgress = new Set<string>();
function getErrnoCode(err: unknown): string | null { return err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
}
async function deferRemainingEntriesForBudget(
entries: readonly QueuedDelivery[],
stateDir: string | undefined,
): Promise<void> { // Increment retryCount so entries that are repeatedly deferred by the // recovery budget eventually hit MAX_RETRIES and get pruned.
await Promise.allSettled(
entries.map((entry) => failDelivery(entry.id, "recovery time budget exceeded", stateDir)),
);
}
/** Compute the backoff delay in ms for a given retry count. */
export function computeBackoffMs(retryCount: number): number { if (retryCount <= 0) { return0;
} return BACKOFF_MS[Math.min(retryCount - 1, BACKOFF_MS.length - 1)] ?? BACKOFF_MS.at(-1) ?? 0;
}
for (const entry of matchingEntries) { if (!claimRecoveryEntry(entry.id)) {
opts.log.info(`${opts.logLabel}: entry ${entry.id} is already being recovered`); continue;
}
try { // Re-read after claim so the queue file remains the source of truth. // This prevents stale startup/reconnect snapshots from re-sending an // entry that another recovery path already acked. const currentEntry = await loadPendingDelivery(entry.id, opts.stateDir); if (!currentEntry) {
opts.log.info(`${opts.logLabel}: entry ${entry.id} already gone, skipping`); continue;
}
const currentDecision = opts.selectEntry(currentEntry, Date.now()); if (!currentDecision.match) {
opts.log.info(`${opts.logLabel}: entry ${currentEntry.id} no longer matches, skipping`); continue;
}
if (currentEntry.retryCount >= MAX_RETRIES) { try {
await moveToFailed(currentEntry.id, opts.stateDir);
} catch (err) { if (getErrnoCode(err) === "ENOENT") {
opts.log.info(`${opts.logLabel}: entry ${currentEntry.id} already gone, skipping`); continue;
} throw err;
}
opts.log.warn(
`${opts.logLabel}: entry ${currentEntry.id} exceeded max retries and was moved to failed/`,
); continue;
}
if (!currentDecision.bypassBackoff) { const retryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now()); if (!retryEligibility.eligible) {
opts.log.info(
`${opts.logLabel}: entry ${currentEntry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`,
); continue;
}
}
const result = await drainQueuedEntry({
entry: currentEntry,
cfg: opts.cfg,
deliver,
stateDir: opts.stateDir,
onFailed: (failedEntry, errMsg) => { if (isPermanentDeliveryError(errMsg)) {
opts.log.warn(
`${opts.logLabel}: entry ${failedEntry.id} hit permanent error — moving to failed/: ${errMsg}`,
); return;
}
opts.log.warn(`${opts.logLabel}: retry failed for entry ${failedEntry.id}: ${errMsg}`);
},
}); if (result === "recovered") {
opts.log.info(
`${opts.logLabel}: drained delivery ${currentEntry.id} on ${currentEntry.channel}`,
);
}
} finally {
releaseRecoveryEntry(entry.id);
}
}
} finally {
drainInProgress.delete(opts.drainKey);
}
}
/** *Ongatewaystartup,scanthedeliveryqueueandretryanypendingentries. *UsesexponentialbackoffandmovesentriesthatexceedMAX_RETRIEStofailed/.
*/
export async function recoverPendingDeliveries(opts: {
deliver: DeliverFn;
log: RecoveryLogger;
cfg: OpenClawConfig;
stateDir?: string; /** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next startup. Default: 60 000. */
maxRecoveryMs?: number;
}): Promise<RecoverySummary> { const pending = await loadPendingDeliveries(opts.stateDir); if (pending.length === 0) { return createEmptyRecoverySummary();
}
for (let i = 0; i < pending.length; i++) { const entry = pending[i]; const now = Date.now(); if (now >= deadline) {
opts.log.warn(`Recovery time budget exceeded — remaining entries deferred to next startup`);
await deferRemainingEntriesForBudget(pending.slice(i), opts.stateDir); break;
}
if (!claimRecoveryEntry(entry.id)) {
opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`); continue;
}
try { const currentEntry = await loadPendingDelivery(entry.id, opts.stateDir); if (!currentEntry) {
opts.log.info(`Recovery skipped for delivery ${entry.id}: already gone`); continue;
}
if (currentEntry.retryCount >= MAX_RETRIES) {
opts.log.warn(
`Delivery ${currentEntry.id} exceeded max retries (${currentEntry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
);
await moveEntryToFailedWithLogging(currentEntry.id, opts.log, opts.stateDir);
summary.skippedMaxRetries += 1; continue;
}
const currentRetryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now()); if (!currentRetryEligibility.eligible) {
summary.deferredBackoff += 1;
opts.log.info(
`Delivery ${currentEntry.id} not ready for retry yet — backoff ${currentRetryEligibility.remainingBackoffMs}ms remaining`,
); continue;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.