/** * Teams streaming message using the streaminfo entity protocol. * * Follows the official Teams SDK pattern: * 1. First chunk → POST a typing activity with streaminfo entity (streamType: "streaming") * 2. Subsequent chunks → POST typing activities with streaminfo + incrementing streamSequence * 3. Finalize → POST a message activity with streaminfo (streamType: "final") * * Uses the shared draft-stream-loop for throttling (avoids rate limits).
*/
import { createDraftStreamLoop, type DraftStreamLoop } from "openclaw/plugin-sdk/channel-lifecycle"; import { readStringValue } from "openclaw/plugin-sdk/text-runtime";
/** Default throttle interval between stream updates (ms).
* Teams docs recommend buffering tokens for 1.5-2s; limit is 1 req/s. */ const DEFAULT_THROTTLE_MS = 1500;
/** Minimum chars before sending the first streaming message. */ const MIN_INITIAL_CHARS = 20;
/** Teams message text limit. */ const TEAMS_MAX_CHARS = 4000;
/** * Stop streaming before Teams expires the content stream server-side. * The exact service limit is opaque, so stay comfortably under it.
*/ const MAX_STREAM_AGE_MS = 45_000;
type StreamSendFn = (activity: Record<string, unknown>) => Promise<unknown>;
export type TeamsStreamOptions = { /** Function to send an activity (POST to Bot Framework). */
sendActivity: StreamSendFn; /** Whether to enable feedback loop on the final message. */
feedbackLoopEnabled?: boolean; /** Throttle interval in ms. Default: 600. */
throttleMs?: number; /** Called on errors during streaming. */
onError?: (err: unknown) => void;
};
import { AI_GENERATED_ENTITY } from "./ai-entity.js"; import { formatUnknownError } from "./errors.js";
function extractId(response: unknown): string | undefined { if (response && typeof response === "object" && "id" in response) { return readStringValue((response as { id?: unknown }).id);
} return undefined;
}
function buildStreamInfoEntity(
streamId: string | undefined,
streamType: "informative" | "streaming" | "final",
streamSequence?: number,
): Record<string, unknown> { const entity: Record<string, unknown> = {
type: "streaminfo",
streamType,
}; // streamId is only present after the first chunk (returned by the service) if (streamId) {
entity.streamId = streamId;
} // streamSequence must be present for start/continue, but NOT for final if (streamSequence != null) {
entity.streamSequence = streamSequence;
} return entity;
}
/** * Send an informative status update (blue progress bar in Teams). * Call this immediately when a message is received, before LLM starts generating. * Establishes the stream so subsequent chunks continue from this stream ID.
*/
async sendInformativeUpdate(text: string): Promise<void> { if (this.stopped || this.finalized) { return;
}
/** * Ingest partial text from the LLM token stream. * Called by onPartialReply — accumulates text and throttles updates.
*/
update(text: string): void { if (this.stopped || this.finalized) { return;
} this.accumulatedText = text;
// Wait for minimum chars before first send (avoids push notification flicker) if (!this.streamId && this.accumulatedText.length < MIN_INITIAL_CHARS) { return;
}
// Text exceeded Teams limit — finalize immediately with what we have // so the user isn't left waiting while the LLM keeps generating. if (this.accumulatedText.length > TEAMS_MAX_CHARS) { this.streamFailed = true; voidthis.finalize(); return;
}
// Stop early before Teams expires the stream server-side. finalize() will // close the stream with the last good content, and reply-stream-controller // will deliver any remaining suffix via normal fallback delivery. if (this.streamStartedAt && Date.now() - this.streamStartedAt >= MAX_STREAM_AGE_MS) { this.streamFailed = true; voidthis.finalize(); return;
}
// Don't append cursor — Teams requires each chunk to be a prefix of subsequent chunks. // The cursor character would cause "content should contain previously streamed content" errors. this.loop.update(this.accumulatedText);
}
/** * Finalize the stream — send the final message activity.
*/
async finalize(): Promise<void> { if (this.finalized) { return;
} this.finalized = true; this.stopped = true; this.loop.stop();
await this.loop.waitForInFlight();
// If no text was streamed (e.g. agent sent a card via tool instead of // streaming text), just return. Teams auto-clears the informative progress // bar after its streaming timeout. Sending an empty final message fails // with 403. if (!this.accumulatedText.trim()) { return;
}
// If streaming failed (>4000 chars or POST errors), close the stream // with the last successfully streamed text so Teams removes the "Stop" // button and replaces the partial chunks. deliver() handles the complete // response since hasContent returns false when streamFailed is true. if (this.streamFailed) { if (this.streamId) { try {
await this.sendActivity({
type: "message",
text: this.lastStreamedText || "",
channelData: { feedbackLoopEnabled: this.feedbackLoopEnabled },
entities: [AI_GENERATED_ENTITY, buildStreamInfoEntity(this.streamId, "final")],
});
} catch { // Best effort — stream will auto-close after Teams timeout
}
} return;
}
// Send final message activity. // Per the spec: type=message, streamType=final, NO streamSequence. try { const entities: Array<Record<string, unknown>> = [AI_GENERATED_ENTITY]; if (this.streamId) {
entities.push(buildStreamInfoEntity(this.streamId, "final"));
}
/** Whether streaming successfully delivered content (at least one chunk sent, not failed). */
get hasContent(): boolean { returnthis.accumulatedText.length > 0 && !this.streamFailed;
}
/** Whether streaming failed and fallback delivery is needed. */
get isFailed(): boolean { returnthis.streamFailed;
}
/** Number of characters successfully streamed before failure. */
get streamedLength(): number { returnthis.lastStreamedText.length;
}
/** Whether the stream has been finalized. */
get isFinalized(): boolean { returnthis.finalized;
}
/** Whether streaming fell back (not used in this implementation). */
get isFallback(): boolean { returnfalse;
}
/** * Send a single streaming chunk as a typing activity with streaminfo. * Per the Teams REST API spec: * - First chunk: no streamId, streamSequence=1 → returns 201 with { id: streamId } * - Subsequent chunks: include streamId, increment streamSequence → returns 202
*/ private async pushStreamChunk(text: string): Promise<boolean> { if (this.stopped && !this.finalized) { returnfalse;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.