Skip to content

Commit 57d2e70

Browse files
committed
feat: multi-column page skip + BETWEEN filter with WASM range ops
- canSkipPageMultiCol: check ALL filter columns' min/max stats for page skip (was only checking first column — missed skips on non-first-column filters) - Add BETWEEN filter op end-to-end: types, client (.whereBetween()), decode (matchesFilter + canSkipPage), WASM range filters, scan-time WASM path - Add filterFloat64Range, filterInt32Range, filterInt64Range Zig exports - canSkipPage handles BETWEEN: skip page if max < low or min > high - WasmAggregateOperator uses multi-column page skip
1 parent 510f9b1 commit 57d2e70

File tree

6 files changed

+158
-9
lines changed

6 files changed

+158
-9
lines changed

src/client.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@ export class DataFrame<T extends Row = Row> {
169169
return this.where(column, op, value);
170170
}
171171

172+
/** Filter rows where column value is between low and high (inclusive). */
173+
whereBetween(column: string, low: number | bigint, high: number | bigint): DataFrame<T> {
174+
return this.derive({ filters: [...this._filters, { column, op: "between", value: [low, high] }] });
175+
}
176+
172177
/** Filter rows where a column is not null. */
173178
whereNotNull(column: string): DataFrame<T> {
174179
return this.derive({ filters: [...this._filters, { column, op: "is_not_null", value: 0 }] });

src/decode.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@ export function canSkipPage(page: PageInfo, filters: QueryDescriptor["filters"],
2525
case "lt": if (min >= val) return true; break;
2626
case "lte": if (min > val) return true; break;
2727
case "eq": if (val < min || val > max) return true; break;
28+
case "between": {
29+
if (!Array.isArray(filter.value) || filter.value.length !== 2) break;
30+
let lo = filter.value[0];
31+
let hi = filter.value[1];
32+
if (typeof min === "bigint" && typeof lo === "number") { lo = BigInt(Math.trunc(lo)); hi = BigInt(Math.trunc(hi as number)); }
33+
else if (typeof min === "number" && typeof lo === "bigint") { min = BigInt(Math.trunc(min)); max = BigInt(Math.trunc(max as number)); }
34+
if (max < lo || min > hi) return true;
35+
break;
36+
}
2837
}
2938
}
3039
return false;
@@ -383,6 +392,13 @@ export function matchesFilter(
383392
case "lt": { const [a, b] = coerceCompare(val, t); return (a as number | bigint | string) < (b as number | bigint | string); }
384393
case "lte": { const [a, b] = coerceCompare(val, t); return (a as number | bigint | string) <= (b as number | bigint | string); }
385394
case "in": return Array.isArray(t) && t.some(v => { const [a, b] = coerceCompare(val, v); return a === b; });
395+
case "between": {
396+
if (!Array.isArray(t) || t.length !== 2) return false;
397+
const [, lo] = coerceCompare(val, t[0]);
398+
const [a, hi] = coerceCompare(val, t[1]);
399+
return (a as number | bigint | string) >= (lo as number | bigint | string) &&
400+
(a as number | bigint | string) <= (hi as number | bigint | string);
401+
}
386402
default: return true;
387403
}
388404
}

src/operators.ts

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,23 @@ export interface FragmentSource {
4141
readPage(col: ColumnMeta, page: PageInfo): Promise<ArrayBuffer>;
4242
}
4343

