Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions datafusion/spark/src/function/datetime/date_add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use arrow::error::ArrowError;
use datafusion_common::cast::{
as_date32_array, as_int16_array, as_int32_array, as_int8_array,
};
use datafusion_common::{internal_err, Result};
use datafusion_common::{
assert_eq_or_internal_err, internal_err, DataFusionError, Result,
};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
Volatility,
Expand Down Expand Up @@ -88,10 +90,12 @@ impl ScalarUDFImpl for SparkDateAdd {

fn spark_date_add(args: &[ArrayRef]) -> Result<ArrayRef> {
let [date_arg, days_arg] = args else {
return internal_err!(
"Spark `date_add` function requires 2 arguments, got {}",
args.len()
assert_eq_or_internal_err!(
args.len(),
2,
"Spark `date_add` function requires 2 arguments"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider preserving the actual argument count in the error message (e.g., include got {} with args.len()), as the previous version provided this detail; this regression also appears in similar checks (crc32, sha1, last_day).

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct that useful debug information is lost in this change and it should be reverted. Prevents harder debugging when the error occurs.

);
unreachable!()
};
let date_array = as_date32_array(date_arg)?;
let result = match days_arg.data_type() {
Expand Down
22 changes: 14 additions & 8 deletions datafusion/spark/src/function/datetime/date_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use arrow::error::ArrowError;
use datafusion_common::cast::{
as_date32_array, as_int16_array, as_int32_array, as_int8_array,
};
use datafusion_common::{internal_err, Result};
use datafusion_common::{
assert_eq_or_internal_err, assert_or_internal_err, DataFusionError, Result,
};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
Volatility,
Expand Down Expand Up @@ -82,10 +84,12 @@ impl ScalarUDFImpl for SparkDateSub {

fn spark_date_sub(args: &[ArrayRef]) -> Result<ArrayRef> {
let [date_arg, days_arg] = args else {
return internal_err!(
"Spark `date_sub` function requires 2 arguments, got {}",
args.len()
assert_eq_or_internal_err!(
args.len(),
2,
"Spark `date_sub` function requires 2 arguments"
);
unreachable!()
};
let date_array = as_date32_array(date_arg)?;
let result = match days_arg.data_type() {
Expand Down Expand Up @@ -126,10 +130,12 @@ fn spark_date_sub(args: &[ArrayRef]) -> Result<ArrayRef> {
)?
}
_ => {
return internal_err!(
"Spark `date_sub` function: argument must be int8, int16, int32, got {:?}",
days_arg.data_type()
);
assert_or_internal_err!(
matches!(days_arg.data_type(), DataType::Int8 | DataType::Int16 | DataType::Int32),
"Spark `date_sub` function: argument must be int8, int16, int32, got {:?}",
days_arg.data_type()
);
unreachable!()
}
};
Ok(Arc::new(result))
Expand Down
13 changes: 9 additions & 4 deletions datafusion/spark/src/function/datetime/last_day.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use std::sync::Arc;
use arrow::array::{ArrayRef, AsArray, Date32Array};
use arrow::datatypes::{DataType, Date32Type};
use chrono::{Datelike, Duration, NaiveDate};
use datafusion_common::{exec_datafusion_err, internal_err, Result, ScalarValue};
use datafusion_common::{
assert_eq_or_internal_err, exec_datafusion_err, internal_err, DataFusionError,
Result, ScalarValue,
};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
Expand Down Expand Up @@ -65,10 +68,12 @@ impl ScalarUDFImpl for SparkLastDay {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let ScalarFunctionArgs { args, .. } = args;
let [arg] = args.as_slice() else {
return internal_err!(
"Spark `last_day` function requires 1 argument, got {}",
args.len()
assert_eq_or_internal_err!(
args.len(),
1,
"Spark `last_day` function requires 1 argument"
);
unreachable!()
};
match arg {
ColumnarValue::Scalar(ScalarValue::Date32(days)) => {
Expand Down
52 changes: 30 additions & 22 deletions datafusion/spark/src/function/datetime/make_interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ mod tests {
use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray};
use arrow::datatypes::Field;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{internal_datafusion_err, internal_err, Result};
use datafusion_common::{
assert_eq_or_internal_err, assert_or_internal_err, internal_datafusion_err,
Result,
};

use super::*;
fn run_make_interval_month_day_nano(arrs: Vec<ArrayRef>) -> Result<ArrayRef> {
Expand Down Expand Up @@ -533,36 +536,41 @@ mod tests {
.ok_or_else(|| {
internal_datafusion_err!("expected IntervalMonthDayNanoArray")
})?;
if arr.len() != number_rows {
return internal_err!(
"expected array length {number_rows}, got {}",
arr.len()
);
}
assert_eq_or_internal_err!(
arr.len(),
number_rows,
"expected array length {number_rows}, got {}",
arr.len()
);
for i in 0..number_rows {
let iv = arr.value(i);
if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) {
return internal_err!(
"row {i}: expected (0,0,0), got ({},{},{})",
iv.months,
iv.days,
iv.nanoseconds
);
}
}
}
ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => {
if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) {
return internal_err!(
"expected scalar 0s, got ({},{},{})",
assert_eq_or_internal_err!(
(iv.months, iv.days, iv.nanoseconds),
(0, 0, 0),
"row {i}: expected (0,0,0), got ({},{},{})",
iv.months,
iv.days,
iv.nanoseconds
);
}
}
ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => {
assert_eq_or_internal_err!(
(iv.months, iv.days, iv.nanoseconds),
(0, 0, 0),
"expected scalar 0s, got ({},{},{})",
iv.months,
iv.days,
iv.nanoseconds
);
}
other => {
return internal_err!(
assert_or_internal_err!(
matches!(
other,
ColumnarValue::Array(_)
| ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(_))
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Match Arm Logic Inversion: Incorrect Control Flow

The assertion logic is inverted in the match arm. The old code unconditionally returned an error for any value in the other arm. The new assertion checks if other matches expected types and returns an error only if it doesn't match. However, if the assertion passes (value matches expected types), execution continues without producing a value for the match arm or returning from the function, which is incorrect. The assertion should likely check that other does NOT match the expected patterns, or this should revert to an unconditional error return.

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Bugbot AI reviewer is correct! The check seems wrong for two reasons: 1) ColumnarValue::Array(arr) is already covered earlier in the first arm of the match; 2) ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(_)) could match only for ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(None)) because the case for Some(iv) is also handled in an earlier arm. It would be cleaner if the None case was handled as the other match arms, and other always returns Err, as before.

"expected Array or Scalar IntervalMonthDayNano, got {other:?}"
);
}
Expand Down
10 changes: 6 additions & 4 deletions datafusion/spark/src/function/hash/crc32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crc32fast::Hasher;
use datafusion_common::cast::{
as_binary_array, as_binary_view_array, as_large_binary_array,
};
use datafusion_common::{exec_err, internal_err, Result};
use datafusion_common::{assert_eq_or_internal_err, exec_err, DataFusionError, Result};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
Expand Down Expand Up @@ -105,10 +105,12 @@ fn spark_crc32_impl<'a>(input: impl Iterator<Item = Option<&'a [u8]>>) -> ArrayR

fn spark_crc32(args: &[ArrayRef]) -> Result<ArrayRef> {
let [input] = args else {
return internal_err!(
"Spark `crc32` function requires 1 argument, got {}",
args.len()
assert_eq_or_internal_err!(
args.len(),
1,
"Spark `crc32` function requires 1 argument"
);
unreachable!()
};

match input.data_type() {
Expand Down
10 changes: 6 additions & 4 deletions datafusion/spark/src/function/hash/sha1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::datatypes::DataType;
use datafusion_common::cast::{
as_binary_array, as_binary_view_array, as_large_binary_array,
};
use datafusion_common::{exec_err, internal_err, Result};
use datafusion_common::{assert_eq_or_internal_err, exec_err, DataFusionError, Result};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
Expand Down Expand Up @@ -117,10 +117,12 @@ fn spark_sha1_impl<'a>(input: impl Iterator<Item = Option<&'a [u8]>>) -> ArrayRe

fn spark_sha1(args: &[ArrayRef]) -> Result<ArrayRef> {
let [input] = args else {
return internal_err!(
"Spark `sha1` function requires 1 argument, got {}",
args.len()
assert_eq_or_internal_err!(
args.len(),
1,
"Spark `sha1` function requires 1 argument"
);
unreachable!()
};

match input.data_type() {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/spark/src/function/math/factorial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use arrow::array::{Array, Int64Array};
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::{Int32, Int64};
use datafusion_common::cast::as_int32_array;
use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue};
use datafusion_common::{
assert_eq_or_internal_err, exec_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::Signature;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Volatility};

Expand Down Expand Up @@ -99,9 +101,7 @@ const FACTORIALS: [i64; 21] = [
];

pub fn spark_factorial(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
if args.len() != 1 {
return internal_err!("`factorial` expects exactly one argument");
}
assert_eq_or_internal_err!(args.len(), 1, "`factorial` expects exactly one argument");

match &args[0] {
ColumnarValue::Scalar(ScalarValue::Int32(value)) => {
Expand Down
7 changes: 3 additions & 4 deletions datafusion/spark/src/function/math/hex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ use arrow::{
};
use datafusion_common::cast::as_string_view_array;
use datafusion_common::{
assert_eq_or_internal_err,
cast::{as_binary_array, as_fixed_size_binary_array, as_int64_array},
exec_err, internal_err, DataFusionError,
exec_err, DataFusionError,
};
use datafusion_expr::Signature;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Volatility};
Expand Down Expand Up @@ -184,9 +185,7 @@ pub fn compute_hex(
args: &[ColumnarValue],
lowercase: bool,
) -> Result<ColumnarValue, DataFusionError> {
if args.len() != 1 {
return internal_err!("hex expects exactly one argument");
}
assert_eq_or_internal_err!(args.len(), 1, "hex expects exactly one argument");

let input = match &args[0] {
ColumnarValue::Scalar(value) => ColumnarValue::Array(value.to_array()?),
Expand Down
28 changes: 15 additions & 13 deletions datafusion/spark/src/function/math/modulus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
use arrow::compute::kernels::numeric::add;
use arrow::compute::kernels::{cmp::lt, numeric::rem, zip::zip};
use arrow::datatypes::DataType;
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_common::{
assert_eq_or_internal_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
Expand All @@ -27,9 +29,7 @@ use std::any::Any;
/// Spark-compatible `mod` function
/// This function directly uses Arrow's arithmetic_op function for modulo operations
pub fn spark_mod(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!("mod expects exactly two arguments");
}
assert_eq_or_internal_err!(args.len(), 2, "mod expects exactly two arguments");
let args = ColumnarValue::values_to_arrays(args)?;
let result = rem(&args[0], &args[1])?;
Ok(ColumnarValue::Array(result))
Expand All @@ -38,9 +38,7 @@ pub fn spark_mod(args: &[ColumnarValue]) -> Result<ColumnarValue> {
/// Spark-compatible `pmod` function
/// This function directly uses Arrow's arithmetic_op function for modulo operations
pub fn spark_pmod(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return internal_err!("pmod expects exactly two arguments");
}
assert_eq_or_internal_err!(args.len(), 2, "pmod expects exactly two arguments");
let args = ColumnarValue::values_to_arrays(args)?;
let left = &args[0];
let right = &args[1];
Expand Down Expand Up @@ -87,9 +85,11 @@ impl ScalarUDFImpl for SparkMod {
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
if arg_types.len() != 2 {
return internal_err!("mod expects exactly two arguments");
}
assert_eq_or_internal_err!(
arg_types.len(),
2,
"mod expects exactly two arguments"
);

// Return the same type as the first argument for simplicity
// Arrow's rem function handles type promotion internally
Expand Down Expand Up @@ -135,9 +135,11 @@ impl ScalarUDFImpl for SparkPmod {
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
if arg_types.len() != 2 {
return internal_err!("pmod expects exactly two arguments");
}
assert_eq_or_internal_err!(
arg_types.len(),
2,
"pmod expects exactly two arguments"
);

// Return the same type as the first argument for simplicity
// Arrow's rem function handles type promotion internally
Expand Down
Loading