Skip to content

Commit c6a84b5

Browse files
committed
fix: int64 precision loss in WasmAggregateOperator + Welford merge
WasmAggregateOperator: SUM/MIN/MAX on int64 columns cast WASM bigint results to Number(), silently losing precision above 2^53. Now uses native bigint accumulators (bigSum/bigMin/bigMax) and preserves bigint in the output row. AVG returns Number since the result is floating-point. Welford merge (partial-agg.ts): cross-shard STDDEV/VARIANCE on int64 columns used state.sum for mean calculation, but bigint columns only update state.bigSum (sum stays 0). Delta was always 0, making the correction term vanish. Now uses Number(bigSum) when available.
1 parent 2c0a461 commit c6a84b5

File tree

2 files changed

+17
-13
lines changed

2 files changed

+17
-13
lines changed

src/operators.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1962,7 +1962,7 @@ export class WasmAggregateOperator implements Operator {
19621962
const filterGroups = this.query.filterGroups;
19631963
const hasFilters = filters.length > 0 || (filterGroups && filterGroups.length > 0);
19641964
// Accumulator per aggregate
1965-
const acc: { sum: number; count: number; min: number; max: number }[] =
1965+
const acc: { sum: number; count: number; min: number; max: number; bigSum?: bigint; bigMin?: bigint; bigMax?: bigint }[] =
19661966
aggregates.map(() => ({ sum: 0, count: 0, min: Infinity, max: -Infinity }));
19671967

19681968
const scanStart = Date.now();
@@ -2067,9 +2067,9 @@ export class WasmAggregateOperator implements Operator {
20672067
if (agg.fn === "min") { const v = this.wasm.minFloat64(buf); if (v < acc[ai].min) acc[ai].min = v; }
20682068
if (agg.fn === "max") { const v = this.wasm.maxFloat64(buf); if (v > acc[ai].max) acc[ai].max = v; }
20692069
} else if (col.dtype === "int64") {
2070-
if (agg.fn === "sum" || agg.fn === "avg") acc[ai].sum += Number(this.wasm.sumInt64(buf));
2071-
if (agg.fn === "min") { const v = Number(this.wasm.minInt64(buf)); if (v < acc[ai].min) acc[ai].min = v; }
2072-
if (agg.fn === "max") { const v = Number(this.wasm.maxInt64(buf)); if (v > acc[ai].max) acc[ai].max = v; }
2070+
if (agg.fn === "sum" || agg.fn === "avg") acc[ai].bigSum = (acc[ai].bigSum ?? 0n) + this.wasm.sumInt64(buf);
2071+
if (agg.fn === "min") { const v = this.wasm.minInt64(buf); if (acc[ai].bigMin === undefined || v < acc[ai].bigMin!) acc[ai].bigMin = v; }
2072+
if (agg.fn === "max") { const v = this.wasm.maxInt64(buf); if (acc[ai].bigMax === undefined || v > acc[ai].bigMax!) acc[ai].bigMax = v; }
20732073
}
20742074
} else {
20752075
// Filtered: use indexed aggregates on matching rows only
@@ -2083,9 +2083,9 @@ export class WasmAggregateOperator implements Operator {
20832083
if (agg.fn === "min") { const v = this.wasm.exports.minFloat64Indexed(dataPtr, indicesPtr, matchCount); if (v < acc[ai].min) acc[ai].min = v; }
20842084
if (agg.fn === "max") { const v = this.wasm.exports.maxFloat64Indexed(dataPtr, indicesPtr, matchCount); if (v > acc[ai].max) acc[ai].max = v; }
20852085
} else if (col.dtype === "int64") {
2086-
if (agg.fn === "sum" || agg.fn === "avg") acc[ai].sum += Number(this.wasm.exports.sumInt64Indexed(dataPtr, indicesPtr, matchCount));
2087-
if (agg.fn === "min") { const v = Number(this.wasm.exports.minInt64Indexed(dataPtr, indicesPtr, matchCount)); if (v < acc[ai].min) acc[ai].min = v; }
2088-
if (agg.fn === "max") { const v = Number(this.wasm.exports.maxInt64Indexed(dataPtr, indicesPtr, matchCount)); if (v > acc[ai].max) acc[ai].max = v; }
2086+
if (agg.fn === "sum" || agg.fn === "avg") acc[ai].bigSum = (acc[ai].bigSum ?? 0n) + this.wasm.exports.sumInt64Indexed(dataPtr, indicesPtr, matchCount);
2087+
if (agg.fn === "min") { const v = this.wasm.exports.minInt64Indexed(dataPtr, indicesPtr, matchCount); if (acc[ai].bigMin === undefined || v < acc[ai].bigMin!) acc[ai].bigMin = v; }
2088+
if (agg.fn === "max") { const v = this.wasm.exports.maxInt64Indexed(dataPtr, indicesPtr, matchCount); if (acc[ai].bigMax === undefined || v > acc[ai].bigMax!) acc[ai].bigMax = v; }
20892089
}
20902090
}
20912091
}
@@ -2098,12 +2098,14 @@ export class WasmAggregateOperator implements Operator {
20982098
for (let i = 0; i < aggregates.length; i++) {
20992099
const agg = aggregates[i];
21002100
const alias = agg.alias ?? `${agg.fn}_${agg.column}`;
2101+
const a = acc[i];
2102+
const hasBig = a.bigSum !== undefined || a.bigMin !== undefined || a.bigMax !== undefined;
21012103
switch (agg.fn) {
2102-
case "sum": row[alias] = acc[i].count === 0 ? null : acc[i].sum; break;
2103-
case "avg": row[alias] = acc[i].count === 0 ? null : acc[i].sum / acc[i].count; break;
2104-
case "min": row[alias] = acc[i].count === 0 ? null : acc[i].min; break;
2105-
case "max": row[alias] = acc[i].count === 0 ? null : acc[i].max; break;
2106-
case "count": row[alias] = acc[i].count; break;
2104+
case "sum": row[alias] = a.count === 0 ? null : (hasBig ? a.bigSum! : a.sum); break;
2105+
case "avg": row[alias] = a.count === 0 ? null : (hasBig ? Number(a.bigSum!) / a.count : a.sum / a.count); break;
2106+
case "min": row[alias] = a.count === 0 ? null : (a.bigMin !== undefined ? a.bigMin : a.min); break;
2107+
case "max": row[alias] = a.count === 0 ? null : (a.bigMax !== undefined ? a.bigMax : a.max); break;
2108+
case "count": row[alias] = a.count; break;
21072109
}
21082110
}
21092111

src/partial-agg.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,9 @@ export function mergeStates(
222222
if (target[i].m2 !== undefined && source[i].m2 !== undefined) {
223223
const nA = target[i].count, nB = source[i].count;
224224
if (nA > 0 && nB > 0) {
225-
const delta = (source[i].sum / nB) - (target[i].sum / nA);
225+
const meanA = target[i].bigSum !== undefined ? Number(target[i].bigSum!) / nA : target[i].sum / nA;
226+
const meanB = source[i].bigSum !== undefined ? Number(source[i].bigSum!) / nB : source[i].sum / nB;
227+
const delta = meanB - meanA;
226228
target[i].m2 = target[i].m2! + source[i].m2! + delta * delta * nA * nB / (nA + nB);
227229
} else {
228230
target[i].m2! += source[i].m2!;

0 commit comments

Comments
 (0)