Skip to content
5 changes: 5 additions & 0 deletions .cursor/rules.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The AI agents should never link to any issue or a pull request
in any GitHub repository in the code reviews!

The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md!

35 changes: 3 additions & 32 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,5 @@
# Agent Guidelines for Apache DataFusion
The AI agents should never link to any issue or a pull request
in any GitHub repository in the code reviews!

## Developer Documentation
The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md!

- [Contributor Guide](docs/source/contributor-guide/index.md)
- [Architecture Guide](docs/source/contributor-guide/architecture.md)

## Before Committing

Before committing any changes, you **must** run the following checks and fix any issues:

```bash
cargo fmt --all
cargo clippy --all-targets --all-features -- -D warnings
```

- `cargo fmt` ensures consistent code formatting across the project.
- `cargo clippy` catches common mistakes and enforces idiomatic Rust patterns. All warnings must be resolved (treated as errors via `-D warnings`).

Do not commit code that fails either of these checks.

## Testing

Run relevant tests before submitting changes:

```bash
cargo test --all-features
```

For SQL logic tests:

```bash
cargo test -p datafusion-sqllogictest
```
100 changes: 97 additions & 3 deletions datafusion-examples/examples/dataframe/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use datafusion::common::config::CsvOptions;
use datafusion::common::parsers::CompressionTypeVariant;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::functions_aggregate::min_max::max;
use datafusion::functions_aggregate::average::{self, avg};
use datafusion::functions_aggregate::min_max::{self, max};
use datafusion::prelude::*;
use datafusion_examples::utils::{datasets::ExampleDataset, write_csv_to_parquet};
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
use tempfile::{TempDir, tempdir};
use tokio::fs::create_dir_all;

