Skip to content

Commit dbd9097

Browse files
committed
fix: 3 correctness bugs — empty min/max Infinity, MaterializedExecutor missing aggs; deduplicate pipeline/agg/LIKE code (-136 lines)
Correctness: - partial-agg resolveValue: return 0 for min/max when count=0 (was Infinity/-Infinity) - WasmAggregateOperator: return null for min/max when count=0 - MaterializedExecutor: add count_distinct, stddev, variance, median, percentile (was silently null) Code reduction: - Extract assemblePipeline() shared by buildPipeline/buildEdgePipeline (-60 lines) - Extract feedAggStates()/initStates() helpers in partial-agg (-45 lines) - Deduplicate LIKE regex: evaluator.ts reuses compileLikeRegex from decode.ts (-15 lines) - Deduplicate identityIndices: partial-agg imports from operators (-10 lines)
1 parent 6f0a9b4 commit dbd9097

File tree

5 files changed

+100
-236
lines changed

5 files changed

+100
-236
lines changed

src/client.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,12 @@ export class MaterializedExecutor implements QueryExecutor {
942942
if (agg.fn === "count") {
943943
if (agg.column === "*") { out[alias] = bucket.length; }
944944
else { let cnt = 0; for (const r of bucket) if (r[agg.column] != null) cnt++; out[alias] = cnt; }
945+
} else if (agg.fn === "count_distinct") {
946+
const seen = new Set<string>();
947+
for (const r of bucket) { const v = r[agg.column]; if (v != null) seen.add(String(v)); }
948+
out[alias] = seen.size;
945949
} else {
950+
const values: number[] = [];
946951
let sum = 0, count = 0, min = Infinity, max = -Infinity;
947952
for (const r of bucket) {
948953
const v = r[agg.column];
@@ -952,13 +957,37 @@ export class MaterializedExecutor implements QueryExecutor {
952957
sum += n; count++;
953958
if (n < min) min = n;
954959
if (n > max) max = n;
960+
if (agg.fn === "stddev" || agg.fn === "variance" || agg.fn === "median" || agg.fn === "percentile") values.push(n);
955961
}
956962
if (count === 0) { out[alias] = null; continue; }
957963
switch (agg.fn) {
958964
case "sum": out[alias] = sum; break;
959965
case "avg": out[alias] = sum / count; break;
960966
case "min": out[alias] = min; break;
961967
case "max": out[alias] = max; break;
968+
case "stddev": {
969+
const mean = sum / count;
970+
let sq = 0; for (const v of values) sq += (v - mean) ** 2;
971+
out[alias] = Math.sqrt(sq / count); break;
972+
}
973+
case "variance": {
974+
const mean = sum / count;
975+
let sq = 0; for (const v of values) sq += (v - mean) ** 2;
976+
out[alias] = sq / count; break;
977+
}
978+
case "median": {
979+
values.sort((a, b) => a - b);
980+
const mid = Math.floor(count / 2);
981+
out[alias] = count % 2 ? values[mid] : (values[mid - 1] + values[mid]) / 2; break;
982+
}
983+
case "percentile": {
984+
if (agg.percentileTarget === undefined) { out[alias] = null; break; }
985+
values.sort((a, b) => a - b);
986+
const p = agg.percentileTarget;
987+
const idx = p * (count - 1);
988+
const lo = Math.floor(idx), hi = Math.ceil(idx);
989+
out[alias] = lo === hi ? values[lo] : values[lo] + (values[hi] - values[lo]) * (idx - lo); break;
990+
}
962991
default: out[alias] = null;
963992
}
964993
}

src/decode.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ function getInSet(values: readonly (number | bigint | string)[]): Set<number | b
529529
/** Cache compiled LIKE regexes — avoids re-compilation per row. */
530530
const likeRegexCache = new Map<string, RegExp>();
531531

532-
function compileLikeRegex(pattern: string): RegExp {
532+
export function compileLikeRegex(pattern: string): RegExp {
533533
let cached = likeRegexCache.get(pattern);
534534
if (cached) return cached;
535535
// Escape regex metacharacters, then replace SQL wildcards

src/operators.ts

Lines changed: 25 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const _textEncoder = new TextEncoder();
1818

1919
/** Cached identity index arrays to avoid repeated allocations on hot paths. */
2020
const _identityCache = new Map<number, Uint32Array>();
21-
function identityIndices(n: number): Uint32Array {
21+
export function identityIndices(n: number): Uint32Array {
2222
let cached = _identityCache.get(n);
2323
if (cached) return cached;
2424
cached = new Uint32Array(n);
@@ -2056,8 +2056,8 @@ export class WasmAggregateOperator implements Operator {
20562056
switch (agg.fn) {
20572057
case "sum": row[alias] = acc[i].sum; break;
20582058
case "avg": row[alias] = acc[i].count === 0 ? 0 : acc[i].sum / acc[i].count; break;
2059-
case "min": row[alias] = acc[i].min; break;
2060-
case "max": row[alias] = acc[i].max; break;
2059+
case "min": row[alias] = acc[i].count === 0 ? null : acc[i].min; break;
2060+
case "max": row[alias] = acc[i].count === 0 ? null : acc[i].max; break;
20612061
case "count": row[alias] = acc[i].count; break;
20622062
}
20632063
}
@@ -2902,88 +2902,7 @@ export function buildPipeline(
29022902
}
29032903

29042904
const scan = new ScanOperator(fragments, query, wasm, /* applyFilters */ true);
2905-
let pipeline: Operator = scan;
2906-
const memBudget = options?.memoryBudgetBytes ?? DEFAULT_MEMORY_BUDGET;
2907-
2908-
// Filter — skip if ScanOperator already applies filters via WASM SIMD
2909-
const hasFilters = query.filters.length > 0 || (query.filterGroups && query.filterGroups.length > 0);
2910-
if (hasFilters && !scan.filtersApplied) {
2911-
pipeline = new FilterOperator(pipeline, query.filters, query.filterGroups);
2912-
}
2913-
2914-
// SubqueryIn filters (pre-computed value sets)
2915-
if (query.subqueryIn) {
2916-
for (const sq of query.subqueryIn) {
2917-
pipeline = new SubqueryInOperator(pipeline, sq.column, sq.valueSet);
2918-
}
2919-
}
2920-
2921-
// Computed columns (in-process callbacks)
2922-
if (query.computedColumns) {
2923-
pipeline = new ComputedColumnOperator(pipeline, query.computedColumns);
2924-
}
2925-
2926-
// User-injected pipe stages (inserted after filter/computed, before agg/sort)
2927-
if (query.pipeStages) {
2928-
for (const stage of query.pipeStages) {
2929-
pipeline = stage(pipeline);
2930-
}
2931-
}
2932-
2933-
// Window functions
2934-
if (query.windows && query.windows.length > 0) {
2935-
pipeline = new WindowOperator(pipeline, query.windows);
2936-
}
2937-
2938-
// Distinct
2939-
if (query.distinct) {
2940-
pipeline = new DistinctOperator(pipeline, query.distinct);
2941-
}
2942-
2943-
const hasAgg = query.aggregates && query.aggregates.length > 0;
2944-
2945-
if (hasAgg) {
2946-
// Aggregate: consumes all filtered rows, produces aggregate output
2947-
pipeline = new AggregateOperator(pipeline, query);
2948-
2949-
// Sort/limit on aggregate output (aggregate output is small — always in-memory)
2950-
if (query.sortColumn && query.limit !== undefined) {
2951-
pipeline = new TopKOperator(
2952-
pipeline, query.sortColumn, query.sortDirection === "desc",
2953-
query.limit, query.offset ?? 0,
2954-
);
2955-
} else if (query.sortColumn) {
2956-
pipeline = new InMemorySortOperator(
2957-
pipeline, query.sortColumn, query.sortDirection === "desc",
2958-
query.offset ?? 0,
2959-
);
2960-
} else if (query.offset || query.limit !== undefined) {
2961-
pipeline = new LimitOperator(pipeline, query.limit ?? Infinity, query.offset ?? 0);
2962-
}
2963-
} else if (query.sortColumn) {
2964-
// Sort path
2965-
if (query.limit !== undefined) {
2966-
pipeline = new TopKOperator(
2967-
pipeline, query.sortColumn, query.sortDirection === "desc",
2968-
query.limit, query.offset ?? 0,
2969-
);
2970-
} else {
2971-
// Full sort without limit — use external sort to handle TB+ data
2972-
pipeline = new ExternalSortOperator(
2973-
pipeline, query.sortColumn, query.sortDirection === "desc",
2974-
query.offset ?? 0, memBudget, options?.spill,
2975-
);
2976-
}
2977-
} else if (query.offset || query.limit !== undefined) {
2978-
// No sort — streaming limit with offset
2979-
pipeline = new LimitOperator(pipeline, query.limit ?? Infinity, query.offset ?? 0);
2980-
}
2981-
2982-
// Project — strip extra columns at the end
2983-
if (outputColumns.length > 0) {
2984-
pipeline = new ProjectOperator(pipeline, outputColumns);
2985-
}
2986-
2905+
const pipeline = assemblePipeline(scan, query, outputColumns, scan.filtersApplied, options);
29872906
return { pipeline, scan };
29882907
}
29892908

@@ -2997,43 +2916,51 @@ export function buildEdgePipeline(
29972916
outputColumns: string[],
29982917
options?: PipelineOptions,
29992918
): Operator {
3000-
let pipeline: Operator = scan;
2919+
const filtersApplied = "filtersApplied" in scan && (scan as { filtersApplied: boolean }).filtersApplied;
2920+
// Skip projection when aggregation is active — aggregate output columns
2921+
// (aliases like "count_*", "sum_value") don't match the original table columns,
2922+
// so projecting to table columns would strip all aggregate results.
2923+
const hasAgg = query.aggregates && query.aggregates.length > 0;
2924+
const projectCols = hasAgg ? [] : outputColumns;
2925+
return assemblePipeline(scan, query, projectCols, filtersApplied, options);
2926+
}
2927+
2928+
/** Shared pipeline assembly: filter → subquery → computed → pipe → window → distinct → agg → sort → project */
2929+
function assemblePipeline(
2930+
source: Operator,
2931+
query: QueryDescriptor,
2932+
outputColumns: string[],
2933+
filtersApplied: boolean,
2934+
options?: PipelineOptions,
2935+
): Operator {
2936+
let pipeline: Operator = source;
30012937
const memBudget = options?.memoryBudgetBytes ?? DEFAULT_MEMORY_BUDGET;
30022938

3003-
// Skip FilterOperator if the scan operator already applies filters
3004-
// (e.g. EdgeScanOperator Lance path uses WASM SQL with WHERE,
3005-
// EdgeScanOperator Parquet path now uses WASM executeQuery with filters)
3006-
const scanHandlesFilters = "filtersApplied" in scan && (scan as { filtersApplied: boolean }).filtersApplied;
3007-
const edgeHasFilters = query.filters.length > 0 || (query.filterGroups && query.filterGroups.length > 0);
3008-
if (edgeHasFilters && !scanHandlesFilters) {
2939+
const hasFilters = query.filters.length > 0 || (query.filterGroups && query.filterGroups.length > 0);
2940+
if (hasFilters && !filtersApplied) {
30092941
pipeline = new FilterOperator(pipeline, query.filters, query.filterGroups);
30102942
}
30112943

3012-
// SubqueryIn filters
30132944
if (query.subqueryIn) {
30142945
for (const sq of query.subqueryIn) {
30152946
pipeline = new SubqueryInOperator(pipeline, sq.column, sq.valueSet);
30162947
}
30172948
}
30182949

3019-
// Computed columns
30202950
if (query.computedColumns) {
30212951
pipeline = new ComputedColumnOperator(pipeline, query.computedColumns);
30222952
}
30232953

3024-
// User-injected pipe stages (inserted after filter/computed, before agg/sort)
30252954
if (query.pipeStages) {
30262955
for (const stage of query.pipeStages) {
30272956
pipeline = stage(pipeline);
30282957
}
30292958
}
30302959

3031-
// Window functions
30322960
if (query.windows && query.windows.length > 0) {
30332961
pipeline = new WindowOperator(pipeline, query.windows);
30342962
}
30352963

3036-
// Distinct
30372964
if (query.distinct) {
30382965
pipeline = new DistinctOperator(pipeline, query.distinct);
30392966
}
@@ -3071,10 +2998,7 @@ export function buildEdgePipeline(
30712998
pipeline = new LimitOperator(pipeline, query.limit ?? Infinity, query.offset ?? 0);
30722999
}
30733000

3074-
// Skip projection when aggregation is active — aggregate output columns
3075-
// (aliases like "count_*", "sum_value") don't match the original table columns,
3076-
// so projecting to table columns would strip all aggregate results.
3077-
if (!hasAgg && outputColumns.length > 0) {
3001+
if (outputColumns.length > 0) {
30783002
pipeline = new ProjectOperator(pipeline, outputColumns);
30793003
}
30803004

0 commit comments

Comments
 (0)