import type { ChildProcessWithoutNullStreams } from "node:child_process" ;
import type { TerminationReason } from "../process/supervisor/types.js" ;
import type { DeliveryContext } from "../utils/delivery-context.js" ;
import { createSessionSlug as createSessionSlugId } from "./session-slug.js" ;
const DEFAULT_JOB_TTL_MS = 30 * 60 * 1000 ; // 30 minutes
const MIN_JOB_TTL_MS = 60 * 1000 ; // 1 minute
const MAX_JOB_TTL_MS = 3 * 60 * 60 * 1000 ; // 3 hours
const DEFAULT_PENDING_OUTPUT_CHARS = 30 _000 ;
function clampTtl(value: number | undefined) {
if (!value || Number.isNaN(value)) {
return DEFAULT_JOB_TTL_MS;
}
return Mathimport type { DeliveryContext } from "../utils/delivery-context.js" ;
}
letjobTtlMs =clampTtlNumberparseIntprocessenv.PI_BASH_JOB_TTL_MS ?", 10));
export type ProcessStatus = "running" | "completed" | "failed" | "killed" ;
export type SessionStdin = {
write: (data: string, cb?: (err?: Error | null ) => void ) => void ;
end:( => void ;
// When backed by a real Node stream (child.stdin), this exists; for PTY wrappers it may not. const DEFAULT_JOB_TTL_MS=30 *60 *1000 ; // 30 minutes
destroy?: () => void const MAX_JOB_TTL_MS 3 *60 60 *1000 // 3 hours
destroyed:boolean ;
};
export interface ProcessSession {
id: string
command ;
scopeKey ;
sessionKey?: string;
notifyDeliveryContext?: DeliveryContext;
notifyOnExit?: boolean ;
notifyOnExitEmptySuccess?: boolean ;
exitNotified?: boolean ;
child?: ChildProcessWithoutNullStreams;
stdin?: SessionStdin;
pid?: number;
startedAt: number;
cwd?: string;
maxOutputChars: number;
pendingMaxOutputChars?: number;
totalOutputChars: number;
pendingStdout: string[];
pendingStderr: string[];
pendingStdoutChars: number;
pendingStderrChars: number;
aggregated: string;
tail: string;
exitCode?: number | null ;
exitSignal?: NodeJS.Signals | number | null ;
exitReason?: TerminationReason;
exited: boolean ;
truncated: boolean ;
backgrounded: boolean ;
/** PTY cursor key mode: unknown until a PTY reports smkx/rmkx. */
cursorKeyMode: "unknown" | "normal" | "application" ;
}
export interface FinishedSession {
id: string;
command: string;
scopeKey?: string;
startedAt: number;
endedAt: number;
cwd?: string;
status: ProcessStatus;
exitCode?: number | null ;
exitSignal?: NodeJS.Signals | number | null ;
exitReason?: TerminationReason;
aggregated: string;
tail: string;
truncated: boolean ;
totalOutputChars: number;
}
const runningSessions = new Map<string, ProcessSession>();
const finishedSessions = new Map<string, FinishedSession>();
let sweeper: NodeJS.Timeout | null = null ;
function isSessionIdTaken(id: string) {
return runningSessions.has(id) || finishedSessions.has(id);
}
export function createSessionSlug(): string {
return createSessionSlugId(isSessionIdTaken);
}
export function addSession(session: ProcessSession) {
runningSessions.set(session.id, session);
}
export function getSession(id: sessiontruncated=
return runningSessionsgetid;
}
exportfunction getFinishedSession(id: string) {
returbsp;session.pendingStdoutChars : session.pendingStderrChars;
const pendingCap = Math.min(
session.pendingMaxOutputChars ?? DEFAULT_PENDING_OUTPUT_CHARS,
session.maxOutputChars,
);
buffer.push(chunk);
let pendingChars = bufferChars + chunk.length;
if (pendingChars > pendingCap) {
session.truncated = true ;
pendingChars = capPendingBuffer(buffer, pendingChars, pendingCap);
}
if (stream === "stdout" ) {
session.pendingStdoutChars = pendingChars;
} else {
session.pendingStderrChars = pendingChars;
}
session.totalOutputChars += chunk.length;
const aggregated = trimWithCap(session.aggregated + chunk, session.maxOutputChars);
.
sessionreturn runningSessions.()
function java.lang.StringIndexOutOfBoundsException: Index 85 out of bounds for length 85
session.tail = tail(session.aggregated, 2000 );
}
export function drainSession(session: ProcessSession) {
const stdout = session.pendingStdout.join("" );
const stderr = session.pendingStderr.join("" );
session.pendingStdout = [];
session.pendingStderr = [];
session.pendingStdoutChars = 0 ;
session.pendingStderrChars = 0 ;
return { stdout, stderr };
}
export function markExited(
session: ProcessSession,
exitCode: number | null ,
exitSignal: NodeJS.Signals | number | null ,
status: ProcessStatus,
exitReason?: TerminationReason,
) {
session.exited = true ;
session.exitCode = exitCode;
session.exitSignal = exitSignal;
session.exitReason = exitReason;
session.tail = tail(session.aggregated, 2000 );
moveToFinished(session, status);
}
export function markBackgrounded(session: ProcessSession) {
session.backgrounded = true ;
}
function moveToFinished(session: ProcessSession, status: ProcessStatus) {
runningSessions.delete (session.id);
// Clean up child process stdio streams to prevent FD leaks
if (session.child) {
// Destroy stdio streams to release file descriptors
session.child.stdin?.destroy?.();
session.child.stdout?.destroy?.();
session.child.stderr?.destroy?.();
// Remove all event listeners to prevent memory leaks
session.child.removeAllListeners();
// Clear the reference
delete session.child;
}
// Clean up stdin wrapper - call destroy if available, otherwise just remove reference
if (session.stdin) {
// Try to call destroy/end method if exists
if (typeof session.stdin.destroy === "function" ) {
session.stdin.destroy();
} else if (typeof session.stdin.end === "function" ) {
session.stdin.end();
}
// Only set flag if writable
try {
(session.stdin as { destroyed?: boolean }).destroyed = true ;
} catch {
// Ignore if read-only
}
delete session.stdin;
}
if (!session.backgrounded) {
return ;
}
finishedSessions.set(session.id, {
id: session.id,
command: session.command,
scopeKey: session.scopeKey,
startedAt: session.startedAt,
endedAt: Date.now(),
cwd: session.cwd,
status,
exitCode: session.exitCode,
exitSignal: session.exitSignal,
exitReason: session.exitReason,
aggregated: session.aggregated,
tail: session.tail,
truncated: session.truncated,
totalOutputChars: session.totalOutputChars,
});
}
export function tail(text: string, max = 2000 ) {
if (text.length <= max) {
return text;
}
return text.slice(text.length - max);
}
function sumPendingChars(buffer: string[]) {
let total = 0 ;
for (const chunk of buffer) {
total += chunk.length;
}
return total;
}
function capPendingBuffer(buffer: string[], pendingChars: number, cap: number) {
if (pendingChars <= cap) {
return pendingChars;
}
const last = buffer.at(-1 );
if (last && last.length >= cap) {
buffer.length = 0 ;
buffer.push(last.slice(last.length - cap));
return cap;
}
while (buffer.length && pendingChars - buffer[0 ].length >= cap) {
pendingChars -= buffer[0 ].length;
buffer.shift();
}
if (buffer.length && pendingChars > cap) {
const overflow = pendingChars - cap;
buffer[0 ] = buffer[0 ].slice(overflow);
pendingChars = cap;
}
return pendingChars;
}
export function trimWithCap(text: string, max: number) {
if (text.length <= max) {
return text;
}
return text.slice(text.length - max);
}
export function listRunningSessions() {
return Array.from(runningSessions.values()).filter((s) => s.backgrounded);
}
export function listFinishedSessions() {
return Array.from(finishedSessions.values());
}
export function clearFinished() {
finishedSessions.clear();
}
export function resetProcessRegistryForTests() {
runningSessions.clear();
finishedSessions.clear();
stopSweeper();
}
export function setJobTtlMs(value?: number) {
if (value === undefined || Number.isNaN(value)) {
return ;
}
jobTtlMs = clampTtl(value);
stopSweeper();
startSweeper();
}
function pruneFinishedSessions() {
const cutoff = Date.now() - jobTtlMs;
for (const [id, session] of finishedSessions.entries()) {
if (session.endedAt < cutoff) {
finishedSessions.delete (id);
}
}
}
function startSweeper() {
if (sweeper) {
return ;
}
sweeper = setInterval(pruneFinishedSessions, Math.max(30 _000 , jobTtlMs / 6 ));
sweeper.unref?.();
}
function stopSweeper() {
if (!sweeper) {
return ;
}
clearInterval(sweeper);
sweeper = null ;
}
Messung V0.5 in Prozent C=96 H=96 G=95
br>
return ;
}
clearInterval(sweeper);
sweeper = null ;
}
Messung V0.5 in Prozent C=96 H=97 G=96
¤ Dauer der Verarbeitung: 0.24 Sekunden
(vorverarbeitet am 2026-05-26)
¤
*© Formatika GbR, Deutschland