Expand All @@ -53,8 +55,10 @@ use tokio::fs::create_dir_all;
///
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
///
/// # Executing subqueries
/// # Querying data
///
/// * [aggregate_global_and_grouped]: global vs grouped aggregation (`select` vs `aggregate`)
/// * [window_vs_grouped_aggregation]: GROUP BY vs window functions (`aggregate`, `window`, `select`)
/// * [where_scalar_subquery]: execute a scalar subquery
/// * [where_in_subquery]: execute a subquery with an IN clause
/// * [where_exist_subquery]: execute a subquery with an EXISTS clause
Expand All @@ -69,6 +73,8 @@ pub async fn dataframe_example() -> Result<()> {
write_out(&ctx).await?;
register_cars_test_data("t1", &ctx).await?;
register_cars_test_data("t2", &ctx).await?;
aggregate_global_and_grouped(&ctx).await?;
window_vs_grouped_aggregation(&ctx).await?;
where_scalar_subquery(&ctx).await?;
where_in_subquery(&ctx).await?;
where_exist_subquery(&ctx).await?;
Expand Down Expand Up @@ -269,6 +275,94 @@ async fn write_out(ctx: &SessionContext) -> Result<()> {
Ok(())
}

/// Global vs grouped aggregation using `select` and `aggregate`
async fn aggregate_global_and_grouped(ctx: &SessionContext) -> Result<()> {
let df = ctx.table("t1").await?;

// SELECT AVG(speed) FROM t1
df.clone()
.aggregate(vec![], vec![avg(col("speed"))])?
.show()
.await?;

// SELECT AVG(speed) FROM t1 (same result via `select`)
df.clone().select(vec![avg(col("speed"))])?.show().await?;

// SELECT car, AVG(speed) FROM t1 GROUP BY car
df.aggregate(vec![col("car")], vec![avg(col("speed"))])?
.show()
.await?;

Ok(())
}

/// GROUP BY vs window functions using `aggregate`, `window`, and `select`
async fn window_vs_grouped_aggregation(ctx: &SessionContext) -> Result<()> {
let df = ctx.table("t1").await?;

// SELECT car,
// AVG(speed),
// MAX(speed)
// FROM t1
// GROUP BY car
df.clone()
.aggregate(
vec![col("car")],
vec![
avg(col("speed")).alias("avg_speed"),
max(col("speed")).alias("max_speed"),
],
)?
.show()
.await?;

// SELECT car, speed,
// AVG(speed) OVER (PARTITION BY car),
// MAX(speed) OVER (PARTITION BY car)
// FROM t1

// Window expressions:
let avg_win = Expr::WindowFunction(Box::new(WindowFunction::new(
WindowFunctionDefinition::AggregateUDF(average::avg_udaf()),
vec![col("speed")],
)))
.partition_by(vec![col("car")])
.order_by(vec![])
.window_frame(WindowFrame::new(None))
.build()?;

let max_win = Expr::WindowFunction(Box::new(WindowFunction::new(
WindowFunctionDefinition::AggregateUDF(min_max::max_udaf()),
vec![col("speed")],
)))
.partition_by(vec![col("car")])
.order_by(vec![])
.window_frame(WindowFrame::new(None))
.build()?;

// Two equivalent ways to compute window expressions:
// Using `window` then selecting columns
let res = df
.clone()
.window(vec![
avg_win.clone().alias("avg_speed"),
max_win.clone().alias("max_speed"),
])?
.select_columns(&["car", "speed", "avg_speed", "max_speed"])?;
res.show().await?;

// Using window expressions directly in `select`
let res = df.select(vec![
col("car"),
col("speed"),
avg_win.alias("avg_speed"),
max_win.alias("max_speed"),
])?;
res.show().await?;

Ok(())
}

/// Use the DataFrame API to execute the following subquery:
/// select car, speed from t1 where (select avg(t2.speed) from t2 where t1.car = t2.car) > 0 limit 3;
async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
Expand Down
130 changes: 124 additions & 6 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow_schema::FieldRef;
use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
Column, DFSchema, DataFusionError, ParamValues, ScalarValue, SchemaError,
TableReference, UnnestOptions, exec_err, internal_datafusion_err, not_impl_err,
plan_datafusion_err, plan_err, unqualified_field_not_found,
};
use datafusion_expr::select_expr::SelectExpr;
use datafusion_expr::utils::find_aggregate_exprs;
use datafusion_expr::{
ExplainOption, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, case,
dml::InsertOp,
Expand Down Expand Up @@ -387,21 +389,35 @@ impl DataFrame {
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # use datafusion_common::assert_batches_sorted_eq;
/// # use datafusion_functions_aggregate::expr_fn::{count, sum};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx
/// .read_csv("tests/data/example.csv", CsvReadOptions::new())
/// .await?;
/// let df = df.select(vec![col("a"), col("b") * col("c")])?;
///
/// // Expressions are evaluated per row
/// let res = df.clone().select(vec![col("a"), col("b") * col("c")])?;
/// let expected = vec![
/// "+---+-----------------------+",
/// "| a | ?table?.b * ?table?.c |",
/// "+---+-----------------------+",
/// "| 1 | 6 |",
/// "+---+-----------------------+",
/// ];
/// # assert_batches_sorted_eq!(expected, &df.collect().await?);
/// # assert_batches_sorted_eq!(expected, &res.collect().await?);
///
/// // Aggregate expressions are also supported
/// let res = df.select(vec![count(col("a")), sum(col("b"))])?;
/// let expected = vec![
/// "+----------+--------+",
/// "| count(a) | sum(b) |",
/// "+----------+--------+",
/// "| 1 | 2 |",
/// "+----------+--------+",
/// ];
/// # assert_batches_sorted_eq!(expected, &res.collect().await?);
/// # Ok(())
/// # }
/// ```
Expand All @@ -410,21 +426,123 @@ impl DataFrame {
expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
) -> Result<DataFrame> {
let expr_list: Vec<SelectExpr> =
expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
expr_list.into_iter().map(|e| e.into()).collect();

// Extract expressions
let expressions = expr_list.iter().filter_map(|e| match e {
SelectExpr::Expression(expr) => Some(expr),
_ => None,
});

let window_func_exprs = find_window_exprs(expressions);
let plan = if window_func_exprs.is_empty() {
// Apply window functions first
let window_func_exprs = find_window_exprs(expressions.clone());

let mut plan = if window_func_exprs.is_empty() {
self.plan
} else {
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
};

let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
// Collect aggregate expressions
let aggr_exprs = find_aggregate_exprs(expressions.clone());

// Check for non-aggregate expressions
let has_non_aggregate_expr = expr_list.iter().any(|e| match e {
SelectExpr::Expression(expr) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafusion/core/src/dataframe/mod.rs:451: The has_non_aggregate_expr check treats any expression that contains an aggregate anywhere as “aggregate”, which can let expressions like sum(x) + col("y") slip into the aggregate-only path and then fail later with a less clear error. This seems like it could violate the intended “non-aggregate columns must be grouped” semantics for subexpressions.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

find_aggregate_exprs(std::iter::once(expr)).is_empty()
}
SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _) => true,
});
Comment on lines +449 to +455
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The check for non-aggregate expressions is incomplete. It only verifies if an expression contains an aggregate function, but it doesn't ensure that all column references within that expression are properly aggregated. For example, select(vec![sum(col("a")) + col("b")]) would incorrectly pass this check because the expression contains an aggregate, even though col("b") is unaggregated. This will lead to a "Column not found" error later in the planning process instead of a proper "must be in GROUP BY" error.

Additionally, calling find_aggregate_exprs in a loop is inefficient as it traverses the expression tree multiple times.


if has_non_aggregate_expr && !aggr_exprs.is_empty() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafusion/core/src/dataframe/mod.rs:457: This mixed aggregate/non-aggregate SELECT error is triggered before the LogicalPlan::Aggregate(_) fallback path, so df.aggregate(...).select(vec![col("g"), min(col("x"))])-style projections can now error even though the input is already grouped. That looks like a potential behavior regression for selecting/aliasing aggregate outputs on an already-aggregated DataFrame.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

return plan_err!(
"Column in SELECT must be in GROUP BY or an aggregate function"
);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Premature validation breaks aggregate-then-select chaining pattern

Medium Severity

The mixed aggregate/non-aggregate validation at has_non_aggregate_expr && !aggr_exprs.is_empty() fires before the matches!(plan, LogicalPlan::Aggregate(_)) fallback check. This means .aggregate().select() chains that re-use aggregate expressions alongside plain column references now error out, even though the plan is already aggregated and the select is just a projection. The old pattern df.aggregate(vec![col("c1")], vec![min(col("c2"))]).select(vec![col("c1"), min(col("c2")).alias("result")]) was previously valid but now rejects with a spurious "must be in GROUP BY" error. This is a public API regression — the Aggregate plan check needs to be evaluated before the mixed-expression validation.

Additional Locations (1)
Fix in Cursor Fix in Web


// Fallback to projection
if matches!(plan, LogicalPlan::Aggregate(_))
|| has_non_aggregate_expr
|| aggr_exprs.is_empty()
{
let project_plan =
LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;

return Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
projection_requires_validation: false,
});
}

