import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process" ;
import { createInterface, type Interface } from "node:readline" ;
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime" ;
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env" ;
import { normalizeLowercaseStringOrEmpty, resolveUserPath } from "openclaw/plugin-sdk/text-runtime" ;
import { DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS } from "./constants.js" ;
export type IMessageRpcError = {
code?: number;
message?: string;
data?: unknown;
};
export type IMessageRpcResponse<T> = {
jsonrpc?: string;
id?: string | number | null ;
result?: T;
error?: IMessageRpcError;
method?: string;
params?: unknown;
};
export type IMessageRpcNotification = {
method: string;
params?: unknown;
};
export type IMessageRpcClientOptions = {
cliPath?: string;
dbPath?: string;
runtime?: RuntimeEnv;
onNotification?: (msg: IMessageRpcNotification) => void ;
};
type PendingRequest = {
resolve: (value: unknown) => void ;
reject: (error: Error) => void ;
timer?: NodeJS.Timeout;
};
function isTestEnv(): boolean {
if (process.env.NODE_ENV === "test" ) {
return true ;
}
const vitest = normalizeLowercaseStringOrEmpty(process.env.VITEST);
return Boolean (vitest);
}
export class IMessageRpcClient {
private readonly cliPath: string;
private readonly dbPath?: string;
private readonly runtime?: RuntimeEnv;
private readonly onNotification?: (msg: IMessageRpcNotification) => void ;
private readonly pending = new Map<string, PendingRequest>();
private readonly closed: Promise<void >;
private closedResolve: (() => void ) | null = null ;
private child: ChildProcessWithoutNullStreams | null = null ;
private reader: Interface | null = null ;
private nextId = 1 ;
constructor(opts: IMessageRpcClientOptions = {}) {
this .cliPath = opts.cliPath?.trim() || "imsg" ;
this .dbPath = opts.dbPath?.trim() ? resolveUserPath(opts.dbPath) : undefined;
this .runtime = opts.runtime;
this .onNotification = opts.onNotification;
this .closed = new Promise((resolve) => {
this .closedResolve = resolve;
});
}
async start(): Promise<void > {
if (this .child) {
return ;
}
if (isTestEnv()) {
throw new Error("Refusing to start imsg rpc in test environment; mock iMessage RPC client" );
}
const args = ["rpc" ];
if (this .dbPath) {
args.push("--db" , this .dbPath);
}
const child = spawn(this .cliPath, args, {
stdio: ["pipe" , "pipe" , "pipe" ],
});
this .child = child;
this .reader = createInterface({ input: child.stdout });
this .reader.on("line" , (line) => {
const trimmed = line.trim();
if (!trimmed) {
return ;
}
this .handleLine(trimmed);
});
child.stderr?.on("data" , (chunk) => {
const lines = chunk.toString().split(/\r?\n/);
for (const line of lines) {
if (!line.trim()) {
continue ;
}
this .runtime?.error?.(`imsg rpc: ${line.trim()}`);
}
});
child.on("error" , (err) => {
this .failAll(err instanceof Error ? err : new Error(String(err)));
this .closedResolve?.();
});
child.on("close" , (code, signal) => {
if (code !== 0 && code !== null ) {
const reason = signal ? `signal ${signal}` : `code ${code}`;
this .failAll(new Error(`imsg rpc exited (${reason})`));
} else {
this .failAll(new Error("imsg rpc closed" ));
}
this .closedResolve?.();
});
}
async stop(): Promise<void > {
if (!this .child) {
return ;
}
this .reader?.close();
this .reader = null ;
this .child.stdin?.end();
const child = this .child;
this .child = null ;
await Promise.race([
this .closed,
new Promise<void >((resolve) => {
setTimeout(() => {
if (!child.killed) {
child.kill("SIGTERM" );
}
resolve();
}, 500 );
}),
]);
}
async waitForClose(): Promise<void > {
await this .closed;
}
async request<T = unknown>(
method: string,
params?: Record<string, unknown>,
opts?: { timeoutMs?: number },
): Promise<T> {
if (!this .child || !this .child.stdin) {
throw new Error("imsg rpc not running" );
}
const id = this .nextId++;
const payload = {
jsonrpc: "2.0" ,
id,
method,
params: params ?? {},
};
const line = `${JSON.stringify(payload)}\n`;
const timeoutMs = opts?.timeoutMs ?? DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS;
const response = new Promise<T>((resolve, reject) => {
const key = String(id);
const timer =
timeoutMs > 0
? setTimeout(() => {
this .pending.delete (key);
reject(new Error(`imsg rpc timeout (${method})`));
}, timeoutMs)
: undefined;
this .pending.set(key, {
resolve: (value) => resolve(value as T),
reject,
timer,
});
});
this .child.stdin.write(line);
return await response;
}
private handleLine(line: string) {
let parsed: IMessageRpcResponse<unknown>;
try {
parsed = JSON.parse(line) as IMessageRpcResponse<unknown>;
} catch (err) {
const detail = formatErrorMessage(err);
this .runtime?.error?.(`imsg rpc: failed to parse ${line}: ${detail}`);
return ;
}
if (parsed.id !== undefined && parsed.id !== null ) {
const key = String(parsed.id);
const pending = this .pending.get(key);
if (!pending) {
return ;
}
if (pending.timer) {
clearTimeout(pending.timer);
}
this .pending.delete (key);
if (parsed.error) {
const baseMessage = parsed.error.message ?? "imsg rpc error" ;
const details = parsed.error.data;
const code = parsed.error.code;
const suffixes = [] as string[];
if (typeof code === "number" ) {
suffixes.push(`code=${code}`);
}
if (details !== undefined) {
const detailText =
typeof details === "string" ? details : JSON.stringify(details, null , 2 );
if (detailText) {
suffixes.push(detailText);
}
}
const msg = suffixes.length > 0 ? `${baseMessage}: ${suffixes.join(" " )}` : baseMessage;
pending.reject(new Error(msg));
return ;
}
pending.resolve(parsed.result);
return ;
}
if (parsed.method) {
this .onNotification?.({
method: parsed.method,
params: parsed.params,
});
}
}
private failAll(err: Error) {
for (const [key, pending] of this .pending.entries()) {
if (pending.timer) {
clearTimeout(pending.timer);
}
pending.reject(err);
this .pending.delete (key);
}
}
}
export async function createIMessageRpcClient(
opts: IMessageRpcClientOptions = {},
): Promise<IMessageRpcClient> {
const client = new IMessageRpcClient(opts);
await client.start();
return client;
}
Messung V0.5 in Prozent C=100 H=98 G=98
¤ Dauer der Verarbeitung: 0.27 Sekunden
(vorverarbeitet am 2026-06-07)
¤
*© Formatika GbR, Deutschland