21021: DataFrame API: allow aggregate functions in select() (#17874)#299
21021: DataFrame API: allow aggregate functions in select() (#17874)#299martin-augment wants to merge 11 commits intomainfrom
Conversation
WalkthroughThis pull request adds support for selecting aggregate expressions directly in DataFusion's DataFrame API and establishes repository rules for AI agent behavior during code reviews. The ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request enables aggregate functions to be used directly within the DataFrame::select method, which now automatically constructs a global aggregation plan when aggregates are detected. The changes include logic for detecting aggregate expressions, rewriting them to reference an internal aggregate stage, and validating that non-aggregate columns are not misused. Feedback points out that the current validation is incomplete and could allow unaggregated columns to pass if they are part of a larger expression. Additionally, an optimization was suggested to avoid redundant alias nodes, and the inefficiency of repeated expression tree traversals was noted.
| // Check for non-aggregate expressions | ||
| let has_non_aggregate_expr = expr_list.iter().any(|e| match e { | ||
| SelectExpr::Expression(expr) => { | ||
| find_aggregate_exprs(std::iter::once(expr)).is_empty() | ||
| } | ||
| SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _) => true, | ||
| }); |
There was a problem hiding this comment.
The check for non-aggregate expressions is incomplete. It only verifies if an expression contains an aggregate function, but it doesn't ensure that all column references within that expression are properly aggregated. For example, select(vec![sum(col("a")) + col("b")]) would incorrectly pass this check because the expression contains an aggregate, even though col("b") is unaggregated. This will lead to a "Column not found" error later in the planning process instead of a proper "must be in GROUP BY" error.
Additionally, calling find_aggregate_exprs in a loop is inefficient as it traverses the expression tree multiple times.
| let rewritten = rewrite_expr(expr, &aggr_map)?; | ||
| let name = | ||
| make_unique_name(base_alias, &mut projection_used_names, 1); | ||
| let final_expr = rewritten.alias(name); |
There was a problem hiding this comment.
This line can create redundant Alias nodes if the rewritten expression already has the desired name (e.g., if the original expression was already an Alias). While functionally correct, it adds unnecessary depth to the expression tree. Consider checking if the name already matches before applying the alias.
let final_expr = if rewritten.name_for_alias().as_ref() == Ok(&name) {
rewritten
} else {
rewritten.alias(name)
};There was a problem hiding this comment.
value:useful; category:bug; feedback: The Gemini AI reviewer is correct! If the expression already has the same alias then there is no need to set it second time. Another option would be to delete any previous alias and set only the new one.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| return plan_err!( | ||
| "Column in SELECT must be in GROUP BY or an aggregate function" | ||
| ); | ||
| } |
There was a problem hiding this comment.
Premature validation breaks aggregate-then-select chaining pattern
Medium Severity
The mixed aggregate/non-aggregate validation at has_non_aggregate_expr && !aggr_exprs.is_empty() fires before the matches!(plan, LogicalPlan::Aggregate(_)) fallback check. This means .aggregate().select() chains that re-use aggregate expressions alongside plain column references now error out, even though the plan is already aggregated and the select is just a projection. The old pattern df.aggregate(vec![col("c1")], vec![min(col("c2"))]).select(vec![col("c1"), min(col("c2")).alias("result")]) was previously valid but now rejects with a spurious "must be in GROUP BY" error. This is a public API regression — the Aggregate plan check needs to be evaluated before the mixed-expression validation.
Additional Locations (1)
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (3)
datafusion/proto/tests/cases/roundtrip_logical_plan.rs (1)
1197-1197: Movegrouping()into a valid grouped roundtrip case instead of leaving it disabled.This drops proto coverage for
grouping()entirely. The failure here is becausegrouping(lit(1))is invalid without grouping columns, so this should live in a small grouped aggregate test instead of as a permanent TODO. I can sketch that case if helpful.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/proto/tests/cases/roundtrip_logical_plan.rs` at line 1197, The test disabled grouping() because grouping(lit(1)) is invalid without grouping columns; fix by moving the grouping() case into a new small grouped-aggregate roundtrip test that includes at least one grouping column so resolve_grouping_function can succeed. Create a new case in roundtrip_logical_plan.rs that builds a grouped aggregate plan (e.g., an aggregate with grouping columns) and includes grouping(lit(1)) in the grouping set, mirror the expected proto/roundtrip assertions from other aggregate cases, and remove the commented-out grouping() line; reference grouping(), grouping(lit(1)), and resolve_grouping_function when locating where to add the grouped test.datafusion-examples/examples/dataframe/dataframe.rs (1)
282-294: Sort the displayed results in the new examples.These examples are meant to compare grouped and windowed aggregation, but the printed row order is planner-dependent right now. Adding an explicit sort by
car(andspeedfor the windowed forms) would make the output reproducible and much easier to compare.Also applies to: 308-361
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion-examples/examples/dataframe/dataframe.rs` around lines 282 - 294, The examples currently rely on planner-dependent row order; modify the DataFrame chains to append an explicit sort before show() so output is reproducible: for grouped aggregations (using df.aggregate(...) and df.select(...)) call sort(vec![col("car")]) before .show(), and for the windowed examples add sort(vec![col("car"), col("speed")]) (or equivalent sort by car then speed) before .show(); update all occurrences around the aggregate/select/avg(col("speed")) usages (and the windowed aggregation blocks) to include these sort(...) calls.datafusion/core/src/dataframe/mod.rs (1)
527-540: Double aliasing when user provides explicit alias.When a user provides an explicit alias (e.g.,
count(col("a")).alias("my_count")), the rewrite logic adds another alias on top of the already-aliased expression. Therewrite_exprpreserves the inner alias structure, then line 534 wraps it again withrewritten.alias(name).This creates a nested
Alias { Alias { ... } }structure that, while functionally correct, adds unnecessary plan complexity.💡 Consider stripping existing alias before re-aliasing
for select_expr in expr_list.into_iter() { match select_expr { SelectExpr::Expression(expr) => { let base_alias = expr.name_for_alias()?; let rewritten = rewrite_expr(expr, &aggr_map)?; let name = make_unique_name(base_alias, &mut projection_used_names, 1); - let final_expr = rewritten.alias(name); + // Strip existing alias if present to avoid double-aliasing + let final_expr = match rewritten { + Expr::Alias(Alias { expr: inner, .. }) => (*inner).alias(name), + other => other.alias(name), + }; rewritten_exprs.push(SelectExpr::Expression(final_expr)); } other => rewritten_exprs.push(other), } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/core/src/dataframe/mod.rs` around lines 527 - 540, The loop over SelectExpr::Expression double-wraps already-aliased expressions by calling rewrite_expr(...) which preserves an inner Alias and then always calling rewritten.alias(name); update the logic in this block to detect and unwrap an existing Alias on the rewritten expression (or otherwise preserve the user-provided alias) before applying the unique name via make_unique_name and rewritten.alias; specifically adjust handling around rewrite_expr, rewritten.alias and projection_used_names so you only add an outer alias when there is no existing alias (or reuse the inner alias name when present) to avoid creating nested Alias{Alias{...}} nodes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@datafusion/core/tests/dataframe/mod.rs`:
- Around line 6928-6939: The test is using df with a single relation so
SelectExpr::QualifiedWildcard("aggregate_test_100", ...) is identical to an
unqualified *; modify the test to make the qualifier meaningful by producing a
plan with two relations (e.g., perform a self-join or alias the dataframe and
join it to itself) so that selecting SelectExpr::QualifiedWildcard for one alias
yields a different column set than an unqualified wildcard; update the setup to
create aliased relations (via df.alias or similar) and use those aliases in the
QualifiedWildcard call and assertions (change the expected num_columns and total
rows accordingly) so a resolver that ignores qualifiers will fail.
- Around line 6985-7019: The test samples fixed offsets from batches[0] after
sorting only by c1, which assumes a single RecordBatch and unspecified
intra-partition order; update the pipeline so the final sort is deterministic
(e.g., sort by col("c1") plus a deterministic secondary key like "rn" or "c9" in
the select/sort chain where count_window_function, sum_window_function,
avg_window_function are used) and replace the hard-coded slices of batches[0]
with assertions over the full collected output (use res.collect() result: either
concatenate/flatten all RecordBatches or iterate all rows across batches to
locate group rows by c1 and assert cnt/sum_c9/avg_c9 values) instead of relying
on batch.slice indices. Ensure references to df.select(...), .sort(...),
collect(), and assert_batches_eq! are updated accordingly.
In `@datafusion/proto/tests/cases/roundtrip_logical_plan.rs`:
- Around line 1236-1260: The test currently only round-trips aggregates via
table.aggregate(...) and thus misses exercising the new aggregate-in-select
rewrite; modify the test to also build a DataFrame that selects expr_list (or
otherwise calls DataFrame::select() including agg_exprs) so the select-path
rewrite runs, then generate an optimized plan (e.g. call into_optimized_plan()
on that select result), serialize/deserialize it using logical_plan_to_bytes and
logical_plan_from_bytes with ctx.task_ctx(), and assert the round-tripped plan
string equals the original (similar to the existing non_agg and agg assertions).
Ensure you keep the existing plan_non_agg and plan_agg checks and add a new
plan_select_with_agg (or similarly named) check that references expr_list,
agg_exprs, table.select(), logical_plan_to_bytes, logical_plan_from_bytes, and
ctx.task_ctx().
---
Nitpick comments:
In `@datafusion-examples/examples/dataframe/dataframe.rs`:
- Around line 282-294: The examples currently rely on planner-dependent row
order; modify the DataFrame chains to append an explicit sort before show() so
output is reproducible: for grouped aggregations (using df.aggregate(...) and
df.select(...)) call sort(vec![col("car")]) before .show(), and for the windowed
examples add sort(vec![col("car"), col("speed")]) (or equivalent sort by car
then speed) before .show(); update all occurrences around the
aggregate/select/avg(col("speed")) usages (and the windowed aggregation blocks)
to include these sort(...) calls.
In `@datafusion/core/src/dataframe/mod.rs`:
- Around line 527-540: The loop over SelectExpr::Expression double-wraps
already-aliased expressions by calling rewrite_expr(...) which preserves an
inner Alias and then always calling rewritten.alias(name); update the logic in
this block to detect and unwrap an existing Alias on the rewritten expression
(or otherwise preserve the user-provided alias) before applying the unique name
via make_unique_name and rewritten.alias; specifically adjust handling around
rewrite_expr, rewritten.alias and projection_used_names so you only add an outer
alias when there is no existing alias (or reuse the inner alias name when
present) to avoid creating nested Alias{Alias{...}} nodes.
In `@datafusion/proto/tests/cases/roundtrip_logical_plan.rs`:
- Line 1197: The test disabled grouping() because grouping(lit(1)) is invalid
without grouping columns; fix by moving the grouping() case into a new small
grouped-aggregate roundtrip test that includes at least one grouping column so
resolve_grouping_function can succeed. Create a new case in
roundtrip_logical_plan.rs that builds a grouped aggregate plan (e.g., an
aggregate with grouping columns) and includes grouping(lit(1)) in the grouping
set, mirror the expected proto/roundtrip assertions from other aggregate cases,
and remove the commented-out grouping() line; reference grouping(),
grouping(lit(1)), and resolve_grouping_function when locating where to add the
grouped test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4241de7e-d819-46cc-9b4c-81ef72db6520
📒 Files selected for processing (6)
.cursor/rules.mdAGENTS.mddatafusion-examples/examples/dataframe/dataframe.rsdatafusion/core/src/dataframe/mod.rsdatafusion/core/tests/dataframe/mod.rsdatafusion/proto/tests/cases/roundtrip_logical_plan.rs
| let res = df.clone().select(vec![ | ||
| SelectExpr::QualifiedWildcard( | ||
| "aggregate_test_100".into(), | ||
| WildcardOptions::default(), | ||
| ), | ||
| lit(42).into(), | ||
| ])?; | ||
|
|
||
| let batches = res.collect().await?; | ||
| assert!(!batches.is_empty()); | ||
| assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 100); | ||
| assert!(batches.iter().all(|b| b.num_columns() == 14)); |
There was a problem hiding this comment.
The qualified-wildcard case is indistinguishable from * here.
Because df only has one relation, aggregate_test_100.* and unqualified * produce the same shape. A resolver bug that ignores the qualifier would still pass this test. Use a join or aliased self-join so the qualifier changes the projected columns.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@datafusion/core/tests/dataframe/mod.rs` around lines 6928 - 6939, The test is
using df with a single relation so
SelectExpr::QualifiedWildcard("aggregate_test_100", ...) is identical to an
unqualified *; modify the test to make the qualifier meaningful by producing a
plan with two relations (e.g., perform a self-join or alias the dataframe and
join it to itself) so that selecting SelectExpr::QualifiedWildcard for one alias
yields a different column set than an unqualified wildcard; update the setup to
create aliased relations (via df.alias or similar) and use those aliases in the
QualifiedWildcard call and assertions (change the expected num_columns and total
rows accordingly) so a resolver that ignores qualifiers will fail.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! If there is no JOIN/UNION between two or more tables then there is no difference between qualified and non-qualified wildcard and it could be "optimized" by the planning/display optimizers.
| let res = df | ||
| .clone() | ||
| .select(vec![ | ||
| col("c1"), | ||
| count_window_function.alias("cnt"), | ||
| sum_window_function.alias("sum_c9"), | ||
| avg_window_function.alias("avg_c9"), | ||
| ])? | ||
| .sort(vec![col("c1").sort(true, true)])?; | ||
|
|
||
| let batches = res.collect().await?; | ||
| assert!(!batches.is_empty()); | ||
| assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 100); | ||
| assert!(batches.iter().all(|b| b.num_columns() == 4)); | ||
| let batch = &batches[0]; | ||
| assert_batches_eq!( | ||
| &[ | ||
| "+----+-----+-------------+--------------------+", | ||
| "| c1 | cnt | sum_c9 | avg_c9 |", | ||
| "+----+-----+-------------+--------------------+", | ||
| "| a | 21 | 42619217323 | 2029486539.1904762 |", | ||
| "| b | 19 | 42365566310 | 2229766647.894737 |", | ||
| "| c | 21 | 46381998762 | 2208666607.714286 |", | ||
| "| d | 18 | 39910269981 | 2217237221.1666665 |", | ||
| "| e | 21 | 50812717684 | 2419653223.047619 |", | ||
| "+----+-----+-------------+--------------------+", | ||
| ], | ||
| &[ | ||
| batch.slice(0, 1), // a | ||
| batch.slice(21, 1), // b | ||
| batch.slice(40, 1), // c | ||
| batch.slice(61, 1), // d | ||
| batch.slice(79, 1) // e | ||
| ] | ||
| ); |
There was a problem hiding this comment.
These window assertions are tied to batch layout and unspecified row order.
The checks below sample batches[0] at hard-coded offsets, which assumes the sorted output stays in a single RecordBatch. In the ROW_NUMBER case, sorting only by c1 also does not guarantee rn = 1..n ordering inside each partition. Sort on c1 plus a deterministic secondary key (rn or c9) and assert over the full collected output instead of fixed slices.
Also applies to: 7039-7156, 7178-7207, 7238-7262
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@datafusion/core/tests/dataframe/mod.rs` around lines 6985 - 7019, The test
samples fixed offsets from batches[0] after sorting only by c1, which assumes a
single RecordBatch and unspecified intra-partition order; update the pipeline so
the final sort is deterministic (e.g., sort by col("c1") plus a deterministic
secondary key like "rn" or "c9" in the select/sort chain where
count_window_function, sum_window_function, avg_window_function are used) and
replace the hard-coded slices of batches[0] with assertions over the full
collected output (use res.collect() result: either concatenate/flatten all
RecordBatches or iterate all rows across batches to locate group rows by c1 and
assert cnt/sum_c9/avg_c9 values) instead of relying on batch.slice indices.
Ensure references to df.select(...), .sort(...), collect(), and
assert_batches_eq! are updated accordingly.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! By using an explicitly requested order the test will guarantee that the results will come in an order that could be asserted. Prevents test failures due to changed order in the future.
| let agg_exprs: Vec<Expr> = expr_list | ||
| .iter() | ||
| .filter(|e| !find_aggregate_exprs(vec![*e]).is_empty()) | ||
| .cloned() | ||
| .collect(); | ||
|
|
||
| let non_agg_exprs: Vec<Expr> = expr_list | ||
| .iter() | ||
| .filter(|e| find_aggregate_exprs(vec![*e]).is_empty()) | ||
| .cloned() | ||
| .collect(); | ||
|
|
||
| let plan_non_agg = table.clone().select(non_agg_exprs)?.into_optimized_plan()?; | ||
| let bytes_non_agg = logical_plan_to_bytes(&plan_non_agg)?; | ||
| let logical_round_trip_non_agg = | ||
| logical_plan_from_bytes(&bytes_non_agg, &ctx.task_ctx())?; | ||
| assert_eq!( | ||
| format!("{plan_non_agg}"), | ||
| format!("{logical_round_trip_non_agg}") | ||
| ); | ||
|
|
||
| let plan_agg = table.aggregate(vec![], agg_exprs)?.into_optimized_plan()?; | ||
| let bytes_agg = logical_plan_to_bytes(&plan_agg)?; | ||
| let logical_round_trip_agg = logical_plan_from_bytes(&bytes_agg, &ctx.task_ctx())?; | ||
| assert_eq!(format!("{plan_agg}"), format!("{logical_round_trip_agg}")); |
There was a problem hiding this comment.
Exercise the new aggregate-in-select() path in this test.
The aggregate half now roundtrips table.aggregate(...), so it no longer covers the DataFrame::select() rewrite this PR is adding. Regressions in aggregate detection, alias/unique-name handling, or the aggregate→project plan shape can slip through while this test still passes.
Suggested adjustment
- let agg_exprs: Vec<Expr> = expr_list
- .iter()
- .filter(|e| !find_aggregate_exprs(vec![*e]).is_empty())
- .cloned()
- .collect();
-
- let non_agg_exprs: Vec<Expr> = expr_list
- .iter()
- .filter(|e| find_aggregate_exprs(vec![*e]).is_empty())
- .cloned()
- .collect();
+ let (agg_exprs, non_agg_exprs): (Vec<Expr>, Vec<Expr>) = expr_list
+ .iter()
+ .cloned()
+ .partition(|e| !find_aggregate_exprs(vec![e]).is_empty());
@@
- let plan_agg = table.aggregate(vec![], agg_exprs)?.into_optimized_plan()?;
+ let plan_agg = table.select(agg_exprs)?.into_optimized_plan()?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@datafusion/proto/tests/cases/roundtrip_logical_plan.rs` around lines 1236 -
1260, The test currently only round-trips aggregates via table.aggregate(...)
and thus misses exercising the new aggregate-in-select rewrite; modify the test
to also build a DataFrame that selects expr_list (or otherwise calls
DataFrame::select() including agg_exprs) so the select-path rewrite runs, then
generate an optimized plan (e.g. call into_optimized_plan() on that select
result), serialize/deserialize it using logical_plan_to_bytes and
logical_plan_from_bytes with ctx.task_ctx(), and assert the round-tripped plan
string equals the original (similar to the existing non_agg and agg assertions).
Ensure you keep the existing plan_non_agg and plan_agg checks and add a new
plan_select_with_agg (or similarly named) check that references expr_list,
agg_exprs, table.select(), logical_plan_to_bytes, logical_plan_from_bytes, and
ctx.task_ctx().
There was a problem hiding this comment.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! An additional test that does df.aggregate(...).select(...) should be added to prevent regressions.
🤖 Augment PR SummarySummary: This PR extends the DataFrame API so Changes:
Technical Notes: The new behavior enforces SQL-like rules by rejecting mixed aggregate/non-aggregate selections (including wildcards) unless the query is explicitly grouped via 🤖 Was this summary useful? React with 👍 or 👎 |
|
|
||
| // Check for non-aggregate expressions | ||
| let has_non_aggregate_expr = expr_list.iter().any(|e| match e { | ||
| SelectExpr::Expression(expr) => { |
There was a problem hiding this comment.
datafusion/core/src/dataframe/mod.rs:451: The has_non_aggregate_expr check treats any expression that contains an aggregate anywhere as “aggregate”, which can let expressions like sum(x) + col("y") slip into the aggregate-only path and then fail later with a less clear error. This seems like it could violate the intended “non-aggregate columns must be grouped” semantics for subexpressions.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _) => true, | ||
| }); | ||
|
|
||
| if has_non_aggregate_expr && !aggr_exprs.is_empty() { |
There was a problem hiding this comment.
datafusion/core/src/dataframe/mod.rs:457: This mixed aggregate/non-aggregate SELECT error is triggered before the LogicalPlan::Aggregate(_) fallback path, so df.aggregate(...).select(vec![col("g"), min(col("x"))])-style projections can now error even though the input is already grouped. That looks like a potential behavior regression for selecting/aliasing aggregate outputs on an already-aggregated DataFrame.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.


21021: To review by AI