21021: DataFrame API: allow aggregate functions in select() (#17874)#288
21021: DataFrame API: allow aggregate functions in select() (#17874)#288martin-augment wants to merge 5 commits intomainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the DataFusion DataFrame API by enabling the direct use of aggregate functions within the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
WalkthroughThis change enhances the ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Code ReviewThis PR adds useful functionality — automatically detecting and handling aggregate functions in Bug: Intra-expression mixing of aggregates and non-aggregatesThe For example: For this expression, Consider adding an explicit check and returning a descriptive error: "Cannot mix aggregate and non-aggregate column references in select() — use aggregate() with explicit GROUP BY instead." Missing test casesThe test suite covers the happy path well, but several important edge cases are missing:
|
There was a problem hiding this comment.
Code Review
This pull request introduces the capability to use aggregate functions directly within the select() method of the DataFrame API. The implementation correctly handles the interplay between window functions, aggregate expressions, and non-aggregate expressions, ensuring that the logical plan is constructed appropriately. New test cases have been added to validate this functionality, covering various scenarios including multiple aggregates, duplicate aggregate aliases, and wildcards. The changes are well-implemented and enhance the flexibility of the DataFrame API.
There was a problem hiding this comment.
🧹 Nitpick comments (4)
datafusion/core/src/dataframe/mod.rs (2)
502-504: Consider documenting wildcard behavior with aggregates.When a
SelectExpr::Wildcardis passed alongside aggregate expressions, it passes through unchanged and will expand to the aggregate plan's schema (which contains only the computed aggregate columns). This might be surprising to users.For example:
df.select(vec![sum(col("a")), Expr::Wildcard {...}])would have the wildcard expand to the aggregate output columns, not the original table columns.This edge case may be intentional, but it could benefit from documentation in the method's doc comment or a check that returns an error when wildcards are mixed with aggregate expressions.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/core/src/dataframe/mod.rs` around lines 502 - 504, The SelectExpr::Wildcard branch currently passes through unchanged and later expands to the aggregate plan's schema (only computed aggregates), which is surprising when mixed with aggregate expressions; update the doc comment for the function handling select expression rewriting to explicitly document this behavior (mention SelectExpr::Wildcard and that it expands to aggregate output when aggregates are present) and/or add a validation in the same function to detect mixing wildcard with aggregate expressions and return a clear error instead; reference the SelectExpr::Wildcard match arm and the rewritten_exprs collection when implementing the documentation or the new check.
495-505: Redundant aliasing may occur for already-aliased expressions.When the original expression already has an alias (e.g.,
sum(col("a")).alias("total")), the rewriting preserves the inner alias throughtransform, and then line 501 wraps it in another alias. This creates nestedAliasnodes.While DataFusion's planner may flatten these, consider avoiding the redundant alias:
♻️ Suggested improvement
SelectExpr::Expression(expr) => { - let rewritten = rewrite_expr(expr.clone(), &aggr_map)?; let alias = expr.name_for_alias()?; - rewritten_exprs.push(SelectExpr::Expression(rewritten.alias(alias))); + let rewritten = rewrite_expr(expr, &aggr_map)?; + // Only add alias if the rewritten expression doesn't already have one + let final_expr = match &rewritten { + Expr::Alias(_) => rewritten, + _ => rewritten.alias(alias), + }; + rewritten_exprs.push(SelectExpr::Expression(final_expr)); }This also removes the unnecessary
expr.clone()sincealiascan be computed first.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/core/src/dataframe/mod.rs` around lines 495 - 505, In the SelectExpr::Expression branch avoid producing nested Alias nodes by computing alias = expr.name_for_alias() before rewriting, then call rewrite_expr with the original expr (no clone) and only wrap with rewritten.alias(alias) if the rewritten expression is not already an Alias (i.e., detect existing Alias on the rewritten expression or if the original expression already carried an alias) so rewritten_exprs.push either the rewritten as-is or rewritten.alias(alias); refer to SelectExpr::Expression, rewrite_expr, expr.name_for_alias(), and rewritten.alias().datafusion/core/tests/dataframe/mod.rs (2)
6913-6916: Make wildcard assertions batch-agnostic.Asserting on
batches[0]assumes a single output batch. Safer checks should validate total rows and consistent column counts across all returned batches.Batch-agnostic assertion pattern
- let batches = res.collect().await?; - assert_eq!(batches[0].num_rows(), 100); - assert_eq!(batches[0].num_columns(), 14); + let batches = res.collect().await?; + assert!(!batches.is_empty()); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 100); + assert!(batches.iter().all(|b| b.num_columns() == 14)); @@ - let batches = res.collect().await?; - assert_eq!(batches[0].num_rows(), 100); - assert_eq!(batches[0].num_columns(), 14); + let batches = res.collect().await?; + assert!(!batches.is_empty()); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 100); + assert!(batches.iter().all(|b| b.num_columns() == 14));Also applies to: 6925-6927
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/core/tests/dataframe/mod.rs` around lines 6913 - 6916, The assertions currently index into batches[0], which assumes a single output batch; change the checks after let batches = res.collect().await? to be batch-agnostic by summing rows across all batches (e.g., total_rows == 100) and verifying every batch has the expected number of columns (batch.num_columns() == 14) rather than only checking batches[0]; apply the same replacement for the second occurrence around the lines referenced (6925-6927). Use the local variable name batches to iterate and perform these aggregated/consistent checks.
6887-6891: Align the “duplicate aliases” case with actual behavior.Line 6889 and Line 6890 use different aliases (
count_c9andcount_c9_2), so this does not test alias-collision behavior despite the comment.Suggested adjustment
- // Test duplicate aggregate aliases + // Test repeated aggregate expressions with distinct aliasesIf the intent is to validate duplicate alias handling, use the same alias on both expressions and assert the expected output naming behavior.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@datafusion/core/tests/dataframe/mod.rs` around lines 6887 - 6891, The test comment says it should check duplicate aggregate aliases but the select uses different aliases; update the test so both aggregate expressions use the same alias (e.g., replace count(col("c9")).alias("count_c9_2") with count(col("c9")).alias("count_c9")) and then assert the expected naming/behavior for duplicate aliases (adjust assertions on the resulting DataFrame / variable res accordingly) so the test actually validates alias-collision handling in the df.select/count/alias flow.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@datafusion/core/src/dataframe/mod.rs`:
- Around line 502-504: The SelectExpr::Wildcard branch currently passes through
unchanged and later expands to the aggregate plan's schema (only computed
aggregates), which is surprising when mixed with aggregate expressions; update
the doc comment for the function handling select expression rewriting to
explicitly document this behavior (mention SelectExpr::Wildcard and that it
expands to aggregate output when aggregates are present) and/or add a validation
in the same function to detect mixing wildcard with aggregate expressions and
return a clear error instead; reference the SelectExpr::Wildcard match arm and
the rewritten_exprs collection when implementing the documentation or the new
check.
- Around line 495-505: In the SelectExpr::Expression branch avoid producing
nested Alias nodes by computing alias = expr.name_for_alias() before rewriting,
then call rewrite_expr with the original expr (no clone) and only wrap with
rewritten.alias(alias) if the rewritten expression is not already an Alias
(i.e., detect existing Alias on the rewritten expression or if the original
expression already carried an alias) so rewritten_exprs.push either the
rewritten as-is or rewritten.alias(alias); refer to SelectExpr::Expression,
rewrite_expr, expr.name_for_alias(), and rewritten.alias().
In `@datafusion/core/tests/dataframe/mod.rs`:
- Around line 6913-6916: The assertions currently index into batches[0], which
assumes a single output batch; change the checks after let batches =
res.collect().await? to be batch-agnostic by summing rows across all batches
(e.g., total_rows == 100) and verifying every batch has the expected number of
columns (batch.num_columns() == 14) rather than only checking batches[0]; apply
the same replacement for the second occurrence around the lines referenced
(6925-6927). Use the local variable name batches to iterate and perform these
aggregated/consistent checks.
- Around line 6887-6891: The test comment says it should check duplicate
aggregate aliases but the select uses different aliases; update the test so both
aggregate expressions use the same alias (e.g., replace
count(col("c9")).alias("count_c9_2") with count(col("c9")).alias("count_c9"))
and then assert the expected naming/behavior for duplicate aliases (adjust
assertions on the resulting DataFrame / variable res accordingly) so the test
actually validates alias-collision handling in the df.select/count/alias flow.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 54a647e9-24d9-45d5-bb85-1a94cb1d3975
📒 Files selected for processing (2)
datafusion/core/src/dataframe/mod.rsdatafusion/core/tests/dataframe/mod.rs
🤖 Augment PR SummarySummary: This PR updates the DataFrame API so aggregate functions can be used directly in Changes:
Technical Notes: The implementation relies on 🤖 Was this summary useful? React with 👍 or 👎 |
| expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>(); | ||
| expr_list.into_iter().map(|e| e.into()).collect(); | ||
|
|
||
| // Extract expressions |
There was a problem hiding this comment.
expressions only includes SelectExpr::Expression, so SelectExpr::Wildcard / QualifiedWildcard aren’t considered when deciding to build the implicit global Aggregate. This can misclassify a select() that mixes a wildcard with an aggregate (e.g. [* , count(..)]) and change semantics by aggregating anyway.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| let aggr_exprs = find_aggregate_exprs(expressions.clone()); | ||
|
|
||
| // Check for non-aggregate expressions | ||
| let has_non_aggregate_expr = expressions |
There was a problem hiding this comment.
The has_non_aggregate_expr check treats any expression with no aggregate sub-expr (including pure scalars like lit(1)) as “non-aggregate”, which prevents the new aggregate-in-select() path from running. This likely makes queries like select([count(..), lit(1)]) fall back to a projection containing an aggregate and then error later.
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| // - no aggregates | ||
| if matches!(plan, LogicalPlan::Aggregate(_)) | ||
| || has_non_aggregate_expr | ||
| || aggr_exprs.is_empty() |
There was a problem hiding this comment.
Wildcard with aggregates incorrectly enters aggregate path
Medium Severity
The has_non_aggregate_expr check only inspects SelectExpr::Expression items (via the expressions iterator), completely ignoring SelectExpr::Wildcard and SelectExpr::QualifiedWildcard. When a wildcard is combined with aggregate expressions (e.g., vec![wildcard(), count(col("a")).into()]), the check finds no non-aggregate expressions, aggr_exprs is non-empty, and the code incorrectly enters the aggregate path. The wildcard is then passed through unmodified at other => rewritten_exprs.push(other) and resolves against the aggregate output schema (which only contains aggregate columns) instead of the original table schema, producing incorrect results.
Additional Locations (1)
| SelectExpr::Expression(expr) => { | ||
| let rewritten = rewrite_expr(expr.clone(), &aggr_map)?; | ||
| let alias = expr.name_for_alias()?; | ||
| rewritten_exprs.push(SelectExpr::Expression(rewritten.alias(alias))); |
There was a problem hiding this comment.
Unconditional alias creates redundant double-wrapping on aliased expressions
Low Severity
When the original expression already has an alias (e.g., count(col("c9")).alias("count_c9")), rewrite_expr preserves the inner Alias node, and then .alias(alias) unconditionally wraps it again, producing Alias(Alias(Column(...), "count_c9"), "count_c9"). The existing alias_if_changed method on Expr already handles this case by only adding an alias when the name differs, which would avoid the unnecessary double wrapping.


21021: To review by AI