Skip to content

Commit 043792f

Browse files
committed
fix: 2 correctness bugs — window BETWEEN parse, window sum/avg null frames; extract rowPassesFilters (-61 lines)
Correctness: - parser: consume optional BETWEEN keyword in window frame clause (ROWS BETWEEN ... AND ...) - operators: window aggregate sum/avg return null (not 0) when frame has only null values Code reduction: - Extract rowPassesFilters() in decode.ts — shared by 4 call sites (decode, FilterOperator, MaterializedExecutor, QueryDO) - Replaces 15-20 line inline filter patterns with single function calls
1 parent e961e8f commit 043792f

File tree

5 files changed

+38
-99
lines changed

5 files changed

+38
-99
lines changed

src/client.ts

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import type {
1616
WindowSpec,
1717
} from "./types.js";
1818
import type { Operator, RowBatch } from "./operators.js";
19-
import { matchesFilter } from "./decode.js";
19+
import { rowPassesFilters } from "./decode.js";
2020
import { computePartialAgg, finalizePartialAgg } from "./partial-agg.js";
2121
import { descriptorToCode } from "./descriptor-to-code.js";
2222

@@ -894,32 +894,9 @@ export class MaterializedExecutor implements QueryExecutor {
894894
});
895895
}
896896

897-
// Apply AND filters
898-
if (query.filters.length > 0) {
899-
rows = rows.filter(row =>
900-
query.filters.every(f => {
901-
const v = row[f.column];
902-
if (f.op === "is_null") return v === null || v === undefined;
903-
if (f.op === "is_not_null") return v !== null && v !== undefined;
904-
if (v === null || v === undefined) return false;
905-
return matchesFilter(v as Row[string], f);
906-
}),
907-
);
908-
}
909-
910-
// Apply OR filter groups (each group is AND-connected, groups are OR'd)
911-
if (query.filterGroups && query.filterGroups.length > 0) {
912-
rows = rows.filter(row =>
913-
query.filterGroups!.some(group =>
914-
group.every(f => {
915-
const v = row[f.column];
916-
if (f.op === "is_null") return v === null || v === undefined;
917-
if (f.op === "is_not_null") return v !== null && v !== undefined;
918-
if (v === null || v === undefined) return false;
919-
return matchesFilter(v as Row[string], f);
920-
}),
921-
),
922-
);
897+
// Apply filters (AND + OR groups)
898+
if (query.filters.length > 0 || (query.filterGroups && query.filterGroups.length > 0)) {
899+
rows = rows.filter(row => rowPassesFilters(row, query.filters, query.filterGroups));
923900
}
924901

925902
// Apply aggregation

src/decode.ts

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -138,27 +138,7 @@ export function assembleRows(
138138
row[col.name] = values ? (values[i] as Row[string]) : null;
139139
}
140140

141-
// AND filters must all pass
142-
const andPass = query.filters.every(f => {
143-
const v = row[f.column];
144-
if (f.op === "is_null") return v === null || v === undefined;
145-
if (f.op === "is_not_null") return v !== null && v !== undefined;
146-
return v !== null && matchesFilter(v, f);
147-
});
148-
if (!andPass) continue;
149-
150-
// OR groups: at least one group must pass
151-
if (query.filterGroups && query.filterGroups.length > 0) {
152-
const orPass = query.filterGroups.some(group =>
153-
group.every(f => {
154-
const v = row[f.column];
155-
if (f.op === "is_null") return v === null || v === undefined;
156-
if (f.op === "is_not_null") return v !== null && v !== undefined;
157-
return v !== null && matchesFilter(v, f);
158-
}),
159-
);
160-
if (!orPass) continue;
161-
}
141+
if (!rowPassesFilters(row, query.filters, query.filterGroups)) continue;
162142

163143
rows.push(row);
164144
}
@@ -526,6 +506,29 @@ function getInSet(values: readonly (number | bigint | string)[]): Set<number | b
526506
return set;
527507
}
528508

