Skip to content

Commit 0ce62a5

Browse files
committed
feat: extend WASM compute pushdown to edge pipeline
EdgeScanOperator Parquet path (src/query-do.ts): - Register decoded columns in WASM and use executeQuery() for SIMD filter/sort instead of JS row-by-row assembly - Mark filtersApplied=true so buildEdgePipeline skips FilterOperator buildEdgePipeline (src/operators.ts): - Check for filtersApplied flag on scan operator, skip FilterOperator when scan already handles filters via WASM unionIndices (wasm/src/wasm/aggregates.zig): - O(n+m) sorted merge (was O(n*m) nested loop with linear search)
1 parent 176a611 commit 0ce62a5

File tree

3 files changed

+70
-20
lines changed

3 files changed

+70
-20
lines changed

src/operators.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2162,7 +2162,11 @@ export function buildEdgePipeline(
21622162
let pipeline: Operator = scan;
21632163
const memBudget = options?.memoryBudgetBytes ?? DEFAULT_MEMORY_BUDGET;
21642164

2165-
if (query.filters.length > 0) {
2165+
// Skip FilterOperator if the scan operator already applies filters
2166+
// (e.g. EdgeScanOperator Lance path uses WASM SQL with WHERE,
2167+
// EdgeScanOperator Parquet path now uses WASM executeQuery with filters)
2168+
const scanHandlesFilters = "filtersApplied" in scan && (scan as { filtersApplied: boolean }).filtersApplied;
2169+
if (query.filters.length > 0 && !scanHandlesFilters) {
21662170
pipeline = new FilterOperator(pipeline, query.filters);
21672171
}
21682172

src/query-do.ts

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ class EdgeScanOperator implements Operator {
6464
edgeCacheMisses = 0;
6565
r2ReadMs = 0;
6666
wasmExecMs = 0;
67+
/** Filters are applied inside WASM executeQuery — skip downstream FilterOperator. */
68+
filtersApplied = true;
6769

6870
constructor(
6971
bucket: R2Bucket,
@@ -328,22 +330,49 @@ class EdgeScanOperator implements Operator {
328330
decodedColumns.set(col.name, decoded);
329331
}
330332

331-
// Assemble rows for this single page
333+
// Try WASM SQL path: register decoded columns → executeQuery (SIMD filter/sort)
332334
let numRows = 0;
333335
for (const v of decodedColumns.values()) if (v.length > numRows) numRows = v.length;
334336
if (numRows === 0) return this.next(); // skip empty pages
335337

336-
const rows: Row[] = [];
337-
for (let i = 0; i < numRows; i++) {
338-
const row: Row = {};
339-
for (const name of colNames) {
340-
const vals = decodedColumns.get(name);
341-
row[name] = vals && i < vals.length ? vals[i] : null;
338+
this.wasmEngine.exports.resetHeap();
339+
const fragTable = `__edge_pq_${pi}`;
340+
let wasmRegistered = true;
341+
for (const col of cols) {
342+
const values = decodedColumns.get(col.name);
343+
if (!values?.length) continue;
344+
if (!this.wasmEngine.registerDecodedColumn(fragTable, col.name, col.dtype, values)) {
345+
wasmRegistered = false;
346+
break;
342347
}
343-
rows.push(row);
344348
}
349+
350+
let rows: Row[];
351+
if (wasmRegistered) {
352+
const pqQuery: QueryDescriptor = {
353+
...this.query,
354+
table: fragTable,
355+
sortColumn: undefined, limit: undefined, offset: undefined,
356+
aggregates: undefined, join: undefined,
357+
};
358+
const wasmRows = this.wasmEngine.executeQuery(pqQuery);
359+
this.wasmEngine.clearTable(fragTable);
360+
rows = wasmRows ?? [];
361+
} else {
362+
// Fallback: assemble rows in JS
363+
rows = [];
364+
for (let i = 0; i < numRows; i++) {
365+
const row: Row = {};
366+
for (const name of colNames) {
367+
const vals = decodedColumns.get(name);
368+
row[name] = vals && i < vals.length ? vals[i] : null;
369+
}
370+
rows.push(row);
371+
}
372+
}
373+
345374
this.wasmExecMs += Date.now() - wasmStart;
346-
return rows;
375+
return rows.length > 0 ? rows : this.next();
347376
}
348377

349378
async close(): Promise<void> {

wasm/src/wasm/aggregates.zig

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ export fn intersectIndices(
775775
}
776776

777777
/// OR two index arrays (union, deduplicated)
778+
/// Uses O(n+m) sorted merge since filter outputs are always in ascending order
778779
export fn unionIndices(
779780
a: [*]const u32,
780781
a_len: usize,
@@ -784,23 +785,39 @@ export fn unionIndices(
784785
max_out: usize,
785786
) usize {
786787
var out_count: usize = 0;
788+
var i: usize = 0;
789+
var j: usize = 0;
787790

788-
// Add all from a
789-
for (0..a_len) |i| {
791+
while (i < a_len and j < b_len) {
790792
if (out_count >= max_out) break;
793+
if (a[i] == b[j]) {
794+
out[out_count] = a[i];
795+
out_count += 1;
796+
i += 1;
797+
j += 1;
798+
} else if (a[i] < b[j]) {
799+
out[out_count] = a[i];
800+
out_count += 1;
801+
i += 1;
802+
} else {
803+
out[out_count] = b[j];
804+
out_count += 1;
805+
j += 1;
806+
}
807+
}
808+
809+
// Drain remaining from a
810+
while (i < a_len and out_count < max_out) {
791811
out[out_count] = a[i];
792812
out_count += 1;
813+
i += 1;
793814
}
794815

795-
// Add from b if not already in out
796-
outer: for (0..b_len) |i| {
797-
if (out_count >= max_out) break;
798-
const b_val = b[i];
799-
for (0..out_count) |j| {
800-
if (out[j] == b_val) continue :outer;
801-
}
802-
out[out_count] = b_val;
816+
// Drain remaining from b
817+
while (j < b_len and out_count < max_out) {
818+
out[out_count] = b[j];
803819
out_count += 1;
820+
j += 1;
804821
}
805822

806823
return out_count;

0 commit comments

Comments
 (0)