Skip to content

Commit d024b4d

Browse files
committed
fix: 6 correctness bugs — COUNT(col) nulls, merge offset, GROUP BY types, Welford variance, pipeStages
- sql/executor.ts: COUNT(column) now excludes nulls (was counting all rows) - merge.ts: apply offset in unsorted multi-fragment merge path (was ignored) - partial-agg.ts: GROUP BY keys use null sentinel to distinguish null from "" - partial-agg.ts: finalize restores numeric types for group-by column values - partial-agg.ts: Welford's online algorithm for variance/stddev (was naive sum-of-squares with catastrophic cancellation) - partial-agg.ts: parallel Welford merge (delta^2 * nA * nB / (nA + nB)) - operators.ts: buildPipeline now applies pipeStages (was only in buildEdgePipeline)
1 parent 5489350 commit d024b4d

File tree

4 files changed

+56
-30
lines changed

4 files changed

+56
-30
lines changed

src/merge.ts

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -158,19 +158,14 @@ export function mergeQueryResults(
158158
};
159159
}
160160

161-
// Unsorted: limit-aware concat with early termination
161+
// Unsorted: concat with offset + limit
162162
let rows: Row[];
163-
if (query.limit) {
164-
rows = [];
165-
for (const p of partials) {
166-
for (const row of p.rows) {
167-
rows.push(row);
168-
if (rows.length >= query.limit) break;
169-
}
170-
if (rows.length >= query.limit) break;
171-
}
163+
const allRows = partials.flatMap((p) => p.rows);
164+
const off = query.offset ?? 0;
165+
if (off > 0 || query.limit !== undefined) {
166+
rows = allRows.slice(off, query.limit !== undefined ? off + query.limit : undefined);
172167
} else {
173-
rows = partials.flatMap((p) => p.rows);
168+
rows = allRows;
174169
}
175170

176171
return {

src/operators.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2902,6 +2902,13 @@ export function buildPipeline(
29022902
pipeline = new ComputedColumnOperator(pipeline, query.computedColumns);
29032903
}
29042904

2905+
// User-injected pipe stages (inserted after filter/computed, before agg/sort)
2906+
if (query.pipeStages) {
2907+
for (const stage of query.pipeStages) {
2908+
pipeline = stage(pipeline);
2909+
}
2910+
}
2911+
29052912
// Window functions
29062913
if (query.windows && query.windows.length > 0) {
29072914
pipeline = new WindowOperator(pipeline, query.windows);

src/partial-agg.ts

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ export interface PartialAggState {
2525
count: number;
2626
min: number;
2727
max: number;
28-
/** For stddev/variance: sum of squared values (Welford's online algorithm) */
29-
sumSq?: number;
28+
/** For stddev/variance: Welford's M2 = sum of squared deviations from mean */
29+
m2?: number;
3030
/** For median/percentile: collected values (exact mode) */
3131
values?: number[];
3232
/** For count_distinct: set of seen values */
@@ -41,7 +41,7 @@ export function initPartialAggState(
4141
opts?: { percentileTarget?: number },
4242
): PartialAggState {
4343
const state: PartialAggState = { fn, column, sum: 0, count: 0, min: Infinity, max: -Infinity };
44-
if (fn === "stddev" || fn === "variance") state.sumSq = 0;
44+
if (fn === "stddev" || fn === "variance") state.m2 = 0;
4545
if (fn === "median" || fn === "percentile") {
4646
state.values = [];
4747
if (fn === "percentile" && opts?.percentileTarget !== undefined) {
@@ -57,11 +57,20 @@ export function updatePartialAgg(
5757
value: number,
5858
rawValue?: unknown,
5959
): void {
60-
state.sum += value;
61-
state.count++;
60+
if (state.m2 !== undefined) {
61+
// Welford's online algorithm — compute before updating sum/count
62+
const oldMean = state.count > 0 ? state.sum / state.count : 0;
63+
const delta = value - oldMean;
64+
state.sum += value;
65+
state.count++;
66+
const newMean = state.sum / state.count;
67+
state.m2 += delta * (value - newMean);
68+
} else {
69+
state.sum += value;
70+
state.count++;
71+
}
6272
if (value < state.min) state.min = value;
6373
if (value > state.max) state.max = value;
64-
if (state.sumSq !== undefined) state.sumSq += value * value;
6574
if (state.values !== undefined) state.values.push(value);
6675
if (state.distinctSet !== undefined) {
6776
state.distinctSet.add(rawValue !== undefined ? String(rawValue) : String(value));
@@ -84,14 +93,11 @@ function resolveValue(state: PartialAggState): number {
8493
return state.distinctSet?.size ?? 0;
8594
case "stddev": {
8695
if (state.count < 2) return 0;
87-
const mean = state.sum / state.count;
88-
const variance = (state.sumSq ?? 0) / state.count - mean * mean;
89-
return Math.sqrt(Math.max(0, variance));
96+
return Math.sqrt(Math.max(0, (state.m2 ?? 0) / state.count));
9097
}
9198
case "variance": {
9299
if (state.count < 2) return 0;
93-
const mean = state.sum / state.count;
94-
return (state.sumSq ?? 0) / state.count - mean * mean;
100+
return (state.m2 ?? 0) / state.count;
95101
}
96102
case "median": {
97103
const vals = state.values ?? [];
@@ -157,7 +163,8 @@ export function computePartialAgg(
157163
let key = "";
158164
for (let g = 0; g < groupCols.length; g++) {
159165
if (g > 0) key += "\x00";
160-
key += String(row[groupCols[g]] ?? "");
166+
const v = row[groupCols[g]];
167+
key += v === null || v === undefined ? "\x01NULL\x01" : String(v);
161168
}
162169
let states = groups.get(key);
163170
if (!states) {
@@ -226,7 +233,8 @@ export function computePartialAggColumnar(
226233
for (let g = 0; g < groupCols.length; g++) {
227234
if (g > 0) key += "\x00";
228235
const vals = batch.columns.get(groupCols[g]);
229-
key += String(vals ? (vals[idx] ?? "") : "");
236+
const v = vals ? vals[idx] : null;
237+
key += v === null || v === undefined ? "\x01NULL\x01" : String(v);
230238
}
231239
let states = groups.get(key);
232240
if (!states) {
@@ -261,14 +269,20 @@ function mergeStates(
261269
source: PartialAggState[],
262270
): void {
263271
for (let i = 0; i < target.length; i++) {
272+
// Parallel Welford merge for m2 (must be computed before sum/count merge)
273+
if (target[i].m2 !== undefined && source[i].m2 !== undefined) {
274+
const nA = target[i].count, nB = source[i].count;
275+
if (nA > 0 && nB > 0) {
276+
const delta = (source[i].sum / nB) - (target[i].sum / nA);
277+
target[i].m2 = target[i].m2! + source[i].m2! + delta * delta * nA * nB / (nA + nB);
278+
} else {
279+
target[i].m2! += source[i].m2!;
280+
}
281+
}
264282
target[i].sum += source[i].sum;
265283
target[i].count += source[i].count;
266284
if (source[i].min < target[i].min) target[i].min = source[i].min;
267285
if (source[i].max > target[i].max) target[i].max = source[i].max;
268-
// Extended aggregate state merge
269-
if (target[i].sumSq !== undefined && source[i].sumSq !== undefined) {
270-
target[i].sumSq! += source[i].sumSq!;
271-
}
272286
if (target[i].values !== undefined && source[i].values !== undefined) {
273287
target[i].values!.push(...source[i].values!);
274288
}
@@ -329,7 +343,14 @@ export function finalizePartialAgg(
329343
const row: Row = {};
330344
const keyParts = key.split("\x00");
331345
for (let i = 0; i < groupCols.length; i++) {
332-
row[groupCols[i]] = keyParts[i];
346+
const part = keyParts[i];
347+
if (part === "\x01NULL\x01") {
348+
row[groupCols[i]] = null;
349+
} else {
350+
// Attempt to restore numeric types
351+
const num = Number(part);
352+
row[groupCols[i]] = part !== "" && !isNaN(num) && String(num) === part ? num : part;
353+
}
333354
}
334355
for (let i = 0; i < states.length; i++) {
335356
const alias = aliasFor(aggregates[i]);

src/sql/executor.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,10 @@ function aggregate(rows: Row[], aggregates: AggregateOp[], groupBy?: string[]):
182182
}
183183

184184
function computeAgg(rows: Row[], agg: AggregateOp): number | bigint | string | boolean | Float32Array | null {
185-
if (agg.fn === "count") return rows.length;
185+
if (agg.fn === "count") {
186+
if (agg.column === "*") return rows.length;
187+
return rows.filter(r => r[agg.column] != null).length;
188+
}
186189

187190
const values: number[] = [];
188191
const seen = new Set<string>();

0 commit comments

Comments
 (0)