import http from "node:http"; import { URL } from "node:url"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { resolveConfiguredCapabilityProvider } from "openclaw/plugin-sdk/provider-selection-runtime"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; import {
createWebhookInFlightLimiter,
WEBHOOK_BODY_READ_DEFAULTS,
} from "openclaw/plugin-sdk/webhook-ingress"; import {
isRequestBodyLimitError,
readRequestBodyWithLimit,
requestBodyErrorToText,
} from "../api.js"; import { isAllowlistedCaller, normalizePhoneNumber } from "./allowlist.js"; import { normalizeVoiceCallConfig, type VoiceCallConfig } from "./config.js"; import type { CoreAgentDeps, CoreConfig } from "./core-bridge.js"; import { getHeader } from "./http-headers.js"; import type { CallManager } from "./manager.js"; import type { MediaStreamConfig } from "./media-stream.js"; import { MediaStreamHandler } from "./media-stream.js"; import type { VoiceCallProvider } from "./providers/base.js"; import { isProviderStatusTerminal } from "./providers/shared/call-status.js"; import type { TwilioProvider } from "./providers/twilio.js"; import type { CallRecord, NormalizedEvent, WebhookContext } from "./types.js"; import type { WebhookResponsePayload } from "./webhook.types.js"; import type { RealtimeCallHandler } from "./webhook/realtime-handler.js"; import { startStaleCallReaper } from "./webhook/stale-call-reaper.js";
type RealtimeTranscriptionRuntime = typeofimport("./realtime-transcription.runtime.js");
type ResponseGeneratorModule = typeofimport("./response-generator.js");
let realtimeTranscriptionRuntimePromise: Promise<RealtimeTranscriptionRuntime> | undefined;
let responseGeneratorModulePromise: Promise<ResponseGeneratorModule> | undefined;
function loadRealtimeTranscriptionRuntime(): Promise<RealtimeTranscriptionRuntime> {
realtimeTranscriptionRuntimePromise ??= import("./realtime-transcription.runtime.js"); return realtimeTranscriptionRuntimePromise;
}
function loadResponseGeneratorModule(): Promise<ResponseGeneratorModule> {
responseGeneratorModulePromise ??= import("./response-generator.js"); return responseGeneratorModulePromise;
}
// Suppress only while the initial greeting is actively being played. // If playback fails and the call leaves "speaking", do not block auto-response. if (call.state !== "speaking") { returnfalse;
}
const mode = (call.metadata?.mode as string | undefined) ?? "conversation"; if (mode !== "conversation") { returnfalse;
}
/** *Initializemediastreamingwiththeselectedrealtimetranscriptionprovider.
*/ private async initializeMediaStreaming(): Promise<void> { const streaming = this.config.streaming; const pluginConfig = this.fullConfig ?? (this.coreConfig as unknown as OpenClawConfig | undefined); const { getRealtimeTranscriptionProvider, listRealtimeTranscriptionProviders } =
await loadRealtimeTranscriptionRuntime(); const resolution = resolveConfiguredCapabilityProvider({
configuredProviderId: streaming.provider,
providerConfigs: streaming.providers,
cfg: pluginConfig,
cfgForResolve: pluginConfig ?? ({} as OpenClawConfig),
getConfiguredProvider: (providerId) =>
getRealtimeTranscriptionProvider(providerId, pluginConfig),
listProviders: () => listRealtimeTranscriptionProviders(pluginConfig),
resolveProviderConfig: ({ provider, cfg, rawConfig }) =>
provider.resolveConfig?.({ cfg, rawConfig }) ?? rawConfig,
isProviderConfigured: ({ provider, cfg, providerConfig }) =>
provider.isConfigured({ cfg, providerConfig }),
}); if (!resolution.ok && resolution.code === "missing-configured-provider") {
console.warn(
`[voice-call] Streaming enabled but realtime transcription provider "${resolution.configuredProviderId}" is not registered`,
); return;
} if (!resolution.ok && resolution.code === "no-registered-provider") {
console.warn( "[voice-call] Streaming enabled but no realtime transcription provider is registered",
); return;
} if (!resolution.ok) {
console.warn(
`[voice-call] Streaming enabled but provider "${resolution.provider?.id}" is not configured`,
); return;
} const provider = resolution.provider; const providerConfig = resolution.providerConfig;
const streamConfig: MediaStreamConfig = {
transcriptionProvider: provider,
providerConfig,
preStartTimeoutMs: streaming.preStartTimeoutMs,
maxPendingConnections: streaming.maxPendingConnections,
maxPendingConnectionsPerIp: streaming.maxPendingConnectionsPerIp,
maxConnections: streaming.maxConnections,
resolveClientIp: (request) => this.resolveMediaStreamClientIp(request),
shouldAcceptStream: ({ callId, token }) => { const call = this.manager.getCallByProviderCallId(callId); if (!call) { returnfalse;
} if (this.provider.name === "twilio") { const twilio = this.provider as TwilioProvider; if (!twilio.isValidStreamToken(callId, token)) {
console.warn(`[voice-call] Rejecting media stream: invalid token for ${callId}`); returnfalse;
}
} returntrue;
},
onTranscript: (providerCallId, transcript) => { const safeTranscript = sanitizeTranscriptForLog(transcript);
console.log(
`[voice-call] Transcript for ${providerCallId}: ${safeTranscript} (chars=${transcript.length})`,
); const call = this.manager.getCallByProviderCallId(providerCallId); if (!call) {
console.warn(`[voice-call] No active call found for provider ID: ${providerCallId}`); return;
} const suppressBargeIn = this.shouldSuppressBargeInForInitialMessage(call); if (suppressBargeIn) {
console.log(
`[voice-call] Ignoring barge transcript while initial message is still playing (${providerCallId})`,
); return;
}
// Clear TTS queue on barge-in (user started speaking, interrupt current playback) if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
}
// Create a speech event and process it through the manager const event: NormalizedEvent = {
id: `stream-transcript-${Date.now()}`,
type: "call.speech",
callId: call.callId,
providerCallId,
timestamp: Date.now(),
transcript,
isFinal: true,
}; this.manager.processEvent(event);
// Auto-respond in conversation mode (inbound always, outbound if mode is conversation) const callMode = call.metadata?.mode as string | undefined; const shouldRespond = call.direction === "inbound" || callMode === "conversation"; if (shouldRespond) { this.handleInboundResponse(call.callId, transcript).catch((err) => {
console.warn(`[voice-call] Failed to auto-respond:`, err);
});
}
},
onSpeechStart: (providerCallId) => { if (this.provider.name !== "twilio") { return;
} const call = this.manager.getCallByProviderCallId(providerCallId); if (this.shouldSuppressBargeInForInitialMessage(call)) { return;
}
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
},
onPartialTranscript: (callId, partial) => { const safePartial = sanitizeTranscriptForLog(partial);
console.log(`[voice-call] Partial for ${callId}: ${safePartial} (chars=${partial.length})`);
},
onConnect: (callId, streamSid) => {
console.log(`[voice-call] Media stream connected: ${callId} -> ${streamSid}`); this.clearPendingDisconnectHangup(callId);
// Register stream with provider for TTS routing if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).registerCallStream(callId, streamSid);
}
// Speak initial message immediately (no delay) to avoid stream timeout this.manager.speakInitialMessage(callId).catch((err) => {
console.warn(`[voice-call] Failed to speak initial message:`, err);
});
},
onDisconnect: (callId, streamSid) => {
console.log(`[voice-call] Media stream disconnected: ${callId} (${streamSid})`); if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).unregisterCallStream(callId, streamSid);
}
// Guard: if a server is already listening, return the existing URL. // This prevents EADDRINUSE when start() is called more than once on the // same instance (e.g. during config hot-reload or concurrent ensureRuntime). if (this.server?.listening) { returnthis.listeningUrl ?? this.resolveListeningUrl(bind, webhookPath);
}
if (this.config.streaming.enabled && !this.mediaStreamHandler) {
await this.initializeMediaStreaming();
}
if (this.startPromise) { returnthis.startPromise;
}
if (req.method !== "POST") { return { statusCode: 405, body: "Method Not Allowed" };
}
const headerGate = this.verifyPreAuthWebhookHeaders(req.headers); if (!headerGate.ok) {
console.warn(`[voice-call] Webhook rejected before body read: ${headerGate.reason}`); return { statusCode: 401, body: "Unauthorized" };
}
const inFlightKey = req.socket.remoteAddress ?? ""; if (!this.webhookInFlightLimiter.tryAcquire(inFlightKey)) {
console.warn(`[voice-call] Webhook rejected before body read: too many in-flight requests`); return { statusCode: 429, body: "Too Many Requests" };
}
try {
let body = ""; try {
body = await this.readBody(req, MAX_WEBHOOK_BODY_BYTES, WEBHOOK_BODY_TIMEOUT_MS);
} catch (err) { if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) { return { statusCode: 413, body: "Payload Too Large" };
} if (isRequestBodyLimitError(err, "REQUEST_BODY_TIMEOUT")) { return { statusCode: 408, body: requestBodyErrorToText("REQUEST_BODY_TIMEOUT") };
} throw err;
}
// Replays must return the same TwiML body so Twilio retries reconnect cleanly. // The one-time token still changes, but the behavior stays identical. return !params.get("SpeechResult") && !params.get("Digits") ? params : null;
}
// Get call context for conversation history const call = this.manager.getCall(callId); if (!call) {
console.warn(`[voice-call] Call ${callId} not found for auto-response`); return;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.