Skip to content

Commit 845538d

Browse files
committed
fix: 11 correctness bugs — page skip alignment, WASM bounds, filterGroups, cache ordering
Critical fixes: - Per-column page skip misaligned rows across columns (query-do, fragment-do) Now uses uniform page-level skip: same decision for all columns per page - parseWasmResult bounds check compared relative dp vs absolute end (wasm-engine) - queryToSql silently dropped OR filterGroups (wasm-engine) - Fragment-do page data out of order with partially warm cache Now tracks results by index and assembles in original order - Missing filter/sort/groupBy/aggregate columns when projections set (query-do) Now computes neededNames from all query references - Grace hash partition path allowed NULL=NULL matches (operators) Now skips NULL keys in both build and probe, matching in-memory path - rewriteAggregatesAsColumns missed BETWEEN/IN/CASE/CAST nodes (evaluator) HAVING with these constructs silently dropped all groups - Partial filter pushdown left stale filters on rollback (compiler) - BigInt multi-column ORDER BY used string comparison (executor) - count() fast path ignored join/vectorSearch/setOperation (local-executor) - Cache key missing distinct/windows/computedColumns/setOp fields (query-do, local-executor)
1 parent cafdacd commit 845538d

8 files changed

Lines changed: 213 additions & 91 deletions

File tree

src/fragment-do.ts

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -85,40 +85,57 @@ export class FragmentDO extends DurableObject<Env> {
8585
}
8686
}
8787