509+
/** Test whether a row passes AND filters + OR filter groups. */
510+
export function rowPassesFilters(row: Row, filters: FilterOp[], filterGroups?: FilterOp[][]): boolean {
511+
for (const f of filters) {
512+
const v = row[f.column];
513+
if (f.op === "is_null") { if (v !== null && v !== undefined) return false; continue; }
514+
if (f.op === "is_not_null") { if (v === null || v === undefined) return false; continue; }
515+
if (v === null || v === undefined) return false;
516+
if (!matchesFilter(v, f)) return false;
517+
}
518+
if (filterGroups && filterGroups.length > 0) {
519+
return filterGroups.some(group =>
520+
group.every(f => {
521+
const v = row[f.column];
522+
if (f.op === "is_null") return v === null || v === undefined;
523+
if (f.op === "is_not_null") return v !== null && v !== undefined;
524+
if (v === null || v === undefined) return false;
525+
return matchesFilter(v, f);
526+
}),
527+
);
528+
}
529+
return true;
530+
}
531+
529532
/** Cache compiled LIKE regexes — avoids re-compilation per row. */
530533
const likeRegexCache = new Map<string, RegExp>();
531534

src/operators.ts

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import type { ColumnMeta, FilterOp, PageInfo, Row } from "./types.js";
1111
import { NULL_SENTINEL } from "./types.js";
1212
import type { QueryDescriptor } from "./client.js";
1313
import type { WasmEngine } from "./wasm-engine.js";
14-
import { canSkipPage, matchesFilter, decodePage } from "./decode.js";
14+
import { canSkipPage, matchesFilter, rowPassesFilters, decodePage } from "./decode.js";
1515
import { decodeParquetColumnChunk } from "./parquet-decode.js";
1616

