-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevents.ts
More file actions
96 lines (83 loc) · 2.07 KB
/
events.ts
File metadata and controls
96 lines (83 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import type { Response } from "express";
import { insertRunEvent } from "./db.ts";
type RunEvent = {
type: string;
runId?: number;
[key: string]: unknown;
};
type Client = {
res: Response;
keepalive: ReturnType<typeof setInterval>;
};
const clientsByRun = new Map<number, Set<Client>>();
export function subscribeRunEvents(runId: number, res: Response) {
const client: Client = {
res,
keepalive: setInterval(() => {
try {
res.write(": ping\n\n");
} catch {
// ignore; cleanup handled by close
}
}, 15000),
};
let clients = clientsByRun.get(runId);
if (!clients) {
clients = new Set();
clientsByRun.set(runId, clients);
}
clients.add(client);
sendEvent(res, { type: "ready", runId }, "ready");
res.on("close", () => {
cleanupClient(runId, client);
});
}
export function emitRunEvent(runId: number, event: RunEvent) {
const clients = clientsByRun.get(runId);
if (clients && clients.size > 0) {
for (const client of Array.from(clients)) {
if (!sendEvent(client.controller, event, "message")) {
cleanupClient(runId, client);
}
}
}
void persistRunEvent(runId, event);
}
function sendEvent(
res: Response,
event: RunEvent,
name: string,
) {
try {
const payload = `event: ${name}\ndata: ${JSON.stringify(event)}\n\n`;
res.write(payload);
return true;
} catch {
return false;
}
}
function cleanupClient(runId: number, client: Client) {
clearInterval(client.keepalive);
const clients = clientsByRun.get(runId);
if (!clients) return;
clients.delete(client);
if (clients.size === 0) {
clientsByRun.delete(runId);
}
}
async function persistRunEvent(runId: number, event: RunEvent) {
try {
await insertRunEvent({
run_id: runId,
type: event.type,
payload: JSON.stringify(event),
created_at: new Date().toISOString(),
});
} catch (error) {
console.warn(
`[events] failed to persist run event ${event.type} for ${runId}: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
}