Skip to content

Commit 95610c3

Browse files
committed
refactor: remove HTTP fallback handlers — RPC only
DOs (QueryDO, MasterDO, FragmentDO) no longer expose fetch() handlers. All inter-DO calls and Worker→DO calls use RPC methods directly, eliminating duplicate HTTP+RPC code paths and JSON serialization overhead. Worker.ts is now the sole HTTP↔RPC translation layer.
1 parent 34be00b commit 95610c3

7 files changed

Lines changed: 322 additions & 502 deletions

File tree

src/fragment-do.test.ts

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { describe, it, expect, vi } from "vitest";
22
import { FragmentDO } from "./fragment-do.js";
3-
import { bigIntReplacer } from "./decode.js";
43
import type { TableMeta, ColumnMeta } from "./types.js";
54

65
// Mock WASM module import (not available in vitest)
@@ -40,41 +39,23 @@ const mockEnv = {
4039
QUERY_DO: null,
4140
} as any;
4241

43-
function makeScanRequest(url: string, body: unknown): Request {
44-
return new Request(url, {
45-
method: "POST",
46-
body: JSON.stringify(body, bigIntReplacer),
47-
headers: { "content-type": "application/json" },
48-
});
49-
}
50-
5142
describe("FragmentDO", () => {
5243
it("is constructible", () => {
5344
const fdo = new FragmentDO(mockState, mockEnv);
5445
expect(fdo).toBeDefined();
5546
});
5647

57-
it("returns 404 for unknown path", async () => {
58-
const fdo = new FragmentDO(mockState, mockEnv);
59-
const res = await fdo.fetch(new Request("http://internal/unknown"));
60-
expect(res.status).toBe(404);
61-
expect(await res.text()).toBe("Not found");
62-
});
63-
64-
it("returns valid JSON for /scan with empty fragments", async () => {
48+
it("returns valid result for scanRpc with empty fragments", async () => {
6549
const fdo = new FragmentDO(mockState, mockEnv);
66-
const req = makeScanRequest("http://internal/scan", {
67-
fragments: [],
68-
query: { table: "test", filters: [], projections: [] },
69-
});
70-
const res = await fdo.fetch(req);
71-
expect(res.status).toBe(200);
72-
const body = await res.json();
73-
expect(body.rows).toEqual([]);
74-
expect(body.rowCount).toBe(0);
75-
expect(body.bytesRead).toBe(0);
76-
expect(body.pagesSkipped).toBe(0);
77-
expect(typeof body.durationMs).toBe("number");
50+
const result = await fdo.scanRpc(
51+
[],
52+
{ table: "test", filters: [], projections: [] } as any,
53+
);
54+
expect(result.rows).toEqual([]);
55+
expect(result.rowCount).toBe(0);
56+
expect(result.bytesRead).toBe(0);
57+
expect(result.pagesSkipped).toBe(0);
58+
expect(typeof result.durationMs).toBe("number");
7859
});
7960

8061
it("handles scan with mock fragments gracefully when R2 returns null", async () => {
@@ -101,17 +82,14 @@ describe("FragmentDO", () => {
10182
};
10283

10384
const fdo = new FragmentDO(mockState, mockEnv);
104-
const req = makeScanRequest("http://internal/scan", {
105-
fragments: [{ r2Key: "users.lance", meta }],
106-
query: { table: "users", filters: [], projections: [] },
107-
});
108-
const res = await fdo.fetch(req);
109-
expect(res.status).toBe(200);
110-
const body = await res.json();
111-
// R2 returns null so no data is read, but response is still valid
112-
expect(body.rows).toEqual([]);
113-
expect(body.rowCount).toBe(0);
114-
expect(body.bytesRead).toBe(0);
115-
expect(body.columns).toEqual(["id"]);
85+
const result = await fdo.scanRpc(
86+
[{ r2Key: "users.lance", meta }],
87+
{ table: "users", filters: [], projections: [] } as any,
88+
);
89+
// R2 returns null so no data is read, but result is still valid
90+
expect(result.rows).toEqual([]);
91+
expect(result.rowCount).toBe(0);
92+
expect(result.bytesRead).toBe(0);
93+
expect(result.columns).toEqual(["id"]);
11694
});
11795
});

src/fragment-do.ts

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Env, TableMeta, QueryResult, Row } from "./types.js";
22
import type { QueryDescriptor } from "./client.js";
3-
import { canSkipPage, bigIntReplacer } from "./decode.js";
3+
import { canSkipPage } from "./decode.js";
44
import { instantiateWasm, type WasmEngine } from "./wasm-engine.js";
55
import { coalesceRanges, fetchBounded, withRetry, withTimeout } from "./coalesce.js";
66
import { R2SpillBackend } from "./r2-spill.js";
@@ -39,14 +39,6 @@ export class FragmentDO extends DurableObject<Env> {
3939
);
4040
}
4141

42-
async fetch(request: Request): Promise<Response> {
43-
await this.ensureInitialized();
44-
45-
const url = new URL(request.url);
46-
if (url.pathname === "/scan") return this.handleScan(request);
47-
return new Response("Not found", { status: 404 });
48-
}
49-
5042
private async ensureInitialized(): Promise<void> {
5143
if (this.initialized) return;
5244
this.initialized = true;
@@ -58,27 +50,7 @@ export class FragmentDO extends DurableObject<Env> {
5850
this.wasmEngine = await instantiateWasm(wasmModule);
5951
}
6052

61-
private json(body: unknown, status = 200): Response {
62-
return new Response(JSON.stringify(body, bigIntReplacer), {
63-
status, headers: { "content-type": "application/json" },
64-
});
65-
}
66-
67-
private async handleScan(request: Request): Promise<Response> {
68-
const { fragments, query } = (await request.json()) as ScanRequest;
69-
const result = await this.executeScan(fragments, query);
70-
71-
this.log("info", "scan_complete", {
72-
fragmentCount: fragments.length, rowCount: result.rowCount,
73-
bytesRead: result.bytesRead, durationMs: result.durationMs,
74-
r2ReadMs: result.r2ReadMs, wasmExecMs: result.wasmExecMs,
75-
cacheHits: result.cacheHits, cacheMisses: result.cacheMisses,
76-
});
77-
78-
return this.json(result);
79-
}
80-
81-
/** Core scan logic shared by HTTP fetch handler and RPC. */
53+
/** Core scan logic used by RPC. */
8254
private async executeScan(fragments: ScanRequest["fragments"], query: QueryDescriptor): Promise<QueryResult> {
8355
const t0 = Date.now();
8456
let totalBytesRead = 0;

src/master-do.ts

Lines changed: 29 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { ColumnMeta, Env, Footer, TableMeta, DatasetMeta, AppendResult } fr
22
import { parseFooter, parseColumnMetaFromProtobuf, FOOTER_SIZE } from "./footer.js";
33
import { parseManifest } from "./manifest.js";
44
import { detectFormat, getParquetFooterLength, parseParquetFooter, parquetMetaToTableMeta } from "./parquet.js";
5-
import { bigIntReplacer } from "./decode.js";
5+
import type { QueryDORpc } from "./types.js";
66
import { instantiateWasm, type WasmEngine } from "./wasm-engine.js";
77
import wasmModule from "./wasm-module.js";
88

@@ -15,25 +15,9 @@ export class MasterDO extends DurableObject<Env> {
1515
super(ctx, env);
1616
}
1717

18-
private json(body: unknown, status = 200): Response {
19-
return new Response(JSON.stringify(body), {
20-
status, headers: { "content-type": "application/json" },
21-
});
22-
}
23-
24-
async fetch(request: Request): Promise<Response> {
25-
switch (new URL(request.url).pathname) {
26-
case "/register": return this.handleRegister(request);
27-
case "/write": return this.handleWrite(request);
28-
case "/append": return this.handleAppend(request);
29-
case "/refresh": return this.handleRefresh(request);
30-
case "/tables": return this.handleListTables();
31-
default: return new Response("Not found", { status: 404 });
32-
}
33-
}
18+
// ── RPC methods ────────────────────────────────────────────────────────
3419

35-
private async handleRegister(request: Request): Promise<Response> {
36-
const { queryDoId, region } = (await request.json()) as { queryDoId: string; region: string };
20+
async registerRpc(queryDoId: string, region: string): Promise<{ registered: boolean; region: string; tableVersions?: Record<string, { r2Key: string; updatedAt: number }> }> {
3721
const regions = (await this.ctx.storage.get<Record<string, string>>("regions")) ?? {};
3822
regions[region] = queryDoId;
3923
await this.ctx.storage.put("regions", regions);
@@ -45,22 +29,22 @@ export class MasterDO extends DurableObject<Env> {
4529
const name = key.replace("table:", "");
4630
tableVersions[name] = { r2Key: meta.r2Key ?? name, updatedAt: meta.updatedAt ?? 0 };
4731
}
48-
return this.json({ registered: true, region, tableVersions });
32+
return { registered: true, region, tableVersions };
4933
}
5034

51-
private async handleWrite(request: Request): Promise<Response> {
52-
const { r2Key } = (await request.json()) as { r2Key: string };
35+
async writeRpc(body: unknown): Promise<unknown> {
36+
const { r2Key } = body as { r2Key: string };
5337
if (!r2Key || typeof r2Key !== "string" || r2Key.includes("..")) {
54-
return this.json({ error: "Invalid r2Key" }, 400);
38+
throw new Error("Invalid r2Key");
5539
}
5640

5741
// Check if this is a dataset directory (ends with / or .lance/)
5842
if (r2Key.endsWith("/") || r2Key.endsWith(".lance/")) {
59-
return this.handleDatasetWrite(r2Key);
43+
return this.executeDatasetWrite(r2Key);
6044
}
6145

6246
const result = await this.readFooterAndColumns(r2Key);
63-
if (!result) return this.json({ error: "Failed to read footer" }, 500);
47+
if (!result) throw new Error("Failed to read footer");
6448

6549
const tableName = r2Key.replace(/\.(lance|parquet)$/, "").split("/").pop() ?? r2Key;
6650
const totalRows = result.columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
@@ -70,26 +54,26 @@ export class MasterDO extends DurableObject<Env> {
7054
};
7155
await this.ctx.storage.put(`table:${tableName}`, meta);
7256
await this.broadcast(tableName, r2Key, result, { totalRows });
73-
return this.json({ success: true, table: tableName });
57+
return { success: true, table: tableName };
7458
}
7559

76-
/** Handle write notification for a multi-fragment dataset directory. */
77-
private async handleDatasetWrite(r2Prefix: string): Promise<Response> {
60+
/** Write notification for a multi-fragment dataset directory. */
61+
private async executeDatasetWrite(r2Prefix: string): Promise<unknown> {
7862
const tableName = r2Prefix.replace(/\/$/, "").replace(/\.lance$/, "").split("/").pop() ?? r2Prefix;
7963

8064
// Find latest manifest
8165
const listed = await this.env.DATA_BUCKET.list({ prefix: `${r2Prefix}_versions/`, limit: 100 });
8266
const manifestKeys = listed.objects
8367
.filter(o => o.key.endsWith(".manifest"))
8468
.sort((a, b) => a.key.localeCompare(b.key));
85-
if (manifestKeys.length === 0) return this.json({ error: "No manifests found" }, 404);
69+
if (manifestKeys.length === 0) throw new Error("No manifests found");
8670

8771
const latestKey = manifestKeys[manifestKeys.length - 1].key;
8872
const manifestObj = await this.env.DATA_BUCKET.get(latestKey);
89-
if (!manifestObj) return this.json({ error: "Failed to read manifest" }, 500);
73+
if (!manifestObj) throw new Error("Failed to read manifest");
9074

9175
const manifest = parseManifest(await manifestObj.arrayBuffer());
92-
if (!manifest) return this.json({ error: "Failed to parse manifest" }, 500);
76+
if (!manifest) throw new Error("Failed to parse manifest");
9377

9478
// Read first fragment's footer to broadcast (Query DOs will discover the rest)
9579
if (manifest.fragments.length > 0) {
@@ -110,7 +94,7 @@ export class MasterDO extends DurableObject<Env> {
11094
name: tableName, r2Prefix, manifest, totalRows: manifest.totalRows, updatedAt: Date.now(),
11195
});
11296

113-
return this.json({ success: true, table: tableName, fragments: manifest.fragments.length, totalRows: manifest.totalRows });
97+
return { success: true, table: tableName, fragments: manifest.fragments.length, totalRows: manifest.totalRows };
11498
}
11599

116100
private async getWasm(): Promise<WasmEngine> {
@@ -119,29 +103,7 @@ export class MasterDO extends DurableObject<Env> {
119103
return this.wasmEngine;
120104
}
121105

122-
/** Append rows to a table using CAS coordination.
123-
* 1. Build Lance fragment from row data via WASM
124-
* 2. PUT data file to R2 (unique name, no conflict)
125-
* 3. CAS loop: read _latest → build new manifest → PUT with ETag match
126-
*/
127-
private async handleAppend(request: Request): Promise<Response> {
128-
const { table, rows } = (await request.json()) as {
129-
table: string;
130-
rows: Record<string, unknown>[];
131-
};
132-
133-
if (!rows?.length) return this.json({ error: "No rows provided" }, 400);
134-
try {
135-
const result = await this.executeAppend(table, rows);
136-
return this.json(result);
137-
} catch (err) {
138-
const msg = err instanceof Error ? err.message : String(err);
139-
const status = msg.includes("CAS failed") ? 409 : 400;
140-
return this.json({ error: msg }, status);
141-
}
142-
}
143-
144-
/** Core append logic shared by HTTP fetch handler and RPC. */
106+
/** Core append logic. */
145107
private async executeAppend(table: string, rows: Record<string, unknown>[]): Promise<AppendResult> {
146108
if (!rows?.length) throw new Error("No rows provided");
147109

@@ -355,22 +317,22 @@ export class MasterDO extends DurableObject<Env> {
355317
return new Uint8Array(bytes);
356318
}
357319

358-
private async handleRefresh(request: Request): Promise<Response> {
359-
const { r2Key } = (await request.json()) as { r2Key: string };
320+
async refreshRpc(body: unknown): Promise<unknown> {
321+
const { r2Key } = body as { r2Key: string };
360322
if (!r2Key || typeof r2Key !== "string" || r2Key.includes("..")) {
361-
return this.json({ error: "Invalid r2Key" }, 400);
323+
throw new Error("Invalid r2Key");
362324
}
363325
const result = await this.readFooterAndColumns(r2Key);
364-
if (!result) return this.json({ error: "Failed to read footer" }, 500);
326+
if (!result) throw new Error("Failed to read footer");
365327

366328
const tableName = r2Key.replace(/\.(lance|parquet)$/, "").split("/").pop() ?? r2Key;
367329
await this.broadcast(tableName, r2Key, result);
368-
return this.json({ refreshed: true, table: tableName });
330+
return { refreshed: true, table: tableName };
369331
}
370332

371-
private async handleListTables(): Promise<Response> {
333+
async listTablesRpc(): Promise<{ tables: string[] }> {
372334
const tables = await this.ctx.storage.list<TableMeta>({ prefix: "table:" });
373-
return this.json({ tables: [...tables.keys()].map(k => k.replace("table:", "")) });
335+
return { tables: [...tables.keys()].map(k => k.replace("table:", "")) };
374336
}
375337

376338
/** Read footer + column metadata from R2 (2 range reads, done once by Master). */
@@ -427,28 +389,26 @@ export class MasterDO extends DurableObject<Env> {
427389
return { parsed, raw, fileSize, columns, format: "lance" };
428390
}
429391

430-
/** Broadcast invalidation with pre-parsed columns to all Query DOs. */
392+
/** Broadcast invalidation with pre-parsed columns to all Query DOs via RPC. */
431393
private async broadcast(
432394
table: string, r2Key: string,
433395
footer: { raw: ArrayBuffer; fileSize: bigint; columns: ColumnMeta[]; format?: "lance" | "parquet" },
434396
opts?: { totalRows?: number; r2Prefix?: string },
435397
): Promise<void> {
436398
const regions = (await this.ctx.storage.get<Record<string, string>>("regions")) ?? {};
437-
const payload = JSON.stringify({
399+
const payload = {
438400
table, r2Key, columns: footer.columns, format: footer.format ?? "lance",
439401
footerBytes: Array.from(new Uint8Array(footer.raw)),
440402
fileSize: footer.fileSize.toString(), timestamp: Date.now(),
441403
...(opts?.totalRows != null ? { totalRows: opts.totalRows } : {}),
442404
...(opts?.r2Prefix != null ? { r2Prefix: opts.r2Prefix } : {}),
443-
}, bigIntReplacer);
405+
};
444406

445407
const deadRegions: string[] = [];
446408
await Promise.allSettled(Object.entries(regions).map(async ([region, doId]) => {
447409
try {
448-
const queryDo = this.env.QUERY_DO.get(this.env.QUERY_DO.idFromString(doId));
449-
await queryDo.fetch(new Request("http://internal/invalidate", {
450-
method: "POST", body: payload, headers: { "content-type": "application/json" },
451-
}));
410+
const queryDo = this.env.QUERY_DO.get(this.env.QUERY_DO.idFromString(doId)) as unknown as QueryDORpc;
411+
await queryDo.invalidateRpc(payload);
452412
this.broadcastFailures.delete(region);
453413
} catch {
454414
const count = (this.broadcastFailures.get(region) ?? 0) + 1;

0 commit comments

Comments
 (0)