import fs from "node:fs"; import path from "node:path"; import type { DatabaseSync } from "node:sqlite"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { readCaptureBlobText, writeCaptureBlob } from "./blob-store.js"; import type {
CaptureBlobRecord,
CaptureEventRecord,
CaptureObservedDimension,
CaptureQueryPreset,
CaptureQueryRow,
CaptureSessionCoverageSummary,
CaptureSessionRecord,
CaptureSessionSummary,
} from "./types.js";
function ensureParentDir(filePath: string) {
fs.mkdirSync(path.dirname(filePath), { recursive: true });
}
function openDatabase(dbPath: string): DatabaseSync {
ensureParentDir(dbPath); const { DatabaseSync } = requireNodeSqlite(); const db = new DatabaseSync(dbPath);
db.exec("PRAGMA journal_mode = WAL");
db.exec("PRAGMA busy_timeout = 5000");
db.exec(`
CREATE TABLE IF NOT EXISTS capture_sessions (
id TEXT PRIMARY KEY,
started_at INTEGER NOT NULL,
ended_at INTEGER,
mode TEXT NOT NULL,
source_scope TEXT NOT NULL,
source_process TEXT NOT NULL,
proxy_url TEXT,
db_path TEXT NOT NULL,
blob_dir TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS capture_events (
id INTEGER PRIMARY KEY,
session_id TEXT NOT NULL,
ts INTEGER NOT NULL,
source_scope TEXT NOT NULL,
source_process TEXT NOT NULL,
protocol TEXT NOT NULL,
direction TEXT NOT NULL,
kind TEXT NOT NULL,
flow_id TEXT NOT NULL,
method TEXT,
host TEXT,
path TEXT,
status INTEGER,
close_code INTEGER,
content_type TEXT,
headers_json TEXT,
data_text TEXT,
data_blob_id TEXT,
data_sha256 TEXT,
error_text TEXT,
meta_json TEXT
);
CREATE INDEX IF NOT EXISTS capture_events_session_ts_idx ON capture_events(session_id, ts);
CREATE INDEX IF NOT EXISTS capture_events_flow_idx ON capture_events(flow_id, ts);
`); return db;
}
function serializeJson(value: unknown): string | null { return value == null ? null : JSON.stringify(value);
}
listSessions(limit = 50): CaptureSessionSummary[] { returnthis.db
.prepare(
`SELECT
s.id,
s.started_at AS startedAt,
s.ended_at AS endedAt,
s.mode,
s.source_process AS sourceProcess,
s.proxy_url AS proxyUrl,
COUNT(e.id) AS eventCount
FROM capture_sessions s
LEFT JOIN capture_events e ON e.session_id = s.id
GROUP BY s.id
ORDER BY s.started_at DESC
LIMIT ?`,
)
.all(limit) as CaptureSessionSummary[];
}
getSessionEvents(sessionId: string, limit = 500): Array<Record<string, unknown>> { returnthis.db
.prepare(
`SELECT
id, session_id AS sessionId, ts, source_scope AS sourceScope, source_process AS sourceProcess,
protocol, direction, kind, flow_id AS flowId, method, host, path, status, close_code AS closeCode,
content_type AS contentType, headers_json AS headersJson, data_text AS dataText,
data_blob_id AS dataBlobId, data_sha256 AS dataSha256, error_text AS errorText, meta_json AS metaJson
FROM capture_events
WHERE session_id = ?
ORDER BY ts DESC, id DESC
LIMIT ?`,
)
.all(sessionId, limit) as Array<Record<string, unknown>>;
}
summarizeSessionCoverage(sessionId: string): CaptureSessionCoverageSummary { const rows = this.db
.prepare(
`SELECT host, meta_json AS metaJson
FROM capture_events
WHERE session_id = ?`,
)
.all(sessionId) as Array<{ host?: string | null; metaJson?: string | null }>; const providers = new Map<string, number>(); const apis = new Map<string, number>(); const models = new Map<string, number>(); const hosts = new Map<string, number>(); const localPeers = new Map<string, number>();
let unlabeledEventCount = 0; for (const row of rows) { const meta = parseMetaJson(row.metaJson); const provider = normalizeObservedValue(meta?.provider); const api = normalizeObservedValue(meta?.api); const model = normalizeObservedValue(meta?.model); const host = normalizeObservedValue(row.host); if (!provider && !api && !model) {
unlabeledEventCount += 1;
} if (provider) {
providers.set(provider, (providers.get(provider) ?? 0) + 1);
} if (api) {
apis.set(api, (apis.get(api) ?? 0) + 1);
} if (model) {
models.set(model, (models.get(model) ?? 0) + 1);
} if (host) {
hosts.set(host, (hosts.get(host) ?? 0) + 1); if (
host === "127.0.0.1:11434" ||
host.startsWith("127.0.0.1:") ||
host.startsWith("localhost:")
) {
localPeers.set(host, (localPeers.get(host) ?? 0) + 1);
}
}
} return {
sessionId,
totalEvents: rows.length,
unlabeledEventCount,
providers: sortObservedCounts(providers),
apis: sortObservedCounts(apis),
models: sortObservedCounts(models),
hosts: sortObservedCounts(hosts),
localPeers: sortObservedCounts(localPeers),
};
}
readBlob(blobId: string): string | null { const row = this.db
.prepare(`SELECT data_blob_id AS blobId FROM capture_events WHERE data_blob_id = ? LIMIT 1`)
.get(blobId) as { blobId?: string } | undefined; if (!row?.blobId) { returnnull;
} const blobPath = path.join(this.blobDir, `${row.blobId}.bin.gz`); return fs.existsSync(blobPath) ? readCaptureBlobText(blobPath) : null;
}
queryPreset(preset: CaptureQueryPreset, sessionId?: string): CaptureQueryRow[] { const sessionWhere = sessionId ? "AND session_id = ?" : ""; const args = sessionId ? [sessionId] : []; switch (preset) { case"double-sends": returnthis.db
.prepare(
`SELECT host, path, method, COUNT(*) AS duplicateCount
FROM capture_events
WHERE kind = 'request' ${sessionWhere}
GROUP BY host, path, method, data_sha256
HAVING COUNT(*) > 1
ORDER BY duplicateCount DESC, host ASC`,
)
.all(...args) as CaptureQueryRow[]; case"retry-storms": returnthis.db
.prepare(
`SELECT host, path, COUNT(*) AS errorCount
FROM capture_events
WHERE kind = 'response' AND status >= 429 ${sessionWhere}
GROUP BY host, path
HAVING COUNT(*) > 1
ORDER BY errorCount DESC, host ASC`,
)
.all(...args) as CaptureQueryRow[]; case"cache-busting": returnthis.db
.prepare(
`SELECT host, path, COUNT(*) AS variantCount
FROM capture_events
WHERE kind = 'request'
AND (path LIKE '%?%' OR headers_json LIKE '%cache-control%' OR headers_json LIKE '%pragma%')
${sessionWhere}
GROUP BY host, path
ORDER BY variantCount DESC, host ASC`,
)
.all(...args) as CaptureQueryRow[]; case"ws-duplicate-frames": returnthis.db
.prepare(
`SELECT host, path, COUNT(*) AS duplicateFrames
FROM capture_events
WHERE kind = 'ws-frame' AND direction = 'outbound' ${sessionWhere}
GROUP BY host, path, data_sha256
HAVING COUNT(*) > 1
ORDER BY duplicateFrames DESC, host ASC`,
)
.all(...args) as CaptureQueryRow[]; case"missing-ack": returnthis.db
.prepare(
`SELECT flow_id AS flowId, host, path, COUNT(*) AS outboundFrames
FROM capture_events
WHERE kind = 'ws-frame' AND direction = 'outbound' ${sessionWhere}
AND flow_id NOT IN (
SELECT flow_id FROM capture_events
WHERE kind = 'ws-frame' AND direction = 'inbound' ${sessionId ? "AND session_id = ?" : ""}
)
GROUP BY flow_id, host, path
ORDER BY outboundFrames DESC`,
)
.all(...(sessionId ? [sessionId, sessionId] : [])) as CaptureQueryRow[]; case"error-bursts": returnthis.db
.prepare(
`SELECT host, path, COUNT(*) AS errorCount
FROM capture_events
WHERE kind = 'error' ${sessionWhere}
GROUP BY host, path
ORDER BY errorCount DESC, host ASC`,
)
.all(...args) as CaptureQueryRow[]; default: return [];
}
}
purgeAll(): { sessions: number; events: number; blobs: number } { const sessionCount =
(this.db.prepare(`SELECT COUNT(*) AS count FROM capture_sessions`).get() as { count: number })
.count ?? 0; const eventCount =
(this.db.prepare(`SELECT COUNT(*) AS count FROM capture_events`).get() as { count: number })
.count ?? 0; this.db.exec(`DELETE FROM capture_events; DELETE FROM capture_sessions;`);
let blobs = 0; if (fs.existsSync(this.blobDir)) { for (const entry of fs.readdirSync(this.blobDir)) {
fs.rmSync(path.join(this.blobDir, entry), { force: true });
blobs += 1;
}
} return { sessions: sessionCount, events: eventCount, blobs };
}
deleteSessions(sessionIds: string[]): { sessions: number; events: number; blobs: number } { const uniqueSessionIds = [...new Set(sessionIds.map((id) => id.trim()).filter(Boolean))]; if (uniqueSessionIds.length === 0) { return { sessions: 0, events: 0, blobs: 0 };
} const placeholders = uniqueSessionIds.map(() => "?").join(", "); const blobRows = this.db
.prepare(
`SELECT DISTINCT data_blob_id AS blobId
FROM capture_events
WHERE session_id IN (${placeholders})
AND data_blob_id IS NOT NULL`,
)
.all(...uniqueSessionIds) as Array<{ blobId?: string | null }>; const eventCount =
( this.db
.prepare(
`SELECT COUNT(*) AS count
FROM capture_events
WHERE session_id IN (${placeholders})`,
)
.get(...uniqueSessionIds) as { count: number }
).count ?? 0; const sessionCount =
( this.db
.prepare(
`SELECT COUNT(*) AS count
FROM capture_sessions
WHERE id IN (${placeholders})`,
)
.get(...uniqueSessionIds) as { count: number }
).count ?? 0; this.db
.prepare(`DELETE FROM capture_events WHERE session_id IN (${placeholders})`)
.run(...uniqueSessionIds); this.db
.prepare(`DELETE FROM capture_sessions WHERE id IN (${placeholders})`)
.run(...uniqueSessionIds); const candidateBlobIds = blobRows
.map((row) => row.blobId?.trim())
.filter((blobId): blobId is string => Boolean(blobId)); const remainingBlobRefs =
candidateBlobIds.length > 0
? new Set(
( this.db
.prepare(
`SELECT DISTINCT data_blob_id AS blobId
FROM capture_events
WHERE data_blob_id IN (${candidateBlobIds.map(() => "?").join(", ")})
AND data_blob_id IS NOT NULL`,
)
.all(...candidateBlobIds) as Array<{ blobId?: string | null }>
)
.map((row) => row.blobId?.trim())
.filter((blobId): blobId is string => Boolean(blobId)),
)
: new Set<string>();
let blobs = 0; for (const row of blobRows) { const blobId = row.blobId?.trim(); if (!blobId || remainingBlobRefs.has(blobId)) { continue;
} const blobPath = path.join(this.blobDir, `${blobId}.bin.gz`); if (fs.existsSync(blobPath)) {
fs.rmSync(blobPath, { force: true });
blobs += 1;
}
} return { sessions: sessionCount, events: eventCount, blobs };
}
}
let cachedStore: DebugProxyCaptureStore | null = null;
let cachedKey = "";
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.