import { chmodSync, existsSync, mkdirSync } from "node:fs"; import type { DatabaseSync, StatementSync } from "node:sqlite"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import type { DeliveryContext } from "../utils/delivery-context.types.js"; import {
resolveTaskFlowRegistryDir,
resolveTaskFlowRegistrySqlitePath,
} from "./task-flow-registry.paths.js"; import type { TaskFlowRegistryStoreSnapshot } from "./task-flow-registry.store.types.js"; import type { TaskFlowRecord, TaskFlowSyncMode, JsonValue } from "./task-flow-registry.types.js";
function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { return Number(value);
} returntypeof value === "number" ? value : undefined;
}
function serializeJson(value: unknown): string | null { return value === undefined ? null : JSON.stringify(value);
}
// oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Persisted JSON columns are typed by the receiving field. function parseJsonValue<T>(raw: string | null): T | undefined { if (!raw?.trim()) { return undefined;
} try { return JSON.parse(raw) as T;
} catch { return undefined;
}
}
function ensureSchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS flow_runs (
flow_id TEXT PRIMARY KEY,
shape TEXT,
sync_mode TEXT NOT NULLDEFAULT'managed',
owner_key TEXT NOT NULL,
requester_origin_json TEXT,
controller_id TEXT,
revision INTEGER NOT NULLDEFAULT0,
status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
blocked_task_id TEXT,
blocked_summary TEXT,
state_json TEXT,
wait_json TEXT,
cancel_requested_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
);
`); if (!hasFlowRunsColumn(db, "owner_key") && hasFlowRunsColumn(db, "owner_session_key")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN owner_key TEXT;`);
db.exec(`
UPDATE flow_runs
SET owner_key = owner_session_key
WHERE owner_key IS NULL
`);
} if (!hasFlowRunsColumn(db, "shape")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN shape TEXT;`);
} if (!hasFlowRunsColumn(db, "sync_mode")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN sync_mode TEXT;`); if (hasFlowRunsColumn(db, "shape")) {
db.exec(`
UPDATE flow_runs
SET sync_mode = CASE
WHEN shape = 'single_task' THEN 'task_mirrored' ELSE'managed'
END
WHERE sync_mode IS NULL
`);
} else {
db.exec(`
UPDATE flow_runs
SET sync_mode = 'managed'
WHERE sync_mode IS NULL
`);
}
} if (!hasFlowRunsColumn(db, "controller_id")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN controller_id TEXT;`);
}
db.exec(`
UPDATE flow_runs
SET controller_id = 'core/legacy-restored'
WHERE sync_mode = 'managed'
AND (controller_id IS NULL OR trim(controller_id) = '')
`); if (!hasFlowRunsColumn(db, "revision")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN revision INTEGER;`);
db.exec(`
UPDATE flow_runs
SET revision = 0
WHERE revision IS NULL
`);
} if (!hasFlowRunsColumn(db, "blocked_task_id")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_task_id TEXT;`);
} if (!hasFlowRunsColumn(db, "blocked_summary")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_summary TEXT;`);
} if (!hasFlowRunsColumn(db, "state_json")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN state_json TEXT;`);
} if (!hasFlowRunsColumn(db, "wait_json")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN wait_json TEXT;`);
} if (!hasFlowRunsColumn(db, "cancel_requested_at")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN cancel_requested_at INTEGER;`);
}
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_key ON flow_runs(owner_key);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`);
}
function ensureFlowRegistryPermissions(pathname: string) { const dir = resolveTaskFlowRegistryDir(process.env);
mkdirSync(dir, { recursive: true, mode: FLOW_REGISTRY_DIR_MODE });
chmodSync(dir, FLOW_REGISTRY_DIR_MODE); for (const suffix of FLOW_REGISTRY_SIDECAR_SUFFIXES) { const candidate = `${pathname}${suffix}`; if (!existsSync(candidate)) { continue;
}
chmodSync(candidate, FLOW_REGISTRY_FILE_MODE);
}
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.