Skip to content

Commit c1cc632

Browse files
committed
feat: BETWEEN in WasmAggregateOperator + two-phase aggregate I/O
- WasmAggregateOperator handles BETWEEN via filterFloat64Range/Int32/Int64Range - canUseWasmAggregate allows "between" filter op on numeric columns - Two-phase I/O in WasmAggregateOperator: read only filter columns first, skip aggregate column I/O when filter eliminates all rows on a page
1 parent ebc0bbb commit c1cc632

File tree

1 file changed

+20
-8
lines changed

1 file changed

+20
-8
lines changed

src/operators.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1373,26 +1373,26 @@ export class WasmAggregateOperator implements Operator {
13731373
for (const frag of this.fragments) {
13741374
const colMap = new Map(frag.columns.map(c => [c.name, c]));
13751375

1376-
// Collect all columns needed (aggregate + filter)
1377-
const neededCols = new Set<string>();
1378-
for (const agg of aggregates) if (agg.column !== "*") neededCols.add(agg.column);
1379-
for (const f of filters) neededCols.add(f.column);
1376+
// Separate filter columns from aggregate-only columns
1377+
const filterColNames = new Set(filters.map(f => f.column));
1378+
const aggOnlyColNames = new Set<string>();
1379+
for (const agg of aggregates) {
1380+
if (agg.column !== "*" && !filterColNames.has(agg.column)) aggOnlyColNames.add(agg.column);
1381+
}
13801382

1381-
// Process page by page
13821383
const firstCol = frag.columns[0];
13831384
if (!firstCol) continue;
13841385
const pageCount = firstCol.pages.length;
13851386

13861387
for (let pi = 0; pi < pageCount; pi++) {
1387-
// Page-level skip via min/max stats — check all filter columns
13881388
if (canSkipPageMultiCol(frag.columns, pi, filters)) {
13891389
this.pagesSkipped++;
13901390
continue;
13911391
}
13921392

1393-
// Read needed columns for this page
1393+
// Phase 1: Read filter columns (+ any aggregate columns that are also filter columns)
13941394
const pageBuffers = new Map<string, ArrayBuffer>();
1395-
for (const colName of neededCols) {
1395+
for (const colName of filterColNames) {
13961396
const col = colMap.get(colName);
13971397
if (!col || !col.pages[pi]) continue;
13981398
const buf = await frag.readPage(col, col.pages[pi]);
@@ -1468,6 +1468,18 @@ export class WasmAggregateOperator implements Operator {
14681468
matchCount = currentIndices.length;
14691469
}
14701470

1471+
// Phase 2: Read aggregate-only columns (skipped if filter eliminated all rows)
1472+
if (!hasFilters || matchCount > 0) {
1473+
for (const colName of aggOnlyColNames) {
1474+
if (pageBuffers.has(colName)) continue; // already read (also a filter column)
1475+
const col = colMap.get(colName);
1476+
if (!col || !col.pages[pi]) continue;
1477+
const buf = await frag.readPage(col, col.pages[pi]);
1478+
this.bytesRead += buf.byteLength;
1479+
pageBuffers.set(colName, buf);
1480+
}
1481+
}
1482+
14711483
// Aggregate per column
14721484
for (let ai = 0; ai < aggregates.length; ai++) {
14731485
const agg = aggregates[ai];

0 commit comments

Comments
 (0)