import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import {
fetchBlueBubblesMessagesSince,
loadBlueBubblesCatchupCursor,
runBlueBubblesCatchup,
saveBlueBubblesCatchupCursor,
} from "./catchup.js"; import type { NormalizedWebhookMessage } from "./monitor-normalize.js"; import type { WebhookTarget } from "./monitor-shared.js";
function makeStateDir(): string { const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-catchup-test-"));
process.env.OPENCLAW_STATE_DIR = dir; return dir;
}
it("treats filesystem-unsafe account IDs as distinct", async () => { // Different account IDs that happen to map to the same safePrefix must // not collide on disk.
await saveBlueBubblesCatchupCursor("acct/a", 111);
await saveBlueBubblesCatchupCursor("acct:a", 222);
expect((await loadBlueBubblesCatchupCursor("acct/a"))?.lastSeenMs).toBe(111);
expect((await loadBlueBubblesCatchupCursor("acct:a"))?.lastSeenMs).toBe(222);
});
});
it("coalesces concurrent runs for the same accountId via in-process singleflight", async () => { // Two calls firing simultaneously must share one run, one fetch, one // set of processMessage calls, one cursor write. Without singleflight, // both calls would read the same cursor, both would process the same // messages twice (caught by #66816 dedupe, but wasteful), and the // second writer could regress the cursor if its nowMs is stale. const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
let fetchCount = 0;
let processCount = 0;
let releaseFetch: (() => void) | undefined;
let fetchStartedResolve: (() => void) | undefined; const fetchStarted = new Promise<void>((resolve) => {
fetchStartedResolve = resolve;
});
const call1 = runBlueBubblesCatchup(makeTarget(), {
now: () => now,
fetchMessages: async () => {
fetchCount++;
fetchStartedResolve?.(); // Block until we fire the second call, so we can verify it // coalesces rather than starting a new run.
await new Promise<void>((resolve) => {
releaseFetch = resolve;
}); return {
resolved: true,
messages: [makeBbMessage({ guid: "g1", dateCreated: 6 * 60 * 1000 })],
};
},
processMessageFn: async () => {
processCount++;
},
});
// Wait for call1 to enter fetchMessages, then fire call2. A fixed // sleep is load-sensitive and can leave call1 permanently blocked.
await fetchStarted; const call2 = runBlueBubblesCatchup(makeTarget(), {
now: () => now,
fetchMessages: async () => {
fetchCount++; return { resolved: true, messages: [makeBbMessage({ guid: "g2" })] };
},
processMessageFn: async () => {
processCount++;
},
});
expect(fetchCount).toBe(1); // second call coalesced, didn't re-fetch
expect(processCount).toBe(1);
expect(r1).toBe(r2); // same summary object returned to both callers
});
it("runs catchup even on rapid restarts (no min-interval gate)", async () => { // Catchup runs once per gateway startup, so a quick restart MUST run // it again — otherwise messages dropped between the two startups // (gateway down → BB ECONNREFUSED → gateway up <30s later) are lost // permanently. Bounded by perRunLimit/maxAge + dedupe-protected. const now = 10_000;
await saveBlueBubblesCatchupCursor("test-account", now - 5_000);
let fetched = false; const summary = await runBlueBubblesCatchup(makeTarget(), {
now: () => now,
fetchMessages: async () => {
fetched = true; return { resolved: true, messages: [] };
},
processMessageFn: async () => {},
});
expect(fetched).toBe(true);
expect(summary).not.toBeNull();
});
it("advances cursor only to last fetched ts when result is truncated (perRunLimit hit)", async () => { // Long-outage scenario: 4 messages arrived during downtime but // perRunLimit=2. Sort:ASC means we get the 2 oldest. Cursor must // advance to the 2nd's timestamp (not nowMs) so the next startup // picks up the remaining 2. const now = 100 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 50 * 60 * 1000); const summary = await runBlueBubblesCatchup(
makeTarget({
account: {
accountId: "test-account",
enabled: true,
configured: true,
baseUrl: "http://127.0.0.1:1234",
config: {
serverUrl: "http://127.0.0.1:1234",
password: "x",
network: { dangerouslyAllowPrivateNetwork: true },
catchup: { perRunLimit: 2 },
} as unknown as WebhookTarget["account"]["config"],
},
}),
{
now: () => now,
fetchMessages: async () => ({
resolved: true, // Only the 2 the cap allows BB to return (oldest first via ASC).
messages: [
makeBbMessage({ guid: "p1", dateCreated: 60 * 60 * 1000 }),
makeBbMessage({ guid: "p2", dateCreated: 70 * 60 * 1000 }),
],
}),
processMessageFn: async () => {},
},
);
expect(summary?.replayed).toBe(2);
expect(summary?.fetchedCount).toBe(2);
expect(summary?.cursorAfter).toBe(70 * 60 * 1000); // page boundary, not nowMs const cursor = await loadBlueBubblesCatchupCursor("test-account");
expect(cursor?.lastSeenMs).toBe(70 * 60 * 1000);
});
it("leaves cursor unchanged when the query fails", async () => { // Use timestamps well past MIN_INTERVAL_MS (30s) so the rate-limit skip // doesn't short-circuit the run before the fetch path fires. const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); const summary = await runBlueBubblesCatchup(makeTarget(), {
now: () => now,
fetchMessages: async () => ({ resolved: false, messages: [] }),
processMessageFn: async () => {},
});
expect(summary?.querySucceeded).toBe(false); const cursor = await loadBlueBubblesCatchupCursor("test-account");
expect(cursor?.lastSeenMs).toBe(5 * 60 * 1000); // unchanged
});
it("does NOT advance cursor past a processMessage failure (retryable)", async () => { const cursorBefore = 5 * 60 * 1000; const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", cursorBefore); const summary = await runBlueBubblesCatchup(makeTarget(), {
now: () => now,
fetchMessages: async () => ({
resolved: true,
messages: [
makeBbMessage({ guid: "ok1", dateCreated: 6 * 60 * 1000 }),
makeBbMessage({ guid: "bad", dateCreated: 7 * 60 * 1000 }),
makeBbMessage({ guid: "ok2", dateCreated: 8 * 60 * 1000 }),
],
}),
processMessageFn: async (m) => { if (m.messageId === "bad") { thrownew Error("transient");
}
},
}); // Cursor is held just before the bad message's timestamp so the next // sweep retries it (and re-queries ok1 which dedupe will drop).
expect(summary?.failed).toBe(1);
expect(summary?.givenUp).toBe(0);
expect(summary?.cursorAfter).toBe(7 * 60 * 1000 - 1); const cursorAfter = await loadBlueBubblesCatchupCursor("test-account");
expect(cursorAfter?.lastSeenMs).toBe(7 * 60 * 1000 - 1); // Retry counter is persisted so subsequent sweeps know how close we // are to the give-up ceiling.
expect(cursorAfter?.failureRetries?.bad).toBe(1);
});
it("clamps held cursor to previous cursor when failure ts is below it", async () => { // Pathological: failure timestamp is at or below the previous cursor // (shouldn't happen with server-side `after:` but defense in depth). // We must never regress the cursor. const cursorBefore = 9 * 60 * 1000; const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", cursorBefore); const summary = await runBlueBubblesCatchup(makeTarget(), {
now: () => now,
fetchMessages: async () => ({
resolved: true,
messages: [makeBbMessage({ guid: "bad", dateCreated: 1_000 })],
}),
processMessageFn: async () => { thrownew Error("transient");
},
}); // skippedPreCursor catches the bad record before processMessage runs, // so no failure is recorded and cursor advances to nowMs normally.
expect(summary?.failed).toBe(0);
expect(summary?.skippedPreCursor).toBe(1);
expect(summary?.cursorAfter).toBe(now);
});
it("recovers from a future-dated cursor by falling through to firstRunLookback", async () => { // Clock-skew scenario: cursor was written with a wall time that is now // ahead of the corrected clock. Catchup must NOT pass `after=future` // to BB (which would return zero), and must NOT save cursor=nowMs // without first replaying the [earliestAllowed, nowMs] window. const now = 1_000_000; const futureCursor = now + 60_000;
await saveBlueBubblesCatchupCursor("test-account", futureCursor);
let seenSince = -1; const summary = await runBlueBubblesCatchup(makeTarget(), {
now: () => now,
fetchMessages: async (sinceMs) => {
seenSince = sinceMs; return { resolved: true, messages: [] };
},
processMessageFn: async () => {},
}); // Should fall through to firstRunLookback (default 30 min), clamped // to maxAge (default 120 min) — i.e. nowMs - 30min, NOT nowMs.
expect(seenSince).toBe(now - 30 * 60_000);
expect(summary).not.toBeNull(); // Cursor should be repaired to nowMs so subsequent runs are normal. const repaired = await loadBlueBubblesCatchupCursor("test-account");
expect(repaired?.lastSeenMs).toBe(now);
});
it("increments retry counter on each consecutive failure and holds cursor", async () => { // Three sweeps, all fail on the same GUID. Counter accumulates and // cursor stays pinned below the failing message so every sweep // retries it. maxFailureRetries: 5 so we don't give up inside this // test. const now1 = 10 * 60 * 1000; const now2 = now1 + 60 * 1000; const now3 = now2 + 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
expect(s1?.failed).toBe(1);
expect(s1?.givenUp).toBe(0);
expect(s2?.givenUp).toBe(0);
expect(s3?.givenUp).toBe(0); const cursor = await loadBlueBubblesCatchupCursor("test-account");
expect(cursor?.failureRetries?.wedge).toBe(3); // Cursor still held just below the wedge message's timestamp.
expect(cursor?.lastSeenMs).toBe(7 * 60 * 1000 - 1);
});
it("gives up on the Nth consecutive failure and records count >= max", async () => { const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); // Pre-seed a cursor with retries at the one-before-give-up threshold // so a single run trips the ceiling. This mirrors what would happen // after many runs through the incremental-retry path above.
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { wedge: 2 });
expect(summary?.failed).toBe(1);
expect(summary?.givenUp).toBe(1); // Give-up no longer holds the cursor: it advances to nowMs so the // wedge message falls out of the next query window entirely.
expect(summary?.cursorAfter).toBe(now);
const persisted = await loadBlueBubblesCatchupCursor("test-account");
expect(persisted?.lastSeenMs).toBe(now); // Counter is persisted at the give-up value so a later sweep that // still sees the message (e.g., because a different GUID is holding // the cursor) will recognize the GUID as given up and skip it.
expect(persisted?.failureRetries?.wedge).toBe(3);
// Distinct WARN log line fired on the give-up transition. const giveUpWarnings = warnings.filter((w) => w.includes("giving up on guid="));
expect(giveUpWarnings).toHaveLength(1);
expect(giveUpWarnings[0]).toContain("guid=wedge");
expect(giveUpWarnings[0]).toContain("3 consecutive failures");
});
it("skips an already-given-up GUID without re-attempting processMessage", async () => { // Setup: the cursor file was written with wedge already at the // give-up threshold from a prior run. On this run, the cursor is // held by a different, still-retrying GUID (`held`), so wedge's // timestamp falls back into the query window. Catchup must skip // wedge without invoking processMessage on it. const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { wedge: 3 });
// processMessage never runs for wedge.
expect(attempted).toEqual(["held"]);
expect(summary?.skippedGivenUp).toBe(1);
expect(summary?.failed).toBe(1);
expect(summary?.givenUp).toBe(0); // Cursor held at `held` so held keeps retrying next sweep.
expect(summary?.cursorAfter).toBe(6 * 60 * 1000 - 1);
const cursor = await loadBlueBubblesCatchupCursor("test-account"); // Both entries preserved: held at count 1 (still retrying), // wedge at count 3 (given up, sticky).
expect(cursor?.failureRetries?.held).toBe(1);
expect(cursor?.failureRetries?.wedge).toBe(3);
});
it("clears the retry counter on successful processing", async () => { // GUID recovered after a transient failure. The counter must drop // so the next failure starts fresh (not carrying forward stale // retry history). const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { flaky: 4 });
expect(summary?.replayed).toBe(1); const cursor = await loadBlueBubblesCatchupCursor("test-account");
expect(cursor?.failureRetries?.flaky).toBeUndefined(); // When the map is empty, the field itself is omitted from the file.
expect(cursor?.failureRetries).toBeUndefined();
expect(cursor?.lastSeenMs).toBe(now);
});
it("resolves 'earlier retry + later give-up' by holding cursor at earlier and skipping later", async () => { // This is the key scenario issue #66870 exists to solve. GUID A at // t=6min is still retrying (count=1). GUID B at t=7min has been // failing for many runs and crosses the ceiling on this run. The // wrong answer is "advance cursor past B to t=7min" — that would // lose A. The right answer is "hold cursor below A, record B as // given-up, skip B on sight next run". const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { giveUpHere: 2 });
expect(summary?.failed).toBe(2);
expect(summary?.givenUp).toBe(1); // Cursor held at (earlier message ts - 1) so retryEarlier keeps retrying.
expect(summary?.cursorAfter).toBe(6 * 60 * 1000 - 1);
const cursor = await loadBlueBubblesCatchupCursor("test-account");
expect(cursor?.failureRetries?.retryEarlier).toBe(1); // Give-up counter preserved at or above the threshold.
expect(cursor?.failureRetries?.giveUpHere).toBe(3);
});
it("uses the default retry cap when maxFailureRetries is omitted from config", async () => { // Boot-strap: record 9 failures, then a 10th should trigger give-up // at the default threshold. We pre-seed the counter at 9 so this // single-run test doesn't need to iterate the whole sequence. const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { wedge: 9 });
it("clamps maxFailureRetries to >= 1 when configured to zero or negative", async () => { // With clamp floor of 1, the first failure already meets count >= 1 // so catchup gives up immediately on first attempt. const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
it("loads cleanly from a legacy cursor file without a failureRetries field", async () => { // Older cursor files (written before this field existed) must still // parse. Round-trip: save without the field (legacy path), then // run catchup and confirm a normal sweep proceeds.
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); const loaded = await loadBlueBubblesCatchupCursor("test-account");
expect(loaded?.lastSeenMs).toBe(5 * 60 * 1000);
expect(loaded?.failureRetries).toBeUndefined();
it("drops retry entries for GUIDs that are no longer in the query window", async () => { // A stale entry carried in the cursor file (e.g., from an older // run whose cursor has since advanced past its timestamp) should // NOT be carried forward if the GUID does not appear in the // current fetch. Otherwise the map grows without bound over time. const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, {
staleGuid: 2,
alsoStale: 5,
});
const summary = await runBlueBubblesCatchup(makeTarget(), {
now: () => now,
fetchMessages: async () => ({
resolved: true, // Fetch returns entirely different GUIDs from the stored map.
messages: [makeBbMessage({ guid: "fresh", dateCreated: 6 * 60 * 1000 })],
}),
processMessageFn: async () => {},
});
expect(summary?.replayed).toBe(1); const cursor = await loadBlueBubblesCatchupCursor("test-account"); // Both stale entries dropped; no new entries since the fresh message // succeeded.
expect(cursor?.failureRetries).toBeUndefined();
});
it("preserves stickiness when a given-up GUID reappears and fails again", async () => { // Setup: cursor advanced, but held by a newer still-retrying GUID // `held`. The wedge GUID is already given up from a prior run and // still appears because `held` is holding the cursor below it. // Catchup must continue to skip wedge on sight across many runs // without ever calling processMessage on it. const now = 10 * 60 * 1000;
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, {
wedge: 10,
held: 1,
});
for (let i = 0; i < 3; i++) {
await runBlueBubblesCatchup(target, {
now: () => now + i,
fetchMessages,
processMessageFn: async (m) => {
attempted.push(m.messageId ?? "?"); return processMessageFn();
},
});
} // wedge is NEVER attempted despite reappearing every sweep.
expect(attempted.filter((g) => g === "wedge")).toHaveLength(0); // held is attempted every sweep.
expect(attempted.filter((g) => g === "held")).toHaveLength(3);
});
it("round-trips an empty retry map by omitting the field from the persisted shape", async () => {
await saveBlueBubblesCatchupCursor("acct", 100, {}); const loaded = await loadBlueBubblesCatchupCursor("acct");
expect(loaded?.lastSeenMs).toBe(100);
expect(loaded?.failureRetries).toBeUndefined();
});
it("filters malformed retry entries during load (zero, negative, non-numeric)", async () => { // Use the public save to produce the on-disk file, then overwrite // its contents with a hand-crafted payload to exercise the loader's // sanitization independently of what the saver would emit.
await saveBlueBubblesCatchupCursor("acct", 100); const stateRoot = process.env.OPENCLAW_STATE_DIR; if (!stateRoot) { thrownew Error("OPENCLAW_STATE_DIR must be set by the test harness");
} const dir = path.join(stateRoot, "bluebubbles", "catchup"); const files = fs.readdirSync(dir);
expect(files).toHaveLength(1); const firstFile = files[0]; if (!firstFile) { thrownew Error("expected a cursor file to exist after save");
} const badCursor = {
lastSeenMs: 100,
updatedAt: 0,
failureRetries: {
good: 3,
zero: 0,
negative: -1,
notANumber: "oops",
infinite: Number.POSITIVE_INFINITY,
nan: Number.NaN,
},
};
fs.writeFileSync(path.join(dir, firstFile), JSON.stringify(badCursor));
describe("fetchBlueBubblesMessagesSince", () => {
it("returns resolved:false when the network call throws", async () => { // Point at a port nothing is listening on so fetch fails fast. const result = await fetchBlueBubblesMessagesSince(0, 10, {
baseUrl: "http://127.0.0.1:1",
password: "x",
allowPrivateNetwork: true,
timeoutMs: 200,
});
expect(result.resolved).toBe(false);
expect(result.messages).toEqual([]);
});
});
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.