import { beforeEach, describe, expect, it, vi } from
"vitest" ;
type StreamingSessionStub = {
active:
boolean ;
start: ReturnType<
typeof vi.fn>;
update: ReturnType<
typeof vi.fn>;
close: ReturnType<
typeof vi.fn>;
isActive: ReturnType<
typeof vi.fn>;
};
const resolveFeishuAccountMock = vi.hoisted(() => vi.fn());
const getFeishuRuntimeMock = vi.hoisted(() => vi.fn());
const sendMessageFeishuMock = vi.hoisted(() => vi.fn());
const sendMarkdownCardFeishuMock = vi.hoisted(() => vi.fn());
const sendStructuredCardFeishuMock = vi.hoisted(() => vi.fn());
const sendMediaFeishuMock = vi.hoisted(() => vi.fn());
const createFeishuClientMock = vi.hoisted(() => vi.fn());
const resolveReceiveIdTypeMock = vi.hoisted(() => vi.fn());
const createReplyDispatcherWithTypingMock = vi.hoisted(() => vi.fn());
const addTypingIndicatorMock = vi.hoisted(() => vi.fn(async () => ({ messageId:
"om_msg" })))
;
const removeTypingIndicatorMock = vi.hoisted(() => vi.fn(async () => {}));
const streamingInstances = vi.hoisted((): StreamingSessionStub[] => []);
function mergeStreamingText(
previousText: string | undefined,
nextText: string | undefined,
): string {
const previous = typeof previousText === "string" ? previousText : "" ;
const next = typeof nextText === "string" ? nextText : "" ;
if (!next) {
return previous;
}
if (!previous || next === previous) {
return next;
}
if (next.startsWith(previous) || next.includes(previous)) {
return next;
}
if (previous.startsWith(next) || previous.includes(next)) {
return previous;
}
const maxOverlap = Math.min(previous.length, next.length);
for (let overlap = maxOverlap; overlap > 0 ; overlap -= 1 ) {
if (previous.slice(-overlap) === next.slice(0 , overlap)) {
return `${previous}${next.slice(overlap)}`;
}
}
return `${previous}${next}`;
}
vi.mock("./accounts.js" , () => ({
resolveFeishuAccount: resolveFeishuAccountMock,
resolveFeishuRuntimeAccount: resolveFeishuAccountMock,
}));
vi.mock("./runtime.js" , () => ({ getFeishuRuntime: getFeishuRuntimeMock }));
vi.mock("./send.js" , () => ({
sendMessageFeishu: sendMessageFeishuMock,
sendMarkdownCardFeishu: sendMarkdownCardFeishuMock,
sendStructuredCardFeishu: sendStructuredCardFeishuMock,
}));
vi.mock("./media.js" , () => ({ sendMediaFeishu: sendMediaFeishuMock }));
vi.mock("./client.js" , () => ({ createFeishuClient: createFeishuClientMock }));
vi.mock("./targets.js" , () => ({ resolveReceiveIdType: resolveReceiveIdTypeMock }));
vi.mock("./typing.js" , () => ({
addTypingIndicator: addTypingIndicatorMock,
removeTypingIndicator: removeTypingIndicatorMock,
}));
vi.mock("./streaming-card.js" , () => {
return {
mergeStreamingText,
FeishuStreamingSession: class {
active = false ;
start = vi.fn(async () => {
this .active = true ;
});
update = vi.fn(async () => {});
close = vi.fn(async () => {
this .active = false ;
});
isActive = vi.fn(() => this .active);
constructor() {
streamingInstances.push(this );
}
},
};
});
import {
clearFeishuStreamingStartBackoffForTests,
createFeishuReplyDispatcher,
} from "./reply-dispatcher.js" ;
describe("createFeishuReplyDispatcher streaming behavior" , () => {
type ReplyDispatcherArgs = Parameters<typeof createFeishuReplyDispatcher>[0 ];
beforeEach(() => {
vi.clearAllMocks();
clearFeishuStreamingStartBackoffForTests();
streamingInstances.length = 0 ;
sendMediaFeishuMock.mockResolvedValue(undefined);
sendStructuredCardFeishuMock.mockResolvedValue(undefined);
resolveFeishuAccountMock.mockReturnValue({
accountId: "main" ,
appId: "app_id" ,
appSecret: "app_secret" ,
domain: "feishu" ,
config: {
renderMode: "auto" ,
streaming: true ,
},
});
resolveReceiveIdTypeMock.mockReturnValue("chat_id" );
createFeishuClientMock.mockReturnValue({});
createReplyDispatcherWithTypingMock.mockImplementation((opts) => ({
dispatcher: {},
replyOptions: {},
markDispatchIdle: vi.fn(),
_opts: opts,
}));
getFeishuRuntimeMock.mockReturnValue({
channel: {
text: {
resolveTextChunkLimit: vi.fn(() => 4000 ),
resolveChunkMode: vi.fn(() => "line" ),
resolveMarkdownTableMode: vi.fn(() => "preserve" ),
convertMarkdownTables: vi.fn((text) => text),
chunkTextWithMode: vi.fn((text) => [text]),
},
reply: {
createReplyDispatcherWithTyping: createReplyDispatcherWithTypingMock,
resolveHumanDelayConfig: vi.fn(() => undefined),
},
},
});
});
function setupNonStreamingAutoDispatcher() {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main" ,
appId: "app_id" ,
appSecret: "app_secret" ,
domain: "feishu" ,
config: {
renderMode: "auto" ,
streaming: false ,
},
});
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent" ,
runtime: { log: vi.fn(), error: vi.fn() } as never,
chatId: "oc_chat" ,
});
return createReplyDispatcherWithTypingMock.mock.calls[0 ]?.[0 ];
}
function createRuntimeLogger() {
return { log: vi.fn(), error: vi.fn() } as never;
}
function createDispatcherHarness(overrides: Partial<ReplyDispatcherArgs> = {}) {
const result = createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent" ,
runtime: {} as never,
chatId: "oc_chat" ,
...overrides,
});
return {
result,
options: createReplyDispatcherWithTypingMock.mock.calls.at(-1 )?.[0 ],
};
}
it("skips typing indicator when account typingIndicator is disabled" , async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main" ,
appId: "app_id" ,
appSecret: "app_secret" ,
domain: "feishu" ,
config: {
renderMode: "auto" ,
streaming: true ,
typingIndicator: false ,
},
});
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent" ,
runtime: {} as never,
chatId: "oc_chat" ,
replyToMessageId: "om_parent" ,
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0 ]?.[0 ];
await options.onReplyStart?.();
expect(addTypingIndicatorMock).not.toHaveBeenCalled();
});
it("skips typing indicator for stale replayed messages" , async () => {
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent" ,
runtime: {} as never,
chatId: "oc_chat" ,
replyToMessageId: "om_parent" ,
messageCreateTimeMs: Date.now() - 3 * 60 _000 ,
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0 ]?.[0 ];
await options.onReplyStart?.();
expect(addTypingIndicatorMock).not.toHaveBeenCalled();
});
it("treats second-based timestamps as stale for typing suppression" , async () => {
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent" ,
runtime: {} as never,
chatId: "oc_chat" ,
replyToMessageId: "om_parent" ,
messageCreateTimeMs: Math.floor((Date.now() - 3 * 60 _000 ) / 1000 ),
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0 ]?.[0 ];
await options.onReplyStart?.();
expect(addTypingIndicatorMock).not.toHaveBeenCalled();
});
it("keeps typing indicator for fresh messages" , async () => {
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent" ,
runtime: {} as never,
chatId: "oc_chat" ,
replyToMessageId: "om_parent" ,
messageCreateTimeMs: Date.now() - 30 _000 ,
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0 ]?.[0 ];
await options.onReplyStart?.();
expect(addTypingIndicatorMock).toHaveBeenCalledTimes(1 );
expect(addTypingIndicatorMock).toHaveBeenCalledWith(
expect.objectContaining({
messageId: "om_parent" ,
}),
);
});
it("keeps auto mode plain text on non-streaming send path" , async () => {
const { options } = createDispatcherHarness();
await options.deliver({ text: "plain text" }, { kind: "final" });
expect(streamingInstances).toHaveLength(0 );
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1 );
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("suppresses internal block payload delivery" , async () => {
const { options } = createDispatcherHarness();
await options.deliver({ text: "internal reasoning chunk" }, { kind: "block" });
expect(streamingInstances).toHaveLength(0 );
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
expect(sendMediaFeishuMock).not.toHaveBeenCalled();
});
it("sets disableBlockStreaming in replyOptions to prevent silent reply drops" , async () => {
const result = createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent" ,
runtime: {} as never,
chatId: "oc_chat" ,
});
expect(result.replyOptions).toHaveProperty("disableBlockStreaming" , true );
});
it("uses streaming session for auto mode markdown payloads" , async () => {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
rootId: "om_root_topic" ,
});
await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1 );
expect(streamingInstances[0 ].start).toHaveBeenCalledTimes(1 );
expect(streamingInstances[0 ].start).toHaveBeenCalledWith(
"oc_chat" ,
"chat_id" ,
expect.objectContaining({
replyToMessageId: undefined,
replyInThread: undefined,
rootId: "om_root_topic" ,
header: { title: "agent" , template: "blue" },
note: "Agent: agent" ,
}),
);
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("closes streaming with block text when final reply is missing" , async () => {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.deliver({ text: "```md\npartial answer\n```" }, { kind: "block" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1 );
expect(streamingInstances[0 ].start).toHaveBeenCalledTimes(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledWith("```md\npartial answer\n```" , {
note: "Agent: agent" ,
});
});
it("delivers distinct final payloads after streaming close" , async () => {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.deliver({ text: "```md\n完整回复第一段\n```" }, { kind: "final" });
await options.deliver({ text: "```md\n完整回复第一段 + 第二段\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(2 );
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledWith("```md\n完整回复第一段\n```" , {
note: "Agent: agent" ,
});
expect(streamingInstances[1 ].close).toHaveBeenCalledTimes(1 );
expect(streamingInstances[1 ].close).toHaveBeenCalledWith(
"```md\n完整回复第一段 + 第二段\n```" ,
{
note: "Agent: agent" ,
},
);
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("skips exact duplicate final text after streaming close" , async () => {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.deliver({ text: "```md\n同一条回复\n```" }, { kind: "final" });
await options.deliver({ text: "```md\n同一条回复\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledWith("```md\n同一条回复\n```" , {
note: "Agent: agent" ,
});
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("skips final text already closed by idle streaming" , async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onPartialReply?.({ text: "```md\nidle streamed reply\n```" });
await options.onIdle?.();
await options.deliver({ text: "```md\nidle streamed reply\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledWith("```md\nidle streamed reply\n```" , {
note: "Agent: agent" ,
});
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled();
});
it("suppresses duplicate final text while still sending media" , async () => {
const options = setupNonStreamingAutoDispatcher();
await options.deliver({ text: "plain final" }, { kind: "final" });
await options.deliver(
{ text: "plain final" , mediaUrl: "https://example.com/a.png " },
{ kind: "final" },
);
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1 );
expect(sendMessageFeishuMock).toHaveBeenLastCalledWith(
expect.objectContaining({
text: "plain final" ,
}),
);
expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1 );
expect(sendMediaFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
mediaUrl: "https://example.com/a.png ",
}),
);
});
it("keeps distinct non-streaming final payloads" , async () => {
const options = setupNonStreamingAutoDispatcher();
await options.deliver({ text: "notice header" }, { kind: "final" });
await options.deliver({ text: "actual answer body" }, { kind: "final" });
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(2 );
expect(sendMessageFeishuMock).toHaveBeenNthCalledWith(
1 ,
expect.objectContaining({ text: "notice header" }),
);
expect(sendMessageFeishuMock).toHaveBeenNthCalledWith(
2 ,
expect.objectContaining({ text: "actual answer body" }),
);
});
it("treats block updates as delta chunks" , async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main" ,
appId: "app_id" ,
appSecret: "app_secret" ,
domain: "feishu" ,
config: {
renderMode: "card" ,
streaming: true ,
},
});
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onPartialReply?.({ text: "hello" });
await options.deliver({ text: "lo world" }, { kind: "block" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledWith("hellolo world" , {
note: "Agent: agent" ,
});
});
it("sends media-only payloads as attachments" , async () => {
const { options } = createDispatcherHarness();
await options.deliver({ mediaUrl: "https://example.com/a.png " }, { kind: "final" });
expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1 );
expect(sendMediaFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
to: "oc_chat" ,
mediaUrl: "https://example.com/a.png ",
}),
);
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("falls back to legacy mediaUrl when mediaUrls is an empty array" , async () => {
const { options } = createDispatcherHarness();
await options.deliver(
{ text: "caption" , mediaUrl: "https://example.com/a.png ", mediaUrls: [] },
{ kind: "final" },
);
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1 );
expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1 );
expect(sendMediaFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
mediaUrl: "https://example.com/a.png ",
}),
);
});
it("sends attachments after streaming final markdown replies" , async () => {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.deliver(
{ text: "```ts\nconst x = 1\n```" , mediaUrls: ["https://example.com/a.png "] },
{ kind: "final" },
);
expect(streamingInstances).toHaveLength(1 );
expect(streamingInstances[0 ].start).toHaveBeenCalledTimes(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1 );
expect(sendMediaFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
mediaUrl: "https://example.com/a.png ",
}),
);
});
it("passes replyInThread to sendMessageFeishu for plain text" , async () => {
const { options } = createDispatcherHarness({
replyToMessageId: "om_msg" ,
replyInThread: true ,
});
await options.deliver({ text: "plain text" }, { kind: "final" });
expect(sendMessageFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
replyToMessageId: "om_msg" ,
replyInThread: true ,
}),
);
});
it("passes replyInThread to sendStructuredCardFeishu for card text" , async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main" ,
appId: "app_id" ,
appSecret: "app_secret" ,
domain: "feishu" ,
config: {
renderMode: "card" ,
streaming: false ,
},
});
const { options } = createDispatcherHarness({
replyToMessageId: "om_msg" ,
replyInThread: true ,
});
await options.deliver({ text: "card text" }, { kind: "final" });
expect(sendStructuredCardFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
replyToMessageId: "om_msg" ,
replyInThread: true ,
}),
);
});
it("streams reasoning content as blockquote before answer" , async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
allowReasoningPreview: true ,
});
await options.onReplyStart?.();
// Core agent sends pre-formatted text from formatReasoningMessage
result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thinking step 1_" });
result.replyOptions.onReasoningStream?.({
text: "Reasoning:\n_thinking step 1_\n_step 2_" ,
});
result.replyOptions.onPartialReply?.({ text: "answer part" });
result.replyOptions.onReasoningEnd?.();
await options.deliver({ text: "answer part final" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1 );
const updateCalls = streamingInstances[0 ].update.mock.calls.map((c: unknown[]) =>
typeof c[0 ] === "string" ? c[0 ] : "" ,
);
const reasoningUpdate = updateCalls.find((c) => c.includes("Thinking" ));
expect(reasoningUpdate).toContain("> **Thinking**" );
// formatReasoningPrefix strips "Reasoning:" prefix and italic markers
expect(reasoningUpdate).toContain("> thinking step" );
expect(reasoningUpdate).not.toContain("Reasoning:" );
expect(reasoningUpdate).not.toMatch(/> _.*_/);
const combinedUpdate = updateCalls.find((c) => c.includes("Thinking" ) && c.includes("---" ));
expect(combinedUpdate).toBeDefined();
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
const closeArg = streamingInstances[0 ].close.mock.calls[0 ][0 ] as string;
expect(closeArg).toContain("> **Thinking**" );
expect(closeArg).toContain("---" );
expect(closeArg).toContain("answer part final" );
});
it("provides onReasoningStream and onReasoningEnd when reasoning previews are allowed" , () => {
const { result } = createDispatcherHarness({
runtime: createRuntimeLogger(),
allowReasoningPreview: true ,
});
expect(result.replyOptions.onReasoningStream).toBeTypeOf("function" );
expect(result.replyOptions.onReasoningEnd).toBeTypeOf("function" );
});
it("omits reasoning callbacks unless reasoning previews are allowed" , () => {
const { result } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
expect(result.replyOptions.onReasoningStream).toBeUndefined();
expect(result.replyOptions.onReasoningEnd).toBeUndefined();
});
it("omits reasoning callbacks when streaming is disabled" , () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main" ,
appId: "app_id" ,
appSecret: "app_secret" ,
domain: "feishu" ,
config: {
renderMode: "auto" ,
streaming: false ,
},
});
const { result } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
expect(result.replyOptions.onReasoningStream).toBeUndefined();
expect(result.replyOptions.onReasoningEnd).toBeUndefined();
});
it("renders reasoning-only card when no answer text arrives" , async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
allowReasoningPreview: true ,
});
await options.onReplyStart?.();
result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_deep thought_" });
result.replyOptions.onReasoningEnd?.();
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
const closeArg = streamingInstances[0 ].close.mock.calls[0 ][0 ] as string;
expect(closeArg).toContain("> **Thinking**" );
expect(closeArg).toContain("> deep thought" );
expect(closeArg).not.toContain("Reasoning:" );
expect(closeArg).not.toContain("---" );
});
it("ignores empty reasoning payloads" , async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
allowReasoningPreview: true ,
});
await options.onReplyStart?.();
result.replyOptions.onReasoningStream?.({ text: "" });
result.replyOptions.onPartialReply?.({ text: "```ts\ncode\n```" });
await options.deliver({ text: "```ts\ncode\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1 );
const closeArg = streamingInstances[0 ].close.mock.calls[0 ][0 ] as string;
expect(closeArg).not.toContain("Thinking" );
expect(closeArg).toBe("```ts\ncode\n```" );
});
it("deduplicates final text by raw answer payload, not combined card text" , async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
allowReasoningPreview: true ,
});
await options.onReplyStart?.();
result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thought_" });
result.replyOptions.onReasoningEnd?.();
await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1 );
expect(streamingInstances[0 ].close).toHaveBeenCalledTimes(1 );
// Deliver the same raw answer text again — should be deduped
await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" });
// No second streaming session since the raw answer text matches
expect(streamingInstances).toHaveLength(1 );
});
it("passes replyToMessageId and replyInThread to streaming.start()" , async () => {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
replyToMessageId: "om_msg" ,
replyInThread: true ,
});
await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1 );
expect(streamingInstances[0 ].start).toHaveBeenCalledWith(
"oc_chat" ,
"chat_id" ,
expect.objectContaining({
replyToMessageId: "om_msg" ,
replyInThread: true ,
header: { title: "agent" , template: "blue" },
note: "Agent: agent" ,
}),
);
});
it("disables streaming for thread replies and keeps reply metadata" , async () => {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
replyToMessageId: "om_msg" ,
replyInThread: false ,
threadReply: true ,
rootId: "om_root_topic" ,
});
await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(0 );
expect(sendStructuredCardFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
replyToMessageId: "om_msg" ,
replyInThread: true ,
}),
);
});
it("passes replyInThread to media attachments" , async () => {
const { options } = createDispatcherHarness({
replyToMessageId: "om_msg" ,
replyInThread: true ,
});
await options.deliver({ mediaUrl: "https://example.com/a.png " }, { kind: "final" });
expect(sendMediaFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
replyToMessageId: "om_msg" ,
replyInThread: true ,
}),
);
});
it("backs off streaming retries after start() throws (HTTP 400)" , async () => {
const errorMock = vi.fn();
let shouldFailStart = true ;
const nowSpy = vi.spyOn(Date, "now" ).mockReturnValue(1 _000 );
// Intercept streaming instance creation to make first start() reject
const origPush = streamingInstances.push.bind(streamingInstances);
streamingInstances.push = (...args: StreamingSessionStub[]) => {
if (shouldFailStart) {
args[0 ].start = vi
.fn()
.mockRejectedValue(new Error("Create card request failed with HTTP 400" ));
shouldFailStart = false ;
}
return origPush(...args);
};
try {
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent" ,
runtime: { log: vi.fn(), error: errorMock } as never,
chatId: "oc_chat" ,
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0 ]?.[0 ];
// First deliver with markdown triggers startStreaming - which will fail
await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "final" });
// Wait for the async error to propagate
await vi.waitFor(() => {
expect(errorMock).toHaveBeenCalledWith(expect.stringContaining("streaming start failed" ));
});
expect(streamingInstances).toHaveLength(1 );
expect(sendStructuredCardFeishuMock).toHaveBeenCalledTimes(1 );
// Immediate next markdown reply should skip a new streaming start and
// fall back directly to a normal card instead of paying the 400 latency.
await options.deliver({ text: "```ts\nconst y = 2\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1 );
expect(sendStructuredCardFeishuMock).toHaveBeenCalledTimes(2 );
// After the short backoff expires, retry streaming so fixed permissions
// or transient Feishu failures recover without a process restart.
nowSpy.mockReturnValue(62 _000 );
await options.deliver({ text: "```ts\nconst z = 3\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(2 );
expect(streamingInstances[1 ].start).toHaveBeenCalled();
expect(streamingInstances[1 ].close).toHaveBeenCalled();
} finally {
streamingInstances.push = origPush;
nowSpy.mockRestore();
}
});
});
Messung V0.5 in Prozent C=98 H=99 G=98
¤ Dauer der Verarbeitung: 0.8 Sekunden
¤
*© Formatika GbR, Deutschland