Skip to content

Commit 2bec5fd

Browse files
committed
perf: WASM SIMD for int64 aggregates + parallel column reads
Two performance optimizations: 1. Int64 aggregates now use WASM SIMD (sumInt64Buffer, minInt64Buffer, maxInt64Buffer) instead of a JS loop over BigInt64Array. This was the single largest compute bottleneck — ~50% slower than the float64 path which already used WASM. 2. ScanOperator now reads all columns for a page in parallel via Promise.all() instead of sequential awaits. For a 5-column table, this overlaps 5 R2 range requests instead of waiting for each one.
1 parent 3c17565 commit 2bec5fd

File tree

2 files changed

+49
-11
lines changed

2 files changed

+49
-11
lines changed

src/operators.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,24 @@ export class ScanOperator implements Operator {
8282

8383
const scanStart = Date.now();
8484

85-
// Read + decode this page for all columns
85+
// Read all columns for this page in parallel (overlaps R2 range requests)
8686
const pageInfoMap = new Map<string, { buf: ArrayBuffer; pageInfo: PageInfo }>();
87+
const readPromises: Promise<void>[] = [];
8788
for (const col of frag.columns) {
8889
const colPage = col.pages[pi];
8990
if (!colPage) continue;
9091
if (!this.query.vectorSearch && canSkipPage(colPage, this.query.filters, col.name)) {
9192
this.pagesSkipped++;
9293
continue;
9394
}
94-
const buf = await frag.readPage(col, colPage);
95-
this.bytesRead += buf.byteLength;
96-
pageInfoMap.set(col.name, { buf, pageInfo: colPage });
95+
readPromises.push(
96+
frag.readPage(col, colPage).then(buf => {
97+
this.bytesRead += buf.byteLength;
98+
pageInfoMap.set(col.name, { buf, pageInfo: colPage });
99+
}),
100+
);
97101
}
102+
await Promise.all(readPromises);
98103

99104
// Decode columns for this single page
100105
const decoded = decodePageBatch(pageInfoMap, frag.columns, this.wasm);
@@ -1005,13 +1010,16 @@ export class WasmAggregateOperator implements Operator {
10051010
} else if (col.dtype === "int64") {
10061011
const count = buf.byteLength >> 3;
10071012
acc[ai].count += count;
1008-
// For int64 we need to read values as BigInt64Array
1009-
const view = new BigInt64Array(buf);
1010-
for (const v of view) {
1011-
const n = Number(v);
1012-
acc[ai].sum += n;
1013-
if (n < acc[ai].min) acc[ai].min = n;
1014-
if (n > acc[ai].max) acc[ai].max = n;
1013+
if (agg.fn === "sum" || agg.fn === "avg") {
1014+
acc[ai].sum += Number(this.wasm.sumInt64(buf));
1015+
}
1016+
if (agg.fn === "min") {
1017+
const v = Number(this.wasm.minInt64(buf));
1018+
if (v < acc[ai].min) acc[ai].min = v;
1019+
}
1020+
if (agg.fn === "max") {
1021+
const v = Number(this.wasm.maxInt64(buf));
1022+
if (v > acc[ai].max) acc[ai].max = v;
10151023
}
10161024
}
10171025
}

src/wasm-engine.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,36 @@ export class WasmEngine {
549549
return this.exports.avgFloat64Buffer(ptr >> 3, numElements);
550550
}
551551

552+
/** SIMD sum of Int64 buffer. Returns 0n for empty input. */
553+
sumInt64(buf: ArrayBuffer): bigint {
554+
if (buf.byteLength === 0) return 0n;
555+
const numElements = buf.byteLength >> 3;
556+
const ptr = this.exports.alloc(buf.byteLength);
557+
if (!ptr) return 0n;
558+
new Uint8Array(this.exports.memory.buffer, ptr, buf.byteLength).set(new Uint8Array(buf));
559+
return this.exports.sumInt64Buffer(ptr >> 3, numElements);
560+
}
561+
562+
/** SIMD min of Int64 buffer. Returns MAX_SAFE_INTEGER for empty input. */
563+
minInt64(buf: ArrayBuffer): bigint {
564+
if (buf.byteLength === 0) return BigInt(Number.MAX_SAFE_INTEGER);
565+
const numElements = buf.byteLength >> 3;
566+
const ptr = this.exports.alloc(buf.byteLength);
567+
if (!ptr) return BigInt(Number.MAX_SAFE_INTEGER);
568+
new Uint8Array(this.exports.memory.buffer, ptr, buf.byteLength).set(new Uint8Array(buf));
569+
return this.exports.minInt64Buffer(ptr >> 3, numElements);
570+
}
571+
572+
/** SIMD max of Int64 buffer. Returns MIN_SAFE_INTEGER for empty input. */
573+
maxInt64(buf: ArrayBuffer): bigint {
574+
if (buf.byteLength === 0) return BigInt(Number.MIN_SAFE_INTEGER);
575+
const numElements = buf.byteLength >> 3;
576+
const ptr = this.exports.alloc(buf.byteLength);
577+
if (!ptr) return BigInt(Number.MIN_SAFE_INTEGER);
578+
new Uint8Array(this.exports.memory.buffer, ptr, buf.byteLength).set(new Uint8Array(buf));
579+
return this.exports.maxInt64Buffer(ptr >> 3, numElements);
580+
}
581+
552582
/** Load a fragment file into the WASM fragment reader. */
553583
loadFragment(data: ArrayBuffer): boolean {
554584
const ptr = this.exports.alloc(data.byteLength);

0 commit comments

Comments
 (0)