import { createHash, randomUUID } from "node:crypto" ;
import type { StreamFn } from "@mariozechner/pi-agent-core" ;
import type {
AssistantMessage,
AssistantMessageEvent,
AssistantMessageEventStream,
StopReason,
} from "@mariozechner/pi-ai" ;
import * as piAi from "@mariozechner/pi-ai" ;
/**
* OpenAI WebSocket StreamFn Integration
*
* Wraps `OpenAIWebSocketManager` in a `StreamFn` that can be plugged into the
* pi-embedded-runner agent in place of the default `streamSimple` HTTP function.
*
* Key behaviours:
* - Per-session `OpenAIWebSocketManager` (keyed by sessionId)
* - Tracks `previous_response_id` to send only incremental tool-result inputs
* - Falls back to `streamSimple` (HTTP) if the WebSocket connection fails
* - Cleanup helpers for releasing sessions after the run completes
*
* Complexity budget & risk mitigation:
* - **Transport aware**: respects `transport` (`auto` | `websocket` | `sse`)
* - **Transparent fallback in `auto` mode**: connect/send failures fall back to
* the existing HTTP `streamSimple`; forced `websocket` mode surfaces WS errors
* - **Zero shared state**: per-session registry; session cleanup on dispose prevents leaks
* - **Full parity**: all generation options (temperature, top_p, max_output_tokens,
* tool_choice, reasoning) forwarded identically to the HTTP path
*
* @see src/agents/openai-ws-connection.ts for the connection manager
*/
import { formatErrorMessage } from "../infra/errors.js" ;
import type { ProviderRuntimeModel } from "../plugins/provider-runtime-model.types.js" ;
import {
resolveProviderTransportTurnStateWithPlugin,
resolveProviderWebSocketSessionPolicyWithPlugin,
} from "../plugins/provider-runtime.js" ;
import type { ProviderTransportTurnState } from "../plugins/types.js" ;
import {
encodeAssistantTextSignature,
normalizeAssistantPhase,
} from "../shared/chat-message-content.js" ;
import { resolveOpenAIStrictToolSetting } from "./openai-strict-tool-setting.js" ;
import {
getOpenAIWebSocketErrorDetails,
OpenAIWebSocketManager,
type FunctionToolDefinition,
type OpenAIResponsesAssistantPhase,
type OpenAIWebSocketManagerOptions,
} from "./openai-ws-connection.js" ;
import {
buildAssistantMessageFromResponse,
convertMessagesToInputItems,
convertResponseToInputItems,
convertTools,
planTurnInput,
} from "./openai-ws-message-conversion.js" ;
import {
buildOpenAIWebSocketResponseCreatePayload,
planOpenAIWebSocketRequestPayload,
} from "./openai-ws-request.js" ;
import type { ResponseCreateEvent } from "./openai-ws-types.js" ;
import { log } from "./pi-embedded-runner/logger.js" ;
import { resolveProviderEndpoint } from "./provider-attribution.js" ;
import { normalizeProviderId } from "./provider-id.js" ;
import { createBoundaryAwareStreamFnForModel } from "./provider-transport-stream.js" ;
import {
buildAssistantMessageWithZeroUsage,
buildStreamErrorAssistantMessage,
} from "./stream-message-shared.js" ;
import { stripSystemPromptCacheBoundary } from "./system-prompt-cache-boundary.js" ;
import { mergeTransportMetadata } from "./transport-stream-shared.js" ;
// ─────────────────────────────────────────────────────────────────────────────
// Per-session state
// ─────────────────────────────────────────────────────────────────────────────
interface WsSession {
manager: OpenAIWebSocketManager;
managerConfigSignature: string;
authSignature: string;
/** Number of messages that were in context.messages at the END of the last streamFn call. */
lastContextLength: number;
/** Last full canonical request, before any incremental previous_response_id delta rewrite. */
lastRequestPayload?: ResponseCreateEvent;
/** Last response output converted to the same replay form used by future full-context sends. */
lastResponseInputItems: ReturnType<typeof convertResponseToInputItems>;
/** True if the connection has been established at least once. */
everConnected: boolean ;
/** True once a best-effort warm-up attempt has run for this session. */
warmUpAttempted: boolean ;
/** True if the session is permanently broken (no more reconnect). */
broken: boolean ;
/** Pending idle release timer when disabled-by-default pooling retains a session. */
idleTimer?: ReturnType<typeof setTimeout>;
pooledUntil?: number;
/** Session-scoped cool-down after repeated websocket failures. */
degradedUntil: number | null ;
degradeCooldownMs: number;
}
function resolveOpenAIWebSocketStrictToolSetting(
model: Parameters<StreamFn>[0 ],
): boolean | undefined {
return resolveOpenAIStrictToolSetting(model, {
transport: "websocket" ,
supportsStrictMode:
model.compat && typeof model.compat === "object"
? (model.compat as { supportsStrictMode?: boolean }).supportsStrictMode
: undefined,
});
}
/** Module-level registry: sessionId → WsSession */
const wsRegistry = new Map<string, WsSession>();
type OpenAIWsStreamDeps = {
createManager: (options?: OpenAIWebSocketManagerOptions) => OpenAIWebSocketManager;
createHttpFallbackStreamFn: (model: ProviderRuntimeModel) => StreamFn | undefined;
streamSimple: typeof piAi.streamSimple;
};
type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAssistantPhase };
const defaultOpenAIWsStreamDeps: OpenAIWsStreamDeps = {
createManager: (options) => new OpenAIWebSocketManager(options),
createHttpFallbackStreamFn: (model) => createBoundaryAwareStreamFnForModel(model),
streamSimple: (...args) => piAi.streamSimple(...args),
};
let openAIWsStreamDeps: OpenAIWsStreamDeps = defaultOpenAIWsStreamDeps;
type AssistantMessageEventStreamLike = {
push(event: AssistantMessageEvent): void ;
end(result?: AssistantMessage): void ;
result(): Promise<AssistantMessage>;
[Symbol.asyncIterator](): AsyncIterator<AssistantMessageEvent>;
};
class LocalAssistantMessageEventStream implements AssistantMessageEventStreamLike {
private readonly queue: AssistantMessageEvent[] = [];
private readonly waiting: Array<(value: IteratorResult<AssistantMessageEvent>) => void > = [];
private done = false ;
private readonly finalResultPromise: Promise<AssistantMessage>;
private resolveFinalResult!: (result: AssistantMessage) => void ;
constructor() {
this .finalResultPromise = new Promise((resolve) => {
this .resolveFinalResult = resolve;
});
}
push(event: AssistantMessageEvent): void {
if (this .done) {
return ;
}
if (event.type === "done" ) {
this .done = true ;
this .resolveFinalResult(event.message);
} else if (event.type === "error" ) {
this .done = true ;
this .resolveFinalResult(event.error);
}
const waiter = this .waiting.shift();
if (waiter) {
waiter({ value: event, done: false });
return ;
}
this .queue.push(event);
}
end(result?: AssistantMessage): void {
this .done = true ;
if (result) {
this .resolveFinalResult(result);
}
while (this .waiting.length > 0 ) {
const waiter = this .waiting.shift();
waiter?.({ value: undefined as unknown as AssistantMessageEvent, done: true });
}
}
async *[Symbol.asyncIterator](): AsyncIterator<AssistantMessageEvent> {
while (true ) {
if (this .queue.length > 0 ) {
yield this .queue.shift()!;
continue ;
}
if (this .done) {
return ;
}
const result = await new Promise<IteratorResult<AssistantMessageEvent>>((resolve) => {
this .waiting.push(resolve);
});
if (result.done) {
return ;
}
yield result.value;
}
}
result(): Promise<AssistantMessage> {
return this .finalResultPromise;
}
}
function createEventStream(): AssistantMessageEventStream {
return typeof piAi.createAssistantMessageEventStream === "function"
? piAi.createAssistantMessageEventStream()
: (new LocalAssistantMessageEventStream() as unknown as AssistantMessageEventStream);
}
// ─────────────────────────────────────────────────────────────────────────────
// Public registry helpers
// ─────────────────────────────────────────────────────────────────────────────
type ReleaseWsSessionOptions = {
allowPool?: boolean ;
env?: NodeJS.ProcessEnv;
};
function resolveWsSessionPoolConfig(env: NodeJS.ProcessEnv = process.env): {
enabled: boolean ;
idleMs: number;
} {
const enabled =
env.OPENCLAW_OPENAI_WS_POOL === "1" || env.OPENCLAW_OPENAI_WS_SESSION_POOL === "1" ;
const rawIdleMs = Number(env.OPENCLAW_OPENAI_WS_SESSION_POOL_IDLE_MS);
const idleMs = Number.isFinite(rawIdleMs)
? Math.min(300 _000 , Math.max(1 _000 , Math.trunc(rawIdleMs)))
: 30 _000 ;
return { enabled, idleMs };
}
function clearWsSessionIdleTimer(session: WsSession): void {
if (!session.idleTimer) {
return ;
}
clearTimeout(session.idleTimer);
session.idleTimer = undefined;
session.pooledUntil = undefined;
}
function closeWsSession(sessionId: string, session: WsSession): void {
clearWsSessionIdleTimer(session);
try {
session.manager.close();
} catch {
// Ignore close errors — connection may already be gone.
}
wsRegistry.delete (sessionId);
}
/**
* Release and close the WebSocket session for the given sessionId.
* Call this after the agent run completes to free the connection.
*/
export function releaseWsSession(sessionId: string, options: ReleaseWsSessionOptions = {}): void {
const session = wsRegistry.get(sessionId);
if (!session) {
return ;
}
const pool = resolveWsSessionPoolConfig(options.env);
if (
options.allowPool === true &&
pool.enabled &&
!session.broken &&
session.manager.isConnected()
) {
clearWsSessionIdleTimer(session);
session.pooledUntil = Date.now() + pool.idleMs;
session.idleTimer = setTimeout(() => {
const current = wsRegistry.get(sessionId);
if (current === session) {
closeWsSession(sessionId, session);
}
}, pool.idleMs);
session.idleTimer.unref?.();
log.debug(`[ws-stream] pooled websocket session=${sessionId} idleMs=${pool.idleMs}`);
return ;
}
closeWsSession(sessionId, session);
}
/**
* Returns true if a live WebSocket session exists for the given sessionId.
*/
export function hasWsSession(sessionId: string): boolean {
const s = wsRegistry.get(sessionId);
return !!(s && !s.broken && s.manager.isConnected());
}
export {
buildAssistantMessageFromResponse,
convertMessagesToInputItems,
convertTools,
planTurnInput,
} from "./openai-ws-message-conversion.js" ;
// ─────────────────────────────────────────────────────────────────────────────
// StreamFn factory
// ─────────────────────────────────────────────────────────────────────────────
export interface OpenAIWebSocketStreamOptions {
/** Manager options (url override, retry counts, etc.) */
managerOptions?: OpenAIWebSocketManagerOptions;
/** Abort signal forwarded from the run. */
signal?: AbortSignal;
}
type WsTransport = "sse" | "websocket" | "auto" ;
const WARM_UP_TIMEOUT_MS = 8 _000 ;
const MAX_AUTO_WS_RUNTIME_RETRIES = 1 ;
const DEFAULT_WS_DEGRADE_COOLDOWN_MS = 60 _000 ;
let wsDegradeCooldownMsOverride: number | undefined;
class OpenAIWebSocketRuntimeError extends Error {
readonly kind: "disconnect" | "send" | "server" ;
readonly retryable: boolean ;
readonly closeCode?: number;
readonly closeReason?: string;
constructor(
message: string,
params: {
kind: "disconnect" | "send" | "server" ;
retryable: boolean ;
closeCode?: number;
closeReason?: string;
},
) {
super (message);
this .name = "OpenAIWebSocketRuntimeError" ;
this .kind = params.kind;
this .retryable = params.retryable;
this .closeCode = params.closeCode;
this .closeReason = params.closeReason;
}
}
function resolveWsTransport(options: Parameters<StreamFn>[2 ]): WsTransport {
const transport = (options as { transport?: unknown } | undefined)?.transport;
return transport === "sse" || transport === "websocket" || transport === "auto"
? transport
: "auto" ;
}
type WsOptions = Parameters<StreamFn>[2 ] & { openaiWsWarmup?: unknown; signal?: AbortSignal };
function resolveWsWarmup(options: Parameters<StreamFn>[2 ]): boolean {
const warmup = (options as WsOptions | undefined)?.openaiWsWarmup;
return warmup === true ;
}
function resetWsSession(params: {
session: WsSession;
createManager: () => OpenAIWebSocketManager;
preserveDegradeUntil?: boolean ;
}): void {
clearWsSessionIdleTimer(params.session);
try {
params.session.manager.close();
} catch {
/* ignore */
}
params.session.manager = params.createManager();
params.session.everConnected = false ;
params.session.warmUpAttempted = false ;
params.session.broken = false ;
params.session.lastContextLength = 0 ;
params.session.lastRequestPayload = undefined;
params.session.lastResponseInputItems = [];
if (!params.preserveDegradeUntil) {
params.session.degradedUntil = null ;
}
}
function markWsSessionDegraded(session: WsSession): void {
session.degradedUntil = Date.now() + session.degradeCooldownMs;
}
function isWsSessionDegraded(session: WsSession): boolean {
if (!session.degradedUntil) {
return false ;
}
if (session.degradedUntil <= Date.now()) {
session.degradedUntil = null ;
return false ;
}
return true ;
}
function createWsManager(
managerOptions: OpenAIWebSocketManagerOptions | undefined,
sessionHeaders?: Record<string, string>,
): OpenAIWebSocketManager {
return openAIWsStreamDeps.createManager({
...managerOptions,
...(sessionHeaders
? {
headers: {
...managerOptions?.headers,
...sessionHeaders,
},
}
: {}),
});
}
function stringifyStable(value: unknown): string {
if (value === null || typeof value !== "object" ) {
return JSON.stringify(value);
}
if (Array.isArray(value)) {
return `[${value.map((entry) => stringifyStable(entry)).join("," )}]`;
}
const entries = Object.entries(value).toSorted(([left], [right]) => left.localeCompare(right));
return `{${entries
.map(([key, entry]) => `${JSON.stringify(key)}:${stringifyStable(entry)}`)
.join("," )}}`;
}
function resolveWsManagerConfigSignature(
managerOptions: OpenAIWebSocketManagerOptions | undefined,
sessionHeaders?: Record<string, string>,
): string {
return stringifyStable({
headers: sessionHeaders,
request: managerOptions?.request,
url: managerOptions?.url,
});
}
function resolveWsAuthSignature(apiKey: string): string {
return createHash("sha256" ).update(apiKey).digest("hex" );
}
const AZURE_OPENAI_PROVIDER_IDS = new Set(["azure-openai" , "azure-openai-responses" ]);
const OPENAI_CODEX_PROVIDER_ID = "openai-codex" ;
function normalizeTransportIdentityValue(value: string, maxLength = 160 ): string {
const trimmed = value.trim().replace(/[\r\n]+/gu, " " );
return trimmed.length > maxLength ? trimmed.slice(0 , maxLength) : trimmed;
}
function usesNativeOpenAIRoute(provider: string, baseUrl?: string): boolean {
const endpointClass = resolveProviderEndpoint(baseUrl).endpointClass;
const normalizedProvider = normalizeProviderId(provider);
if (!normalizedProvider) {
return false ;
}
if (normalizedProvider === "openai" ) {
return endpointClass === "default" || endpointClass === "openai-public" ;
}
if (AZURE_OPENAI_PROVIDER_IDS.has(normalizedProvider)) {
return endpointClass === "default" || endpointClass === "azure-openai" ;
}
if (normalizedProvider === OPENAI_CODEX_PROVIDER_ID) {
return (
endpointClass === "default" ||
endpointClass === "openai-public" ||
endpointClass === "openai-codex"
);
}
return false ;
}
function resolveNativeOpenAISessionHeaders(params: {
provider: string;
baseUrl?: string;
sessionId?: string;
}): Record<string, string> | undefined {
if (!params.sessionId || !usesNativeOpenAIRoute(params.provider, params.baseUrl)) {
return undefined;
}
const sessionId = normalizeTransportIdentityValue(params.sessionId);
if (!sessionId) {
return undefined;
}
return {
"x-client-request-id" : sessionId,
"x-openclaw-session-id" : sessionId,
};
}
function resolveNativeOpenAITransportTurnState(params: {
provider: string;
baseUrl?: string;
sessionId?: string;
turnId: string;
attempt: number;
transport: "stream" | "websocket" ;
}): ProviderTransportTurnState | undefined {
const sessionHeaders = resolveNativeOpenAISessionHeaders({
provider: params.provider,
baseUrl: params.baseUrl,
sessionId: params.sessionId,
});
if (!sessionHeaders) {
return undefined;
}
const turnId = normalizeTransportIdentityValue(params.turnId);
const attempt = String(Math.max(1 , params.attempt));
return {
headers: {
...sessionHeaders,
"x-openclaw-turn-id" : turnId,
"x-openclaw-turn-attempt" : attempt,
},
metadata: {
openclaw_session_id: sessionHeaders["x-openclaw-session-id" ] ?? "" ,
openclaw_turn_id: turnId,
openclaw_turn_attempt: attempt,
openclaw_transport: params.transport,
},
};
}
function resolveProviderTransportTurnState(
model: Parameters<StreamFn>[0 ],
params: {
sessionId?: string;
turnId: string;
attempt: number;
transport: "stream" | "websocket" ;
},
): ProviderTransportTurnState | undefined {
if (usesNativeOpenAIRoute(model.provider, (model as { baseUrl?: string }).baseUrl)) {
return resolveNativeOpenAITransportTurnState({
provider: model.provider,
baseUrl: (model as { baseUrl?: string }).baseUrl,
sessionId: params.sessionId,
turnId: params.turnId,
attempt: params.attempt,
transport: params.transport,
});
}
return (
resolveProviderTransportTurnStateWithPlugin({
provider: model.provider,
context: {
provider: model.provider,
modelId: model.id,
model: model as ProviderRuntimeModel,
sessionId: params.sessionId,
turnId: params.turnId,
attempt: params.attempt,
transport: params.transport,
},
}) ?? undefined
);
}
function resolveWebSocketSessionPolicy(
model: Parameters<StreamFn>[0 ],
sessionId: string,
): { headers?: Record<string, string>; degradeCooldownMs: number } {
if (usesNativeOpenAIRoute(model.provider, (model as { baseUrl?: string }).baseUrl)) {
return {
headers: resolveNativeOpenAISessionHeaders({
provider: model.provider,
baseUrl: (model as { baseUrl?: string }).baseUrl,
sessionId,
}),
degradeCooldownMs: Math.max(0 , wsDegradeCooldownMsOverride ?? DEFAULT_WS_DEGRADE_COOLDOWN_MS),
};
}
const policy = resolveProviderWebSocketSessionPolicyWithPlugin({
provider: model.provider,
context: {
provider: model.provider,
modelId: model.id,
model: model as ProviderRuntimeModel,
sessionId,
},
});
return {
headers: policy?.headers,
degradeCooldownMs: Math.max(
0 ,
wsDegradeCooldownMsOverride ?? policy?.degradeCooldownMs ?? DEFAULT_WS_DEGRADE_COOLDOWN_MS,
),
};
}
function formatOpenAIWebSocketError(
event: Parameters<OpenAIWebSocketManager["onMessage" ]>[0 ] extends (arg: infer T) => void
? Extract<T, { type: "error" }>
: never,
): string {
const details = getOpenAIWebSocketErrorDetails(event);
const code = details.code ?? "unknown" ;
const message = details.message ?? "Unknown error" ;
const extras = [
typeof details.status === "number" ? `status=${details.status}` : null ,
details.type ? `type=${details.type}` : null ,
details.param ? `param=${details.param}` : null ,
].filter(Boolean );
return extras.length > 0
? `${message} (code=${code}; ${extras.join(", " )})`
: `${message} (code=${code})`;
}
function formatOpenAIWebSocketResponseFailure(response: {
error?: { code?: string; message?: string };
incomplete_details?: { reason?: string };
}): string {
if (response.error) {
return `${response.error.code || "unknown" }: ${response.error.message || "no message" }`;
}
if (response.incomplete_details?.reason) {
return `incomplete: ${response.incomplete_details.reason}`;
}
return "Unknown error (no error details in response)" ;
}
function normalizeWsRunError(err: unknown): OpenAIWebSocketRuntimeError {
if (err instanceof OpenAIWebSocketRuntimeError) {
return err;
}
return new OpenAIWebSocketRuntimeError(formatErrorMessage(err), {
kind: "server" ,
retryable: false ,
});
}
function buildRetryableSendError(err: unknown): OpenAIWebSocketRuntimeError {
return new OpenAIWebSocketRuntimeError(
err instanceof Error ? err.message : `WebSocket send failed: ${String(err)}`,
{
kind: "send" ,
retryable: true ,
},
);
}
async function runWarmUp(params: {
manager: OpenAIWebSocketManager;
modelId: string;
tools: FunctionToolDefinition[];
instructions?: string;
metadata?: Record<string, string>;
signal?: AbortSignal;
}): Promise<void > {
if (params.signal?.aborted) {
throw new Error("aborted" );
}
await new Promise<void >((resolve, reject) => {
const timeout = setTimeout(() => {
cleanup();
reject(new Error(`warm-up timed out after ${WARM_UP_TIMEOUT_MS}ms`));
}, WARM_UP_TIMEOUT_MS);
const abortHandler = () => {
cleanup();
reject(new Error("aborted" ));
};
const closeHandler = (code: number, reason: string) => {
cleanup();
reject(new Error(`warm-up closed (code=${code}, reason=${reason || "unknown" })`));
};
const unsubscribe = params.manager.onMessage((event) => {
if (event.type === "response.completed" ) {
cleanup();
resolve();
} else if (event.type === "response.failed" ) {
cleanup();
reject(
new Error(`warm-up failed: ${formatOpenAIWebSocketResponseFailure(event.response)}`),
);
} else if (event.type === "error" ) {
cleanup();
reject(new Error(`warm-up error: ${formatOpenAIWebSocketError(event)}`));
}
});
const cleanup = () => {
clearTimeout(timeout);
params.signal?.removeEventListener("abort" , abortHandler);
params.manager.off("close" , closeHandler);
unsubscribe();
};
params.signal?.addEventListener("abort" , abortHandler, { once: true });
params.manager.on("close" , closeHandler);
params.manager.warmUp({
model: params.modelId,
tools: params.tools.length > 0 ? params.tools : undefined,
instructions: params.instructions,
...(params.metadata ? { metadata: params.metadata } : {}),
});
});
}
/**
* Creates a `StreamFn` backed by a persistent WebSocket connection to the
* OpenAI Responses API. The first call for a given `sessionId` opens the
* connection; subsequent calls reuse it, sending only incremental tool-result
* inputs with `previous_response_id`.
*
* If the WebSocket connection is unavailable, the function falls back to the
* standard `streamSimple` HTTP path and logs a warning.
*
* @param apiKey OpenAI API key
* @param sessionId Agent session ID (used as the registry key)
* @param opts Optional manager + abort signal overrides
*/
export function createOpenAIWebSocketStreamFn(
apiKey: string,
sessionId: string,
opts: OpenAIWebSocketStreamOptions = {},
): StreamFn {
return (model, context, options) => {
const eventStream = createEventStream();
const run = async () => {
const transport = resolveWsTransport(options);
if (transport === "sse" ) {
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal);
}
const signal = opts.signal ?? (options as WsOptions | undefined)?.signal;
let emittedStart = false ;
let runtimeRetries = 0 ;
const turnId = randomUUID();
let turnAttempt = 0 ;
const wsSessionPolicy = resolveWebSocketSessionPolicy(model, sessionId);
const sessionHeaders = wsSessionPolicy.headers;
while (true ) {
let session = wsRegistry.get(sessionId);
const authSignature = resolveWsAuthSignature(apiKey);
const managerConfigSignature = resolveWsManagerConfigSignature(
opts.managerOptions,
sessionHeaders,
);
if (!session) {
const manager = createWsManager(opts.managerOptions, sessionHeaders);
session = {
manager,
managerConfigSignature,
authSignature,
lastContextLength: 0 ,
lastResponseInputItems: [],
everConnected: false ,
warmUpAttempted: false ,
broken: false ,
degradedUntil: null ,
degradeCooldownMs: wsSessionPolicy.degradeCooldownMs,
};
wsRegistry.set(sessionId, session);
} else if (
session.managerConfigSignature !== managerConfigSignature ||
session.authSignature !== authSignature
) {
clearWsSessionIdleTimer(session);
resetWsSession({
session,
createManager: () => createWsManager(opts.managerOptions, sessionHeaders),
});
session.managerConfigSignature = managerConfigSignature;
session.authSignature = authSignature;
session.degradeCooldownMs = wsSessionPolicy.degradeCooldownMs;
} else {
clearWsSessionIdleTimer(session);
}
if (transport !== "websocket" && isWsSessionDegraded(session)) {
log.debug(
`[ws-stream] session=${sessionId} in websocket cool-down; using HTTP fallback until ${new Date(session.degradedUntil!).toISOString()}`,
);
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: emittedStart,
turnState: resolveProviderTransportTurnState(model, {
sessionId,
turnId,
attempt: Math.max(1 , turnAttempt),
transport: "stream" ,
}),
});
}
if (!session.manager.isConnected() && !session.broken) {
try {
await session.manager.connect(apiKey);
session.everConnected = true ;
session.degradedUntil = null ;
log.debug(`[ws-stream] connected for session=${sessionId}`);
} catch (connErr) {
markWsSessionDegraded(session);
resetWsSession({
session,
createManager: () => createWsManager(opts.managerOptions, sessionHeaders),
preserveDegradeUntil: true ,
});
if (transport === "websocket" ) {
throw connErr instanceof Error ? connErr : new Error(String(connErr));
}
log.warn(
`[ws-stream] WebSocket connect failed for session=${sessionId}; falling back to HTTP. error=${String(connErr)}`,
);
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: emittedStart,
turnState: resolveProviderTransportTurnState(model, {
sessionId,
turnId,
attempt: Math.max(1 , turnAttempt),
transport: "stream" ,
}),
});
}
}
if (session.broken || !session.manager.isConnected()) {
if (transport === "websocket" ) {
throw new Error("WebSocket session disconnected" );
}
log.warn(`[ws-stream] session=${sessionId} broken/disconnected; falling back to HTTP`);
markWsSessionDegraded(session);
resetWsSession({
session,
createManager: () => createWsManager(opts.managerOptions, sessionHeaders),
preserveDegradeUntil: true ,
});
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: emittedStart,
turnState: resolveProviderTransportTurnState(model, {
sessionId,
turnId,
attempt: Math.max(1 , turnAttempt),
transport: "stream" ,
}),
});
}
if (resolveWsWarmup(options) && !session.warmUpAttempted) {
session.warmUpAttempted = true ;
let warmupFailed = false ;
try {
await runWarmUp({
manager: session.manager,
modelId: model.id,
tools: convertTools(context.tools, {
strict: resolveOpenAIWebSocketStrictToolSetting(model),
}),
instructions: context.systemPrompt
? stripSystemPromptCacheBoundary(context.systemPrompt)
: undefined,
metadata: resolveProviderTransportTurnState(model, {
sessionId,
turnId,
attempt: Math.max(1 , turnAttempt),
transport: "websocket" ,
})?.metadata,
signal,
});
log.debug(`[ws-stream] warm-up completed for session=${sessionId}`);
} catch (warmErr) {
if (signal?.aborted) {
throw warmErr instanceof Error ? warmErr : new Error(String(warmErr));
}
warmupFailed = true ;
log.warn(
`[ws-stream] warm-up failed for session=${sessionId}; continuing without warm-up. error=${String(warmErr)}`,
);
}
if (warmupFailed && !session.manager.isConnected()) {
try {
session.manager.close();
} catch {
/* ignore */
}
try {
session.manager = createWsManager(opts.managerOptions, sessionHeaders);
await session.manager.connect(apiKey);
session.everConnected = true ;
session.degradedUntil = null ;
log.debug(`[ws-stream] reconnected after warm-up failure for session=${sessionId}`);
} catch (reconnectErr) {
markWsSessionDegraded(session);
resetWsSession({
session,
createManager: () => createWsManager(opts.managerOptions, sessionHeaders),
preserveDegradeUntil: true ,
});
if (transport === "websocket" ) {
throw reconnectErr instanceof Error
? reconnectErr
: new Error(String(reconnectErr));
}
log.warn(
`[ws-stream] reconnect after warm-up failed for session=${sessionId}; falling back to HTTP. error=${String(reconnectErr)}`,
);
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: emittedStart,
turnState: resolveProviderTransportTurnState(model, {
sessionId,
turnId,
attempt: Math.max(1 , turnAttempt),
transport: "stream" ,
}),
});
}
}
}
turnAttempt++;
const turnState = resolveProviderTransportTurnState(model, {
sessionId,
turnId,
attempt: turnAttempt,
transport: "websocket" ,
});
const fullTurnInput = {
inputItems: convertMessagesToInputItems(context.messages, model),
};
let fullPayload = buildOpenAIWebSocketResponseCreatePayload({
model,
context,
options: options as WsOptions | undefined,
turnInput: fullTurnInput,
tools: convertTools(context.tools, {
strict: resolveOpenAIWebSocketStrictToolSetting(model),
}),
metadata: turnState?.metadata,
}) as Record<string, unknown>;
const nextPayload = await options?.onPayload?.(fullPayload, model);
fullPayload = mergeTransportMetadata(
(nextPayload ?? fullPayload) as Record<string, unknown>,
turnState?.metadata,
);
const plannedPayload = planOpenAIWebSocketRequestPayload({
fullPayload: fullPayload as ResponseCreateEvent,
previousRequestPayload: session.lastRequestPayload,
previousResponseId: session.manager.previousResponseId,
previousResponseInputItems: session.lastResponseInputItems,
});
const plannedInputItems = Array.isArray(plannedPayload.payload.input)
? plannedPayload.payload.input
: [];
if (plannedPayload.mode === "incremental" ) {
log.debug(
`[ws-stream] session=${sessionId}: incremental send (${plannedInputItems.length} items) previous_response_id=${plannedPayload.payload.previous_response_id}`,
);
} else {
log.debug(
`[ws-stream] session=${sessionId}: full context send (${plannedInputItems.length} items)`,
);
}
const requestPayload = plannedPayload.payload as Parameters<
OpenAIWebSocketManager["send" ]
>[0 ];
try {
session.manager.send(requestPayload);
} catch (sendErr) {
const normalizedErr = buildRetryableSendError(sendErr);
if (
transport !== "websocket" &&
!signal?.aborted &&
runtimeRetries < MAX_AUTO_WS_RUNTIME_RETRIES
) {
runtimeRetries++;
log.warn(
`[ws-stream] retrying websocket turn after send failure for session=${sessionId} (${runtimeRetries}/${MAX_AUTO_WS_RUNTIME_RETRIES}). error=${normalizedErr.message}`,
);
resetWsSession({
session,
createManager: () => createWsManager(opts.managerOptions, sessionHeaders),
});
continue ;
}
if (transport !== "websocket" ) {
log.warn(
`[ws-stream] send failed for session=${sessionId}; falling back to HTTP. error=${normalizedErr.message}`,
);
markWsSessionDegraded(session);
resetWsSession({
session,
createManager: () => createWsManager(opts.managerOptions, sessionHeaders),
preserveDegradeUntil: true ,
});
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: emittedStart,
turnState: resolveProviderTransportTurnState(model, {
sessionId,
turnId,
attempt: turnAttempt,
transport: "stream" ,
}),
});
}
throw normalizedErr;
}
if (!emittedStart) {
eventStream.push({
type: "start" ,
partial: buildAssistantMessageWithZeroUsage({
model,
content: [],
stopReason: "stop" ,
}),
});
emittedStart = true ;
}
const outputItemPhaseById = new Map<string, OpenAIResponsesAssistantPhase | undefined>();
const outputTextByPart = new Map<string, string>();
const emittedTextByPart = new Map<string, string>();
const getOutputTextKey = (itemId: string, contentIndex: number) =>
`${itemId}:${contentIndex}`;
const emitTextDelta = (params: {
fullText: string;
deltaText: string;
itemId?: string;
contentIndex?: number;
}) => {
const resolvedItemId = params.itemId;
const contentIndex = params.contentIndex ?? 0 ;
const itemPhase = resolvedItemId
? normalizeAssistantPhase(outputItemPhaseById.get(resolvedItemId))
: undefined;
const partialBase = buildAssistantMessageWithZeroUsage({
model,
content: [
{
type: "text" ,
text: params.fullText,
...(resolvedItemId
? {
textSignature: encodeAssistantTextSignature({
id: resolvedItemId,
...(itemPhase ? { phase: itemPhase } : {}),
}),
}
: {}),
},
],
stopReason: "stop" ,
});
const partialMsg: AssistantMessageWithPhase = itemPhase
? ({ ...partialBase, phase: itemPhase } as AssistantMessageWithPhase)
: partialBase;
eventStream.push({
type: "text_delta" ,
contentIndex,
delta: params.deltaText,
partial: partialMsg,
});
};
const emitBufferedTextDelta = (params: { itemId: string; contentIndex: number }) => {
const key = getOutputTextKey(params.itemId, params.contentIndex);
const fullText = outputTextByPart.get(key) ?? "" ;
const emittedText = emittedTextByPart.get(key) ?? "" ;
if (!fullText || fullText === emittedText) {
return ;
}
const deltaText = fullText.startsWith(emittedText)
? fullText.slice(emittedText.length)
: fullText;
emittedTextByPart.set(key, fullText);
emitTextDelta({
fullText,
deltaText,
itemId: params.itemId,
contentIndex: params.contentIndex,
});
};
const capturedContextLength = context.messages.length;
let sawWsOutput = false ;
try {
await new Promise<void >((resolve, reject) => {
const abortHandler = () => {
outputItemPhaseById.clear();
outputTextByPart.clear();
emittedTextByPart.clear();
cleanup();
reject(new Error("aborted" ));
};
if (signal?.aborted) {
reject(new Error("aborted" ));
return ;
}
signal?.addEventListener("abort" , abortHandler, { once: true });
const closeHandler = (code: number, reason: string) => {
outputItemPhaseById.clear();
outputTextByPart.clear();
emittedTextByPart.clear();
cleanup();
const closeInfo = session.manager.lastCloseInfo;
reject(
new OpenAIWebSocketRuntimeError(
`WebSocket closed mid-request (code=${code}, reason=${reason || "unknown" })`,
{
kind: "disconnect" ,
retryable: closeInfo?.retryable ?? true ,
closeCode: closeInfo?.code ?? code,
closeReason: closeInfo?.reason ?? reason,
},
),
);
};
session.manager.on("close" , closeHandler);
const cleanup = () => {
signal?.removeEventListener("abort" , abortHandler);
session.manager.off("close" , closeHandler);
unsubscribe();
};
const unsubscribe = session.manager.onMessage((event) => {
if (
event.type === "response.output_item.added" ||
event.type === "response.output_item.done" ||
event.type === "response.content_part.added" ||
event.type === "response.content_part.done" ||
event.type === "response.output_text.delta" ||
event.type === "response.output_text.done" ||
event.type === "response.function_call_arguments.delta" ||
event.type === "response.function_call_arguments.done"
) {
sawWsOutput = true ;
}
if (
event.type === "response.output_item.added" ||
event.type === "response.output_item.done"
) {
if (typeof event.item.id === "string" ) {
const itemPhase =
event.item.type === "message"
? normalizeAssistantPhase((event.item as { phase?: unknown }).phase)
: undefined;
outputItemPhaseById.set(event.item.id, itemPhase);
if (itemPhase !== undefined) {
for (const key of outputTextByPart.keys()) {
if (key.startsWith(`${event.item.id}:`)) {
const [, contentIndexText] = key.split(":" );
emitBufferedTextDelta({
itemId: event.item.id,
contentIndex: Number.parseInt(contentIndexText ?? "0" , 10 ) || 0 ,
});
}
}
}
}
return ;
}
if (event.type === "response.output_text.delta" ) {
const key = getOutputTextKey(event.item_id, event.content_index);
const nextText = `${outputTextByPart.get(key) ?? "" }${event.delta}`;
outputTextByPart.set(key, nextText);
if (outputItemPhaseById.get(event.item_id) !== undefined) {
emitBufferedTextDelta({
itemId: event.item_id,
contentIndex: event.content_index,
});
}
return ;
}
if (event.type === "response.output_text.done" ) {
const key = getOutputTextKey(event.item_id, event.content_index);
if (event.text && event.text !== outputTextByPart.get(key)) {
outputTextByPart.set(key, event.text);
}
if (outputItemPhaseById.get(event.item_id) !== undefined) {
emitBufferedTextDelta({
itemId: event.item_id,
contentIndex: event.content_index,
});
}
return ;
}
if (event.type === "response.completed" ) {
outputItemPhaseById.clear();
outputTextByPart.clear();
emittedTextByPart.clear();
cleanup();
session.lastContextLength = capturedContextLength;
session.lastRequestPayload = fullPayload as ResponseCreateEvent;
session.lastResponseInputItems = convertResponseToInputItems(event.response, {
api: model.api,
provider: model.provider,
id: model.id,
input: model.input,
});
const assistantMsg = buildAssistantMessageFromResponse(event.response, {
api: model.api,
provider: model.provider,
id: model.id,
});
const reason: Extract<StopReason, "stop" | "length" | "toolUse" > =
assistantMsg.stopReason === "toolUse" ? "toolUse" : "stop" ;
eventStream.push({ type: "done" , reason, message: assistantMsg });
resolve();
} else if (event.type === "response.failed" ) {
outputItemPhaseById.clear();
outputTextByPart.clear();
emittedTextByPart.clear();
cleanup();
reject(
new OpenAIWebSocketRuntimeError(
`OpenAI WebSocket response failed: ${formatOpenAIWebSocketResponseFailure(event.response)}`,
{
kind: "server" ,
retryable: false ,
},
),
);
} else if (event.type === "error" ) {
outputItemPhaseById.clear();
outputTextByPart.clear();
emittedTextByPart.clear();
cleanup();
reject(
new OpenAIWebSocketRuntimeError(
`OpenAI WebSocket error: ${formatOpenAIWebSocketError(event)}`,
{
kind: "server" ,
retryable: false ,
},
),
);
}
});
});
return ;
} catch (wsRunErr) {
const normalizedErr = normalizeWsRunError(wsRunErr);
if (
transport !== "websocket" &&
!signal?.aborted &&
normalizedErr.retryable &&
!sawWsOutput &&
runtimeRetries < MAX_AUTO_WS_RUNTIME_RETRIES
) {
runtimeRetries++;
log.warn(
`[ws-stream] retrying websocket turn after retryable runtime failure for session=${sessionId} (${runtimeRetries}/${MAX_AUTO_WS_RUNTIME_RETRIES}). error=${normalizedErr.message}`,
);
resetWsSession({
session,
createManager: () => createWsManager(opts.managerOptions, sessionHeaders),
});
continue ;
}
if (transport !== "websocket" && !signal?.aborted && !sawWsOutput) {
log.warn(
`[ws-stream] session=${sessionId} runtime failure before output; falling back to HTTP. error=${normalizedErr.message}`,
);
markWsSessionDegraded(session);
resetWsSession({
session,
createManager: () => createWsManager(opts.managerOptions, sessionHeaders),
preserveDegradeUntil: true ,
});
return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal, {
suppressStart: true ,
turnState: resolveProviderTransportTurnState(model, {
sessionId,
turnId,
attempt: turnAttempt,
transport: "stream" ,
}),
});
}
throw normalizedErr;
}
}
};
queueMicrotask(() =>
run().catch ((err) => {
const errorMessage = formatErrorMessage(err);
log.warn(`[ws-stream] session=${sessionId} run error: ${errorMessage}`);
eventStream.push({
type: "error" ,
reason: "error" ,
error: buildStreamErrorAssistantMessage({
model,
errorMessage,
}),
});
eventStream.end();
}),
);
return eventStream;
};
}
/**
* Fall back to HTTP and pipe events into the existing stream.
* This is called when the WebSocket is broken or unavailable.
*/
async function fallbackToHttp(
model: Parameters<StreamFn>[0 ],
context: Parameters<StreamFn>[1 ],
streamOptions: Parameters<StreamFn>[2 ],
apiKey: string,
eventStream: AssistantMessageEventStreamLike,
signal?: AbortSignal,
fallbackOptions?: {
suppressStart?: boolean ;
turnState?: ProviderTransportTurnState;
},
): Promise<void > {
const baseOnPayload = streamOptions?.onPayload;
const mergedOptions = {
...streamOptions,
apiKey,
...(fallbackOptions?.turnState?.headers
? {
headers: {
...streamOptions?.headers,
...fallbackOptions.turnState.headers,
},
}
: {}),
...(fallbackOptions?.turnState?.metadata
? {
onPayload: async (
payload: unknown,
payloadModel: Parameters<NonNullable<typeof baseOnPayload>>[1 ],
) => {
const nextPayload = await baseOnPayload?.(payload, payloadModel);
const resolvedPayload = (nextPayload ?? payload) as Record<string, unknown>;
return mergeTransportMetadata(resolvedPayload, fallbackOptions.turnState?.metadata);
},
}
: {}),
...(signal ? { signal } : {}),
};
const httpStreamFn =
openAIWsStreamDeps.createHttpFallbackStreamFn(model as ProviderRuntimeModel) ??
openAIWsStreamDeps.streamSimple;
const httpStream = await httpStreamFn(model, context, mergedOptions);
for await (const event of httpStream) {
if (fallbackOptions?.suppressStart && event.type === "start" ) {
continue ;
}
eventStream.push(event);
}
}
export const __testing = {
setDepsForTest(overrides?: Partial<OpenAIWsStreamDeps>) {
openAIWsStreamDeps = overrides
? {
...defaultOpenAIWsStreamDeps,
...overrides,
}
: defaultOpenAIWsStreamDeps;
},
setWsDegradeCooldownMsForTest(nextMs?: number) {
wsDegradeCooldownMsOverride = nextMs;
},
};
Messung V0.5 in Prozent C=98 H=95 G=96
¤ Dauer der Verarbeitung: 0.6 Sekunden
(vorverarbeitet am 2026-05-26)
¤
*© Formatika GbR, Deutschland