From 18440ea40bb79f386cdf1dc2fd5df0353de8b47b Mon Sep 17 00:00:00 2001 From: sriram Date: Thu, 13 Nov 2025 21:10:51 +0530 Subject: [PATCH 1/3] chore: Refactor with assert_or_internal_err!() in datafusion/spark. --- .../src/function/datetime/make_interval.rs | 52 +++++++++++-------- datafusion/spark/src/function/hash/sha1.rs | 10 ++-- .../spark/src/function/math/factorial.rs | 8 +-- datafusion/spark/src/function/math/hex.rs | 7 ++- datafusion/spark/src/function/math/modulus.rs | 28 +++++----- 5 files changed, 58 insertions(+), 47 deletions(-) diff --git a/datafusion/spark/src/function/datetime/make_interval.rs b/datafusion/spark/src/function/datetime/make_interval.rs index 8e3169556b95b..4972f5f8926d6 100644 --- a/datafusion/spark/src/function/datetime/make_interval.rs +++ b/datafusion/spark/src/function/datetime/make_interval.rs @@ -238,7 +238,10 @@ mod tests { use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray}; use arrow::datatypes::Field; use datafusion_common::config::ConfigOptions; - use datafusion_common::{internal_datafusion_err, internal_err, Result}; + use datafusion_common::{ + assert_eq_or_internal_err, assert_or_internal_err, internal_datafusion_err, + Result, + }; use super::*; fn run_make_interval_month_day_nano(arrs: Vec) -> Result { @@ -533,36 +536,41 @@ mod tests { .ok_or_else(|| { internal_datafusion_err!("expected IntervalMonthDayNanoArray") })?; - if arr.len() != number_rows { - return internal_err!( - "expected array length {number_rows}, got {}", - arr.len() - ); - } + assert_eq_or_internal_err!( + arr.len(), + number_rows, + "expected array length {number_rows}, got {}", + arr.len() + ); for i in 0..number_rows { let iv = arr.value(i); - if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) { - return internal_err!( - "row {i}: expected (0,0,0), got ({},{},{})", - iv.months, - iv.days, - iv.nanoseconds - ); - } - } - } - ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => { - if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) { - return internal_err!( - "expected scalar 0s, got ({},{},{})", + assert_eq_or_internal_err!( + (iv.months, iv.days, iv.nanoseconds), + (0, 0, 0), + "row {i}: expected (0,0,0), got ({},{},{})", iv.months, iv.days, iv.nanoseconds ); } } + ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => { + assert_eq_or_internal_err!( + (iv.months, iv.days, iv.nanoseconds), + (0, 0, 0), + "expected scalar 0s, got ({},{},{})", + iv.months, + iv.days, + iv.nanoseconds + ); + } other => { - return internal_err!( + assert_or_internal_err!( + matches!( + other, + ColumnarValue::Array(_) + | ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(_)) + ), "expected Array or Scalar IntervalMonthDayNano, got {other:?}" ); } diff --git a/datafusion/spark/src/function/hash/sha1.rs b/datafusion/spark/src/function/hash/sha1.rs index 5b2a2653ed7cc..9f1c28347a4f4 100644 --- a/datafusion/spark/src/function/hash/sha1.rs +++ b/datafusion/spark/src/function/hash/sha1.rs @@ -24,7 +24,7 @@ use arrow::datatypes::DataType; use datafusion_common::cast::{ as_binary_array, as_binary_view_array, as_large_binary_array, }; -use datafusion_common::{exec_err, internal_err, Result}; +use datafusion_common::{assert_eq_or_internal_err, exec_err, DataFusionError, Result}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; @@ -117,10 +117,12 @@ fn spark_sha1_impl<'a>(input: impl Iterator>) -> ArrayRe fn spark_sha1(args: &[ArrayRef]) -> Result { let [input] = args else { - return internal_err!( - "Spark `sha1` function requires 1 argument, got {}", - args.len() + assert_eq_or_internal_err!( + args.len(), + 1, + "Spark `sha1` function requires 1 argument" ); + unreachable!() }; match input.data_type() { diff --git a/datafusion/spark/src/function/math/factorial.rs b/datafusion/spark/src/function/math/factorial.rs index 4921e73d262a3..845f5c4cecbef 100644 --- a/datafusion/spark/src/function/math/factorial.rs +++ b/datafusion/spark/src/function/math/factorial.rs @@ -22,7 +22,9 @@ use arrow::array::{Array, Int64Array}; use arrow::datatypes::DataType; use arrow::datatypes::DataType::{Int32, Int64}; use datafusion_common::cast::as_int32_array; -use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{ + assert_eq_or_internal_err, exec_err, DataFusionError, Result, ScalarValue, +}; use datafusion_expr::Signature; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Volatility}; @@ -99,9 +101,7 @@ const FACTORIALS: [i64; 21] = [ ]; pub fn spark_factorial(args: &[ColumnarValue]) -> Result { - if args.len() != 1 { - return internal_err!("`factorial` expects exactly one argument"); - } + assert_eq_or_internal_err!(args.len(), 1, "`factorial` expects exactly one argument"); match &args[0] { ColumnarValue::Scalar(ScalarValue::Int32(value)) => { diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index cdd13e9033265..0b34e5e7079a3 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -29,8 +29,9 @@ use arrow::{ }; use datafusion_common::cast::as_string_view_array; use datafusion_common::{ + assert_eq_or_internal_err, cast::{as_binary_array, as_fixed_size_binary_array, as_int64_array}, - exec_err, internal_err, DataFusionError, + exec_err, DataFusionError, }; use datafusion_expr::Signature; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Volatility}; @@ -184,9 +185,7 @@ pub fn compute_hex( args: &[ColumnarValue], lowercase: bool, ) -> Result { - if args.len() != 1 { - return internal_err!("hex expects exactly one argument"); - } + assert_eq_or_internal_err!(args.len(), 1, "hex expects exactly one argument"); let input = match &args[0] { ColumnarValue::Scalar(value) => ColumnarValue::Array(value.to_array()?), diff --git a/datafusion/spark/src/function/math/modulus.rs b/datafusion/spark/src/function/math/modulus.rs index fea0297a7ae94..aa66b179e2d4c 100644 --- a/datafusion/spark/src/function/math/modulus.rs +++ b/datafusion/spark/src/function/math/modulus.rs @@ -18,7 +18,9 @@ use arrow::compute::kernels::numeric::add; use arrow::compute::kernels::{cmp::lt, numeric::rem, zip::zip}; use arrow::datatypes::DataType; -use datafusion_common::{internal_err, Result, ScalarValue}; +use datafusion_common::{ + assert_eq_or_internal_err, DataFusionError, Result, ScalarValue, +}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; @@ -27,9 +29,7 @@ use std::any::Any; /// Spark-compatible `mod` function /// This function directly uses Arrow's arithmetic_op function for modulo operations pub fn spark_mod(args: &[ColumnarValue]) -> Result { - if args.len() != 2 { - return internal_err!("mod expects exactly two arguments"); - } + assert_eq_or_internal_err!(args.len(), 2, "mod expects exactly two arguments"); let args = ColumnarValue::values_to_arrays(args)?; let result = rem(&args[0], &args[1])?; Ok(ColumnarValue::Array(result)) @@ -38,9 +38,7 @@ pub fn spark_mod(args: &[ColumnarValue]) -> Result { /// Spark-compatible `pmod` function /// This function directly uses Arrow's arithmetic_op function for modulo operations pub fn spark_pmod(args: &[ColumnarValue]) -> Result { - if args.len() != 2 { - return internal_err!("pmod expects exactly two arguments"); - } + assert_eq_or_internal_err!(args.len(), 2, "pmod expects exactly two arguments"); let args = ColumnarValue::values_to_arrays(args)?; let left = &args[0]; let right = &args[1]; @@ -87,9 +85,11 @@ impl ScalarUDFImpl for SparkMod { } fn return_type(&self, arg_types: &[DataType]) -> Result { - if arg_types.len() != 2 { - return internal_err!("mod expects exactly two arguments"); - } + assert_eq_or_internal_err!( + arg_types.len(), + 2, + "mod expects exactly two arguments" + ); // Return the same type as the first argument for simplicity // Arrow's rem function handles type promotion internally @@ -135,9 +135,11 @@ impl ScalarUDFImpl for SparkPmod { } fn return_type(&self, arg_types: &[DataType]) -> Result { - if arg_types.len() != 2 { - return internal_err!("pmod expects exactly two arguments"); - } + assert_eq_or_internal_err!( + arg_types.len(), + 2, + "pmod expects exactly two arguments" + ); // Return the same type as the first argument for simplicity // Arrow's rem function handles type promotion internally From 790f1d0048243027cf148932ccc1d3993222bdb9 Mon Sep 17 00:00:00 2001 From: sriram Date: Thu, 13 Nov 2025 21:38:09 +0530 Subject: [PATCH 2/3] refactor cont... --- .../spark/src/function/datetime/date_add.rs | 13 +++++++---- .../spark/src/function/datetime/date_sub.rs | 22 ++++++++++++------- .../spark/src/function/datetime/last_day.rs | 13 +++++++---- datafusion/spark/src/function/hash/crc32.rs | 10 +++++---- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/datafusion/spark/src/function/datetime/date_add.rs b/datafusion/spark/src/function/datetime/date_add.rs index a00430febcdb0..a53c78b8d61d8 100644 --- a/datafusion/spark/src/function/datetime/date_add.rs +++ b/datafusion/spark/src/function/datetime/date_add.rs @@ -25,7 +25,10 @@ use arrow::error::ArrowError; use datafusion_common::cast::{ as_date32_array, as_int16_array, as_int32_array, as_int8_array, }; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{ + assert_eq_or_internal_err, assert_or_internal_err, internal_err, DataFusionError, + Result, +}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -88,10 +91,12 @@ impl ScalarUDFImpl for SparkDateAdd { fn spark_date_add(args: &[ArrayRef]) -> Result { let [date_arg, days_arg] = args else { - return internal_err!( - "Spark `date_add` function requires 2 arguments, got {}", - args.len() + assert_eq_or_internal_err!( + args.len(), + 2, + "Spark `date_add` function requires 2 arguments" ); + unreachable!() }; let date_array = as_date32_array(date_arg)?; let result = match days_arg.data_type() { diff --git a/datafusion/spark/src/function/datetime/date_sub.rs b/datafusion/spark/src/function/datetime/date_sub.rs index a3b26661d196c..d826f7acf506d 100644 --- a/datafusion/spark/src/function/datetime/date_sub.rs +++ b/datafusion/spark/src/function/datetime/date_sub.rs @@ -25,7 +25,9 @@ use arrow::error::ArrowError; use datafusion_common::cast::{ as_date32_array, as_int16_array, as_int32_array, as_int8_array, }; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{ + assert_eq_or_internal_err, assert_or_internal_err, DataFusionError, Result, +}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -82,10 +84,12 @@ impl ScalarUDFImpl for SparkDateSub { fn spark_date_sub(args: &[ArrayRef]) -> Result { let [date_arg, days_arg] = args else { - return internal_err!( - "Spark `date_sub` function requires 2 arguments, got {}", - args.len() + assert_eq_or_internal_err!( + args.len(), + 2, + "Spark `date_sub` function requires 2 arguments" ); + unreachable!() }; let date_array = as_date32_array(date_arg)?; let result = match days_arg.data_type() { @@ -126,10 +130,12 @@ fn spark_date_sub(args: &[ArrayRef]) -> Result { )? } _ => { - return internal_err!( - "Spark `date_sub` function: argument must be int8, int16, int32, got {:?}", - days_arg.data_type() - ); + assert_or_internal_err!( + matches!(days_arg.data_type(), DataType::Int8 | DataType::Int16 | DataType::Int32), + "Spark `date_sub` function: argument must be int8, int16, int32, got {:?}", + days_arg.data_type() + ); + unreachable!() } }; Ok(Arc::new(result)) diff --git a/datafusion/spark/src/function/datetime/last_day.rs b/datafusion/spark/src/function/datetime/last_day.rs index c01a6403649c5..8700937c18faa 100644 --- a/datafusion/spark/src/function/datetime/last_day.rs +++ b/datafusion/spark/src/function/datetime/last_day.rs @@ -21,7 +21,10 @@ use std::sync::Arc; use arrow::array::{ArrayRef, AsArray, Date32Array}; use arrow::datatypes::{DataType, Date32Type}; use chrono::{Datelike, Duration, NaiveDate}; -use datafusion_common::{exec_datafusion_err, internal_err, Result, ScalarValue}; +use datafusion_common::{ + assert_eq_or_internal_err, assert_or_internal_err, exec_datafusion_err, internal_err, + DataFusionError, Result, ScalarValue, +}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; @@ -65,10 +68,12 @@ impl ScalarUDFImpl for SparkLastDay { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let ScalarFunctionArgs { args, .. } = args; let [arg] = args.as_slice() else { - return internal_err!( - "Spark `last_day` function requires 1 argument, got {}", - args.len() + assert_eq_or_internal_err!( + args.len(), + 1, + "Spark `last_day` function requires 1 argument" ); + unreachable!() }; match arg { ColumnarValue::Scalar(ScalarValue::Date32(days)) => { diff --git a/datafusion/spark/src/function/hash/crc32.rs b/datafusion/spark/src/function/hash/crc32.rs index 76e31d12c6487..909f81c5c0359 100644 --- a/datafusion/spark/src/function/hash/crc32.rs +++ b/datafusion/spark/src/function/hash/crc32.rs @@ -24,7 +24,7 @@ use crc32fast::Hasher; use datafusion_common::cast::{ as_binary_array, as_binary_view_array, as_large_binary_array, }; -use datafusion_common::{exec_err, internal_err, Result}; +use datafusion_common::{assert_eq_or_internal_err, exec_err, DataFusionError, Result}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; @@ -105,10 +105,12 @@ fn spark_crc32_impl<'a>(input: impl Iterator>) -> ArrayR fn spark_crc32(args: &[ArrayRef]) -> Result { let [input] = args else { - return internal_err!( - "Spark `crc32` function requires 1 argument, got {}", - args.len() + assert_eq_or_internal_err!( + args.len(), + 1, + "Spark `crc32` function requires 1 argument" ); + unreachable!() }; match input.data_type() { From fcd703b784d3ec4d9976c47a55b52d22c0c08757 Mon Sep 17 00:00:00 2001 From: sriram Date: Thu, 13 Nov 2025 22:15:16 +0530 Subject: [PATCH 3/3] Clippy fix --- datafusion/spark/src/function/datetime/date_add.rs | 3 +-- datafusion/spark/src/function/datetime/last_day.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/spark/src/function/datetime/date_add.rs b/datafusion/spark/src/function/datetime/date_add.rs index a53c78b8d61d8..8a90f4dc42cc2 100644 --- a/datafusion/spark/src/function/datetime/date_add.rs +++ b/datafusion/spark/src/function/datetime/date_add.rs @@ -26,8 +26,7 @@ use datafusion_common::cast::{ as_date32_array, as_int16_array, as_int32_array, as_int8_array, }; use datafusion_common::{ - assert_eq_or_internal_err, assert_or_internal_err, internal_err, DataFusionError, - Result, + assert_eq_or_internal_err, internal_err, DataFusionError, Result, }; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, diff --git a/datafusion/spark/src/function/datetime/last_day.rs b/datafusion/spark/src/function/datetime/last_day.rs index 8700937c18faa..e730c2f6e101f 100644 --- a/datafusion/spark/src/function/datetime/last_day.rs +++ b/datafusion/spark/src/function/datetime/last_day.rs @@ -22,8 +22,8 @@ use arrow::array::{ArrayRef, AsArray, Date32Array}; use arrow::datatypes::{DataType, Date32Type}; use chrono::{Datelike, Duration, NaiveDate}; use datafusion_common::{ - assert_eq_or_internal_err, assert_or_internal_err, exec_datafusion_err, internal_err, - DataFusionError, Result, ScalarValue, + assert_eq_or_internal_err, exec_datafusion_err, internal_err, DataFusionError, + Result, ScalarValue, }; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,