From 91c2e04e6e3085203d3f7e9e522a99cbdd2f64c9 Mon Sep 17 00:00:00 2001
From: Huaijin
Date: Thu, 9 Apr 2026 02:41:42 +0800
Subject: [PATCH 1/4] fix: FilterExec should drop projection when apply
projection pushdown (#21460)
## Which issue does this PR close?
- Closes #21459
## Rationale for this change
When a `ProjectionExec` sits on top of a `FilterExec` that already
carries an explicit projection, the `ProjectionPushdown` optimizer
attempts to swap them via `try_swapping_with_projection`. The swap
replaces the `FilterExec's` input with the narrower `ProjectionExec`,
but `FilterExecBuilder::from(self)` carried over the old projection
indices (e.g. [0, 1, 2]). After the swap the new input only has the
columns selected by the `ProjectionExec` (e.g. 2 columns), so .build()
tries to validate the stale projection against the narrower schema and
panics with "project index 2 out of bounds, max field 2".
## What changes are included in this PR?
In `FilterExec::try_swapping_with_projection`, after replacing the input
with the narrower ProjectionExec, clear the FilterExec's own projection
via .`apply_projection(None)`. The ProjectionExec that is now the input
already handles column selection, so the FilterExec no longer needs its
own projection.
## Are these changes tested?
yes, add test case
## Are there any user-facing changes?
---
.../physical_optimizer/projection_pushdown.rs | 120 +++++++++++++++++-
datafusion/physical-plan/src/filter.rs | 67 ++++++++++
2 files changed, 186 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index 95cd34b6e4553..6018d714c5955 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -46,7 +46,7 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::coop::CooperativeExec;
-use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder};
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use datafusion_physical_plan::joins::{
HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode,
@@ -1754,3 +1754,121 @@ fn test_hash_join_empty_projection_embeds() -> Result<()> {
Ok(())
}
+
+/// Regression test for
+///
+/// When a `ProjectionExec` sits on top of a `FilterExec` that already carries
+/// an embedded projection, the `ProjectionPushdown` optimizer must not panic.
+///
+/// Before the fix, `FilterExecBuilder::from(self)` copied stale projection
+/// indices (e.g. `[0, 1, 2]`). After swapping, the new input was narrower
+/// (2 columns), so `.build()` panicked with "project index out of bounds".
+#[test]
+fn test_filter_with_embedded_projection_after_projection() -> Result<()> {
+ // DataSourceExec: [a, b, c, d, e]
+ let csv = create_simple_csv_exec();
+
+ // FilterExec: a > 0, projection=[0, 1, 2] → output: [a, b, c]
+ let predicate = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 0)),
+ Operator::Gt,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
+ ));
+ let filter: Arc = Arc::new(
+ FilterExecBuilder::new(predicate, csv)
+ .apply_projection(Some(vec![0, 1, 2]))?
+ .build()?,
+ );
+
+ // ProjectionExec: narrows [a, b, c] → [a, b]
+ let projection: Arc = Arc::new(ProjectionExec::try_new(
+ vec![
+ ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
+ ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
+ ],
+ filter,
+ )?);
+
+ let initial = displayable(projection.as_ref()).indent(true).to_string();
+ let actual = initial.trim();
+ assert_snapshot!(
+ actual,
+ @r"
+ ProjectionExec: expr=[a@0 as a, b@1 as b]
+ FilterExec: a@0 > 0, projection=[a@0, b@1, c@2]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
+ "
+ );
+
+ // This must not panic
+ let after_optimize =
+ ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ let after_optimize_string = displayable(after_optimize.as_ref())
+ .indent(true)
+ .to_string();
+ let actual = after_optimize_string.trim();
+ assert_snapshot!(
+ actual,
+ @r"
+ FilterExec: a@0 > 0
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b], file_type=csv, has_header=false
+ "
+ );
+
+ Ok(())
+}
+
+/// Same as above, but the outer ProjectionExec also renames columns.
+/// Ensures the rename is preserved after the projection pushdown swap.
+#[test]
+fn test_filter_with_embedded_projection_after_renaming_projection() -> Result<()> {
+ let csv = create_simple_csv_exec();
+
+ // FilterExec: b > 10, projection=[0, 1, 2, 3] → output: [a, b, c, d]
+ let predicate = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("b", 1)),
+ Operator::Gt,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
+ ));
+ let filter: Arc = Arc::new(
+ FilterExecBuilder::new(predicate, csv)
+ .apply_projection(Some(vec![0, 1, 2, 3]))?
+ .build()?,
+ );
+
+ // ProjectionExec: [a as x, b as y] — narrows and renames
+ let projection: Arc = Arc::new(ProjectionExec::try_new(
+ vec![
+ ProjectionExpr::new(Arc::new(Column::new("a", 0)), "x"),
+ ProjectionExpr::new(Arc::new(Column::new("b", 1)), "y"),
+ ],
+ filter,
+ )?);
+
+ let initial = displayable(projection.as_ref()).indent(true).to_string();
+ let actual = initial.trim();
+ assert_snapshot!(
+ actual,
+ @r"
+ ProjectionExec: expr=[a@0 as x, b@1 as y]
+ FilterExec: b@1 > 10, projection=[a@0, b@1, c@2, d@3]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
+ "
+ );
+
+ let after_optimize =
+ ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ let after_optimize_string = displayable(after_optimize.as_ref())
+ .indent(true)
+ .to_string();
+ let actual = after_optimize_string.trim();
+ assert_snapshot!(
+ actual,
+ @r"
+ FilterExec: y@1 > 10
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a@0 as x, b@1 as y], file_type=csv, has_header=false
+ "
+ );
+
+ Ok(())
+}
diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs
index afe2b0ae810a3..8720d5f7d223b 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -568,6 +568,10 @@ impl ExecutionPlan for FilterExec {
return FilterExecBuilder::from(self)
.with_input(make_with_child(projection, self.input())?)
.with_predicate(new_predicate)
+ // The original FilterExec projection referenced columns from its old
+ // input. After the swap the new input is the ProjectionExec which
+ // already handles column selection, so clear the projection here.
+ .apply_projection(None)?
.build()
.map(|e| Some(Arc::new(e) as _));
}
@@ -2572,4 +2576,67 @@ mod tests {
);
Ok(())
}
+
+ /// Regression test: ProjectionExec on top of a FilterExec that already has
+ /// an explicit projection must not panic when `try_swapping_with_projection`
+ /// attempts to swap the two nodes.
+ ///
+ /// Before the fix, `FilterExecBuilder::from(self)` copied the old projection
+ /// (e.g. `[0, 1, 2]`) from the FilterExec. After `.with_input` replaced the
+ /// input with the narrower ProjectionExec (2 columns), `.build()` tried to
+ /// validate the stale `[0, 1, 2]` projection against the 2-column schema and
+ /// panicked with "project index 2 out of bounds, max field 2".
+ #[test]
+ fn test_filter_with_projection_swap_does_not_panic() -> Result<()> {
+ use crate::projection::ProjectionExpr;
+ use datafusion_physical_expr::expressions::col;
+
+ // Schema: [ts: Int64, tokens: Int64, svc: Utf8]
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("ts", DataType::Int64, false),
+ Field::new("tokens", DataType::Int64, false),
+ Field::new("svc", DataType::Utf8, false),
+ ]));
+ let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
+
+ // FilterExec: ts > 0, projection=[ts@0, tokens@1, svc@2] (all 3 cols)
+ let predicate = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("ts", 0)),
+ Operator::Gt,
+ Arc::new(Literal::new(ScalarValue::Int64(Some(0)))),
+ ));
+ let filter = Arc::new(
+ FilterExecBuilder::new(predicate, input)
+ .apply_projection(Some(vec![0, 1, 2]))?
+ .build()?,
+ );
+
+ // ProjectionExec: narrows to [ts, tokens] (drops svc)
+ let proj_exprs = vec![
+ ProjectionExpr {
+ expr: col("ts", &filter.schema())?,
+ alias: "ts".to_string(),
+ },
+ ProjectionExpr {
+ expr: col("tokens", &filter.schema())?,
+ alias: "tokens".to_string(),
+ },
+ ];
+ let projection = Arc::new(ProjectionExec::try_new(
+ proj_exprs,
+ Arc::clone(&filter) as _,
+ )?);
+
+ // This must not panic
+ let result = filter.try_swapping_with_projection(&projection)?;
+ assert!(result.is_some(), "swap should succeed");
+
+ let new_plan = result.unwrap();
+ // Output schema must still be [ts, tokens]
+ let out_schema = new_plan.schema();
+ assert_eq!(out_schema.fields().len(), 2);
+ assert_eq!(out_schema.field(0).name(), "ts");
+ assert_eq!(out_schema.field(1).name(), "tokens");
+ Ok(())
+ }
}
From 4aed81afcb8b34012f720daffe85aad680520e23 Mon Sep 17 00:00:00 2001
From: Zhen Chen
Date: Thu, 9 Apr 2026 03:34:34 +0800
Subject: [PATCH 2/4] fix: preserve duplicate GROUPING SETS rows (#21058)
## Which issue does this PR close?
- Closes #21316.
## Rationale for this change
`GROUPING SETS` with duplicate grouping lists were incorrectly collapsed
during execution. The internal grouping id only encoded the semantic
null mask, so repeated grouping sets shared the same execution key and
were merged, which caused rows to be lost compared with PostgreSQL
behavior.
For example, with:
```sql
create table duplicate_grouping_sets(deptno int, job varchar, sal int, comm int);
insert into duplicate_grouping_sets values
(10, 'CLERK', 1300, null),
(20, 'MANAGER', 3000, null);
select deptno, job, sal, sum(comm), grouping(deptno), grouping(job), grouping(sal)
from duplicate_grouping_sets
group by grouping sets ((deptno, job), (deptno, sal), (deptno, job))
order by deptno, job, sal, grouping(deptno), grouping(job), grouping(sal);
```
PostgreSQL preserves the duplicate grouping set and returns:
```text
deptno | job | sal | sum | grouping | grouping | grouping
--------+---------+------+-----+----------+----------+----------
10 | CLERK | | | 0 | 0 | 1
10 | CLERK | | | 0 | 0 | 1
10 | | 1300 | | 0 | 1 | 0
20 | MANAGER | | | 0 | 0 | 1
20 | MANAGER | | | 0 | 0 | 1
20 | | 3000 | | 0 | 1 | 0
(6 rows)
```
Before this fix, DataFusion collapsed the duplicate `(deptno, job)`
grouping set and returned only 4 rows for the same query shape.
```text
+--------+---------+------+-----------------------------------+------------------------------------------+---------------------------------------+---------------------------------------+
| deptno | job | sal | sum(duplicate_grouping_sets.comm) | grouping(duplicate_grouping_sets.deptno) | grouping(duplicate_grouping_sets.job) | grouping(duplicate_grouping_sets.sal) |
+--------+---------+------+-----------------------------------+------------------------------------------+---------------------------------------+---------------------------------------+
| 10 | CLERK | NULL | NULL | 0 | 0 | 1 |
| 10 | NULL | 1300 | NULL | 0 | 1 | 0 |
| 20 | MANAGER | NULL | NULL | 0 | 0 | 1 |
| 20 | NULL | 3000 | NULL | 0 | 1 | 0 |
+--------+---------+------+-----------------------------------+------------------------------------------+---------------------------------------+---------------------------------------+
```
## What changes are included in this PR?
- Preserve duplicate grouping sets by packing a duplicate ordinal into
the high bits of `__grouping_id`, so repeated occurrences of the same
grouping set pattern produce distinct execution keys.
- `GROUPING()` now reads the actual `__grouping_id` column type directly
from the schema (via `Aggregate::grouping_id_type` rather than inferring
bit width from the count of grouping expressions alone. This ensures
bitmask literals are correctly sized when duplicate-ordinal bits widen
the column type beyond what the expression count would imply.
- `GROUPING()` masks off the ordinal bits before returning the result,
so the duplicate-ordinal encoding is invisible to user-facing SQL and
semantics remain unchanged.
- Add regression coverage for the duplicate `GROUPING SETS` case in:
- `datafusion/core/tests/sql/aggregates/basic.rs`
- `datafusion/sqllogictest/test_files/group_by.slt`
## Are these changes tested?
- `cargo fmt --all`
- `cargo test -p datafusion duplicate_grouping_sets_are_preserved`
- `cargo test -p datafusion-physical-plan
grouping_sets_preserve_duplicate_groups`
- `cargo test -p datafusion-physical-plan
evaluate_group_by_supports_duplicate_grouping_sets_with_eight_columns`
- PostgreSQL validation against the same query/result shape
## Are there any user-facing changes?
- Yes. Queries that contain duplicate `GROUPING SETS` entries now return
the correct duplicated result rows, matching PostgreSQL behavior.
---------
Co-authored-by: Andrew Lamb
---
datafusion/expr/src/logical_plan/plan.rs | 87 ++++++++++++++----
.../src/analyzer/resolve_grouping_function.rs | 49 ++++++----
.../physical-plan/src/aggregates/mod.rs | 91 ++++++++++++++++---
.../sqllogictest/test_files/group_by.slt | 35 +++++++
4 files changed, 212 insertions(+), 50 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 07e0eb1a77aa9..4f73169ad2827 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -45,7 +45,7 @@ use crate::utils::{
grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
};
use crate::{
- BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
+ BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, GroupingSet,
LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
};
@@ -3595,11 +3595,12 @@ impl Aggregate {
.into_iter()
.map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
.collect::>();
+ let max_ordinal = max_grouping_set_duplicate_ordinal(&group_expr);
qualified_fields.push((
None,
Field::new(
Self::INTERNAL_GROUPING_ID,
- Self::grouping_id_type(qualified_fields.len()),
+ Self::grouping_id_type(qualified_fields.len(), max_ordinal),
false,
)
.into(),
@@ -3685,15 +3686,24 @@ impl Aggregate {
}
/// Returns the data type of the grouping id.
- /// The grouping ID value is a bitmask where each set bit
- /// indicates that the corresponding grouping expression is
- /// null
- pub fn grouping_id_type(group_exprs: usize) -> DataType {
- if group_exprs <= 8 {
+ ///
+ /// The grouping ID packs two pieces of information into a single integer:
+ /// - The low `group_exprs` bits are the semantic bitmask (a set bit means the
+ /// corresponding grouping expression is NULL for this grouping set).
+ /// - The bits above position `group_exprs` encode a duplicate ordinal that
+ /// distinguishes multiple occurrences of the same grouping set pattern.
+ ///
+ /// `max_ordinal` is the highest ordinal value that will appear (0 when there
+ /// are no duplicate grouping sets). The type is chosen to be the smallest
+ /// unsigned integer that can represent both parts.
+ pub fn grouping_id_type(group_exprs: usize, max_ordinal: usize) -> DataType {
+ let ordinal_bits = usize::BITS as usize - max_ordinal.leading_zeros() as usize;
+ let total_bits = group_exprs + ordinal_bits;
+ if total_bits <= 8 {
DataType::UInt8
- } else if group_exprs <= 16 {
+ } else if total_bits <= 16 {
DataType::UInt16
- } else if group_exprs <= 32 {
+ } else if total_bits <= 32 {
DataType::UInt32
} else {
DataType::UInt64
@@ -3702,21 +3712,36 @@ impl Aggregate {
/// Internal column used when the aggregation is a grouping set.
///
- /// This column contains a bitmask where each bit represents a grouping
- /// expression. The least significant bit corresponds to the rightmost
- /// grouping expression. A bit value of 0 indicates that the corresponding
- /// column is included in the grouping set, while a value of 1 means it is excluded.
+ /// This column packs two values into a single unsigned integer:
+ ///
+ /// - **Low bits (positions 0 .. n-1)**: a semantic bitmask where each bit
+ /// represents one of the `n` grouping expressions. The least significant
+ /// bit corresponds to the rightmost grouping expression. A `1` bit means
+ /// the corresponding column is replaced with `NULL` for this grouping set;
+ /// a `0` bit means it is included.
+ /// - **High bits (positions n and above)**: a *duplicate ordinal* that
+ /// distinguishes multiple occurrences of the same semantic grouping set
+ /// pattern within a single query. The ordinal is `0` for the first
+ /// occurrence, `1` for the second, and so on.
+ ///
+ /// The integer type is chosen by [`Self::grouping_id_type`] to be the
+ /// smallest `UInt8 / UInt16 / UInt32 / UInt64` that can represent both
+ /// parts.
///
- /// For example, for the grouping expressions CUBE(a, b), the grouping ID
- /// column will have the following values:
+ /// For example, for the grouping expressions CUBE(a, b) (no duplicates),
+ /// the grouping ID column will have the following values:
/// 0b00: Both `a` and `b` are included
/// 0b01: `b` is excluded
/// 0b10: `a` is excluded
/// 0b11: Both `a` and `b` are excluded
///
- /// This internal column is necessary because excluded columns are replaced
- /// with `NULL` values. To handle these cases correctly, we must distinguish
- /// between an actual `NULL` value in a column and a column being excluded from the set.
+ /// When the same set appears twice and `n = 2`, the duplicate ordinal is
+ /// packed into bit 2:
+ /// first occurrence: `0b0_01` (ordinal = 0, mask = 0b01)
+ /// second occurrence: `0b1_01` (ordinal = 1, mask = 0b01)
+ ///
+ /// The GROUPING function always masks the value with `(1 << n) - 1` before
+ /// interpreting it so the ordinal bits are invisible to user-facing SQL.
pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
}
@@ -3737,6 +3762,24 @@ impl PartialOrd for Aggregate {
}
}
+/// Returns the highest duplicate ordinal across all grouping sets in `group_expr`.
+///
+/// The ordinal for each occurrence of a grouping set pattern is its 0-based
+/// index among identical entries. For example, if the same set appears three
+/// times, the ordinals are 0, 1, 2 and this function returns 2.
+/// Returns 0 when no grouping set is duplicated.
+fn max_grouping_set_duplicate_ordinal(group_expr: &[Expr]) -> usize {
+ if let Some(Expr::GroupingSet(GroupingSet::GroupingSets(sets))) = group_expr.first() {
+ let mut counts: HashMap<&[Expr], usize> = HashMap::new();
+ for set in sets {
+ *counts.entry(set).or_insert(0) += 1;
+ }
+ counts.into_values().max().unwrap_or(0).saturating_sub(1)
+ } else {
+ 0
+ }
+}
+
/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
fn contains_grouping_set(group_expr: &[Expr]) -> bool {
group_expr
@@ -5053,6 +5096,14 @@ mod tests {
);
}
+ #[test]
+ fn grouping_id_type_accounts_for_duplicate_ordinal_bits() {
+ // 8 grouping columns fit in UInt8 when there are no duplicate ordinals,
+ // but adding one duplicate ordinal bit widens the type to UInt16.
+ assert_eq!(Aggregate::grouping_id_type(8, 0), DataType::UInt8);
+ assert_eq!(Aggregate::grouping_id_type(8, 1), DataType::UInt16);
+ }
+
#[test]
fn test_filter_is_scalar() {
// test empty placeholder
diff --git a/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs b/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs
index 6b8ae3e8531bc..c12d7fd2ec2f6 100644
--- a/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs
+++ b/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs
@@ -99,10 +99,17 @@ fn replace_grouping_exprs(
{
match expr {
Expr::AggregateFunction(ref function) if is_grouping_function(&expr) => {
+ let grouping_id_type = is_grouping_set
+ .then(|| {
+ schema
+ .field_with_name(None, Aggregate::INTERNAL_GROUPING_ID)
+ .map(|f| f.data_type().clone())
+ })
+ .transpose()?;
let grouping_expr = grouping_function_on_id(
function,
&group_expr_to_bitmap_index,
- is_grouping_set,
+ grouping_id_type,
)?;
projection_exprs.push(Expr::Alias(Alias::new(
grouping_expr,
@@ -184,40 +191,44 @@ fn validate_args(
fn grouping_function_on_id(
function: &AggregateFunction,
group_by_expr: &HashMap<&Expr, usize>,
- is_grouping_set: bool,
+ // None means not a grouping set (result is always 0).
+ grouping_id_type: Option,
) -> Result {
validate_args(function, group_by_expr)?;
let args = &function.params.args;
// Postgres allows grouping function for group by without grouping sets, the result is then
// always 0
- if !is_grouping_set {
+ let Some(grouping_id_type) = grouping_id_type else {
return Ok(Expr::Literal(ScalarValue::from(0i32), None));
- }
-
- let group_by_expr_count = group_by_expr.len();
- let literal = |value: usize| {
- if group_by_expr_count < 8 {
- Expr::Literal(ScalarValue::from(value as u8), None)
- } else if group_by_expr_count < 16 {
- Expr::Literal(ScalarValue::from(value as u16), None)
- } else if group_by_expr_count < 32 {
- Expr::Literal(ScalarValue::from(value as u32), None)
- } else {
- Expr::Literal(ScalarValue::from(value as u64), None)
- }
};
+ // Use the actual __grouping_id column type to size literals correctly. This
+ // accounts for duplicate-ordinal bits that `Aggregate::grouping_id_type`
+ // packs into the high bits of the column, which a simple count of grouping
+ // expressions would miss.
+ let literal = |value: usize| match &grouping_id_type {
+ DataType::UInt8 => Expr::Literal(ScalarValue::from(value as u8), None),
+ DataType::UInt16 => Expr::Literal(ScalarValue::from(value as u16), None),
+ DataType::UInt32 => Expr::Literal(ScalarValue::from(value as u32), None),
+ DataType::UInt64 => Expr::Literal(ScalarValue::from(value as u64), None),
+ other => panic!("unexpected __grouping_id type: {other}"),
+ };
let grouping_id_column = Expr::Column(Column::from(Aggregate::INTERNAL_GROUPING_ID));
- // The grouping call is exactly our internal grouping id
- if args.len() == group_by_expr_count
+ if args.len() == group_by_expr.len()
&& args
.iter()
.rev()
.enumerate()
.all(|(idx, expr)| group_by_expr.get(expr) == Some(&idx))
{
- return Ok(cast(grouping_id_column, DataType::Int32));
+ let n = group_by_expr.len();
+ // Mask the ordinal bits above position `n` so only the semantic bitmask is visible.
+ // checked_shl returns None when n >= 64 (all bits are semantic), mapping to u64::MAX.
+ let semantic_mask: u64 = 1u64.checked_shl(n as u32).map_or(u64::MAX, |m| m - 1);
+ let masked_id =
+ bitwise_and(grouping_id_column.clone(), literal(semantic_mask as usize));
+ return Ok(cast(masked_id, DataType::Int32));
}
args.iter()
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs
index a3f0f568616e5..41782330c39da 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -37,7 +37,7 @@ use crate::{
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::utils::collect_columns;
use parking_lot::Mutex;
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
use arrow::array::{ArrayRef, UInt8Array, UInt16Array, UInt32Array, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -396,6 +396,15 @@ impl PhysicalGroupBy {
self.expr.len() + usize::from(self.has_grouping_set)
}
+ /// Returns the Arrow data type of the `__grouping_id` column.
+ ///
+ /// The type is chosen to be wide enough to hold both the semantic bitmask
+ /// (in the low `n` bits, where `n` is the number of grouping expressions)
+ /// and the duplicate ordinal (in the high bits).
+ fn grouping_id_data_type(&self) -> DataType {
+ Aggregate::grouping_id_type(self.expr.len(), max_duplicate_ordinal(&self.groups))
+ }
+
pub fn group_schema(&self, schema: &Schema) -> Result {
Ok(Arc::new(Schema::new(self.group_fields(schema)?)))
}
@@ -420,7 +429,7 @@ impl PhysicalGroupBy {
fields.push(
Field::new(
Aggregate::INTERNAL_GROUPING_ID,
- Aggregate::grouping_id_type(self.expr.len()),
+ self.grouping_id_data_type(),
false,
)
.into(),
@@ -2039,27 +2048,72 @@ fn evaluate_optional(
.collect()
}
-fn group_id_array(group: &[bool], batch: &RecordBatch) -> Result {
- if group.len() > 64 {
+/// Builds the internal `__grouping_id` array for a single grouping set.
+///
+/// The returned array packs two values into a single integer:
+///
+/// - Low `n` bits (positions 0 .. n-1): the semantic bitmask. A `1` bit
+/// at position `i` means that the `i`-th grouping column (counting from the
+/// least significant bit, i.e. the *last* column in the `group` slice) is
+/// `NULL` for this grouping set.
+/// - High bits (positions n and above): the duplicate `ordinal`, which
+/// distinguishes multiple occurrences of the same grouping-set pattern. The
+/// ordinal is `0` for the first occurrence, `1` for the second, and so on.
+///
+/// The integer type is chosen to be the smallest `UInt8 / UInt16 / UInt32 /
+/// UInt64` that can represent both parts. It matches the type returned by
+/// [`Aggregate::grouping_id_type`].
+fn group_id_array(
+ group: &[bool],
+ ordinal: usize,
+ max_ordinal: usize,
+ batch: &RecordBatch,
+) -> Result {
+ let n = group.len();
+ if n > 64 {
return not_impl_err!(
"Grouping sets with more than 64 columns are not supported"
);
}
- let group_id = group.iter().fold(0u64, |acc, &is_null| {
+ let ordinal_bits = usize::BITS as usize - max_ordinal.leading_zeros() as usize;
+ let total_bits = n + ordinal_bits;
+ if total_bits > 64 {
+ return not_impl_err!(
+ "Grouping sets with {n} columns and a maximum duplicate ordinal of \
+ {max_ordinal} require {total_bits} bits, which exceeds 64"
+ );
+ }
+ let semantic_id = group.iter().fold(0u64, |acc, &is_null| {
(acc << 1) | if is_null { 1 } else { 0 }
});
+ let full_id = semantic_id | ((ordinal as u64) << n);
let num_rows = batch.num_rows();
- if group.len() <= 8 {
- Ok(Arc::new(UInt8Array::from(vec![group_id as u8; num_rows])))
- } else if group.len() <= 16 {
- Ok(Arc::new(UInt16Array::from(vec![group_id as u16; num_rows])))
- } else if group.len() <= 32 {
- Ok(Arc::new(UInt32Array::from(vec![group_id as u32; num_rows])))
+ if total_bits <= 8 {
+ Ok(Arc::new(UInt8Array::from(vec![full_id as u8; num_rows])))
+ } else if total_bits <= 16 {
+ Ok(Arc::new(UInt16Array::from(vec![full_id as u16; num_rows])))
+ } else if total_bits <= 32 {
+ Ok(Arc::new(UInt32Array::from(vec![full_id as u32; num_rows])))
} else {
- Ok(Arc::new(UInt64Array::from(vec![group_id; num_rows])))
+ Ok(Arc::new(UInt64Array::from(vec![full_id; num_rows])))
}
}
+/// Returns the highest duplicate ordinal across all grouping sets.
+///
+/// At the call-site, the ordinal is the 0-based index assigned to each
+/// occurrence of a repeated grouping-set pattern: the first occurrence gets
+/// ordinal 0, the second gets 1, and so on. If the same `Vec` appears
+/// three times the ordinals are 0, 1, 2 and this function returns 2.
+/// Returns 0 when no grouping set is duplicated.
+fn max_duplicate_ordinal(groups: &[Vec]) -> usize {
+ let mut counts: HashMap<&[bool], usize> = HashMap::new();
+ for group in groups {
+ *counts.entry(group).or_insert(0) += 1;
+ }
+ counts.into_values().max().unwrap_or(0).saturating_sub(1)
+}
+
/// Evaluate a group by expression against a `RecordBatch`
///
/// Arguments:
@@ -2074,6 +2128,8 @@ pub fn evaluate_group_by(
group_by: &PhysicalGroupBy,
batch: &RecordBatch,
) -> Result>> {
+ let max_ordinal = max_duplicate_ordinal(&group_by.groups);
+ let mut ordinal_per_pattern: HashMap<&[bool], usize> = HashMap::new();
let exprs = evaluate_expressions_to_arrays(
group_by.expr.iter().map(|(expr, _)| expr),
batch,
@@ -2087,6 +2143,10 @@ pub fn evaluate_group_by(
.groups
.iter()
.map(|group| {
+ let ordinal = ordinal_per_pattern.entry(group).or_insert(0);
+ let current_ordinal = *ordinal;
+ *ordinal += 1;
+
let mut group_values = Vec::with_capacity(group_by.num_group_exprs());
group_values.extend(group.iter().enumerate().map(|(idx, is_null)| {
if *is_null {
@@ -2096,7 +2156,12 @@ pub fn evaluate_group_by(
}
}));
if !group_by.is_single() {
- group_values.push(group_id_array(group, batch)?);
+ group_values.push(group_id_array(
+ group,
+ current_ordinal,
+ max_ordinal,
+ batch,
+ )?);
}
Ok(group_values)
})
diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt
index 59db63ba420e9..b313424951532 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -5203,6 +5203,41 @@ NULL NULL 1
statement ok
drop table t;
+# regression: duplicate grouping sets must not be collapsed into one
+statement ok
+create table duplicate_grouping_sets(deptno int, job varchar, sal int, comm int) as values
+(10, 'CLERK', 1300, null),
+(20, 'MANAGER', 3000, null);
+
+query ITIIIII
+select deptno, job, sal, sum(comm), grouping(deptno), grouping(job), grouping(sal)
+from duplicate_grouping_sets
+group by grouping sets ((deptno, job), (deptno, sal), (deptno, job))
+order by deptno, job, sal, grouping(deptno), grouping(job), grouping(sal);
+----
+10 CLERK NULL NULL 0 0 1
+10 CLERK NULL NULL 0 0 1
+10 NULL 1300 NULL 0 1 0
+20 MANAGER NULL NULL 0 0 1
+20 MANAGER NULL NULL 0 0 1
+20 NULL 3000 NULL 0 1 0
+
+query ITII
+select deptno, job, sal, grouping(deptno, job, sal)
+from duplicate_grouping_sets
+group by grouping sets ((deptno, job), (deptno, sal), (deptno, job))
+order by deptno, job, sal, grouping(deptno, job, sal);
+----
+10 CLERK NULL 1
+10 CLERK NULL 1
+10 NULL 1300 2
+20 MANAGER NULL 1
+20 MANAGER NULL 1
+20 NULL 3000 2
+
+statement ok
+drop table duplicate_grouping_sets;
+
# test multi group by for binary type without nulls
statement ok
create table t(a int, b bytea) as values (1, 0xa), (1, 0xa), (2, 0xb), (3, 0xb), (3, 0xb);
From 4b1901f592b1db054807fa9857389630f1f1a2eb Mon Sep 17 00:00:00 2001
From: Subham Singhal
Date: Thu, 9 Apr 2026 01:40:52 +0530
Subject: [PATCH 3/4] Eliminate outer joins with empty relations via
null-padded projection (#21321)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
## Which issue does this PR close?
- Closes https://github.com/apache/datafusion/issues/21320
### Rationale for this change
When one side of a LEFT/RIGHT/FULL outer join is an EmptyRelation, the
current PropagateEmptyRelation optimizer rule leaves the join untouched.
This means the engine still builds a hash table for the empty side,
probes every row from the non-empty side, finds zero matches, and pads
NULLs — all wasted work.
The TODO at lines 76-80 of propagate_empty_relation.rs explicitly called
out this gap:
```
// TODO: For LeftOut/Full Join, if the right side is empty, the Join can be eliminated
// with a Projection with left side columns + right side columns replaced with null values.
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated
// with a Projection with right side columns + left side columns replaced with null values.
```
### What changes are included in this PR?
Extends the PropagateEmptyRelation rule to handle 4 previously
unoptimized cases by replacing the join with a Projection that null-pads
the empty side's columns:
### Are these changes tested?
Yes. 4 new unit tests added:
### Are there any user-facing changes?
No API changes.
---------
Co-authored-by: Subham Singhal
Co-authored-by: Dmitrii Blaginin
---
.../optimizer/src/propagate_empty_relation.rs | 204 +++++++++++++++++-
.../propagate_empty_relation_outer_join.slt | 155 +++++++++++++
.../sqllogictest/test_files/subquery.slt | 6 +-
3 files changed, 354 insertions(+), 11 deletions(-)
create mode 100644 datafusion/sqllogictest/test_files/propagate_empty_relation_outer_join.slt
diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs
index da18d9071869c..6565d4f187339 100644
--- a/datafusion/optimizer/src/propagate_empty_relation.rs
+++ b/datafusion/optimizer/src/propagate_empty_relation.rs
@@ -21,9 +21,9 @@ use std::sync::Arc;
use datafusion_common::JoinType;
use datafusion_common::tree_node::Transformed;
-use datafusion_common::{Result, plan_err};
+use datafusion_common::{Column, DFSchemaRef, Result, ScalarValue, plan_err};
use datafusion_expr::logical_plan::LogicalPlan;
-use datafusion_expr::{EmptyRelation, Projection, Union};
+use datafusion_expr::{EmptyRelation, Expr, Projection, Union, cast, lit};
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
@@ -73,12 +73,8 @@ impl OptimizerRule for PropagateEmptyRelation {
Ok(Transformed::no(plan))
}
LogicalPlan::Join(ref join) => {
- // TODO: For Join, more join type need to be careful:
- // For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
- // columns + right side columns replaced with null values.
- // For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
- // columns + left side columns replaced with null values.
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
+ let left_field_count = join.left.schema().fields().len();
match join.join_type {
// For Full Join, only both sides are empty, the Join result is empty.
@@ -88,6 +84,24 @@ impl OptimizerRule for PropagateEmptyRelation {
schema: Arc::clone(&join.schema),
}),
)),
+ // For Full Join, if one side is empty, replace with a
+ // Projection that null-pads the empty side's columns.
+ JoinType::Full if right_empty => {
+ Ok(Transformed::yes(build_null_padded_projection(
+ Arc::clone(&join.left),
+ &join.schema,
+ left_field_count,
+ true,
+ )?))
+ }
+ JoinType::Full if left_empty => {
+ Ok(Transformed::yes(build_null_padded_projection(
+ Arc::clone(&join.right),
+ &join.schema,
+ left_field_count,
+ false,
+ )?))
+ }
JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
@@ -100,12 +114,32 @@ impl OptimizerRule for PropagateEmptyRelation {
schema: Arc::clone(&join.schema),
}),
)),
+ // Left Join with empty right: all left rows survive
+ // with NULLs for right columns.
+ JoinType::Left if right_empty => {
+ Ok(Transformed::yes(build_null_padded_projection(
+ Arc::clone(&join.left),
+ &join.schema,
+ left_field_count,
+ true,
+ )?))
+ }
JoinType::Right if right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::clone(&join.schema),
}),
)),
+ // Right Join with empty left: all right rows survive
+ // with NULLs for left columns.
+ JoinType::Right if left_empty => {
+ Ok(Transformed::yes(build_null_padded_projection(
+ Arc::clone(&join.right),
+ &join.schema,
+ left_field_count,
+ false,
+ )?))
+ }
JoinType::LeftSemi if left_empty || right_empty => Ok(
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
@@ -230,6 +264,57 @@ fn empty_child(plan: &LogicalPlan) -> Result