Skip to content

Commit d1094d6

Browse files
committed
fix: window function wrong column, partition catalog false negatives
- Add optional `column` field to WindowSpec for lag/lead/aggregate target column. Falls back to orderBy[0].column for backwards compat. - Fix partition catalog eq/in lookups on range data (min≠max) — these could falsely prune fragments containing matching rows. Now returns null (skip catalog) for non-exact-partition data, falling back to min/max pruning which is always correct. - Persist exactPartition flag in catalog serialization - Add 3 tests for range-data catalog safety
1 parent 900d44f commit d1094d6

File tree

4 files changed

+61
-8
lines changed

4 files changed

+61
-8
lines changed

src/operators.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -973,10 +973,11 @@ export class WindowOperator implements Operator {
973973
case "lag": {
974974
const offset = win.args?.offset ?? 1;
975975
const defaultVal = win.args?.default_ ?? null;
976+
const lagCol = win.column ?? (orderBy.length > 0 ? orderBy[0].column : "");
976977
for (let i = 0; i < indices.length; i++) {
977978
const srcIdx = i - offset;
978-
if (srcIdx >= 0 && srcIdx < indices.length && orderBy.length > 0) {
979-
rows[indices[i]][alias] = rows[indices[srcIdx]][orderBy[0].column] as Row[string];
979+
if (srcIdx >= 0 && srcIdx < indices.length && lagCol) {
980+
rows[indices[i]][alias] = rows[indices[srcIdx]][lagCol] as Row[string];
980981
} else {
981982
rows[indices[i]][alias] = defaultVal as Row[string];
982983
}
@@ -987,10 +988,11 @@ export class WindowOperator implements Operator {
987988
case "lead": {
988989
const offset = win.args?.offset ?? 1;
989990
const defaultVal = win.args?.default_ ?? null;
991+
const leadCol = win.column ?? (orderBy.length > 0 ? orderBy[0].column : "");
990992
for (let i = 0; i < indices.length; i++) {
991993
const srcIdx = i + offset;
992-
if (srcIdx >= 0 && srcIdx < indices.length && orderBy.length > 0) {
993-
rows[indices[i]][alias] = rows[indices[srcIdx]][orderBy[0].column] as Row[string];
994+
if (srcIdx >= 0 && srcIdx < indices.length && leadCol) {
995+
rows[indices[i]][alias] = rows[indices[srcIdx]][leadCol] as Row[string];
994996
} else {
995997
rows[indices[i]][alias] = defaultVal as Row[string];
996998
}
@@ -999,7 +1001,7 @@ export class WindowOperator implements Operator {
9991001
}
10001002

10011003
case "sum": case "avg": case "min": case "max": case "count": {
1002-
const col = orderBy.length > 0 ? orderBy[0].column : "";
1004+
const col = win.column ?? (orderBy.length > 0 ? orderBy[0].column : "");
10031005
this.applyAggregateWindow(rows, indices, win, col);
10041006
break;
10051007
}

src/partition-catalog.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,41 @@ describe("PartitionCatalog", () => {
161161
});
162162
});
163163

164+
describe("prune — non-exact partition (range data)", () => {
165+
it("returns null for eq on range data (prevents false negatives)", () => {
166+
// min=1, max=100 — value 50 exists but isn't indexed
167+
const fragments = new Map<number, TableMeta>([
168+
makeFragmentMeta(1, "id", 1, 100),
169+
makeFragmentMeta(2, "id", 101, 200),
170+
]);
171+
const catalog = PartitionCatalog.fromFragments("id", fragments);
172+
const result = catalog.prune([{ column: "id", op: "eq", value: 50 }]);
173+
// Must return null (can't safely prune), not [] (no fragments)
174+
expect(result).toBeNull();
175+
});
176+
177+
it("returns null for in on range data", () => {
178+
const fragments = new Map<number, TableMeta>([
179+
makeFragmentMeta(1, "id", 1, 100),
180+
makeFragmentMeta(2, "id", 101, 200),
181+
]);
182+
const catalog = PartitionCatalog.fromFragments("id", fragments);
183+
const result = catalog.prune([{ column: "id", op: "in", value: [50, 150] }]);
184+
expect(result).toBeNull();
185+
});
186+
187+
it("neq still works on range data (conservative — includes all non-excluded)", () => {
188+
const fragments = new Map<number, TableMeta>([
189+
makeFragmentMeta(1, "id", 1, 100),
190+
makeFragmentMeta(2, "id", 101, 200),
191+
]);
192+
const catalog = PartitionCatalog.fromFragments("id", fragments);
193+
// neq is safe even for range data — worst case is slightly over-inclusive
194+
const result = catalog.prune([{ column: "id", op: "neq", value: 1 }]);
195+
expect(result).not.toBeNull();
196+
});
197+
});
198+
164199
describe("prune — non-partition column", () => {
165200
it("returns null when filter is on a different column", () => {
166201
const catalog = buildCatalog();

src/partition-catalog.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ export class PartitionCatalog {
3333
private allFragmentIds: number[] = [];
3434
/** O(1) dedup during construction/registration. */
3535
private allFragmentIdSet = new Set<number>();
36+
/** True when every indexed page has min===max (Hive-style partitioning).
37+
* Only exact-partition catalogs are safe for eq/in lookups. */
38+
private exactPartition = true;
3639

3740
constructor(column: string) {
3841
this.column = column;
@@ -53,6 +56,11 @@ export class PartitionCatalog {
5356
for (const page of col.pages) {
5457
if (page.minValue !== undefined) values.add(String(page.minValue));
5558
if (page.maxValue !== undefined) values.add(String(page.maxValue));
59+
// If min !== max on any page, this isn't exact-partition data
60+
if (page.minValue !== undefined && page.maxValue !== undefined &&
61+
String(page.minValue) !== String(page.maxValue)) {
62+
catalog.exactPartition = false;
63+
}
5664
}
5765

5866
for (const v of values) {
@@ -116,10 +124,14 @@ export class PartitionCatalog {
116124
private matchFilter(filter: FilterOp): number[] | null {
117125
switch (filter.op) {
118126
case "eq": {
127+
// Only safe for exact-partition data (min===max per page).
128+
// For range data, a value between min and max wouldn't be indexed.
129+
if (!this.exactPartition) return null;
119130
const entry = this.index.get(String(filter.value));
120131
return entry ?? [];
121132
}
122133
case "in": {
134+
if (!this.exactPartition) return null;
123135
if (!Array.isArray(filter.value)) return null;
124136
const ids = new Set<number>();
125137
for (const v of filter.value) {
@@ -149,19 +161,21 @@ export class PartitionCatalog {
149161
}
150162

151163
/** Serialize to a plain object for DO durable storage. */
152-
serialize(): { column: string; index: Record<string, number[]>; allFragmentIds: number[] } {
164+
serialize(): { column: string; index: Record<string, number[]>; allFragmentIds: number[]; exactPartition: boolean } {
153165
const index: Record<string, number[]> = {};
154166
for (const [key, ids] of this.index) index[key] = ids;
155-
return { column: this.column, index, allFragmentIds: this.allFragmentIds };
167+
return { column: this.column, index, allFragmentIds: this.allFragmentIds, exactPartition: this.exactPartition };
156168
}
157169

158170
/** Restore from serialized form. */
159-
static deserialize(data: { column: string; index: Record<string, number[]>; allFragmentIds: number[] }): PartitionCatalog {
171+
static deserialize(data: { column: string; index: Record<string, number[]>; allFragmentIds: number[]; exactPartition?: boolean }): PartitionCatalog {
160172
const catalog = new PartitionCatalog(data.column);
161173
for (const [key, ids] of Object.entries(data.index)) {
162174
catalog.index.set(key, ids);
163175
}
164176
catalog.allFragmentIds = data.allFragmentIds;
177+
catalog.allFragmentIdSet = new Set(data.allFragmentIds);
178+
catalog.exactPartition = data.exactPartition ?? true; // backwards-compat: old catalogs assumed exact
165179
return catalog;
166180
}
167181

src/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ export interface AggregateOp {
102102
/** Window function specification (serializable — no callbacks) */
103103
export interface WindowSpec {
104104
fn: "row_number" | "rank" | "dense_rank" | "lag" | "lead" | "sum" | "avg" | "min" | "max" | "count";
105+
/** Target column for lag/lead/sum/avg/min/max/count. Falls back to orderBy[0].column if not set. */
106+
column?: string;
105107
args?: { offset?: number; default_?: unknown };
106108
partitionBy: string[];
107109
orderBy: { column: string; direction: "asc" | "desc" }[];

0 commit comments

Comments
 (0)