Skip to content

Commit 16e578d

Browse files
authored
Unify cast handling by removing CastColumnExpr branches in pruning and ordering equivalence (apache#21545)
## Which issue does this PR close? * Part of apache#20164 --- ## Rationale for this change Downstream components in DataFusion currently branch on both `CastExpr` and `CastColumnExpr`, even though they represent equivalent semantics after recent cast unification work. This duplication increases maintenance burden, introduces unnecessary complexity, and raises the risk of inconsistent behavior when evolving cast-related logic. This PR simplifies downstream logic by collapsing these dual branches into a single `CastExpr`-based handling path, aligning with the broader cast unification effort. --- ## What changes are included in this PR? * **Removed `CastColumnExpr` handling in downstream logic**: * Eliminated branching specific to `CastColumnExpr` in: * `equivalence/properties/mod.rs` * `pruning/pruning_predicate.rs` * **Unified cast substitution logic in ordering equivalence**: * Replaced `substitute_cast_like_ordering` with `substitute_cast_ordering` * Simplified implementation to only consider `CastExpr` * Preserved correctness by ensuring: * Exact child expression match * Order-preserving (widening) casts only * **Refactored pruning rewrite logic**: * Introduced helper: `rewrite_cast_child_to_prunable` * Removed duplicated validation and recursion logic across cast types * Updated both `CastExpr` and `TryCastExpr` handling to reuse the helper * **Test updates and cleanup**: * Updated tests to use `CastExpr::new_with_target_field` * Simplified test cases by removing vector-based equality conditions * Removed obsolete test covering `CastColumnExpr` * Renamed pruning test to reflect unified cast handling * **Minor code cleanups**: * Removed unused imports * Simplified iterator chains and substitution logic --- ## Are these changes tested? Yes. * Existing pruning and equivalence tests were updated to reflect the unified cast handling. * Obsolete tests relying on `CastColumnExpr` were removed. * Test cases were adjusted to ensure behavior remains unchanged, including: * Ordering equivalence with cast substitutions * Predicate pruning with cast expressions These updates ensure no regression in behavior while validating the simplified implementation. --- ## Are there any user-facing changes? No. This change is an internal refactor and does not modify user-facing APIs or query behavior. It preserves existing semantics while improving maintainability. --- ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
1 parent 0ab78e7 commit 16e578d

3 files changed

Lines changed: 80 additions & 130 deletions

File tree

datafusion/physical-expr/src/equivalence/properties/dependency.rs

Lines changed: 18 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ mod tests {
389389
convert_to_sort_reqs, create_test_params, create_test_schema, parse_sort_expr,
390390
};
391391
use crate::equivalence::{ProjectionMapping, convert_to_sort_exprs};
392-
use crate::expressions::{BinaryExpr, CastColumnExpr, CastExpr, Column, col};
392+
use crate::expressions::{BinaryExpr, CastExpr, Column, col};
393393
use crate::projection::tests::output_schema;
394394
use crate::{ConstExpr, EquivalenceProperties, ScalarFunctionExpr};
395395

@@ -931,42 +931,43 @@ mod tests {
931931
struct TestCase {
932932
name: &'static str,
933933
constants: Vec<Arc<dyn PhysicalExpr>>,
934-
equal_conditions: Vec<[Arc<dyn PhysicalExpr>; 2]>,
935-
sort_columns: &'static [&'static str],
934+
equal_condition: [Arc<dyn PhysicalExpr>; 2],
936935
should_satisfy_ordering: bool,
937936
}
938937

939938
let col_a = col("a", schema.as_ref())?;
940939
let col_b = col("b", schema.as_ref())?;
941940
let col_c = col("c", schema.as_ref())?;
942-
let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None)) as _;
941+
let cast_c = Arc::new(CastExpr::new_with_target_field(
942+
col_c,
943+
Arc::new(Field::new("c", DataType::Date32, true)),
944+
None,
945+
)) as _;
946+
let required_sort = vec![PhysicalSortExpr::new_default(col("c", &schema)?)];
943947

