Skip to content

Commit 34be00b

Browse files
committed
refactor: replace NDJSON spill/streaming with columnar binary format
Data was being serialized as row-based JSON (NDJSON) for spill storage and streaming despite the source data being columnar (Lance/Parquet). This introduced unnecessary overhead: column name repetition per row, text encoding, and BigInt workarounds (__bigint__ prefix hack). New columnar binary format (QMCB): header + per-column typed arrays (Float64Array, BigInt64Array, packed bools, offset+data strings, dimensioned float vectors) with null bitmaps. Applied to: - R2SpillBackend (edge spill) - FsSpillBackend (local spill) - streamRpc + handleQueryStream (DO → client streaming) - executeStream client-side parsing (frame reassembly)
1 parent d7d1d38 commit 34be00b

File tree

5 files changed

+510
-138
lines changed

5 files changed

+510
-138
lines changed

src/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ export class TableQuery<T extends Row = Row> {
185185
return this._executor.execute(this.toDescriptor()) as Promise<QueryResult<T>>;
186186
}
187187

188-
/** Execute and return an NDJSON stream of rows. Only works with RemoteExecutor. */
188+
/** Execute and return a columnar binary stream of rows. Only works with RemoteExecutor. */
189189
async execStream(): Promise<ReadableStream<Row>> {
190190
if (!this._executor.executeStream) {
191191
throw new Error("execStream() requires a remote executor with streaming support");

src/index.ts

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -139,36 +139,47 @@ class RemoteExecutor implements QueryExecutor {
139139
return masterRpc.appendRpc(table, rows);
140140
}
141141

142-
/** Stream query results via RPC — native ReadableStream, no HTTP wrapper. */
142+
/** Stream query results via RPC — columnar binary framed stream. */
143143
async executeStream(query: QueryDescriptor): Promise<ReadableStream<Row>> {
144+
const { decodeColumnarRun } = await import("./r2-spill.js");
144145
const rpc = this.getQueryHandle();
145146
const byteStream = await rpc.streamRpc(query);
146147

147-
// Parse NDJSON byte stream into Row objects
148-
const decoder = new TextDecoder();
149-
const MAX_STREAM_BUFFER = 10 * 1024 * 1024;
150-
let buffer = "";
151-
148+
// Parse length-prefixed columnar binary frames into Row objects
152149
return new ReadableStream<Row>({
153150
async start(controller) {
154151
const reader = byteStream.getReader();
152+
let pending = new Uint8Array(0);
153+
154+
const concat = (a: Uint8Array, b: Uint8Array): Uint8Array => {
155+
const out = new Uint8Array(a.length + b.length);
156+
out.set(a, 0);
157+
out.set(b, a.length);
158+
return out;
159+
};
160+
155161
try {
156162
while (true) {
157163
const { done, value } = await reader.read();
158164
if (done) break;
159-
buffer += decoder.decode(value, { stream: true });
160-
if (buffer.length > MAX_STREAM_BUFFER) {
161-
controller.error(new Error("Stream buffer exceeded 10MB"));
162-
reader.cancel();
163-
return;
164-
}
165-
const lines = buffer.split("\n");
166-
buffer = lines.pop()!;
167-
for (const line of lines) {
168-
if (line.trim()) controller.enqueue(JSON.parse(line) as Row);
165+
pending = pending.length > 0 ? concat(pending, value) : value;
166+
167+
// Process complete frames
168+
while (pending.length >= 4) {
169+
const frameLen = new DataView(pending.buffer, pending.byteOffset).getUint32(0, true);
170+
if (pending.length < 4 + frameLen) break; // wait for more data
171+
172+
const frameBuf = pending.buffer.slice(
173+
pending.byteOffset + 4,
174+
pending.byteOffset + 4 + frameLen,
175+
);
176+
pending = pending.subarray(4 + frameLen);
177+
178+
for (const row of decodeColumnarRun(frameBuf)) {
179+
controller.enqueue(row);
180+
}
169181
}
170182
}
171-
if (buffer.trim()) controller.enqueue(JSON.parse(buffer) as Row);
172183
controller.close();
173184
} catch (err) {
174185
controller.error(err);

src/operators.ts

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -891,25 +891,12 @@ export function estimateRowSize(row: Row): number {
891891
return size;
892892
}
893893

894-
/** JSON replacer for bigint values in NDJSON runs. */
895-
function bigIntJsonReplacer(_key: string, value: unknown): unknown {
896-
return typeof value === "bigint" ? `__bigint__${value.toString()}` : value;
897-
}
898-
899-
/** Parse a single NDJSON line, restoring bigint values. */
900-
function parseNdjsonRow(line: string): Row {
901-
return JSON.parse(line, (_key, value) => {
902-
if (typeof value === "string" && value.startsWith("__bigint__")) {
903-
return BigInt(value.slice(10));
904-
}
905-
return value;
906-
}) as Row;
907-
}
908-
909-
// Re-export SpillBackend from r2-spill so consumers can import from either place
894+
// Re-export SpillBackend + columnar codec from r2-spill
910895
export type { SpillBackend } from "./r2-spill.js";
896+
export { encodeColumnarRun, decodeColumnarRun } from "./r2-spill.js";
897+
import { encodeColumnarRun, decodeColumnarRun } from "./r2-spill.js";
911898

912-
/** Filesystem-backed spill for Node/Bun environments. */
899+
/** Filesystem-backed spill for Node/Bun environments using columnar binary format. */
913900
export class FsSpillBackend {
914901
private runFiles: string[] = [];
915902
private tmpDir: string | null = null;
@@ -925,30 +912,20 @@ export class FsSpillBackend {
925912
}
926913
const path = await import("node:path");
927914
const fs = await import("node:fs/promises");
928-
const runPath = path.join(this.tmpDir!, `run_${this.runFiles.length}.ndjson`);
929-
const lines = rows.map(row => JSON.stringify(row, bigIntJsonReplacer));
930-
const body = lines.join("\n") + "\n";
931-
this.bytesWritten += Buffer.byteLength(body, "utf8");
932-
await fs.writeFile(runPath, body);
915+
const runPath = path.join(this.tmpDir!, `run_${this.runFiles.length}.bin`);
916+
const buf = encodeColumnarRun(rows);
917+
this.bytesWritten += buf.byteLength;
918+
await fs.writeFile(runPath, new Uint8Array(buf));
933919
this.runFiles.push(runPath);
934920
return runPath;
935921
}
936922

937923
async *streamRun(spillId: string): AsyncGenerator<Row> {
938-
const nodeFs = await import("node:fs");
939-
const readline = await import("node:readline");
940-
const stream = nodeFs.createReadStream(spillId);
941-
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
942-
try {
943-
for await (const line of rl) {
944-
if (line.length === 0) continue;
945-
this.bytesRead += Buffer.byteLength(line, "utf8") + 1; // +1 for newline
946-
yield parseNdjsonRow(line);
947-
}
948-
} finally {
949-
rl.close();
950-
stream.destroy();
951-
}
924+
const fs = await import("node:fs/promises");
925+
const fileData = await fs.readFile(spillId);
926+
const buf = fileData.buffer.slice(fileData.byteOffset, fileData.byteOffset + fileData.byteLength);
927+
this.bytesRead += buf.byteLength;
928+
yield* decodeColumnarRun(buf);
952929
}
953930

954931
async cleanup(): Promise<void> {

src/query-do.ts

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { decodeParquetColumnChunk } from "./parquet-decode.js";
99
import { instantiateWasm, type WasmEngine } from "./wasm-engine.js";
1010
import { mergeQueryResults } from "./merge.js";
1111
import { coalesceRanges, fetchBounded, withRetry, withTimeout } from "./coalesce.js";
12-
import { R2SpillBackend } from "./r2-spill.js";
12+
import { R2SpillBackend, encodeColumnarRun } from "./r2-spill.js";
1313
import {
1414
type Operator, type RowBatch,
1515
buildEdgePipeline, drainPipeline, estimateRowSize,
@@ -1806,30 +1806,35 @@ export class QueryDO extends DurableObject<Env> {
18061806
}
18071807
}
18081808

1809-
/** Stream query results as NDJSON. */
1809+
/** Stream query results as columnar binary batches. */
18101810
private async handleQueryStream(request: Request): Promise<Response> {
18111811
const body = await request.json();
18121812
let query: QueryDescriptor;
18131813
try { query = this.parseQuery(body); } catch (err) { return this.json({ error: this.errMsg(err) }, 400); }
18141814
let result: QueryResult;
18151815
try { result = await this.executeQuery(query); } catch (err) { return this.json({ error: this.errMsg(err) }, 500); }
18161816

1817-
const { readable, writable } = new TransformStream<Uint8Array>();
1818-
const writer = writable.getWriter();
1819-
const encoder = new TextEncoder();
1820-
1821-
(async () => {
1822-
try {
1823-
for (const row of result.rows) {
1824-
await writer.write(encoder.encode(JSON.stringify(row, bigIntReplacer) + "\n"));
1825-
}
1826-
} finally {
1827-
await writer.close();
1828-
}
1829-
})();
1817+
// Encode as columnar binary with length-prefixed batches for streaming
1818+
const STREAM_BATCH_SIZE = 4096;
1819+
const rows = result.rows;
1820+
let idx = 0;
1821+
const stream = new ReadableStream<Uint8Array>({
1822+
pull(controller) {
1823+
if (idx >= rows.length) { controller.close(); return; }
1824+
const end = Math.min(idx + STREAM_BATCH_SIZE, rows.length);
1825+
const batch = rows.slice(idx, end);
1826+
idx = end;
1827+
const buf = encodeColumnarRun(batch);
1828+
// Length-prefix each batch: 4 bytes little-endian uint32 + columnar payload
1829+
const frame = new Uint8Array(4 + buf.byteLength);
1830+
new DataView(frame.buffer).setUint32(0, buf.byteLength, true);
1831+
frame.set(new Uint8Array(buf), 4);
1832+
controller.enqueue(frame);
1833+
},
1834+
});
18301835

1831-
return new Response(readable, {
1832-
headers: { "content-type": "application/x-ndjson" },
1836+
return new Response(stream, {
1837+
headers: { "content-type": "application/x-querymode-columnar" },
18331838
});
18341839
}
18351840

@@ -1873,13 +1878,21 @@ export class QueryDO extends DurableObject<Env> {
18731878

18741879
async streamRpc(descriptor: unknown): Promise<ReadableStream<Uint8Array>> {
18751880
const result = await this.executeQuery(await this.rpcParseQuery(descriptor));
1876-
const encoder = new TextEncoder();
18771881
const rows = result.rows;
1882+
const STREAM_BATCH_SIZE = 4096;
18781883
let idx = 0;
18791884
return new ReadableStream<Uint8Array>({
18801885
pull(controller) {
18811886
if (idx >= rows.length) { controller.close(); return; }
1882-
controller.enqueue(encoder.encode(JSON.stringify(rows[idx++], bigIntReplacer) + "\n"));
1887+
const end = Math.min(idx + STREAM_BATCH_SIZE, rows.length);
1888+
const batch = rows.slice(idx, end);
1889+
idx = end;
1890+
const buf = encodeColumnarRun(batch);
1891+
// Length-prefix: 4 bytes LE uint32 + payload
1892+
const frame = new Uint8Array(4 + buf.byteLength);
1893+
new DataView(frame.buffer).setUint32(0, buf.byteLength, true);
1894+
frame.set(new Uint8Array(buf), 4);
1895+
controller.enqueue(frame);
18831896
},
18841897
});
18851898
}

0 commit comments

Comments
 (0)