1717
const _textEncoder = new TextEncoder();
@@ -1053,8 +1053,8 @@ export class WindowOperator implements Operator {
10531053
if (n > runMax) runMax = n;
10541054
}
10551055
switch (fn) {
1056-
case "sum": rows[indices[i]][alias] = runSum; break;
1057-
case "avg": rows[indices[i]][alias] = runCount === 0 ? 0 : runSum / runCount; break;
1056+
case "sum": rows[indices[i]][alias] = runCount === 0 ? null : runSum; break;
1057+
case "avg": rows[indices[i]][alias] = runCount === 0 ? null : runSum / runCount; break;
10581058
case "min": rows[indices[i]][alias] = runMin === Infinity ? null : runMin; break;
10591059
case "max": rows[indices[i]][alias] = runMax === -Infinity ? null : runMax; break;
10601060
case "count": rows[indices[i]][alias] = runCount; break;
@@ -1087,8 +1087,8 @@ export class WindowOperator implements Operator {
10871087
}
10881088

10891089
switch (fn) {
1090-
case "sum": rows[indices[i]][alias] = sum; break;
1091-
case "avg": rows[indices[i]][alias] = count === 0 ? 0 : sum / count; break;
1090+
case "sum": rows[indices[i]][alias] = count === 0 ? null : sum; break;
1091+
case "avg": rows[indices[i]][alias] = count === 0 ? null : sum / count; break;
10921092
case "min": rows[indices[i]][alias] = min === Infinity ? null : min; break;
10931093
case "max": rows[indices[i]][alias] = max === -Infinity ? null : max; break;
10941094
case "count": rows[indices[i]][alias] = count; break;
@@ -1354,28 +1354,7 @@ export class FilterOperator implements Operator {
13541354
}
13551355

13561356
private matchesRow(row: Row): boolean {
1357-
// AND filters must all pass
1358-
const andPass = this.filters.every(f => {
1359-
const v = row[f.column];
1360-
if (f.op === "is_null") return v === null || v === undefined;
1361-
if (f.op === "is_not_null") return v !== null && v !== undefined;
1362-
return v !== null && matchesFilter(v, f);
1363-
});
1364-
if (!andPass) return false;
1365-
1366-
// OR groups: at least one group must pass (each group is AND-connected)
1367-
if (this.filterGroups && this.filterGroups.length > 0) {
1368-
return this.filterGroups.some(group =>
1369-
group.every(f => {
1370-
const v = row[f.column];
1371-
if (f.op === "is_null") return v === null || v === undefined;
1372-
if (f.op === "is_not_null") return v !== null && v !== undefined;
1373-
return v !== null && matchesFilter(v, f);
1374-
}),
1375-
);
1376-
}
1377-
1378-
return true;
1357+
return rowPassesFilters(row, this.filters, this.filterGroups);
13791358
}
13801359

13811360
async nextColumnar(): Promise<ColumnarBatch | null> {

src/query-do.ts

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { parseFooter, parseColumnMetaFromProtobuf } from "./footer.js";
55
import { parseManifest, logicalTypeToDataType } from "./manifest.js";
66
import { detectFormat, getParquetFooterLength, parseParquetFooter, parquetMetaToTableMeta } from "./parquet.js";
77
import { parseIcebergMetadata, extractParquetPathsFromManifest } from "./iceberg.js";
8-
import { canSkipPage, canSkipFragment, matchesFilter } from "./decode.js";
8+
import { canSkipPage, canSkipFragment, rowPassesFilters } from "./decode.js";
99
import { decodeParquetColumnChunk } from "./parquet-decode.js";
1010
import { instantiateWasm, type WasmEngine } from "./wasm-engine.js";
1111
import { mergeQueryResults } from "./merge.js";
@@ -433,29 +433,8 @@ export class QueryDO extends DurableObject<Env> {
433433
/** Apply filters, sort, and limit in JS. Handles bigint↔number coercion for cross-type comparison. */
434434
private applyJsPostProcessing(rows: Row[], query: QueryDescriptor): Row[] {
435435
let result = rows;
436-
if (query.filters.length > 0) {
437-
result = result.filter(row =>
438-
query.filters.every(f => {
439-
const v = row[f.column];
440-
if (f.op === "is_null") return v === null || v === undefined;
441-
if (f.op === "is_not_null") return v !== null && v !== undefined;
442-
if (v === null || v === undefined) return false;
443-
return matchesFilter(v as number | bigint | string | boolean, f);
444-
}),
445-
);
446-
}
447-
if (query.filterGroups && query.filterGroups.length > 0) {
448-
result = result.filter(row =>
449-
query.filterGroups!.some(group =>
450-
group.every(f => {
451-
const v = row[f.column];
452-
if (f.op === "is_null") return v === null || v === undefined;
453-
if (f.op === "is_not_null") return v !== null && v !== undefined;
454-
if (v === null || v === undefined) return false;
455-
return matchesFilter(v as number | bigint | string | boolean, f);
456-
}),
457-
),
458-
);
436+
if (query.filters.length > 0 || (query.filterGroups && query.filterGroups.length > 0)) {
437+
result = result.filter(row => rowPassesFilters(row, query.filters, query.filterGroups));
459438
}
460439
if (query.sortColumn) {
461440
const dir = query.sortDirection === "desc" ? -1 : 1;

src/sql/parser.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,7 @@ class Parser {
732732
// Frame clause
733733
if (this.check(TokenType.ROWS) || this.check(TokenType.RANGE)) {
734734
const frameType = this.match(TokenType.ROWS) ? "rows" as const : (this.advance(), "range" as const);
735+
this.match(TokenType.BETWEEN); // consume optional BETWEEN keyword
735736
const start = this.parseFrameBound();
736737
let end: NonNullable<SqlWindowSpec["frame"]>["end"];
737738
if (this.match(TokenType.AND)) {

0 commit comments

Comments
 (0)