Skip to content

Commit 43bc6df

Browse files
committed
fix: neededNames missing distinct/windows/join/subqueryIn columns across all scan paths
All four scan sites (query-do x2, fragment-do, local-executor x2) now include window partition/order/target columns, distinct columns, join leftKey, and subqueryIn columns when computing which columns to fetch. Previously these were only covered when projections were empty (all columns). With explicit projections, these referenced columns could be missing from the scan, causing silent data loss or incorrect results.
1 parent 4caffaa commit 43bc6df

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

src/fragment-do.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ export class FragmentDO extends DurableObject<Env> {
8282
if (query.sortColumn) neededNames.add(query.sortColumn);
8383
if (query.groupBy) for (const g of query.groupBy) neededNames.add(g);
8484
if (query.aggregates) for (const a of query.aggregates) if (a.column !== "*") neededNames.add(a.column);
85+
if (query.distinct) for (const d of query.distinct) neededNames.add(d);
86+
if (query.windows) for (const w of query.windows) {
87+
if (w.column) neededNames.add(w.column);
88+
for (const p of w.partitionBy) neededNames.add(p);
89+
for (const o of w.orderBy) neededNames.add(o.column);
90+
}
91+
if (query.join) { neededNames.add(query.join.leftKey); }
92+
if (query.subqueryIn) for (const sq of query.subqueryIn) neededNames.add(sq.column);
8593
} else {
8694
neededNames = new Set(effectiveMeta.columns.map(c => c.name));
8795
}

src/local-executor.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,13 +472,21 @@ export class LocalExecutor implements QueryExecutor {
472472
}
473473
}
474474

475-
// Step 2: Determine columns to fetch (projection + filter + sort + groupBy + aggregate columns)
475+
// Step 2: Determine columns to fetch (projection + all referenced columns)
476476
const neededColumns = new Set(query.projections.length > 0 ? query.projections : columns.map(c => c.name));
477477
for (const f of query.filters) neededColumns.add(f.column);
478478
if (query.filterGroups) for (const g of query.filterGroups) for (const f of g) neededColumns.add(f.column);
479479
if (query.sortColumn) neededColumns.add(query.sortColumn);
480480
if (query.groupBy) for (const g of query.groupBy) neededColumns.add(g);
481481
if (query.aggregates) for (const a of query.aggregates) if (a.column !== "*") neededColumns.add(a.column);
482+
if (query.distinct) for (const d of query.distinct) neededColumns.add(d);
483+
if (query.windows) for (const w of query.windows) {
484+
if (w.column) neededColumns.add(w.column);
485+
for (const p of w.partitionBy) neededColumns.add(p);
486+
for (const o of w.orderBy) neededColumns.add(o.column);
487+
}
488+
if (query.join) { neededColumns.add(query.join.leftKey); }
489+
if (query.subqueryIn) for (const sq of query.subqueryIn) neededColumns.add(sq.column);
482490
if (query.vectorSearch) neededColumns.add(query.vectorSearch.column);
483491
const projectedColumns = columns.filter(c => neededColumns.has(c.name));
484492

@@ -928,6 +936,14 @@ export class LocalExecutor implements QueryExecutor {
928936
if (query.sortColumn) neededCols.add(query.sortColumn);
929937
if (query.groupBy) for (const g of query.groupBy) neededCols.add(g);
930938
if (query.aggregates) for (const a of query.aggregates) if (a.column !== "*") neededCols.add(a.column);
939+
if (query.distinct) for (const d of query.distinct) neededCols.add(d);
940+
if (query.windows) for (const w of query.windows) {
941+
if (w.column) neededCols.add(w.column);
942+
for (const p of w.partitionBy) neededCols.add(p);
943+
for (const o of w.orderBy) neededCols.add(o.column);
944+
}
945+
if (query.join) { neededCols.add(query.join.leftKey); }
946+
if (query.subqueryIn) for (const sq of query.subqueryIn) neededCols.add(sq.column);
931947
if (query.vectorSearch) neededCols.add(query.vectorSearch.column);
932948
const projectedColumns = meta.columns.filter(c => neededCols.has(c.name));
933949
const filePath = meta.r2Key;

src/query-do.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ class EdgeScanOperator implements Operator {
117117
if (query.sortColumn) neededNames.add(query.sortColumn);
118118
if (query.groupBy) for (const g of query.groupBy) neededNames.add(g);
119119
if (query.aggregates) for (const a of query.aggregates) if (a.column !== "*") neededNames.add(a.column);
120+
if (query.distinct) for (const d of query.distinct) neededNames.add(d);
121+
if (query.windows) for (const w of query.windows) {
122+
if (w.column) neededNames.add(w.column);
123+
for (const p of w.partitionBy) neededNames.add(p);
124+
for (const o of w.orderBy) neededNames.add(o.column);
125+
}
126+
if (query.join) { neededNames.add(query.join.leftKey); }
127+
if (query.subqueryIn) for (const sq of query.subqueryIn) neededNames.add(sq.column);
120128
} else {
121129
neededNames = new Set(meta.columns.map(c => c.name));
122130
}
@@ -971,7 +979,7 @@ export class QueryDO extends DurableObject<Env> {
971979

972980
/** Scan only the needed pages from R2 via coalesced Range reads, with cache-before-fetch. */
973981
private async scanPages(query: QueryDescriptor, meta: TableMeta, t0: number): Promise<QueryResult> {
974-
// Determine columns to fetch: projections + all referenced by filters/sort/groupBy/aggregates
982+
// Determine columns to fetch: projections + all referenced by filters/sort/groupBy/aggregates/windows/distinct/join
975983
let neededNames: Set<string>;
976984
if (query.projections.length > 0) {
977985
neededNames = new Set(query.projections);
@@ -980,6 +988,14 @@ export class QueryDO extends DurableObject<Env> {
980988
if (query.sortColumn) neededNames.add(query.sortColumn);
981989
if (query.groupBy) for (const g of query.groupBy) neededNames.add(g);
982990
if (query.aggregates) for (const a of query.aggregates) if (a.column !== "*") neededNames.add(a.column);
991+
if (query.distinct) for (const d of query.distinct) neededNames.add(d);
992+
if (query.windows) for (const w of query.windows) {
993+
if (w.column) neededNames.add(w.column);
994+
for (const p of w.partitionBy) neededNames.add(p);
995+
for (const o of w.orderBy) neededNames.add(o.column);
996+
}
997+
if (query.join) { neededNames.add(query.join.leftKey); }
998+
if (query.subqueryIn) for (const sq of query.subqueryIn) neededNames.add(sq.column);
983999
} else {
9841000
neededNames = new Set(meta.columns.map(c => c.name));
9851001
}

0 commit comments

Comments
 (0)