944948
let cases = vec![
945949
TestCase {
946-
name: "(a, b, c) -> (c)",
950+
name: "cast_c = a",
947951
// b is constant, so it should be removed from the sort order
948952
constants: vec![Arc::clone(&col_b)],
949-
equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]],
950-
sort_columns: &["c"],
953+
equal_condition: [Arc::clone(&cast_c), Arc::clone(&col_a)],
951954
should_satisfy_ordering: true,
952955
},
953956
// Same test with above test, where equality order is swapped.
954957
// Algorithm shouldn't depend on this order.
955958
TestCase {
956-
name: "(a, b, c) -> (c)",
959+
name: "a = cast_c",
957960
// b is constant, so it should be removed from the sort order
958961
constants: vec![col_b],
959-
equal_conditions: vec![[Arc::clone(&col_a), Arc::clone(&cast_c)]],
960-
sort_columns: &["c"],
962+
equal_condition: [Arc::clone(&col_a), Arc::clone(&cast_c)],
961963
should_satisfy_ordering: true,
962964
},
963965
TestCase {
964966
name: "not ordered because (b) is not constant",
965967
// b is not constant anymore
966968
constants: vec![],
967969
// a and c are still compatible, but this is irrelevant since the original ordering is (a, b, c)
968-
equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]],
969-
sort_columns: &["c"],
970+
equal_condition: [Arc::clone(&cast_c), Arc::clone(&col_a)],
970971
should_satisfy_ordering: false,
971972
},
972973
];
@@ -979,9 +980,8 @@ mod tests {
979980
// Equal conditions before constants
980981
{
981982
let mut properties = base_properties.clone();
982-
for [left, right] in case.equal_conditions.clone() {
983-
properties.add_equal_conditions(left, right)?
984-
}
983+
let [left, right] = case.equal_condition.clone();
984+
properties.add_equal_conditions(left, right)?;
985985
properties.add_constants(
986986
case.constants.iter().cloned().map(ConstExpr::from),
987987
)?;
@@ -993,20 +993,13 @@ mod tests {
993993
properties.add_constants(
994994
case.constants.iter().cloned().map(ConstExpr::from),
995995
)?;
996-
for [left, right] in case.equal_conditions {
997-
properties.add_equal_conditions(left, right)?
998-
}
996+
let [left, right] = case.equal_condition;
997+
properties.add_equal_conditions(left, right)?;
999998
properties
1000999
},
10011000
] {
1002-
let sort = case
1003-
.sort_columns
1004-
.iter()
1005-
.map(|&name| col(name, &schema).map(PhysicalSortExpr::new_default))
1006-
.collect::<Result<Vec<_>>>()?;
1007-
10081001
assert_eq!(
1009-
properties.ordering_satisfy(sort)?,
1002+
properties.ordering_satisfy(required_sort.clone())?,
10101003
case.should_satisfy_ordering,
10111004
"failed test '{}'",
10121005
case.name
@@ -1017,44 +1010,6 @@ mod tests {
10171010
Ok(())
10181011
}
10191012

1020-
#[test]
1021-
fn test_eliminate_redundant_monotonic_sorts_cast_column_expr() -> Result<()> {
1022-
let schema = Arc::new(Schema::new(vec![
1023-
Field::new("a", DataType::Date32, true),
1024-
Field::new("b", DataType::Utf8, true),
1025-
Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1026-
]));
1027-
let mut properties = EquivalenceProperties::new(Arc::clone(&schema));
1028-
properties.reorder(
1029-
["a", "b", "c"]
1030-
.into_iter()
1031-
.map(|c| PhysicalSortExpr::new_default(col(c, schema.as_ref()).unwrap())),
1032-
)?;
1033-
1034-
let col_a = col("a", schema.as_ref())?;
1035-
let col_b = col("b", schema.as_ref())?;
1036-
let col_c = col("c", schema.as_ref())?;
1037-
1038-
let cast_c = Arc::new(CastColumnExpr::new(
1039-
Arc::clone(&col_c),
1040-
Arc::new(Field::new(
1041-
"c",
1042-
DataType::Timestamp(TimeUnit::Nanosecond, None),
1043-
true,
1044-
)),
1045-
Arc::new(Field::new("c", DataType::Date32, true)),
1046-
None,
1047-
)) as Arc<dyn PhysicalExpr>;
1048-
1049-
properties.add_equal_conditions(cast_c, Arc::clone(&col_a))?;
1050-
properties.add_constants(std::iter::once(ConstExpr::from(col_b)))?;
1051-
1052-
let required = vec![PhysicalSortExpr::new_default(col("c", &schema)?)];
1053-
assert!(properties.ordering_satisfy(required)?);
1054-
1055-
Ok(())
1056-
}
1057-
10581013
#[test]
10591014
fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> {
10601015
let schema = Arc::new(Schema::new(vec![

datafusion/physical-expr/src/equivalence/properties/mod.rs

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use self::dependency::{
3333
use crate::equivalence::{
3434
AcrossPartitions, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
3535
};
36-
use crate::expressions::{CastColumnExpr, CastExpr, Column, Literal, with_new_schema};
36+
use crate::expressions::{CastExpr, Column, Literal, with_new_schema};
3737
use crate::{
3838
ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
3939
PhysicalSortRequirement,
@@ -196,35 +196,23 @@ impl OrderingEquivalenceCache {
196196

197197
impl EquivalenceProperties {
198198
/// Helper used by the ordering equivalence rule when considering whether a
199-
/// cast-bearing expression can replace an existing sort key without invalidating
200-
/// the ordering.
199+
/// cast-bearing expression can replace an existing sort key without
200+
/// invalidating the ordering.
201201
///
202-
/// This function handles *both* `CastExpr` (generic cast) and
203-
/// `CastColumnExpr` (field-aware cast) because the planner may introduce either
204-
/// form during rewrite steps; the core logic is the same in both cases. The
205-
/// substitution is only allowed when the cast wraps **the very same child
206-
/// expression** that the original sort used (an exact-child-match invariant),
207-
/// and the casted type must be a widening/order-preserving conversion
208-
/// `CastExpr::check_bigger_cast(...)` ensures. Without those restrictions the
209-
/// existing sort order could be violated (e.g. a narrowing cast could collapse
210-
/// distinct values together).
211-
fn substitute_cast_like_ordering(
202+
/// The substitution is only allowed when the cast wraps the very same child
203+
/// expression that the original sort used and the casted type is a
204+
/// widening/order-preserving conversion. Without those restrictions, a
205+
/// narrowing cast could collapse distinct values and violate the existing
206+
/// sort order.
207+
fn substitute_cast_ordering(
212208
r_expr: Arc<dyn PhysicalExpr>,
213209
sort_expr: &PhysicalSortExpr,
214210
expr_type: &DataType,
215211
) -> Option<PhysicalSortExpr> {
216-
let (child_expr, cast_type) = if let Some(cast_expr) =
217-
r_expr.as_any().downcast_ref::<CastExpr>()
218-
{
219-
(cast_expr.expr(), cast_expr.cast_type())
220-
} else if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastColumnExpr>() {
221-
(cast_expr.expr(), cast_expr.target_field().data_type())
222-
} else {
223-
return None;
224-
};
212+
let cast_expr = r_expr.as_any().downcast_ref::<CastExpr>()?;
225213

226-
(child_expr.eq(&sort_expr.expr)
227-
&& CastExpr::check_bigger_cast(cast_type, expr_type))
214+
(cast_expr.expr().eq(&sort_expr.expr)
215+
&& CastExpr::check_bigger_cast(cast_expr.cast_type(), expr_type))
228216
.then(|| PhysicalSortExpr::new(r_expr, sort_expr.options))
229217
}
230218

@@ -866,25 +854,25 @@ impl EquivalenceProperties {
866854
order
867855
.into_iter()
868856
.map(|sort_expr| {
869-
let referring_exprs = mapping
870-
.iter()
871-
.map(|(source, _target)| source)
872-
.filter(|source| expr_refers(source, &sort_expr.expr))
873-
.cloned();
874-
let mut result = vec![];
875857
// The sort expression comes from this schema, so the
876858
// following call to `unwrap` is safe.
877859
let expr_type = sort_expr.expr.data_type(schema).unwrap();
860+
let original_sort_expr = sort_expr.clone();
878861
// TODO: Add one-to-one analysis for ScalarFunctions.
879-
for r_expr in referring_exprs {
880-
if let Some(substituted) = Self::substitute_cast_like_ordering(
881-
r_expr, &sort_expr, &expr_type,
882-
) {
883-
result.push(substituted);
884-
}
885-
}
886-
result.push(sort_expr);
887-
result
862+
mapping
863+
.iter()
864+
.map(|(source, _target)| source)
865+
.filter(|source| expr_refers(source, &original_sort_expr.expr))
866+
.cloned()
867+
.filter_map(|r_expr| {
868+
Self::substitute_cast_ordering(
869+
r_expr,
870+
&original_sort_expr,
871+
&expr_type,
872+
)
873+
})
874+
.chain(std::iter::once(sort_expr))
875+
.collect::<Vec<_>>()
888876
})
889877
// Generate all valid orderings given substituted expressions:
890878
.multi_cartesian_product()

datafusion/pruning/src/pruning_predicate.rs

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use datafusion_common::{
4242
tree_node::{Transformed, TreeNode},
4343
};
4444
use datafusion_expr_common::operator::Operator;
45-
use datafusion_physical_expr::expressions::CastColumnExpr;
4645
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
4746
use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr};
4847
use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt;
@@ -1124,40 +1123,34 @@ fn rewrite_expr_to_prunable(
11241123
Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
11251124
} else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
11261125
// `cast(col) op lit()`
1127-
let arrow_schema = schema.as_arrow();
1128-
let from_type = cast.expr().data_type(arrow_schema)?;
1129-
verify_support_type_for_prune(&from_type, cast.cast_type())?;
1130-
let (left, op, right) =
1131-
rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1132-
let left = Arc::new(phys_expr::CastExpr::new(
1126+
let (left, op, right) = rewrite_cast_child_to_prunable(
1127+
cast.expr(),
1128+
cast.cast_type(),
1129+
op,
1130+
scalar_expr,
1131+
schema,
1132+
)?;
1133+
let left = Arc::new(phys_expr::CastExpr::new_with_target_field(
11331134
left,
1134-
cast.cast_type().clone(),
1135+
Arc::clone(cast.target_field()),
11351136
None,
11361137
));
1137-
Ok((left, op, right))
1138-
} else if let Some(cast_col) = column_expr_any.downcast_ref::<CastColumnExpr>() {
1139-
// `cast_column(col) op lit()` - same as CastExpr but uses CastColumnExpr
1140-
let arrow_schema = schema.as_arrow();
1141-
let from_type = cast_col.expr().data_type(arrow_schema)?;
1142-
let to_type = cast_col.target_field().data_type();
1143-
verify_support_type_for_prune(&from_type, to_type)?;
1144-
let (left, op, right) =
1145-
rewrite_expr_to_prunable(cast_col.expr(), op, scalar_expr, schema)?;
1146-
// Predicate pruning / statistics generally don't support struct columns yet.
1147-
// In the future we may want to support pruning on nested fields, in which case we probably need to
1148-
// do something more sophisticated here.
1149-
// But for now since we don't support pruning on nested fields, we can just cast to the target type directly.
1150-
let left = Arc::new(phys_expr::CastExpr::new(left, to_type.clone(), None));
1138+
// PruningPredicate does not support pruning on nested fields yet.
1139+
// End-to-end nested-field pruning also requires Parquet statistics
1140+
// extraction to agree with PruningPredicate on a stats representation
1141+
// for nested field expressions.
11511142
Ok((left, op, right))
11521143
} else if let Some(try_cast) =
11531144
column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
11541145
{
11551146
// `try_cast(col) op lit()`
1156-
let arrow_schema = schema.as_arrow();
1157-
let from_type = try_cast.expr().data_type(arrow_schema)?;
1158-
verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1159-
let (left, op, right) =
1160-
rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1147+
let (left, op, right) = rewrite_cast_child_to_prunable(
1148+
try_cast.expr(),
1149+
try_cast.cast_type(),
1150+
op,
1151+
scalar_expr,
1152+
schema,
1153+
)?;
11611154
let left = Arc::new(phys_expr::TryCastExpr::new(
11621155
left,
11631156
try_cast.cast_type().clone(),
@@ -1191,6 +1184,20 @@ fn rewrite_expr_to_prunable(
11911184
}
11921185
}
11931186

1187+
fn rewrite_cast_child_to_prunable(
1188+
cast_child_expr: &PhysicalExprRef,
1189+
cast_type: &DataType,
1190+
op: Operator,
1191+
scalar_expr: &PhysicalExprRef,
1192+
schema: DFSchema,
1193+
) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1194+
verify_support_type_for_prune(
1195+
&cast_child_expr.data_type(schema.as_arrow())?,
1196+
cast_type,
1197+
)?;
1198+
rewrite_expr_to_prunable(cast_child_expr, op, scalar_expr, schema)
1199+
}
1200+
11941201
fn is_compare_op(op: Operator) -> bool {
11951202
matches!(
11961203
op,
@@ -4192,7 +4199,7 @@ mod tests {
41924199
}
41934200

41944201
#[test]
4195-
fn prune_cast_column_scalar() {
4202+
fn prune_cast_scalar() {
41964203
// The data type of column i is INT32
41974204
let (schema, statistics) = int32_setup();
41984205
let expected_ret = &[true, true, false, true, true];

0 commit comments

Comments
 (0)