import fs from
"node:fs" ;
import path from
"node:path" ;
import { describe, expect, it, vi } from
"vitest" ;
import {
enqueueDelivery,
loadPendingDeliveries,
MAX_RETRIES,
recoverPendingDeliveries,
} from
"./delivery-queue.js" ;
import {
asDeliverFn,
createRecoveryLog,
installDeliveryQueueTmpDirHooks,
setQueuedEntryState,
} from
"./delivery-queue.test-helpers.js" ;
describe(
"delivery-queue recovery" , () => {
const { tmpDir } = installDeliveryQueueTmpDirHooks();
const baseCfg = {};
const enqueueCrashRecoveryEntries = async () => {
await enqueueDelivery(
{ channel:
"demo-channel-a" , to:
"+1" , payloads: [{ text:
"a" }] },
tmpDir(),
);
await enqueueDelivery(
{ channel:
"demo-channel-b" , to:
"2" , payloads: [{ text:
"b" }] },
tmpDir(),
);
};
const runRecovery = async ({
deliver,
log = createRecoveryLog(),
maxRecoveryMs,
}: {
deliver: ReturnType<
typeof vi.fn>;
log?: ReturnType<
typeof createRecoveryLog>;
maxRecoveryMs?: number;
}) => {
const result = await recoverPendingDeliveries({
deliver: asDeliverFn(deliver),
log,
cfg: baseCfg,
stateDir: tmpDir(),
...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }),
});
return { result, log };
};
it(
"recovers entries from a simulated crash" , async () => {
await enqueueCrashRecoveryEntries();
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledTimes(
2 );
expect(result).toEqual({
recovered:
2 ,
failed:
0 ,
skippedMaxRetries:
0 ,
deferredBackoff:
0 ,
});
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(
0 );
});
it(
"moves entries that exceeded max retries to failed/" , async () => {
const id = await enqueueDelivery(
{ channel:
"demo-channel-a" , to:
"+1" , payloads: [{ text:
"a" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, { retryCount: MAX_RETRIES });
const deliver = vi.fn();
const { result } = await runRecovery({ deliver });
expect(deliver).not.toHaveBeenCalled();
expect(result.skippedMaxRetries).toBe(
1 );
expect(result.deferredBackoff).toBe(
0 );
expect(fs.existsSync(path.join(tmpDir(),
"delivery-queue" ,
"failed" , `${id}.json`))
).toBe(true );
});
it("increments retryCount on failed recovery attempt" , async () => {
await enqueueDelivery(
{ channel: "demo-channel-c" , to: "#ch" , payloads: [{ text: "x" }] },
tmpDir(),
);
const deliver = vi.fn().mockRejectedValue(new Error("network down" ));
const { result } = await runRecovery({ deliver });
expect(result.failed).toBe(1 );
expect(result.recovered).toBe(0 );
const entries = await loadPendingDeliveries(tmpDir());
expect(entries).toHaveLength(1 );
expect(entries[0 ]?.retryCount).toBe(1 );
expect(entries[0 ]?.lastError).toBe("network down" );
});
it("moves entries to failed/ immediately on permanent delivery errors" , async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel" , to: "user:abc" , payloads: [{ text: "hi" }] },
tmpDir(),
);
const deliver = vi
.fn()
.mockRejectedValue(new Error("No conversation reference found for user:abc" ));
const log = createRecoveryLog();
const { result } = await runRecovery({ deliver, log });
expect(result.failed).toBe(1 );
expect(result.recovered).toBe(0 );
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0 );
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue" , "failed" , `${id}.json`))).toBe(true );
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error" ));
});
it("treats Matrix 'User not in room' as a permanent error" , async () => {
const id = await enqueueDelivery(
{ channel: "matrix" , to: "!lowercased:matrix.example.com" , payloads: [{ text: "hi" }] },
tmpDir(),
);
const deliver = vi
.fn()
.mockRejectedValue(
new Error(
"MatrixError: [403] User @bot:matrix.example.com not in room !lowercased:matrix.example.com" ,
),
);
const log = createRecoveryLog();
const { result } = await runRecovery({ deliver, log });
expect(result.failed).toBe(1 );
expect(result.recovered).toBe(0 );
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0 );
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue" , "failed" , `${id}.json`))).toBe(true );
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("permanent error" ));
});
it("passes skipQueue: true to prevent re-enqueueing during recovery" , async () => {
await enqueueDelivery(
{ channel: "demo-channel-a" , to: "+1" , payloads: [{ text: "a" }] },
tmpDir(),
);
const deliver = vi.fn().mockResolvedValue([]);
await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true }));
});
it("replays stored delivery options during recovery" , async () => {
await enqueueDelivery(
{
channel: "demo-channel-a" ,
to: "+1" ,
payloads: [{ text: "a" }],
replyToId: "root-message" ,
replyToMode: "first" ,
formatting: {
textLimit: 1234 ,
maxLinesPerMessage: 7 ,
tableMode: "off" ,
chunkMode: "newline" ,
},
bestEffort: true ,
gifPlayback: true ,
silent: true ,
gatewayClientScopes: ["operator.write" ],
mirror: {
sessionKey: "agent:main:main" ,
text: "a" ,
mediaUrls: ["https://example.com/a.png "],
},
session: {
key: "agent:main:main" ,
agentId: "agent-main" ,
requesterAccountId: "acct-1" ,
requesterSenderId: "sender-1" ,
requesterSenderName: "Sender One" ,
requesterSenderUsername: "sender.one" ,
requesterSenderE164: "+15551234567" ,
},
},
tmpDir(),
);
const deliver = vi.fn().mockResolvedValue([]);
await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({
bestEffort: true ,
gifPlayback: true ,
silent: true ,
replyToId: "root-message" ,
replyToMode: "first" ,
formatting: {
textLimit: 1234 ,
maxLinesPerMessage: 7 ,
tableMode: "off" ,
chunkMode: "newline" ,
},
gatewayClientScopes: ["operator.write" ],
mirror: {
sessionKey: "agent:main:main" ,
text: "a" ,
mediaUrls: ["https://example.com/a.png "],
},
session: {
key: "agent:main:main" ,
agentId: "agent-main" ,
requesterAccountId: "acct-1" ,
requesterSenderId: "sender-1" ,
requesterSenderName: "Sender One" ,
requesterSenderUsername: "sender.one" ,
requesterSenderE164: "+15551234567" ,
},
}),
);
});
it("respects maxRecoveryMs time budget and bumps deferred retries" , async () => {
await enqueueCrashRecoveryEntries();
await enqueueDelivery(
{ channel: "demo-channel-c" , to: "#c" , payloads: [{ text: "c" }] },
tmpDir(),
);
const deliver = vi.fn().mockResolvedValue([]);
const { result, log } = await runRecovery({
deliver,
maxRecoveryMs: 0 ,
});
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 0 ,
failed: 0 ,
skippedMaxRetries: 0 ,
deferredBackoff: 0 ,
});
const remaining = await loadPendingDeliveries(tmpDir());
expect(remaining).toHaveLength(3 );
expect(remaining.every((entry) => entry.retryCount === 1 )).toBe(true );
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next startup" ));
});
it("defers entries until backoff becomes eligible" , async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel-a" , to: "+1" , payloads: [{ text: "a" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, { retryCount: 3 , lastAttemptAt: Date.now() });
const deliver = vi.fn().mockResolvedValue([]);
const { result, log } = await runRecovery({
deliver,
maxRecoveryMs: 60 _000 ,
});
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 0 ,
failed: 0 ,
skippedMaxRetries: 0 ,
deferredBackoff: 1 ,
});
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(1 );
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet" ));
});
it("continues past high-backoff entries and recovers ready entries behind them" , async () => {
const now = Date.now();
const blockedId = await enqueueDelivery(
{ channel: "demo-channel-a" , to: "+1" , payloads: [{ text: "blocked" }] },
tmpDir(),
);
const readyId = await enqueueDelivery(
{ channel: "demo-channel-b" , to: "2" , payloads: [{ text: "ready" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), blockedId, {
retryCount: 3 ,
lastAttemptAt: now,
enqueuedAt: now - 30 _000 ,
});
setQueuedEntryState(tmpDir(), readyId, { retryCount: 0 , enqueuedAt: now - 10 _000 });
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver, maxRecoveryMs: 60 _000 });
expect(result).toEqual({
recovered: 1 ,
failed: 0 ,
skippedMaxRetries: 0 ,
deferredBackoff: 1 ,
});
expect(deliver).toHaveBeenCalledTimes(1 );
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({ channel: "demo-channel-b" , to: "2" , skipQueue: true }),
);
const remaining = await loadPendingDeliveries(tmpDir());
expect(remaining).toHaveLength(1 );
expect(remaining[0 ]?.id).toBe(blockedId);
});
it("recovers deferred entries on a later restart once backoff elapsed" , async () => {
vi.useFakeTimers();
const start = new Date("2026-01-01T00:00:00.000Z" );
vi.setSystemTime(start);
const id = await enqueueDelivery(
{ channel: "demo-channel-a" , to: "+1" , payloads: [{ text: "later" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, { retryCount: 3 , lastAttemptAt: start.getTime() });
const firstDeliver = vi.fn().mockResolvedValue([]);
const firstRun = await runRecovery({ deliver: firstDeliver, maxRecoveryMs: 60 _000 });
expect(firstRun.result).toEqual({
recovered: 0 ,
failed: 0 ,
skippedMaxRetries: 0 ,
deferredBackoff: 1 ,
});
expect(firstDeliver).not.toHaveBeenCalled();
vi.setSystemTime(new Date(start.getTime() + 600 _000 + 1 ));
const secondDeliver = vi.fn().mockResolvedValue([]);
const secondRun = await runRecovery({ deliver: secondDeliver, maxRecoveryMs: 60 _000 });
expect(secondRun.result).toEqual({
recovered: 1 ,
failed: 0 ,
skippedMaxRetries: 0 ,
deferredBackoff: 0 ,
});
expect(secondDeliver).toHaveBeenCalledTimes(1 );
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0 );
vi.useRealTimers();
});
it("returns zeros when queue is empty" , async () => {
const deliver = vi.fn();
const { result } = await runRecovery({ deliver });
expect(result).toEqual({
recovered: 0 ,
failed: 0 ,
skippedMaxRetries: 0 ,
deferredBackoff: 0 ,
});
expect(deliver).not.toHaveBeenCalled();
});
});
Messung V0.5 in Prozent C=100 H=98 G=98
¤ Dauer der Verarbeitung: 0.10 Sekunden
(vorverarbeitet am 2026-06-10)
¤
*© Formatika GbR, Deutschland