import { createHash } from "node:crypto"; import path from "node:path"; import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; import { resolveBlueBubblesServerAccount } from "./account-resolve.js"; import { createBlueBubblesClientFromParts } from "./client.js"; import { warmupBlueBubblesInboundDedupe } from "./inbound-dedupe.js"; import { asRecord, normalizeWebhookMessage } from "./monitor-normalize.js"; import { processMessage } from "./monitor-processing.js"; import type { WebhookTarget } from "./monitor-shared.js";
// When the gateway is down, restarting, or wedged, inbound webhook POSTs from // BB Server fail with ECONNRESET/ECONNREFUSED. BB's WebhookService does not // retry, and its MessagePoller only re-fires webhooks on BB-side reconnect // events (Messages.app / APNs), not on webhook-receiver recovery. Without a // recovery pass, messages delivered during outage windows are permanently // lost. See #66721 for design discussion and experimental validation.
const DEFAULT_MAX_AGE_MINUTES = 120; const MAX_MAX_AGE_MINUTES = 12 * 60; const DEFAULT_PER_RUN_LIMIT = 50; const MAX_PER_RUN_LIMIT = 500; const DEFAULT_FIRST_RUN_LOOKBACK_MINUTES = 30; const DEFAULT_MAX_FAILURE_RETRIES = 10; const MAX_MAX_FAILURE_RETRIES = 1_000; // Defense-in-depth bound: a runaway retry map (e.g., a storm of unique // failing GUIDs) should not balloon the cursor file unboundedly. When the // map exceeds this size, we keep only the highest-count entries (the ones // closest to being given up) and drop the rest. Realistic backlogs stay // well under this; the bound exists to cap pathological growth. const MAX_FAILURE_RETRY_MAP_SIZE = 5_000; const FETCH_TIMEOUT_MS = 15_000;
function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { // Explicit OPENCLAW_STATE_DIR overrides take precedence (including // per-test mkdtemp dirs in this module's test suite). if (env.OPENCLAW_STATE_DIR?.trim()) { return resolveStateDir(env);
} // Default test isolation: per-pid tmpdir, no bleed into real ~/.openclaw. // Use resolvePreferredOpenClawTmpDir + string concat (mirrors // inbound-dedupe) so this doesn't trip the tmpdir-path-guard test that // flags dynamic template-literal suffixes on os.tmpdir() paths. if (env.VITEST || env.NODE_ENV === "test") { const name = "openclaw-vitest-" + process.pid; return path.join(resolvePreferredOpenClawTmpDir(), name);
} // Canonical OpenClaw state dir: honors `~` expansion + legacy/new // fallback. Sharing this resolver with inbound-dedupe is what guarantees // the catchup cursor and the dedupe state always live under the same // root, so a replayed GUID is recognized by the dedupe after catchup // re-feeds the message through processMessage. return resolveStateDir(env);
}
function resolveCursorFilePath(accountId: string): string { // Match inbound-dedupe's file layout: readable prefix + short hash so // account IDs that only differ by filesystem-unsafe characters do not // collapse onto the same file. const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_") || "account"; const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12); return path.join(
resolveStateDirFromEnv(), "bluebubbles", "catchup",
`${safePrefix}__${hash}.json`,
);
}
function sanitizeFailureRetriesInput(raw: unknown): Record<string, number> { // Older cursor files don't carry this field; also guard against // hand-edited JSON or future shape drift. Drop any entry whose count is // not a finite positive integer so downstream arithmetic stays sound. if (!raw || typeof raw !== "object") { return {};
} const out: Record<string, number> = {}; for (const [guid, count] of Object.entries(raw as Record<string, unknown>)) { if (!guid || typeof guid !== "string") { continue;
} if (typeof count !== "number" || !Number.isFinite(count) || count <= 0) { continue;
}
out[guid] = Math.floor(count);
} return out;
}
export async function loadBlueBubblesCatchupCursor(
accountId: string,
): Promise<BlueBubblesCatchupCursor | null> { const filePath = resolveCursorFilePath(accountId); const { value } = await readJsonFileWithFallback<BlueBubblesCatchupCursor | null>(filePath, null); if (!value || typeof value !== "object") { returnnull;
} if (typeof value.lastSeenMs !== "number" || !Number.isFinite(value.lastSeenMs)) { returnnull;
} const failureRetries = sanitizeFailureRetriesInput(value.failureRetries); const hasRetries = Object.keys(failureRetries).length > 0; // Keep the shape consistent with what the writer emits: only carry the // `failureRetries` key when there's something to retry. Old cursor files // without the field continue to round-trip to the same shape. return {
lastSeenMs: value.lastSeenMs,
updatedAt: typeof value.updatedAt === "number" ? value.updatedAt : 0,
...(hasRetries ? { failureRetries } : {}),
};
}
export async function saveBlueBubblesCatchupCursor(
accountId: string,
lastSeenMs: number,
failureRetries?: Record<string, number>,
): Promise<void> { const filePath = resolveCursorFilePath(accountId); const sanitized = sanitizeFailureRetriesInput(failureRetries); const hasRetries = Object.keys(sanitized).length > 0; const cursor: BlueBubblesCatchupCursor = {
lastSeenMs,
updatedAt: Date.now(), // Only emit the field when non-empty so unrelated cursor writes from // the happy path don't bloat the cursor file with `"failureRetries": {}`.
...(hasRetries ? { failureRetries: sanitized } : {}),
};
await writeJsonFileAtomically(filePath, cursor);
}
/** *BoundtheretrymapsoapathologicalstormofuniquefailingGUIDs *cannotgrowthecursorfilewithoutlimit.Keepsthe`maxSize`entries *withthehighestcounts(closesttogive-up)whenoverthebound. * *Themapisalreadyscopedto"currentlyfailing,still-retrying"GUIDs *andprunesoneveryrun(entriesnotobservedinthefetchedwindoware *dropped),sothisisadefense-in-depthcap,nottheprimarypruning *mechanism.
*/ function capFailureRetriesMap(
map: Record<string, number>,
maxSize: number,
): Record<string, number> { const entries = Object.entries(map); if (entries.length <= maxSize) { return map;
} // Sort by count desc; stable tiebreak on guid string so the retained set // is deterministic across runs (important for cursor-file diffing during // debugging).
entries.sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0])); const capped: Record<string, number> = {}; for (let i = 0; i < maxSize; i++) { const [guid, count] = entries[i];
capped[guid] = count;
} return capped;
}
// Catchup runs once per gateway startup (called from monitor.ts after // webhook target registration). We deliberately do NOT short-circuit on // a "ran recently" gate, because catchup is the only mechanism that // recovers messages dropped during the gateway-down window. A short // gap (e.g. <30s) between two startups can still have lost messages in // the middle, and skipping the second startup's catchup would lose // them permanently. The bounded query (perRunLimit, maxAge) and the // inbound-dedupe cache from #66230 cap the cost of running the query // every startup.
const earliestAllowed = nowMs - maxAgeMs; // A future-dated cursor (clock rollback via NTP correction or manual // adjust) is unusable: querying with `after` set to a future timestamp // would return zero records, and saving `nowMs` as the new cursor would // permanently skip any real messages missed in the // [earliestAllowed, nowMs] window. Treat it as if no cursor exists and // fall through to the firstRun lookback path; the inbound-dedupe cache // from #66230 handles any overlap with already-processed messages, and // saving cursor = nowMs at the end of the run repairs the cursor. const cursorIsUsable = existing !== null && existing.lastSeenMs <= nowMs; // First-run (and recovered-future-cursor) lookback is also clamped to // the maxAge ceiling so a config with `maxAgeMinutes: 5, // firstRunLookbackMinutes: 30` doesn't silently exceed the operator's // stated lookback cap on first startup. const windowStartMs = cursorIsUsable
? Math.max(existing.lastSeenMs, earliestAllowed)
: Math.max(nowMs - firstRunLookbackMs, earliestAllowed);
// Ensure legacy→hashed dedupe file migration runs and the on-disk store // is warm before we replay. Without this, an upgrade from a version that // used the old `${safe}.json` naming to the current `${safe}__${hash}.json` // would start with an empty dedupe cache and re-dispatch every message in // the catchup window — producing duplicate replies.
await warmupBlueBubblesInboundDedupe(accountId).catch((err) => {
error?.(`[${accountId}] BlueBubbles catchup: dedupe warmup failed: ${String(err)}`);
});
if (!resolved) { // Leave cursor unchanged so the next run retries the same window.
error?.(`[${accountId}] BlueBubbles catchup: message-query failed; cursor unchanged`); return summary;
}
// Track the earliest timestamp where `processMessage` threw *and* the // failing message has not yet crossed the per-GUID retry ceiling, so we // never advance the cursor past a retryable failure. Normalize failures // (the record didn't yield a usable NormalizedWebhookMessage) are // treated as permanent skips and do NOT block cursor advance — those // payloads are unlikely to ever normalize on retry, and blocking on // them would wedge catchup forever. Given-up messages (count >= max) // also do NOT contribute here; see `skippedGivenUp` below.
let earliestProcessFailureTs: number | null = null; // Track the latest fetched message timestamp regardless of fate, so a // truncated query (fetchedCount === perRunLimit) can advance the cursor // exactly to the page boundary. Without this, the unfetched tail past // the cap is permanently unreachable.
let latestFetchedTs = windowStartMs; // Next-run retry map. Built from scratch each run so entries for GUIDs // that didn't appear in this fetch are dropped (the cursor has // advanced past them and they will never be queried again). Entries we // do carry forward encode two states via the stored count: // - `1 <= count < maxFailureRetries`: still-retrying, holds cursor. // - `count >= maxFailureRetries`: given-up, skipped on sight without // another `processMessage` attempt. Preserving the count is what // keeps the give-up state sticky across runs when an earlier // still-retrying failure is holding the cursor and the given-up // message keeps reappearing in the query window. const nextRetries: Record<string, number> = {};
for (const rec of messages) { // Defense in depth: the server-side `after:` filter should already // exclude pre-cursor messages, but guard here against BB API variants // that return inclusive-of-boundary data. const ts = typeof rec.dateCreated === "number" ? rec.dateCreated : 0; if (ts > 0 && ts > latestFetchedTs) {
latestFetchedTs = ts;
} if (ts > 0 && ts <= windowStartMs) {
summary.skippedPreCursor++; continue;
}
// Filter fromMe early so BB's record of our own outbound sends cannot // enter the inbound pipeline even if normalization would accept them. if (rec.isFromMe === true || rec.is_from_me === true) {
summary.skippedFromMe++; continue;
}
// Skip tapback/reaction/balloon events. These carry an // `associatedMessageGuid` pointing at the parent text message and // have a different `guid` of their own. The live webhook path handles // balloons via the debouncer, which coalesces them with their parent. // Without debouncing here, replaying a balloon would dispatch it as a // standalone message — producing a duplicate reply to the parent. // // Guard: only skip when `associatedMessageType` is set (tapbacks and // reactions — e.g., "like", 2000) OR `balloonBundleId` is set (URL // previews, stickers). iMessage threaded replies use a separate // `threadOriginatorGuid` field and do NOT set either of these, so // they pass through for correct catchup replay. const assocGuid = typeof rec.associatedMessageGuid === "string"
? rec.associatedMessageGuid.trim()
: typeof rec.associated_message_guid === "string"
? rec.associated_message_guid.trim()
: ""; const assocType = rec.associatedMessageType ?? rec.associated_message_type; const balloonId = typeof rec.balloonBundleId === "string" ? rec.balloonBundleId.trim() : ""; if (assocGuid && (assocType != null || balloonId)) { continue;
}
// Prefer the normalized messageId (what the dedupe cache uses) so the // retry counter and downstream dedupe key agree on identity. Fall // back to the raw BB `guid` only when normalization didn't supply one. const retryKey = normalized.messageId ?? (typeof rec.guid === "string" ? rec.guid : "");
// Already-given-up GUIDs are skipped without another `processMessage` // attempt. This is what lets catchup make forward progress through an // earlier, still-retrying failure while not burning cycles re-running // a permanently broken message every sweep. const prevCount = retryKey ? (prevRetries[retryKey] ?? 0) : 0; if (retryKey && prevCount >= maxFailureRetries) {
summary.skippedGivenUp++; // Preserve the count so give-up stickiness survives this run.
nextRetries[retryKey] = prevCount; continue;
}
try {
await procFn(normalized, target);
summary.replayed++; // Success clears any accumulated retries for this GUID. Since we // build `nextRetries` from scratch rather than mutating // `prevRetries`, simply NOT copying the entry is the clear. (We // still need this branch so readers understand the lifecycle.)
} catch (err) {
summary.failed++; const nextCount = prevCount + 1; if (retryKey && nextCount >= maxFailureRetries) { // Crossing the ceiling this run: log WARN once and record the // give-up in the persisted map. Don't contribute to // `earliestProcessFailureTs` — we're intentionally letting the // cursor advance past this GUID on the next sweep.
summary.givenUp++;
nextRetries[retryKey] = nextCount;
error?.(
`[${accountId}] BlueBubbles catchup: giving up on guid=${retryKey} ` +
`after ${nextCount} consecutive failures; future sweeps will skip ` +
`this message. timestamp=${ts}: ${String(err)}`,
);
} else { // Still retrying: count this failure and hold the cursor so the // next sweep retries the same window. (retryKey may be empty in // the unusual case where neither normalizer nor raw payload // carried a GUID — in that case we hold the cursor but cannot // increment a counter, matching pre-retry-cap behavior.) if (retryKey) {
nextRetries[retryKey] = nextCount;
} if (ts > 0 && (earliestProcessFailureTs === null || ts < earliestProcessFailureTs)) {
earliestProcessFailureTs = ts;
}
error?.(
`[${accountId}] BlueBubbles catchup: processMessage failed (retry ` +
`${nextCount}/${maxFailureRetries}): ${String(err)}`,
);
}
}
}
// Compute the new cursor. // // - Default: advance to `nowMs` so subsequent runs start from the moment // this sweep finished (avoiding stuck rescans of a message with // `dateCreated > nowMs` from minor clock skew between BB host and // gateway host). // - On retryable failure (any still-retrying `processMessage` throw, // where the GUID has NOT crossed `maxFailureRetries`): hold the // cursor just before the earliest still-retrying failed timestamp so // the next run retries from there. The inbound-dedupe cache from // #66230 keeps successfully replayed messages from being re-processed. // - On give-up (failures that crossed `maxFailureRetries`): the GUID // is recorded in the persisted retry map with `count >= max` and // skipped on sight in subsequent runs (without another processMessage // attempt). Give-up GUIDs intentionally do NOT hold the cursor, so // the cursor can advance past them naturally — this is what unwedges // catchup from a permanently malformed message (issue #66870). // - On truncation (fetched === perRunLimit): advance only to the latest // fetched timestamp so the next run picks up from the page boundary. // Otherwise the unfetched tail past the cap (which can be substantial // during long outages) would be permanently unreachable. const isTruncated = summary.fetchedCount >= perRunLimit;
let nextCursorMs = nowMs; if (earliestProcessFailureTs !== null) { const heldCursor = Math.max(earliestProcessFailureTs - 1, cursorBefore ?? windowStartMs);
nextCursorMs = Math.min(heldCursor, nowMs);
} elseif (isTruncated) { // Use latestFetchedTs (clamped to >= prior cursor and <= nowMs) so the // next run starts where this page ended.
nextCursorMs = Math.min(Math.max(latestFetchedTs, cursorBefore ?? windowStartMs), nowMs);
}
summary.cursorAfter = nextCursorMs; // Cap the retry map before writing — defense in depth against a storm // of unique failing GUIDs ballooning the cursor file. const retriesToPersist = capFailureRetriesMap(nextRetries, MAX_FAILURE_RETRY_MAP_SIZE);
await saveBlueBubblesCatchupCursor(accountId, nextCursorMs, retriesToPersist).catch((err) => {
error?.(`[${accountId}] BlueBubbles catchup: cursor save failed: ${String(err)}`);
});
// Distinct WARNING when the BB result hits perRunLimit so operators // know a single startup didn't drain the full backlog. The cursor was // advanced only to the page boundary above, so the unfetched tail will // be picked up on the next gateway startup — but if startups are // infrequent, raising perRunLimit drains larger backlogs in one pass. if (isTruncated) {
error?.(
`[${accountId}] BlueBubbles catchup: WARNING fetched=${summary.fetchedCount} ` +
`hit perRunLimit=${perRunLimit}; cursor advanced only to page boundary, ` +
`remaining messages will be picked up on next startup. Raise ` +
`channels.bluebubbles...catchup.perRunLimit to drain larger backlogs ` +
`in a single pass.`,
);
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.