44+
/** Check if a page can be skipped by checking ALL filter columns' min/max stats.
45+
* Returns true if ANY filter column's stats prove zero rows can match. */
46+
function canSkipPageMultiCol(
47+
columns: ColumnMeta[], pageIdx: number, filters: QueryDescriptor["filters"],
48+
): boolean {
49+
if (filters.length === 0) return false;
50+
const colMap = new Map(columns.map(c => [c.name, c]));
51+
for (const f of filters) {
52+
const col = colMap.get(f.column);
53+
if (!col) continue;
54+
const page = col.pages[pageIdx];
55+
if (!page) continue;
56+
if (canSkipPage(page, [f], f.column)) return true;
57+
}
58+
return false;
59+
}
60+
4461
// ---------------------------------------------------------------------------
4562
// ScanOperator — reads pages from fragments, decodes, yields batches
4663
// ---------------------------------------------------------------------------
@@ -110,8 +127,7 @@ export class ScanOperator implements Operator {
110127
const firstCol = frag.columns[0];
111128
if (!firstCol) return -1;
112129
for (let i = start; i < firstCol.pages.length; i++) {
113-
const page = firstCol.pages[i];
114-
if (this.query.vectorSearch || !canSkipPage(page, this.query.filters, firstCol.name)) {
130+
if (this.query.vectorSearch || !canSkipPageMultiCol(frag.columns, i, this.query.filters)) {
115131
return i;
116132
}
117133
}
@@ -156,9 +172,8 @@ export class ScanOperator implements Operator {
156172
const pi = this.pageIdx;
157173
this.pageIdx++;
158174

159-
// Page-level skip via min/max stats on first column
160-
const page = firstCol.pages[pi];
161-
if (!this.query.vectorSearch && canSkipPage(page, this.query.filters, firstCol.name)) {
175+
// Page-level skip via min/max stats — check ALL filter columns, not just first
176+
if (!this.query.vectorSearch && canSkipPageMultiCol(frag.columns, pi, this.query.filters)) {
162177
this.pagesSkipped += frag.columns.length;
163178
continue;
164179
}
@@ -318,7 +333,7 @@ function scanFilterIndices(
318333
return new Uint32Array(0);
319334
}
320335

321-
const wasmOp = filter.op !== "in" ? filterOpToWasm(filter.op) : -1;
336+
const wasmOp = filter.op !== "in" && filter.op !== "between" ? filterOpToWasm(filter.op) : -1;
322337

323338
// Try WASM SIMD path for numeric scalar filters
324339
if (wasmOp >= 0 && typeof filter.value === "number" &&
@@ -333,6 +348,20 @@ function scanFilterIndices(
333348
}
334349
}
335350

351+
// Try WASM BETWEEN path for numeric range filters
352+
if (filter.op === "between" && Array.isArray(filter.value) && filter.value.length === 2 &&
353+
typeof filter.value[0] === "number" && typeof filter.value[1] === "number" &&
354+
(dtype === "float64" || dtype === "float32" || dtype === "int32" || dtype === "int64")) {
355+
const filterResult = wasmFilterRange(
356+
values, dtype, filter.value[0], filter.value[1], rowCount, wasm,
357+
);
358+
if (filterResult) {
359+
indices = indices ? wasmIntersect(indices, filterResult, wasm) : filterResult;
360+
if (indices.length === 0) return indices;
361+
continue;
362+
}
363+
}
364+
336365
// JS fallback: evaluate filter on current index set
337366
const src = indices ?? Uint32Array.from({ length: rowCount }, (_, i) => i);
338367
const kept: number[] = [];
@@ -413,6 +442,60 @@ function wasmFilterNumeric(
413442
}
414443
}
415444

445+
/** Run WASM BETWEEN (range) filter on decoded numeric values. */
446+
function wasmFilterRange(
447+
values: DecodedValue[],
448+
dtype: string,
449+
low: number,
450+
high: number,
451+
rowCount: number,
452+
wasm: WasmEngine,
453+
): Uint32Array | null {
454+
try {
455+
wasm.exports.resetHeap();
456+
457+
if (dtype === "float64" || dtype === "float32") {
458+
const dataPtr = wasm.exports.alloc(rowCount * 8);
459+
if (!dataPtr) return null;
460+
const dst = new Float64Array(wasm.exports.memory.buffer, dataPtr, rowCount);
461+
for (let i = 0; i < rowCount; i++) dst[i] = (values[i] as number) ?? 0;
462+
const outPtr = wasm.exports.alloc(rowCount * 4);
463+
if (!outPtr) return null;
464+
const count = wasm.exports.filterFloat64Range(dataPtr, rowCount, low, high, outPtr, rowCount);
465+
return new Uint32Array(wasm.exports.memory.buffer.slice(outPtr, outPtr + count * 4));
466+
}
467+
468+
if (dtype === "int64") {
469+
const dataPtr = wasm.exports.alloc(rowCount * 8);
470+
if (!dataPtr) return null;
471+
const dst = new BigInt64Array(wasm.exports.memory.buffer, dataPtr, rowCount);
472+
for (let i = 0; i < rowCount; i++) {
473+
const v = values[i];
474+
dst[i] = typeof v === "bigint" ? v : BigInt((v as number) ?? 0);
475+
}
476+
const outPtr = wasm.exports.alloc(rowCount * 4);
477+
if (!outPtr) return null;
478+
const count = wasm.exports.filterInt64Range(dataPtr, rowCount, BigInt(low), BigInt(high), outPtr, rowCount);
479+
return new Uint32Array(wasm.exports.memory.buffer.slice(outPtr, outPtr + count * 4));
480+
}
481+
482+
if (dtype === "int32") {
483+
const dataPtr = wasm.exports.alloc(rowCount * 4);
484+
if (!dataPtr) return null;
485+
const dst = new Int32Array(wasm.exports.memory.buffer, dataPtr, rowCount);
486+
for (let i = 0; i < rowCount; i++) dst[i] = (values[i] as number) ?? 0;
487+
const outPtr = wasm.exports.alloc(rowCount * 4);
488+
if (!outPtr) return null;
489+
const count = wasm.exports.filterInt32Range(dataPtr, rowCount, low, high, outPtr, rowCount);
490+
return new Uint32Array(wasm.exports.memory.buffer.slice(outPtr, outPtr + count * 4));
491+
}
492+
493+
return null;
494+
} catch {
495+
return null;
496+
}
497+
}
498+
416499
/** Intersect two sorted index arrays using WASM. */
417500
function wasmIntersect(a: Uint32Array, b: Uint32Array, wasm: WasmEngine): Uint32Array {
418501
try {
@@ -1296,8 +1379,8 @@ export class WasmAggregateOperator implements Operator {
12961379
const pageCount = firstCol.pages.length;
12971380

12981381
for (let pi = 0; pi < pageCount; pi++) {
1299-
// Page-level skip via min/max stats
1300-
if (canSkipPage(firstCol.pages[pi], filters, firstCol.name)) {
1382+
// Page-level skip via min/max stats — check all filter columns
1383+
if (canSkipPageMultiCol(frag.columns, pi, filters)) {
13011384
this.pagesSkipped++;
13021385
continue;
13031386
}

src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ export interface TableMeta {
8585
/** A filter predicate that can be pushed down to page-level skipping */
8686
export interface FilterOp {
8787
column: string;
88-
op: "eq" | "neq" | "gt" | "gte" | "lt" | "lte" | "in" | "is_null" | "is_not_null";
88+
op: "eq" | "neq" | "gt" | "gte" | "lt" | "lte" | "in" | "between" | "is_null" | "is_not_null";
8989
value: number | bigint | string | (number | bigint | string)[];
9090
}
9191

src/wasm-engine.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ export interface WasmExports {
6767
filterFloat64Buffer(dataPtr: number, len: number, op: number, value: number, outPtr: number, maxOut: number): number;
6868
filterInt32Buffer(dataPtr: number, len: number, op: number, value: number, outPtr: number, maxOut: number): number;
6969
filterInt64Buffer(dataPtr: number, len: number, op: number, value: bigint, outPtr: number, maxOut: number): number;
70+
filterFloat64Range(dataPtr: number, len: number, low: number, high: number, outPtr: number, maxOut: number): number;
71+
filterInt32Range(dataPtr: number, len: number, low: number, high: number, outPtr: number, maxOut: number): number;
72+
filterInt64Range(dataPtr: number, len: number, low: bigint, high: bigint, outPtr: number, maxOut: number): number;
7073
intersectIndices(aPtr: number, aLen: number, bPtr: number, bLen: number, outPtr: number, maxOut: number): number;
7174
unionIndices(aPtr: number, aLen: number, bPtr: number, bLen: number, outPtr: number, maxOut: number): number;
7275

wasm/src/wasm/aggregates.zig

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,48 @@ export fn filterFloat64Range(
805805
return out_count;
806806
}
807807

808+
/// Filter int32 with range (BETWEEN): returns indices where low <= val <= high
809+
export fn filterInt32Range(
810+
data_ptr: [*]const i32,
811+
len: usize,
812+
low: i32,
813+
high: i32,
814+
out_indices: [*]u32,
815+
max_indices: usize,
816+
) usize {
817+
var out_count: usize = 0;
818+
for (0..len) |i| {
819+
if (out_count >= max_indices) break;
820+
const v = data_ptr[i];
821+
if (v >= low and v <= high) {
822+
out_indices[out_count] = @intCast(i);
823+
out_count += 1;
824+
}
825+
}
826+
return out_count;
827+
}
828+
829+
/// Filter int64 with range (BETWEEN): returns indices where low <= val <= high
830+
export fn filterInt64Range(
831+
data_ptr: [*]const i64,
832+
len: usize,
833+
low: i64,
834+
high: i64,
835+
out_indices: [*]u32,
836+
max_indices: usize,
837+
) usize {
838+
var out_count: usize = 0;
839+
for (0..len) |i| {
840+
if (out_count >= max_indices) break;
841+
const v = data_ptr[i];
842+
if (v >= low and v <= high) {
843+
out_indices[out_count] = @intCast(i);
844+
out_count += 1;
845+
}
846+
}
847+
return out_count;
848+
}
849+
808850
/// AND two index arrays (intersection)
809851
/// Uses O(n+m) sorted merge since filter outputs are always in ascending order
810852
export fn intersectIndices(

0 commit comments

Comments
 (0)