Skip to content

Commit 3a82b84

Browse files
committed
fix: MaterializedExecutor missing window/distinct + columnar edge cases
- MaterializedExecutor.execute() now applies window functions and distinct (previously silently dropped — any .window() or .distinct() on fromJSON/ fromCSV DataFrames had no effect) - Extract window evaluation into standalone applyWindowsToRows() for reuse (WindowOperator delegates to it; MaterializedExecutor calls it directly) - columnar sliceColumnarBatch: clear nullBitmap on empty slice (stale ref) - columnar kWayMerge: guard against missing sort column index (-1)
1 parent 103439f commit 3a82b84

3 files changed

Lines changed: 220 additions & 198 deletions

File tree

src/client.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ import type {
1515
VectorSearchParams,
1616
WindowSpec,
1717
} from "./types.js";
18-
import { NULL_SENTINEL, rowComparator } from "./types.js";
18+
import { NULL_SENTINEL, rowComparator, groupKey } from "./types.js";
1919
import type { Operator, RowBatch } from "./operators.js";
20+
import { applyWindowsToRows } from "./operators.js";
2021
import { rowPassesFilters, bigIntReplacer } from "./decode.js";
2122
import { computePartialAgg, finalizePartialAgg } from "./partial-agg.js";
2223
import { descriptorToCode } from "./descriptor-to-code.js";
@@ -979,11 +980,27 @@ export class MaterializedExecutor implements QueryExecutor {
979980
});
980981
}
981982

983+
// Apply window functions (before filters, same order as operator pipeline)
984+
if (query.windows && query.windows.length > 0) {
985+
applyWindowsToRows(rows, query.windows);
986+
}
987+
982988
// Apply filters (AND + OR groups)
983989
if (query.filters.length > 0 || (query.filterGroups && query.filterGroups.length > 0)) {
984990
rows = rows.filter(row => rowPassesFilters(row, query.filters, query.filterGroups));
985991
}
986992

993+
// Apply distinct
994+
if (query.distinct) {
995+
const seen = new Set<string>();
996+
rows = rows.filter(row => {
997+
const key = groupKey(row, query.distinct!);
998+
if (seen.has(key)) return false;
999+
seen.add(key);
1000+
return true;
1001+
});
1002+
}
1003+
9871004
// Apply aggregation
9881005
if (query.aggregates && query.aggregates.length > 0) {
9891006
const partial = computePartialAgg(rows, query);

src/columnar.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ export function columnarKWayMerge(
661661
const indices: number[] = new Array(batches.length).fill(0);
662662
const heap: MergeHeapEntry[] = [];
663663
for (let i = 0; i < batches.length; i++) {
664-
if (batches[i].rowCount > 0) {
664+
if (batches[i].rowCount > 0 && sortColIndices[i] >= 0) {
665665
const sci = sortColIndices[i];
666666
heap.push({ batchIdx: i, rowIdx: 0, value: readColumnValue(batches[i].columns[sci], 0) });
667667
indices[i] = 1;
@@ -796,7 +796,7 @@ export function sliceColumnarBatch(batch: ColumnarBatch, offset: number, limit?:
796796
const start = Math.min(offset, batch.rowCount);
797797
const end = limit !== undefined ? Math.min(start + limit, batch.rowCount) : batch.rowCount;
798798
const rowCount = end - start;
799-
if (rowCount <= 0) return { columns: batch.columns.map(c => ({ ...c, rowCount: 0, data: new ArrayBuffer(0) })), rowCount: 0 };
799+
if (rowCount <= 0) return { columns: batch.columns.map(c => ({ ...c, rowCount: 0, data: new ArrayBuffer(0), nullBitmap: undefined })), rowCount: 0 };
800800
if (start === 0 && end === batch.rowCount) return batch;
801801

802802
const columns: ColumnarColumn[] = [];

0 commit comments

Comments
 (0)