Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/audit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ jobs:
- name: Run audit check
# Note: you can ignore specific RUSTSEC issues using the `--ignore` flag ,for example:
# run: cargo audit --ignore RUSTSEC-2026-0001
run: cargo audit --ignore RUSTSEC-2024-0014
run: cargo audit
15 changes: 7 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl PhysicalExprAdapter for CustomCastsPhysicalExprAdapter {
// kernel / expression. For example, DataFusion Comet has a custom cast
// kernel in its native Spark expression implementation.
expr.transform(|expr| {
if let Some(cast) = expr.as_any().downcast_ref::<CastExpr>() {
if let Some(cast) = expr.downcast_ref::<CastExpr>() {
let input_data_type =
cast.expr().data_type(&self.physical_file_schema)?;
let output_data_type = cast.target_field().data_type();
Expand Down
10 changes: 3 additions & 7 deletions datafusion-examples/examples/data_io/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,20 +311,16 @@ impl ShreddedJsonRewriter {
expr: Arc<dyn PhysicalExpr>,
physical_file_schema: &Schema,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(func) = expr.as_any().downcast_ref::<ScalarFunctionExpr>()
if let Some(func) = expr.downcast_ref::<ScalarFunctionExpr>()
&& func.name() == "json_get_str"
&& func.args().len() == 2
{
// Get the key from the first argument
if let Some(literal) = func.args()[0]
.as_any()
.downcast_ref::<expressions::Literal>()
if let Some(literal) = func.args()[0].downcast_ref::<expressions::Literal>()
&& let ScalarValue::Utf8(Some(field_name)) = literal.value()
{
// Get the column from the second argument
if let Some(column) = func.args()[1]
.as_any()
.downcast_ref::<expressions::Column>()
if let Some(column) = func.args()[1].downcast_ref::<expressions::Column>()
{
let column_name = column.name();
// Check if there's a flat column with underscore prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ pub async fn expression_deduplication() -> Result<()> {
};
let predicate = Arc::clone(filter_exec.predicate());
let binary_expr = predicate
.as_any()
.downcast_ref::<BinaryExpr>()
.expect("Predicate is not a BinaryExpr");
let left = &binary_expr.left();
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ bytes = { workspace = true }
env_logger = { workspace = true }
glob = { workspace = true }
insta = { workspace = true }
pretty_assertions = "1.0"
rand = { workspace = true, features = ["small_rng"] }
rand_distr = "0.5"
recursive = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ mod tests {
impl PhysicalExprAdapter for TestPhysicalExprAdapter {
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
expr.transform(|e| {
if let Some(column) = e.as_any().downcast_ref::<Column>() {
if let Some(column) = e.downcast_ref::<Column>() {
// If column is "extra_column" and missing from physical schema, inject "foo"
if column.name() == "extra_column"
&& self.physical_file_schema.index_of("extra_column").is_err()
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/fuzz_cases/equivalence/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn output_schema(
let data_type = source.data_type(input_schema)?;
let nullable = source.nullable(input_schema)?;
for (target, _) in targets.iter() {
let Some(column) = target.as_any().downcast_ref::<Column>() else {
let Some(column) = target.downcast_ref::<Column>() else {
return plan_err!("Expects to have column");
};
fields.push(Field::new(column.name(), data_type.clone(), nullable));
Expand Down Expand Up @@ -282,7 +282,7 @@ fn get_representative_arr(
schema: SchemaRef,
) -> Option<ArrayRef> {
for expr in eq_group.iter() {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let col = expr.downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
if let Some(res) = &existing_vec[idx] {
return Some(Arc::clone(res));
Expand Down Expand Up @@ -370,7 +370,7 @@ pub fn generate_table_for_eq_properties(

// Fill constant columns
for constant in eq_properties.constants() {
let col = constant.expr.as_any().downcast_ref::<Column>().unwrap();
let col = constant.expr.downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr =
Arc::new(Float64Array::from_iter_values(vec![0 as f64; n_elem])) as ArrayRef;
Expand All @@ -382,7 +382,7 @@ pub fn generate_table_for_eq_properties(
let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
.iter()
.map(|PhysicalSortExpr { expr, options }| {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let col = expr.downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr = generate_random_array(n_elem, n_distinct);
(
Expand All @@ -408,7 +408,7 @@ pub fn generate_table_for_eq_properties(
.unwrap_or_else(|| generate_random_array(n_elem, n_distinct));

for expr in eq_group.iter() {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let col = expr.downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
schema_vec[idx] = Some(Arc::clone(&representative_array));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ mod sp_repartition_fuzz_tests {
schema: SchemaRef,
) -> Option<ArrayRef> {
for expr in eq_group.iter() {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let col = expr.downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
if let Some(res) = &existing_vec[idx] {
return Some(res.clone());
Expand Down Expand Up @@ -149,7 +149,7 @@ mod sp_repartition_fuzz_tests {

// Fill constant columns
for constant in eq_properties.constants() {
let col = constant.expr.as_any().downcast_ref::<Column>().unwrap();
let col = constant.expr.downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr =
Arc::new(UInt64Array::from_iter_values(vec![0; n_elem])) as ArrayRef;
Expand All @@ -161,7 +161,7 @@ mod sp_repartition_fuzz_tests {
let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
.iter()
.map(|PhysicalSortExpr { expr, options }| {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let col = expr.downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
let arr = generate_random_array(n_elem, n_distinct);
(
Expand All @@ -187,7 +187,7 @@ mod sp_repartition_fuzz_tests {
.unwrap_or_else(|| generate_random_array(n_elem, n_distinct));

for expr in eq_group.iter() {
let col = expr.as_any().downcast_ref::<Column>().unwrap();
let col = expr.downcast_ref::<Column>().unwrap();
let (idx, _field) = schema.column_with_name(col.name()).unwrap();
schema_vec[idx] = Some(representative_array.clone());
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/expr_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ impl PhysicalExprAdapter for CustomPhysicalExprAdapter {
fn rewrite(&self, mut expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
expr = expr
.transform(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if let Some(column) = expr.downcast_ref::<Column>() {
let field_name = column.name();
if self
.physical_file_schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2959,7 +2959,7 @@ async fn test_discover_dynamic_filters_via_expressions_api() {

// Check expressions from this node using apply_expressions
let _ = plan.apply_expressions(&mut |expr| {
if let Some(_df) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
if let Some(_df) = expr.downcast_ref::<DynamicFilterPhysicalExpr>() {
count += 1;
}
Ok(TreeNodeRecursion::Continue)
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/tests/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,6 @@ async fn test_hash_join_swap_on_joins_with_projections(

fn assert_col_expr(expr: &Arc<dyn PhysicalExpr>, name: &str, index: usize) {
let col = expr
.as_any()
.downcast_ref::<Column>()
.expect("Projection items should be Column expression");
assert_eq!(col.name(), name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1127,8 +1127,8 @@ fn hash_join_exec(
) -> Arc<dyn ExecutionPlan> {
let left_on = col("c", &left.schema()).unwrap();
let right_on = col("c", &right.schema()).unwrap();
let left_col = left_on.as_any().downcast_ref::<Column>().unwrap();
let right_col = right_on.as_any().downcast_ref::<Column>().unwrap();
let left_col = left_on.downcast_ref::<Column>().unwrap();
let right_col = right_on.downcast_ref::<Column>().unwrap();
Arc::new(
HashJoinExec::try_new(
left,
Expand Down
16 changes: 6 additions & 10 deletions datafusion/datasource-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,17 +713,13 @@ impl FileMetadata for CachedParquetMetaData {
pub(crate) fn sort_expr_to_sorting_column(
sort_expr: &PhysicalSortExpr,
) -> Result<SortingColumn> {
let column = sort_expr
.expr
.as_any()
.downcast_ref::<Column>()
.ok_or_else(|| {
DataFusionError::Plan(format!(
"Parquet sorting_columns only supports simple column references, \
let column = sort_expr.expr.downcast_ref::<Column>().ok_or_else(|| {
DataFusionError::Plan(format!(
"Parquet sorting_columns only supports simple column references, \
but got expression: {}",
sort_expr.expr
))
})?;
sort_expr.expr
))
})?;

let column_idx: i32 = column.index().try_into().map_err(|_| {
DataFusionError::Plan(format!(
Expand Down
6 changes: 3 additions & 3 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1916,8 +1916,8 @@ mod test {
.try_map_exprs(|expr| replace_columns_with_literals(expr, &constants))
.unwrap();
let exprs = rewritten.as_ref();
assert!(exprs[0].expr.as_any().downcast_ref::<Literal>().is_some());
assert!(exprs[1].expr.as_any().downcast_ref::<Column>().is_some());
assert!(exprs[0].expr.downcast_ref::<Literal>().is_some());
assert!(exprs[1].expr.downcast_ref::<Column>().is_some());

// Only column `b` should remain in the projection mask
assert_eq!(rewritten.column_indices(), vec![1]);
Expand All @@ -1930,7 +1930,7 @@ mod test {
let expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));

let rewritten = replace_columns_with_literals(expr, &constants).unwrap();
assert!(rewritten.as_any().downcast_ref::<Literal>().is_some());
assert!(rewritten.downcast_ref::<Literal>().is_some());
}

async fn count_batches_and_rows(
Expand Down
15 changes: 5 additions & 10 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,7 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
{
let args = func.args();

if let Some(column) = args
.first()
.and_then(|a| a.as_any().downcast_ref::<Column>())
{
if let Some(column) = args.first().and_then(|a| a.downcast_ref::<Column>()) {
// for Map columns, get_field performs a runtime key lookup rather than a
// schema-level field access so the entire Map column must be read,
// we skip the struct field optimization and defer to normal Column traversal
Expand Down Expand Up @@ -451,7 +448,7 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
let field_path = args[1..]
.iter()
.map(|arg| {
arg.as_any().downcast_ref::<Literal>().and_then(|lit| {
arg.downcast_ref::<Literal>().and_then(|lit| {
lit.value().try_as_str().flatten().map(|s| s.to_string())
})
})
Expand Down Expand Up @@ -481,7 +478,7 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
}
}

if let Some(column) = node.as_any().downcast_ref::<Column>()
if let Some(column) = node.downcast_ref::<Column>()
&& let Some(recursion) = self.check_single_column(column.name())
{
return Ok(recursion);
Expand Down Expand Up @@ -611,14 +608,12 @@ pub(crate) fn build_projection_read_plan(
// fast path: if every expression is a plain Column reference, skip all
// struct analysis and use root-level projection directly
let exprs = exprs.into_iter().collect::<Vec<_>>();
let all_plain_columns = exprs
.iter()
.all(|e| e.as_any().downcast_ref::<Column>().is_some());
let all_plain_columns = exprs.iter().all(|e| e.downcast_ref::<Column>().is_some());

if all_plain_columns {
let mut root_indices: Vec<usize> = exprs
.iter()
.map(|e| e.as_any().downcast_ref::<Column>().unwrap().index())
.map(|e| e.downcast_ref::<Column>().unwrap().index())
.collect();
root_indices.sort_unstable();
root_indices.dedup();
Expand Down
7 changes: 3 additions & 4 deletions datafusion/datasource-parquet/src/supported_predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ impl SupportsListPushdown for dyn PhysicalExpr {
///
/// These checks are universally supported for all column types.
fn is_null_check(expr: &dyn PhysicalExpr) -> bool {
expr.as_any().downcast_ref::<IsNullExpr>().is_some()
|| expr.as_any().downcast_ref::<IsNotNullExpr>().is_some()
expr.downcast_ref::<IsNullExpr>().is_some()
|| expr.downcast_ref::<IsNotNullExpr>().is_some()
}

/// Checks if an expression is a scalar function registered for list pushdown.
Expand All @@ -86,8 +86,7 @@ fn is_supported_scalar_function(expr: &dyn PhysicalExpr) -> bool {
}

fn scalar_function_name(expr: &dyn PhysicalExpr) -> Option<&str> {
expr.as_any()
.downcast_ref::<ScalarFunctionExpr>()
expr.downcast_ref::<ScalarFunctionExpr>()
.map(ScalarFunctionExpr::name)
}

Expand Down
Loading
Loading