import { createServer, type IncomingMessage, type Server, type ServerResponse } from
"node:http";
import { formatErrorMessage } from
"openclaw/plugin-sdk/error-runtime";
import { normalizeAccountId } from
"./bus-queries.js";
import type { QaBusState } from
"./bus-state.js";
import type {
QaBusCreateThreadInput,
QaBusDeleteMessageInput,
QaBusEditMessageInput,
QaBusInboundMessageInput,
QaBusOutboundMessageInput,
QaBusPollInput,
QaBusReactToMessageInput,
QaBusReadMessageInput,
QaBusSearchMessagesInput,
QaBusWaitForInput,
} from
"./runtime-api.js";
async
function readJson(req: IncomingMessage): Promise<unknown> {
const chunks: Buffer[] = [];
for await (
const chunk of req) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const text = Buffer.concat(chunks).toString(
"utf8").trim();
return text ? (JSON.parse(text) as unknown) : {};
}
export
function writeJson(res: ServerResponse, statusCode: number, body: unknown) {
const payload = JSON.stringify(body);
res.writeHead(statusCode, {
"content-type":
"application/json; charset=utf-8",
"content-length": Buffer.byteLength(payload),
});
res.end(payload);
}
export
function writeError(res: ServerResponse, statusCode: number, error: unknown) {
writeJson(res, statusCode, {
error: formatErrorMessage(error),
});
}
export async
function closeQaHttpServer(server: Server): Promise<
void> {
let forceCloseTimer: NodeJS.Timeout | undefined;
try {
await
new Promise<
void>((resolve, reject) => {
server.close((error) => (error ? reject(error) : resolve()));
server.closeIdleConnections?.();
forceCloseTimer = setTimeout(() => {
server.closeAllConnections?.();
},
250);
forceCloseTimer.unref();
});
}
finally {
if (forceCloseTimer) {
clearTimeout(forceCloseTimer);
}
}
}
export async
function handleQaBusRequest(params: {
req: IncomingMessage;
res: ServerResponse;
state: QaBusState;
}): Promise<
boolean> {
const method = params.req.method ??
"GET";
const url =
new URL(params.req.url ??
"/",
"http://127.0.0.1");
if (method ===
"GET" && url.pathname ===
"/health") {
writeJson(params.res,
200, { ok:
true });
return true;
}
if (method ===
"GET" && url.pathname ===
"/v1/state") {
writeJson(params.res,
200, params.state.getSnapshot());
return true;
}
if (!url.pathname.startsWith(
"/v1/")) {
return false;
}
if (method !==
"POST") {
writeError(params.res,
405,
"method not allowed");
return true;
}
const body = (await readJson(params.req)) as Record<string, unknown>;
try {
switch (url.pathname) {
case "/v1/reset":
params.state.reset();
writeJson(params.res,
200, { ok:
true });
return true;
case "/v1/inbound/message":
writeJson(params.res,
200, {
message: params.state.addInboundMessage(body as unknown as QaBusInboundMessageInput),
});
return true;
case "/v1/outbound/message":
writeJson(params.res,
200, {
message: params.state.addOutboundMessage(body as unknown as QaBusOutboundMessageInput
),
});
return true;
case "/v1/actions/thread-create":
writeJson(params.res, 200, {
thread: params.state.createThread(body as unknown as QaBusCreateThreadInput),
});
return true;
case "/v1/actions/react":
writeJson(params.res, 200, {
message: params.state.reactToMessage(body as unknown as QaBusReactToMessageInput),
});
return true;
case "/v1/actions/edit":
writeJson(params.res, 200, {
message: params.state.editMessage(body as unknown as QaBusEditMessageInput),
});
return true;
case "/v1/actions/delete":
writeJson(params.res, 200, {
message: params.state.deleteMessage(body as unknown as QaBusDeleteMessageInput),
});
return true;
case "/v1/actions/read":
writeJson(params.res, 200, {
message: params.state.readMessage(body as unknown as QaBusReadMessageInput),
});
return true;
case "/v1/actions/search":
writeJson(params.res, 200, {
messages: params.state.searchMessages(body as unknown as QaBusSearchMessagesInput),
});
return true;
case "/v1/poll": {
const input = body as unknown as QaBusPollInput;
const timeoutMs = Math.max(0, Math.min(input.timeoutMs ?? 0, 30_000));
const accountId = normalizeAccountId(input.accountId);
const initial = params.state.poll(input);
if (initial.events.length > 0 || timeoutMs === 0) {
writeJson(params.res, 200, initial);
return true;
}
try {
await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs, (snapshot) => {
return snapshot.events.some(
(event) => event.accountId === accountId && event.cursor > (input.cursor ?? 0),
);
});
} catch {
// timeout ok for long-poll
}
writeJson(params.res, 200, params.state.poll(input));
return true;
}
case "/v1/wait":
writeJson(params.res, 200, {
match: await params.state.waitFor(body as unknown as QaBusWaitForInput),
});
return true;
default:
writeError(params.res, 404, "not found");
return true;
}
} catch (error) {
writeError(params.res, 400, error);
return true;
}
}
export function createQaBusServer(state: QaBusState): Server {
return createServer(async (req, res) => {
const handled = await handleQaBusRequest({ req, res, state });
if (!handled) {
writeError(res, 404, "not found");
}
});
}
export async function startQaBusServer(params: { state: QaBusState; port?: number }) {
const server = createQaBusServer(params.state);
await new Promise<void>((resolve, reject) => {
server.once("error", reject);
server.listen(params.port ?? 0, "127.0.0.1", () => resolve());
});
const address = server.address();
if (!address || typeof address === "string") {
throw new Error("qa-bus failed to bind");
}
return {
server,
port: address.port,
baseUrl: `http://127.0.0.1:${address.port}`,
async stop() {
await closeQaHttpServer(server);
},
};
}