import { chmodSync, existsSync, mkdirSync } from "node:fs"; import type { DatabaseSync, StatementSync } from "node:sqlite"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import type { DeliveryContext } from "../utils/delivery-context.types.js"; import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js"; import type { TaskRegistryStoreSnapshot } from "./task-registry.store.types.js"; import type { TaskDeliveryState, TaskRecord } from "./task-registry.types.js";
function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { return Number(value);
} returntypeof value === "number" ? value : undefined;
}
function serializeJson(value: unknown): string | null { return value == null ? null : JSON.stringify(value);
}
// oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Persisted JSON columns are typed by the receiving field. function parseJsonValue<T>(raw: string | null): T | undefined { if (!raw?.trim()) { return undefined;
} try { return JSON.parse(raw) as T;
} catch { return undefined;
}
}
function migrateLegacyOwnerColumns(db: DatabaseSync) { if (!hasTaskRunsColumn(db, "owner_key")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN owner_key TEXT;`);
} if (!hasTaskRunsColumn(db, "requester_session_key")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN requester_session_key TEXT;`);
} if (!hasTaskRunsColumn(db, "scope_kind")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN scope_kind TEXT NOT NULLDEFAULT'session';`);
} if (hasTaskRunsColumn(db, "requester_session_key")) {
db.exec(`
UPDATE task_runs
SET owner_key = requester_session_key
WHERE owner_key IS NULL
`);
}
db.exec(`
UPDATE task_runs
SET owner_key = CASE
WHEN trim(COALESCE(owner_key, '')) <> '' THEN trim(owner_key) ELSE'system:' || runtime || ':' || COALESCE(NULLIF(source_id, ''), task_id)
END
`);
db.exec(`
UPDATE task_runs
SET scope_kind = CASE
WHEN scope_kind = 'system' THEN 'system'
WHEN owner_key LIKE 'system:%' THEN 'system' ELSE'session'
END
`);
db.exec(`
UPDATE task_runs
SET requester_session_key = CASE
WHEN scope_kind = 'system' THEN ''
WHEN trim(COALESCE(requester_session_key, '')) <> '' THEN trim(requester_session_key) ELSE owner_key
END
`);
}
function ensureSchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS task_runs (
task_id TEXT PRIMARY KEY,
runtime TEXT NOT NULL,
task_kind TEXT,
source_id TEXT,
requester_session_key TEXT,
owner_key TEXT NOT NULL,
scope_kind TEXT NOT NULL,
child_session_key TEXT,
parent_flow_id TEXT,
parent_task_id TEXT,
agent_id TEXT,
run_id TEXT,
label TEXT,
task TEXT NOT NULL,
status TEXT NOT NULL,
delivery_status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
created_at INTEGER NOT NULL,
started_at INTEGER,
ended_at INTEGER,
last_event_at INTEGER,
cleanup_after INTEGER,
error TEXT,
progress_summary TEXT,
terminal_summary TEXT,
terminal_outcome TEXT
);
`);
migrateLegacyOwnerColumns(db); if (!hasTaskRunsColumn(db, "task_kind")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN task_kind TEXT;`);
} if (!hasTaskRunsColumn(db, "parent_flow_id")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN parent_flow_id TEXT;`);
}
db.exec(`
CREATE TABLE IF NOT EXISTS task_delivery_state (
task_id TEXT PRIMARY KEY,
requester_origin_json TEXT,
last_notified_event_at INTEGER
);
`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_run_id ON task_runs(run_id);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_status ON task_runs(status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_owner_key ON task_runs(owner_key);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`,
);
}
function ensureTaskRegistryPermissions(pathname: string) { const dir = resolveTaskRegistryDir(process.env);
mkdirSync(dir, { recursive: true, mode: TASK_REGISTRY_DIR_MODE });
chmodSync(dir, TASK_REGISTRY_DIR_MODE); for (const suffix of TASK_REGISTRY_SIDECAR_SUFFIXES) { const candidate = `${pathname}${suffix}`; if (!existsSync(candidate)) { continue;
}
chmodSync(candidate, TASK_REGISTRY_FILE_MODE);
}
}
export function loadTaskRegistryStateFromSqlite(): TaskRegistryStoreSnapshot { const { statements } = openTaskRegistryDatabase(); const taskRows = statements.selectAll.all() as TaskRegistryRow[]; const deliveryRows = statements.selectAllDeliveryStates.all() as TaskDeliveryStateRow[]; return {
tasks: new Map(taskRows.map((row) => [row.task_id, rowToTaskRecord(row)])),
deliveryStates: new Map(deliveryRows.map((row) => [row.task_id, rowToTaskDeliveryState(row)])),
};
}
export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapshot) {
withWriteTransaction((statements) => {
statements.clearDeliveryStates.run();
statements.clearRows.run(); for (const task of snapshot.tasks.values()) {
statements.upsertRow.run(bindTaskRecordBase(task));
} for (const state of snapshot.deliveryStates.values()) {
statements.replaceDeliveryState.run(bindTaskDeliveryState(state));
}
});
}
export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) { const store = openTaskRegistryDatabase();
store.statements.upsertRow.run(bindTaskRecordBase(task));
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.