Files
pi-config/extensions/pi-intercom/broker/client.ts

536 lines
15 KiB
TypeScript

import { EventEmitter } from "events";
import net from "net";
import { randomUUID } from "crypto";
import { writeMessage, createMessageReader } from "./framing.js";
import { getBrokerSocketPath } from "./paths.js";
import type { SessionInfo, Message, Attachment } from "../types.js";
const BROKER_SOCKET = getBrokerSocketPath();
interface SendOptions {
text: string;
attachments?: Attachment[];
replyTo?: string;
expectsReply?: boolean;
messageId?: string;
}
interface SendResult {
id: string;
delivered: boolean;
reason?: string;
}
function toError(error: unknown): Error {
return error instanceof Error ? error : new Error(String(error));
}
function isAttachment(value: unknown): value is Attachment {
if (typeof value !== "object" || value === null) {
return false;
}
const attachment = value as Record<string, unknown>;
if (
attachment.type !== "file"
&& attachment.type !== "snippet"
&& attachment.type !== "context"
) {
return false;
}
if (typeof attachment.name !== "string" || typeof attachment.content !== "string") {
return false;
}
return attachment.language === undefined || typeof attachment.language === "string";
}
function isMessage(value: unknown): value is Message {
if (typeof value !== "object" || value === null) {
return false;
}
const message = value as Record<string, unknown>;
if (typeof message.id !== "string" || typeof message.timestamp !== "number") {
return false;
}
if (message.replyTo !== undefined && typeof message.replyTo !== "string") {
return false;
}
if (message.expectsReply !== undefined && typeof message.expectsReply !== "boolean") {
return false;
}
if (typeof message.content !== "object" || message.content === null) {
return false;
}
const content = message.content as Record<string, unknown>;
if (typeof content.text !== "string") {
return false;
}
return content.attachments === undefined
|| (Array.isArray(content.attachments) && content.attachments.every(isAttachment));
}
function isSessionInfo(value: unknown): value is SessionInfo {
if (typeof value !== "object" || value === null) {
return false;
}
const session = value as Record<string, unknown>;
if (
typeof session.id !== "string"
|| typeof session.cwd !== "string"
|| typeof session.model !== "string"
|| typeof session.pid !== "number"
|| typeof session.startedAt !== "number"
|| typeof session.lastActivity !== "number"
) {
return false;
}
if (session.name !== undefined && typeof session.name !== "string") {
return false;
}
return session.status === undefined || typeof session.status === "string";
}
export class IntercomClient extends EventEmitter {
private socket: net.Socket | null = null;
private _sessionId: string | null = null;
private pendingSends = new Map<string, { resolve: (r: SendResult) => void; reject: (e: Error) => void }>();
private pendingLists = new Map<string, { resolve: (sessions: SessionInfo[]) => void; reject: (e: Error) => void }>();
private disconnecting = false;
private disconnectError: Error | null = null;
private failPending(error: Error): void {
for (const pending of this.pendingSends.values()) {
pending.reject(error);
}
this.pendingSends.clear();
for (const pending of this.pendingLists.values()) {
pending.reject(error);
}
this.pendingLists.clear();
}
get sessionId(): string | null {
return this._sessionId;
}
isConnected(): boolean {
const socket = this.socket;
return Boolean(socket && this._sessionId && !this.disconnecting && !socket.destroyed && !socket.writableEnded && socket.writable);
}
private requireActiveSocket(): net.Socket {
if (this.disconnecting) {
throw new Error("Client disconnecting");
}
const socket = this.socket;
if (!socket || !this._sessionId) {
throw new Error("Not connected");
}
if (socket.destroyed || socket.writableEnded || !socket.writable) {
throw new Error("Client disconnected");
}
return socket;
}
connect(session: Omit<SessionInfo, "id">): Promise<void> {
if (this.socket) {
return Promise.reject(new Error("Already connected"));
}
return new Promise((resolve, reject) => {
const socket = net.connect(BROKER_SOCKET);
this.socket = socket;
this.disconnectError = null;
let settled = false;
const timeout = setTimeout(() => {
if (!this._sessionId) {
cleanupConnectionAttempt();
cleanupSocketListeners();
if (this.socket === socket) {
this.socket = null;
}
socket.destroy();
reject(new Error("Connection timeout"));
}
}, 10000);
let connectionEstablished = false;
const onRegistered = () => {
settled = true;
connectionEstablished = true;
cleanupConnectionAttempt();
resolve();
};
const onError = (err: Error) => {
settled = true;
cleanupConnectionAttempt();
cleanupSocketListeners();
if (this.socket === socket) {
this.socket = null;
}
socket.destroy();
reject(err);
};
const onClose = () => {
const wasConnecting = !settled && !this._sessionId;
const wasDisconnecting = this.disconnecting;
const disconnectError = this.disconnectError ?? new Error("Client disconnected");
this.disconnecting = false;
cleanupConnectionAttempt();
cleanupSocketListeners();
this.failPending(disconnectError);
if (this.socket === socket) {
this.socket = null;
}
this._sessionId = null;
this.disconnectError = null;
if (connectionEstablished && !wasDisconnecting) {
this.emit("disconnected", disconnectError);
}
if (wasConnecting) {
reject(new Error("Connection closed before registration"));
}
};
const onSocketError = (err: Error) => {
if (connectionEstablished) {
this.disconnectError = err;
this.emit("error", err);
}
};
const onReaderError = (error: Error) => {
const protocolError = new Error(`Intercom protocol error: ${error.message}`, { cause: error });
if (!connectionEstablished) {
onError(protocolError);
return;
}
this.disconnectError = protocolError;
this.emit("error", protocolError);
socket.destroy();
};
const reader = createMessageReader((msg) => {
this.handleBrokerMessage(msg);
}, onReaderError);
const cleanupConnectionAttempt = () => {
this.off("_registered", onRegistered);
socket.off("error", onError);
clearTimeout(timeout);
};
const cleanupSocketListeners = () => {
socket.off("data", reader);
socket.off("error", onSocketError);
socket.off("close", onClose);
};
socket.on("data", reader);
socket.on("error", onError);
socket.on("close", onClose);
socket.on("error", onSocketError);
this.once("_registered", onRegistered);
try {
writeMessage(socket, { type: "register", session });
} catch (error) {
cleanupConnectionAttempt();
cleanupSocketListeners();
if (this.socket === socket) {
this.socket = null;
}
socket.destroy();
reject(toError(error));
}
});
}
private handleBrokerMessage(msg: unknown): void {
if (typeof msg !== "object" || msg === null || !("type" in msg) || typeof msg.type !== "string") {
throw new Error("Invalid broker message");
}
const brokerMessage = msg as { type: string } & Record<string, unknown>;
if (this._sessionId === null && brokerMessage.type !== "registered") {
throw new Error(`Received ${brokerMessage.type} before registered`);
}
switch (brokerMessage.type) {
case "registered": {
if (typeof brokerMessage.sessionId !== "string") {
throw new Error("Invalid registered message");
}
if (this._sessionId !== null) {
throw new Error("Received duplicate registered message");
}
this._sessionId = brokerMessage.sessionId;
this.emit("_registered", { type: "registered", sessionId: brokerMessage.sessionId });
break;
}
case "sessions": {
const { requestId, sessions } = brokerMessage;
if (typeof requestId !== "string" || !Array.isArray(sessions) || !sessions.every(isSessionInfo)) {
throw new Error("Invalid sessions message");
}
const pending = this.pendingLists.get(requestId);
if (!pending) {
// Late list responses can still arrive after the caller has already timed out.
return;
}
this.pendingLists.delete(requestId);
pending.resolve(sessions);
break;
}
case "message": {
const { from, message } = brokerMessage;
if (!isSessionInfo(from) || !isMessage(message)) {
throw new Error("Invalid message event");
}
this.emit("message", from, message);
break;
}
case "delivered": {
const { messageId } = brokerMessage;
if (typeof messageId !== "string") {
throw new Error("Invalid delivered message");
}
const pending = this.pendingSends.get(messageId);
if (!pending) {
// Late send responses are harmless once the caller has already timed out.
return;
}
this.pendingSends.delete(messageId);
pending.resolve({ id: messageId, delivered: true });
break;
}
case "delivery_failed": {
const { messageId, reason } = brokerMessage;
if (typeof messageId !== "string" || typeof reason !== "string") {
throw new Error("Invalid delivery_failed message");
}
const pending = this.pendingSends.get(messageId);
if (!pending) {
// Late send responses are harmless once the caller has already timed out.
return;
}
this.pendingSends.delete(messageId);
pending.resolve({ id: messageId, delivered: false, reason });
break;
}
case "session_joined": {
if (!isSessionInfo(brokerMessage.session)) {
throw new Error("Invalid session_joined message");
}
this.emit("session_joined", brokerMessage.session);
break;
}
case "session_left": {
if (typeof brokerMessage.sessionId !== "string") {
throw new Error("Invalid session_left message");
}
this.emit("session_left", brokerMessage.sessionId);
break;
}
case "presence_update": {
if (!isSessionInfo(brokerMessage.session)) {
throw new Error("Invalid presence_update message");
}
this.emit("presence_update", brokerMessage.session);
break;
}
case "error": {
if (typeof brokerMessage.error !== "string") {
throw new Error("Invalid error message");
}
this.emit("error", new Error(brokerMessage.error));
break;
}
default:
throw new Error(`Unknown broker message type: ${brokerMessage.type}`);
}
}
async disconnect(): Promise<void> {
const socket = this.socket;
if (!socket) {
return;
}
this.disconnecting = true;
this.disconnectError = null;
this.failPending(new Error("Client disconnected"));
await new Promise<void>((resolve) => {
let settled = false;
const finish = () => {
if (settled) {
return;
}
settled = true;
clearTimeout(timeout);
socket.off("close", onClose);
socket.off("error", onError);
resolve();
};
const onClose = () => finish();
const onError = () => {
socket.destroy();
};
const timeout = setTimeout(() => {
socket.destroy();
}, 2000);
socket.once("close", onClose);
socket.once("error", onError);
try {
writeMessage(socket, { type: "unregister" });
socket.end();
} catch {
// Disconnect should still finish even if the unregister write fails.
socket.destroy();
}
});
}
listSessions(): Promise<SessionInfo[]> {
let socket: net.Socket;
try {
socket = this.requireActiveSocket();
} catch (error) {
return Promise.reject(toError(error));
}
return new Promise((resolve, reject) => {
const requestId = randomUUID();
const wrappedResolve = (sessions: SessionInfo[]) => {
clearTimeout(timeout);
resolve(sessions);
};
const wrappedReject = (error: Error) => {
clearTimeout(timeout);
reject(error);
};
const timeout = setTimeout(() => {
if (this.pendingLists.has(requestId)) {
this.pendingLists.delete(requestId);
wrappedReject(new Error("List sessions timeout"));
}
}, 5000);
this.pendingLists.set(requestId, { resolve: wrappedResolve, reject: wrappedReject });
try {
writeMessage(socket, { type: "list", requestId });
} catch (error) {
clearTimeout(timeout);
this.pendingLists.delete(requestId);
reject(toError(error));
}
});
}
send(to: string, options: SendOptions): Promise<SendResult> {
let socket: net.Socket;
try {
socket = this.requireActiveSocket();
} catch (error) {
return Promise.reject(toError(error));
}
const messageId = options.messageId ?? randomUUID();
const message: Message = {
id: messageId,
timestamp: Date.now(),
replyTo: options.replyTo,
expectsReply: options.expectsReply,
content: {
text: options.text,
attachments: options.attachments,
},
};
return new Promise((resolve, reject) => {
const wrappedResolve = (result: SendResult) => {
clearTimeout(timeout);
resolve(result);
};
const wrappedReject = (error: Error) => {
clearTimeout(timeout);
reject(error);
};
const timeout = setTimeout(() => {
if (this.pendingSends.has(messageId)) {
this.pendingSends.delete(messageId);
wrappedReject(new Error("Send timeout"));
}
}, 10000);
this.pendingSends.set(messageId, { resolve: wrappedResolve, reject: wrappedReject });
try {
writeMessage(socket, { type: "send", to, message });
} catch (error) {
clearTimeout(timeout);
this.pendingSends.delete(messageId);
reject(toError(error));
}
});
}
updatePresence(updates: { name?: string; status?: string; model?: string }): void {
if (this.disconnecting) {
return;
}
const socket = this.socket;
if (!socket || !this._sessionId || socket.destroyed || socket.writableEnded || !socket.writable) {
return;
}
writeMessage(socket, { type: "presence", ...updates });
}
}