Skip to content

Commit 0e04196

Browse files
grichaclaude
andcommitted
Add OpenCode server API integration for real-time streaming
Instead of running opencode as a subprocess, this starts an OpenCode server inside the container and connects to its SSE /event endpoint for real-time streaming of text deltas. - Create opencode-server.ts with server lifecycle management - Connect to SSE endpoint for streaming events with delta support - Send messages via HTTP POST to /session/{id}/message API - Auto-start server on first use, reuse for subsequent requests 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 5b2db2c commit 0e04196

File tree

2 files changed

+337
-2
lines changed

2 files changed

+337
-2
lines changed

src/chat/opencode-server.ts

Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
import type { Subprocess } from 'bun';
2+
import type { ChatMessage } from './handler';
3+
import { execInContainer } from '../docker';
4+
5+
interface OpenCodeServerEvent {
6+
type: string;
7+
properties: {
8+
info?: {
9+
id: string;
10+
sessionID?: string;
11+
role?: string;
12+
};
13+
part?: {
14+
id: string;
15+
sessionID: string;
16+
messageID: string;
17+
type: string;
18+
text?: string;
19+
tool?: string;
20+
state?: {
21+
input?: Record<string, unknown>;
22+
output?: string;
23+
title?: string;
24+
};
25+
};
26+
delta?: string;
27+
sessionID?: string;
28+
status?: { type: string };
29+
};
30+
}
31+
32+
export interface OpenCodeServerOptions {
33+
containerName: string;
34+
workDir?: string;
35+
sessionId?: string;
36+
}
37+
38+
const serverPorts = new Map<string, number>();
39+
const serverStarting = new Map<string, Promise<number>>();
40+
41+
async function findAvailablePort(containerName: string): Promise<number> {
42+
const script = `import socket; s=socket.socket(); s.bind(('', 0)); print(s.getsockname()[1]); s.close()`;
43+
const result = await execInContainer(containerName, ['python3', '-c', script], {
44+
user: 'workspace',
45+
});
46+
return parseInt(result.stdout.trim(), 10);
47+
}
48+
49+
async function isServerRunning(containerName: string, port: number): Promise<boolean> {
50+
try {
51+
const result = await execInContainer(
52+
containerName,
53+
['curl', '-s', '-o', '/dev/null', '-w', '%{http_code}', `http://localhost:${port}/session`],
54+
{ user: 'workspace' }
55+
);
56+
return result.stdout.trim() === '200';
57+
} catch {
58+
return false;
59+
}
60+
}
61+
62+
async function startServer(containerName: string): Promise<number> {
63+
const existing = serverPorts.get(containerName);
64+
if (existing && (await isServerRunning(containerName, existing))) {
65+
return existing;
66+
}
67+
68+
const starting = serverStarting.get(containerName);
69+
if (starting) {
70+
return starting;
71+
}
72+
73+
const startPromise = (async () => {
74+
const port = await findAvailablePort(containerName);
75+
console.log(`[opencode-server] Starting server on port ${port} in ${containerName}`);
76+
77+
await execInContainer(
78+
containerName,
79+
[
80+
'sh',
81+
'-c',
82+
`nohup opencode serve --port ${port} --hostname 127.0.0.1 > /tmp/opencode-server.log 2>&1 &`,
83+
],
84+
{ user: 'workspace' }
85+
);
86+
87+
for (let i = 0; i < 30; i++) {
88+
await new Promise((resolve) => setTimeout(resolve, 500));
89+
if (await isServerRunning(containerName, port)) {
90+
console.log(`[opencode-server] Server started on port ${port}`);
91+
serverPorts.set(containerName, port);
92+
serverStarting.delete(containerName);
93+
return port;
94+
}
95+
}
96+
97+
serverStarting.delete(containerName);
98+
throw new Error('Failed to start OpenCode server');
99+
})();
100+
101+
serverStarting.set(containerName, startPromise);
102+
return startPromise;
103+
}
104+
105+
export class OpenCodeServerSession {
106+
private containerName: string;
107+
private workDir: string;
108+
private sessionId?: string;
109+
private onMessage: (message: ChatMessage) => void;
110+
private sseProcess: Subprocess<'ignore', 'pipe', 'pipe'> | null = null;
111+
private responseComplete = false;
112+
113+
constructor(options: OpenCodeServerOptions, onMessage: (message: ChatMessage) => void) {
114+
this.containerName = options.containerName;
115+
this.workDir = options.workDir || '/home/workspace';
116+
this.sessionId = options.sessionId;
117+
this.onMessage = onMessage;
118+
}
119+
120+
async sendMessage(userMessage: string): Promise<void> {
121+
const port = await startServer(this.containerName);
122+
const baseUrl = `http://localhost:${port}`;
123+
124+
this.onMessage({
125+
type: 'system',
126+
content: 'Processing your message...',
127+
timestamp: new Date().toISOString(),
128+
});
129+
130+
try {
131+
if (!this.sessionId) {
132+
const createResult = await execInContainer(
133+
this.containerName,
134+
[
135+
'curl',
136+
'-s',
137+
'-X',
138+
'POST',
139+
`${baseUrl}/session`,
140+
'-H',
141+
'Content-Type: application/json',
142+
'-d',
143+
'{}',
144+
],
145+
{ user: 'workspace' }
146+
);
147+
const session = JSON.parse(createResult.stdout);
148+
this.sessionId = session.id;
149+
this.onMessage({
150+
type: 'system',
151+
content: `Session started ${this.sessionId}`,
152+
timestamp: new Date().toISOString(),
153+
});
154+
}
155+
156+
this.responseComplete = false;
157+
const ssePromise = this.startSSEStream(port);
158+
159+
await new Promise((resolve) => setTimeout(resolve, 100));
160+
161+
const messagePayload = JSON.stringify({
162+
parts: [{ type: 'text', text: userMessage }],
163+
});
164+
165+
execInContainer(
166+
this.containerName,
167+
[
168+
'curl',
169+
'-s',
170+
'-X',
171+
'POST',
172+
`${baseUrl}/session/${this.sessionId}/message`,
173+
'-H',
174+
'Content-Type: application/json',
175+
'-d',
176+
messagePayload,
177+
],
178+
{ user: 'workspace' }
179+
).catch((err) => {
180+
console.error('[opencode-server] Send error:', err);
181+
});
182+
183+
await ssePromise;
184+
185+
this.onMessage({
186+
type: 'done',
187+
content: 'Response complete',
188+
timestamp: new Date().toISOString(),
189+
});
190+
} catch (err) {
191+
console.error('[opencode-server] Error:', err);
192+
this.onMessage({
193+
type: 'error',
194+
content: (err as Error).message,
195+
timestamp: new Date().toISOString(),
196+
});
197+
}
198+
}
199+
200+
private async startSSEStream(port: number): Promise<void> {
201+
return new Promise((resolve) => {
202+
const proc = Bun.spawn(
203+
[
204+
'docker',
205+
'exec',
206+
'-i',
207+
this.containerName,
208+
'curl',
209+
'-s',
210+
'-N',
211+
'--max-time',
212+
'120',
213+
`http://localhost:${port}/event`,
214+
],
215+
{
216+
stdin: 'ignore',
217+
stdout: 'pipe',
218+
stderr: 'pipe',
219+
}
220+
);
221+
222+
this.sseProcess = proc;
223+
224+
const decoder = new TextDecoder();
225+
let buffer = '';
226+
227+
const processChunk = (chunk: Uint8Array) => {
228+
buffer += decoder.decode(chunk);
229+
const lines = buffer.split('\n');
230+
buffer = lines.pop() || '';
231+
232+
for (const line of lines) {
233+
if (!line.startsWith('data: ')) continue;
234+
const data = line.slice(6).trim();
235+
if (!data) continue;
236+
237+
try {
238+
const event: OpenCodeServerEvent = JSON.parse(data);
239+
this.handleEvent(event);
240+
241+
if (
242+
event.type === 'message.part.updated' &&
243+
event.properties.part?.type === 'step-finish'
244+
) {
245+
this.responseComplete = true;
246+
proc.kill();
247+
resolve();
248+
return;
249+
}
250+
} catch {
251+
continue;
252+
}
253+
}
254+
};
255+
256+
(async () => {
257+
if (!proc.stdout) {
258+
resolve();
259+
return;
260+
}
261+
262+
for await (const chunk of proc.stdout) {
263+
processChunk(chunk);
264+
if (this.responseComplete) break;
265+
}
266+
267+
resolve();
268+
})();
269+
270+
setTimeout(() => {
271+
if (!this.responseComplete) {
272+
proc.kill();
273+
resolve();
274+
}
275+
}, 120000);
276+
});
277+
}
278+
279+
private handleEvent(event: OpenCodeServerEvent): void {
280+
const timestamp = new Date().toISOString();
281+
282+
if (event.type === 'message.part.updated' && event.properties.part) {
283+
const part = event.properties.part;
284+
285+
if (part.type === 'text' && event.properties.delta) {
286+
this.onMessage({
287+
type: 'assistant',
288+
content: event.properties.delta,
289+
timestamp,
290+
});
291+
} else if (part.type === 'tool-use' && part.tool) {
292+
const input = part.state?.input;
293+
this.onMessage({
294+
type: 'tool_use',
295+
content: JSON.stringify(input, null, 2),
296+
toolName: part.state?.title || part.tool,
297+
toolId: part.id,
298+
timestamp,
299+
});
300+
} else if (part.type === 'tool-result' && part.state?.output) {
301+
this.onMessage({
302+
type: 'tool_result',
303+
content: part.state.output,
304+
toolId: part.id,
305+
timestamp,
306+
});
307+
}
308+
}
309+
}
310+
311+
async interrupt(): Promise<void> {
312+
if (this.sseProcess) {
313+
this.sseProcess.kill();
314+
this.sseProcess = null;
315+
this.onMessage({
316+
type: 'system',
317+
content: 'Chat interrupted',
318+
timestamp: new Date().toISOString(),
319+
});
320+
}
321+
}
322+
323+
getSessionId(): string | undefined {
324+
return this.sessionId;
325+
}
326+
}
327+
328+
export function createOpenCodeServerSession(
329+
options: OpenCodeServerOptions,
330+
onMessage: (message: ChatMessage) => void
331+
): OpenCodeServerSession {
332+
return new OpenCodeServerSession(options, onMessage);
333+
}

src/chat/opencode-websocket.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ import {
66
} from './base-chat-websocket';
77
import { createOpencodeSession } from './opencode-handler';
88
import { createHostOpencodeSession } from './host-opencode-handler';
9+
import { createOpenCodeServerSession } from './opencode-server';
910
import type { ChatMessage } from './handler';
1011

1112
type AnyOpencodeSession =
1213
| ReturnType<typeof createOpencodeSession>
13-
| ReturnType<typeof createHostOpencodeSession>;
14+
| ReturnType<typeof createHostOpencodeSession>
15+
| ReturnType<typeof createOpenCodeServerSession>;
1416

1517
interface OpencodeConnection extends BaseChatConnection {
1618
session: AnyOpencodeSession | null;
@@ -39,7 +41,7 @@ export class OpencodeWebSocketServer extends BaseChatWebSocketServer<OpencodeCon
3941
sessionId: string | undefined,
4042
onMessage: (message: ChatMessage) => void
4143
): ChatSessionInterface {
42-
return createOpencodeSession(
44+
return createOpenCodeServerSession(
4345
{
4446
containerName,
4547
workDir: '/home/workspace',

0 commit comments

Comments
 (0)