88-
// Build byte ranges for each page, skipping via min/max stats
88+
// Build byte ranges for each page, skipping uniformly across all columns.
89+
// A page is skipped only if any AND filter eliminates it — same for all columns.
90+
const maxPages = cols.reduce((m, c) => Math.max(m, c.pages.length), 0);
91+
const keptPageIndices: number[] = [];
92+
for (let pi = 0; pi < maxPages; pi++) {
93+
let skip = false;
94+
if (!query.vectorSearch) {
95+
for (const f of query.filters) {
96+
const col = cols.find(c => c.name === f.column);
97+
if (!col) continue;
98+
const page = col.pages[pi];
99+
if (!page) continue;
100+
if (canSkipPage(page, [f], f.column)) { skip = true; break; }
101+
}
102+
}
103+
if (skip) { totalPagesSkipped += cols.length; continue; }
104+
keptPageIndices.push(pi);
105+
}
106+
89107
const ranges: { column: string; offset: number; length: number }[] = [];
90108
for (const col of cols) {
91-
for (const page of col.pages) {
92-
if (!query.vectorSearch && canSkipPage(page, query.filters, col.name)) {
93-
totalPagesSkipped++;
94-
continue;
95-
}
109+
for (const pi of keptPageIndices) {
110+
const page = col.pages[pi];
111+
if (!page) continue;
96112
ranges.push({ column: col.name, offset: Number(page.byteOffset), length: page.byteLength });
97113
}
98114
}
99115

100-
// Cache-before-fetch: check WASM buffer pool for each range
101-
const columnData = new Map<string, ArrayBuffer[]>();
102-
const uncachedRanges: typeof ranges = [];
116+
// Cache-before-fetch: check WASM buffer pool for each range.
117+
// Track results by index to preserve page ordering when cache is partially warm.
118+
const rangeResults = new Array<ArrayBuffer | null>(ranges.length).fill(null);
119+
const uncachedRanges: { range: typeof ranges[0]; idx: number }[] = [];
103120

104-
for (const r of ranges) {
121+
for (let ri = 0; ri < ranges.length; ri++) {
122+
const r = ranges[ri];
105123
const cacheKey = `${r2Key}:${r.offset}:${r.length}`;
106124
const cached = this.wasmEngine.cacheGet(cacheKey);
107125
if (cached) {
108126
totalCacheHits++;
109-
const arr = columnData.get(r.column) ?? [];
110-
arr.push(cached);
111-
columnData.set(r.column, arr);
127+
rangeResults[ri] = cached;
112128
} else {
113129
totalCacheMisses++;
114-
uncachedRanges.push(r);
130+
uncachedRanges.push({ range: r, idx: ri });
115131
}
116132
}
117133

118134
// Fetch uncached ranges from R2 with retry + timeout
119135
const r2Start = Date.now();
120136
if (uncachedRanges.length > 0) {
121-
const coalesced = coalesceRanges(uncachedRanges, autoCoalesceGap(uncachedRanges));
137+
const fetchRanges = uncachedRanges.map(u => u.range);
138+
const coalesced = coalesceRanges(fetchRanges, autoCoalesceGap(fetchRanges));
122139
const fetched = await fetchBounded(
123140
coalesced.map(c => () =>
124141
withRetry(() =>
@@ -135,20 +152,33 @@ export class FragmentDO extends DurableObject<Env> {
135152
),
136153
8,
137154
);
155+
// Build a lookup from (column, offset, length) → fetched slice
156+
const fetchedMap = new Map<string, ArrayBuffer>();
138157
for (const f of fetched) {
139158
if (!f) continue;
140159
totalBytesRead += f.data.byteLength;
141160
for (const sub of f.ranges) {
142161
const slice = f.data.slice(sub.offset - f.offset, sub.offset - f.offset + sub.length);
143-
const arr = columnData.get(sub.column) ?? [];
144-
arr.push(slice);
145-
columnData.set(sub.column, arr);
146-
147-
// Populate cache for next time
162+
fetchedMap.set(`${sub.column}:${sub.offset}:${sub.length}`, slice);
148163
const cacheKey = `${r2Key}:${sub.offset}:${sub.length}`;
149164
this.wasmEngine.cacheSet(cacheKey, slice);
150165
}
151166
}
167+
// Fill in uncached slots in order
168+
for (const u of uncachedRanges) {
169+
const key = `${u.range.column}:${u.range.offset}:${u.range.length}`;
170+
rangeResults[u.idx] = fetchedMap.get(key) ?? null;
171+
}
172+
}
173+
174+
// Assemble columnData in correct page order
175+
const columnData = new Map<string, ArrayBuffer[]>();
176+
for (let ri = 0; ri < ranges.length; ri++) {
177+
const buf = rangeResults[ri];
178+
if (!buf) continue;
179+
const arr = columnData.get(ranges[ri].column) ?? [];
180+
arr.push(buf);
181+
columnData.set(ranges[ri].column, arr);
152182
}
153183
totalR2ReadMs += Date.now() - r2Start;
154184

@@ -161,7 +191,7 @@ export class FragmentDO extends DurableObject<Env> {
161191
.map(col => ({
162192
name: col.name, dtype: col.dtype, listDim: col.listDimension,
163193
pages: columnData.get(col.name)!,
164-
pageInfos: col.pages,
194+
pageInfos: keptPageIndices.map(pi => col.pages[pi]).filter(Boolean),
165195
}));
166196
if (!this.wasmEngine.registerColumns(fragTable, fragColEntries)) {
167197
throw new Error(`WASM OOM: failed to register columns for fragment "${r2Key}"`);

src/local-executor.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ export class LocalExecutor implements QueryExecutor {
182182
/** Count matching rows. No-filter case uses metadata only (zero I/O). */
183183
async count(query: QueryDescriptor): Promise<number> {
184184
const meta = await this.getOrLoadMeta(query.table);
185-
if (query.filters.length === 0 && !query.filterGroups?.length && !query.aggregates?.length && !query.groupBy?.length && !query.distinct) {
185+
if (query.filters.length === 0 && !query.filterGroups?.length && !query.aggregates?.length && !query.groupBy?.length && !query.distinct && !query.join && !query.vectorSearch && !query.setOperation && !query.subqueryIn && !query.computedColumns?.length) {
186186
return meta.columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
187187
}
188188
// With filters: fall through to aggregate path
@@ -387,6 +387,10 @@ export class LocalExecutor implements QueryExecutor {
387387
if (query.groupBy) for (const g of query.groupBy) feed(g);
388388
if (query.distinct) feed("distinct");
389389
if (query.version !== undefined) feed(`v${query.version}`);
390+
if (query.windows) for (const w of query.windows) { feed(w.fn); feed(w.alias); feed(w.column ?? ""); }
391+
if (query.computedColumns) for (const cc of query.computedColumns) feed(cc.name);
392+
if (query.setOperation) { feed(query.setOperation.type); feed(query.setOperation.table); }
393+
if (query.subqueryIn) { feed(query.subqueryIn.column); feed(query.subqueryIn.table); }
390394
return `qr:${query.table}:${(h >>> 0).toString(36)}`;
391395
}
392396

src/operators.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,10 +2368,13 @@ export class HashJoinOperator implements Operator {
23682368

23692369
/** Process one partition: load right → build map → probe left → emit matches. */
23702370
private async processPartition(partIdx: number): Promise<Row[]> {
2371-
// Load right partition into hash map
2371+
// Load right partition into hash map (skip NULL keys — SQL NULL != NULL)
23722372
const rightMap = new Map<string, Row[]>();
2373+
const rightNullRows: Row[] = [];
23732374
for await (const row of this.streamPartition(this.rightPartitionIds[partIdx])) {
2374-
const key = this.toJoinKey(row[this.rightKey]);
2375+
const val = row[this.rightKey];
2376+
if (val === null || val === undefined) { rightNullRows.push(row); continue; }
2377+
const key = this.toJoinKey(val);
23752378
const bucket = rightMap.get(key);
23762379
if (bucket) bucket.push(row);
23772380
else rightMap.set(key, [row]);
@@ -2380,10 +2383,15 @@ export class HashJoinOperator implements Operator {
23802383
// Track matched right rows for right/full joins
23812384
const matched = (this.joinType === "right" || this.joinType === "full") ? new Set<string>() : null;
23822385

2383-
// Probe with left partition
2386+
// Probe with left partition (skip NULL keys)
23842387
const result: Row[] = [];
23852388
for await (const leftRow of this.streamPartition(this.leftPartitionIds[partIdx])) {
2386-
const key = this.toJoinKey(leftRow[this.leftKey]);
2389+
const leftVal = leftRow[this.leftKey];
2390+
if (leftVal === null || leftVal === undefined) {
2391+
if (this.joinType === "left" || this.joinType === "full") result.push({ ...leftRow });
2392+
continue;
2393+
}
2394+
const key = this.toJoinKey(leftVal);
23872395
const rightRows = rightMap.get(key);
23882396
if (rightRows) {
23892397
for (let i = 0; i < rightRows.length; i++) {
@@ -2395,7 +2403,7 @@ export class HashJoinOperator implements Operator {
23952403
}
23962404
}
23972405

2398-
// Emit unmatched right rows for right/full joins
2406+
// Emit unmatched right rows for right/full joins (including null-keyed right rows)
23992407
if (matched) {
24002408
for (const [key, rows] of rightMap) {
24012409
for (let i = 0; i < rows.length; i++) {
@@ -2404,6 +2412,10 @@ export class HashJoinOperator implements Operator {
24042412
}
24052413
}
24062414
}
2415+
// NULL-keyed right rows never match — always unmatched
2416+
for (const row of rightNullRows) {
2417+
result.push({ ...row });
2418+
}
24072419
}
24082420

24092421
return result;

src/query-do.ts

Lines changed: 75 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -95,37 +95,46 @@ class EdgeScanOperator implements Operator {
9595
const query = this.query;
9696
const meta = this.meta;
9797

98-
let cols = query.projections.length > 0
99-
? meta.columns.filter(c => query.projections.includes(c.name))
100-
: meta.columns;
101-
102-
if (query.vectorSearch) {
103-
const vc = query.vectorSearch.column;
104-
if (!cols.some(c => c.name === vc)) {
105-
const ec = meta.columns.find(c => c.name === vc);
106-
if (ec) cols = [...cols, ec];
107-
}
98+
// Determine columns to fetch: projections + all columns referenced by filters/sort/groupBy/aggregates
99+
let neededNames: Set<string>;
100+
if (query.projections.length > 0) {
101+
neededNames = new Set(query.projections);
102+
for (const f of query.filters) neededNames.add(f.column);
103+
if (query.filterGroups) for (const g of query.filterGroups) for (const f of g) neededNames.add(f.column);
104+
if (query.sortColumn) neededNames.add(query.sortColumn);
105+
if (query.groupBy) for (const g of query.groupBy) neededNames.add(g);
106+
if (query.aggregates) for (const a of query.aggregates) if (a.column !== "*") neededNames.add(a.column);
107+
} else {
108+
neededNames = new Set(meta.columns.map(c => c.name));
108109
}
110+
if (query.vectorSearch) neededNames.add(query.vectorSearch.column);
111+
112+
let cols = meta.columns.filter(c => neededNames.has(c.name));
109113
this.cols = cols;
110114

111-
for (const col of cols) {
112-
const keptPages: typeof col.pages = [];
113-
for (const page of col.pages) {
114-
if (!query.vectorSearch && canSkipPage(page, query.filters, col.name)) {
115-
this.pagesSkipped++;
116-
continue;
115+
// Determine which pages to keep — must be uniform across all columns to avoid row misalignment.
116+
// A page is skipped only if any AND filter eliminates it (same logic as canSkipPageMultiCol).
117+
const maxPages = cols.reduce((m, c) => Math.max(m, c.pages.length), 0);
118+
const keptPageIndices: number[] = [];
119+
for (let pi = 0; pi < maxPages; pi++) {
120+
let skip = false;
121+
if (!query.vectorSearch) {
122+
for (const f of query.filters) {
123+
const col = cols.find(c => c.name === f.column);
124+
if (!col) continue;
125+
const page = col.pages[pi];
126+
if (!page) continue;
127+
if (canSkipPage(page, [f], f.column)) { skip = true; break; }
117128
}
118-
keptPages.push(page);
119129
}
120-
this.columnPageInfos.set(col.name, keptPages);
130+
if (skip) { this.pagesSkipped += cols.length; continue; }
131+
keptPageIndices.push(pi);
121132
}
122133

123-
// Count total pages from first column
124-
const firstCol = cols[0];
125-
if (firstCol) {
126-
const keptPages = this.columnPageInfos.get(firstCol.name) ?? firstCol.pages;
127-
this.pageCount = keptPages.length;
134+
for (const col of cols) {
135+
this.columnPageInfos.set(col.name, keptPageIndices.map(pi => col.pages[pi]).filter(Boolean));
128136
}
137+
this.pageCount = keptPageIndices.length;
129138
}
130139

131140
/**
@@ -670,6 +679,11 @@ export class QueryDO extends DurableObject<Env> {
670679
if (query.offset !== undefined) feed(String(query.offset));
671680
if (query.aggregates) for (const a of query.aggregates) { feed(a.fn); feed(a.column); if (a.alias) feed(a.alias); }
672681
if (query.groupBy) for (const g of query.groupBy) feed(g);
682+
if (query.distinct) feed("distinct");
683+
if (query.windows) for (const w of query.windows) { feed(w.fn); feed(w.alias); feed(w.column ?? ""); }
684+
if (query.computedColumns) for (const cc of query.computedColumns) feed(cc.name);
685+
if (query.setOperation) { feed(query.setOperation.type); feed(query.setOperation.table); }
686+
if (query.subqueryIn) { feed(query.subqueryIn.column); feed(query.subqueryIn.table); }
673687
return `qr:${query.table}:${(h >>> 0).toString(36)}`;
674688
}
675689

@@ -922,31 +936,52 @@ export class QueryDO extends DurableObject<Env> {
922936

923937
/** Scan only the needed pages from R2 via coalesced Range reads, with cache-before-fetch. */
924938
private async scanPages(query: QueryDescriptor, meta: TableMeta, t0: number): Promise<QueryResult> {
925-
let cols = query.projections.length > 0
926-
? meta.columns.filter(c => query.projections.includes(c.name))
927-
: meta.columns;
928-
929-
if (query.vectorSearch) {
930-
const vc = query.vectorSearch.column;
931-
if (!cols.some(c => c.name === vc)) {
932-
const ec = meta.columns.find(c => c.name === vc);
933-
if (ec) cols = [...cols, ec];
934-
}
939+
// Determine columns to fetch: projections + all referenced by filters/sort/groupBy/aggregates
940+
let neededNames: Set<string>;
941+
if (query.projections.length > 0) {
942+
neededNames = new Set(query.projections);
943+
for (const f of query.filters) neededNames.add(f.column);
944+
if (query.filterGroups) for (const g of query.filterGroups) for (const f of g) neededNames.add(f.column);
945+
if (query.sortColumn) neededNames.add(query.sortColumn);
946+
if (query.groupBy) for (const g of query.groupBy) neededNames.add(g);
947+
if (query.aggregates) for (const a of query.aggregates) if (a.column !== "*") neededNames.add(a.column);
948+
} else {
949+
neededNames = new Set(meta.columns.map(c => c.name));
935950
}
951+
if (query.vectorSearch) neededNames.add(query.vectorSearch.column);
952+
953+
let cols = meta.columns.filter(c => neededNames.has(c.name));
936954

937-
// Build per-page ranges, applying page-level skip.
938-
// Track non-skipped page infos per column so buffer indices stay aligned.
955+
// Build per-page ranges, applying page-level skip uniformly across all columns.
956+
// A page is skipped only if any AND filter eliminates it — same decision for all columns
957+
// to prevent row misalignment when different columns have different page counts.
939958
const ranges: { column: string; offset: number; length: number }[] = [];
940959
const columnPageInfos = new Map<string, typeof cols[0]["pages"]>();
941960
let pagesSkipped = 0;
961+
962+
const maxPages = cols.reduce((m, c) => Math.max(m, c.pages.length), 0);
963+
const keptPageIndices: number[] = [];
964+
for (let pi = 0; pi < maxPages; pi++) {
965+
let skip = false;
966+
if (!query.vectorSearch) {
967+
for (const f of query.filters) {
968+
const col = cols.find(c => c.name === f.column);
969+
if (!col) continue;
970+
const page = col.pages[pi];
971+
if (!page) continue;
972+
if (canSkipPage(page, [f], f.column)) { skip = true; break; }
973+
}
974+
}
975+
if (skip) { pagesSkipped += cols.length; continue; }
976+
keptPageIndices.push(pi);
977+
}
978+
942979
for (const col of cols) {
943-
const keptPages: typeof col.pages = [];
944-
for (const page of col.pages) {
945-
if (!query.vectorSearch && canSkipPage(page, query.filters, col.name)) { pagesSkipped++; continue; }
946-
keptPages.push(page);
980+
const keptPages = keptPageIndices.map(pi => col.pages[pi]).filter(Boolean);
981+
columnPageInfos.set(col.name, keptPages);
982+
for (const page of keptPages) {
947983
ranges.push({ column: col.name, offset: Number(page.byteOffset), length: page.byteLength });
948984
}
949-
columnPageInfos.set(col.name, keptPages);
950985
}
951986

952987
// 3-tier cache hierarchy:

src/sql/compiler.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,11 @@ export function compileFull(stmt: SelectStmt): SqlCompileResult {
7171
let whereExpr: SqlExpr | undefined;
7272
let filterGroups: FilterOp[][] | undefined;
7373
if (stmt.where) {
74+
const savedLen = filters.length;
7475
const fullyFlattened = tryFlattenFilters(stmt.where, filters);
7576
if (!fullyFlattened) {
77+
// Roll back any partially-pushed filters to avoid double-filtering
78+
filters.length = savedLen;
7679
// Try OR decomposition: if top-level is OR, flatten each branch independently
7780
const orGroups = tryFlattenOrGroups(stmt.where);
7881
if (orGroups) {

src/sql/evaluator.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,31 @@ export function rewriteAggregatesAsColumns(expr: SqlExpr): SqlExpr {
222222
};
223223
case "unary":
224224
return { kind: "unary", op: expr.op, operand: rewriteAggregatesAsColumns(expr.operand) };
225+
case "between":
226+
return {
227+
kind: "between",
228+
expr: rewriteAggregatesAsColumns(expr.expr),
229+
low: rewriteAggregatesAsColumns(expr.low),
230+
high: rewriteAggregatesAsColumns(expr.high),
231+
};
232+
case "in_list":
233+
return {
234+
kind: "in_list", negated: expr.negated,
235+
expr: rewriteAggregatesAsColumns(expr.expr),
236+
values: expr.values.map(rewriteAggregatesAsColumns),
237+
};
238+
case "case_expr":
239+
return {
240+
kind: "case_expr",
241+
operand: expr.operand ? rewriteAggregatesAsColumns(expr.operand) : undefined,
242+
whenClauses: expr.whenClauses.map(w => ({
243+
condition: rewriteAggregatesAsColumns(w.condition),
244+
result: rewriteAggregatesAsColumns(w.result),
245+
})),
246+
elseResult: expr.elseResult ? rewriteAggregatesAsColumns(expr.elseResult) : undefined,
247+
};
248+
case "cast":
249+
return { kind: "cast", expr: rewriteAggregatesAsColumns(expr.expr), targetType: expr.targetType };
225250
default:
226251
return expr;
227252
}

0 commit comments

Comments
 (0)