// Unique name generator
let make_unique_name =
|base: String, used: &mut HashSet<String>, start: usize| {
let mut name = base.clone();
let mut counter = start;
while used.contains(&name) {
name = format!("{base}_{counter}");
counter += 1;
}
used.insert(name.clone());

name
};

// Aggregate stage
let mut aggr_map: HashMap<Expr, Expr> = HashMap::new();
let mut aggr_used_names = HashSet::new();
let aggr_exprs_with_alias: Vec<Expr> = aggr_exprs
.into_iter()
.map(|expr| {
let base_name = expr.name_for_alias()?;
let name = make_unique_name(base_name, &mut aggr_used_names, 1);
let aliased = expr.clone().alias(name.clone());
let col = Expr::Column(Column::from_name(name));
aggr_map.insert(expr, col);

Ok(aliased)
})
.collect::<Result<Vec<_>>>()?;

// Build aggregate plan
plan = LogicalPlanBuilder::from(plan)
.aggregate(Vec::<Expr>::new(), aggr_exprs_with_alias)?
.build()?;

// Rewrite expressions
let rewrite_expr = |expr: Expr, aggr_map: &HashMap<Expr, Expr>| -> Result<Expr> {
expr.transform(|e| {
Ok(match aggr_map.get(&e) {
Some(replacement) => Transformed::yes(replacement.clone()),
None => Transformed::no(e),
})
})
.map(|t| t.data)
};

// Projection stage
let mut rewritten_exprs = Vec::with_capacity(expr_list.len());
let mut projection_used_names = HashSet::new();
for select_expr in expr_list.into_iter() {
match select_expr {
SelectExpr::Expression(expr) => {
let base_alias = expr.name_for_alias()?;
let rewritten = rewrite_expr(expr, &aggr_map)?;
let name =
make_unique_name(base_alias, &mut projection_used_names, 1);
let final_expr = rewritten.alias(name);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line can create redundant Alias nodes if the rewritten expression already has the desired name (e.g., if the original expression was already an Alias). While functionally correct, it adds unnecessary depth to the expression tree. Consider checking if the name already matches before applying the alias.

                    let final_expr = if rewritten.name_for_alias().as_ref() == Ok(&name) {
                        rewritten
                    } else {
                        rewritten.alias(name)
                    };

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! If the expression already has the same alias then there is no need to set it second time. Another option would be to delete any previous alias and set only the new one.


rewritten_exprs.push(SelectExpr::Expression(final_expr));
}
other => rewritten_exprs.push(other),
}
}

// Final projection
let project_plan = LogicalPlanBuilder::from(plan)
.project(rewritten_exprs)?
.build()?;

Ok(DataFrame {
session_state: self.session_state,
Expand Down
Loading
Loading