Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2458,9 +2458,8 @@ async fn cache_producer_test() -> Result<()> {
@r"
CacheNode
Projection: aggregate_test_100.c2, aggregate_test_100.c3, CAST(CAST(aggregate_test_100.c2 AS Int64) + CAST(aggregate_test_100.c3 AS Int64) AS Int64) AS sum
Projection: aggregate_test_100.c2, aggregate_test_100.c3
Limit: skip=0, fetch=1
TableScan: aggregate_test_100, fetch=1
Limit: skip=0, fetch=1
TableScan: aggregate_test_100 projection=[c2, c3], fetch=1
"
);
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,10 @@ mod test {
num_rows: Precision::Exact(0),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics::new_unknown(),
ColumnStatistics {
distinct_count: Precision::Exact(0),
..ColumnStatistics::new_unknown()
},
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
],
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ pub use udaf::{
udaf_default_schema_name, udaf_default_window_function_display_name,
udaf_default_window_function_schema_name,
};
pub use udf::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl};
pub use udf::{
ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, StructFieldMapping,
};
pub use udwf::{LimitEffect, ReversedUDWF, WindowUDF, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};

Expand Down
53 changes: 53 additions & 0 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@ use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// Describes how a struct-producing UDF's output fields correspond to its
/// input arguments. This enables the optimizer to propagate orderings
/// through struct projections (e.g., so that sorting by a struct field
/// can be recognized as equivalent to sorting by the source column).
///
/// See [`ScalarUDFImpl::struct_field_mapping`] for details.
pub struct StructFieldMapping {
/// The UDF used to construct field access expressions on the output.
/// For example, the `get_field` UDF for accessing struct fields.
pub field_accessor: Arc<ScalarUDF>,
/// For each output field: the literal arguments to pass to the
/// `field_accessor` UDF (after the base expression), and the index
/// of the corresponding input argument that produces the field's value.
///
/// For `named_struct('a', col1, 'b', col2)`, this would be:
/// `[(["a"], 1), (["b"], 3)]` — field `"a"` comes from arg index 1.
pub fields: Vec<(Vec<ScalarValue>, usize)>,
}

/// Logical representation of a Scalar User Defined Function.
///
/// A scalar function produces a single row output for each row of input. This
Expand Down Expand Up @@ -305,6 +324,14 @@ impl ScalarUDF {
self.inner.evaluate_bounds(inputs)
}

/// See [`ScalarUDFImpl::struct_field_mapping`] for more details.
pub fn struct_field_mapping(
&self,
literal_args: &[Option<ScalarValue>],
) -> Option<StructFieldMapping> {
self.inner.struct_field_mapping(literal_args)
}

/// Updates bounds for child expressions, given a known interval for this
/// function. This is used to propagate constraints down through an expression
/// tree.
Expand Down Expand Up @@ -961,6 +988,25 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync + Any {
not_impl_err!("Function {} does not implement coerce_types", self.name())
}

/// For struct-producing functions, return how output fields map to input
/// arguments. This enables the optimizer to propagate orderings through
/// struct projections.
///
/// `literal_args[i]` is `Some(value)` if argument `i` is a known literal,
/// allowing extraction of field names from arguments like
/// `named_struct('field_name', value, ...)`.
///
/// For example, `named_struct('a', col1, 'b', col2)` would return a
/// mapping indicating that output field `'a'` (accessed via
/// `get_field(output, 'a')`) corresponds to input argument `col1` at
/// index 1, and field `'b'` corresponds to `col2` at index 3.
fn struct_field_mapping(
&self,
_literal_args: &[Option<ScalarValue>],
) -> Option<StructFieldMapping> {
None
}

/// Returns the documentation for this Scalar UDF.
///
/// Documentation can be accessed programmatically as well as generating
Expand Down Expand Up @@ -1109,6 +1155,13 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.propagate_constraints(interval, inputs)
}

fn struct_field_mapping(
&self,
literal_args: &[Option<ScalarValue>],
) -> Option<StructFieldMapping> {
self.inner.struct_field_mapping(literal_args)
}

fn output_ordering(&self, inputs: &[ExprProperties]) -> Result<SortProperties> {
self.inner.output_ordering(inputs)
}
Expand Down
17 changes: 17 additions & 0 deletions datafusion/functions/benches/split_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ fn criterion_benchmark(c: &mut Criterion) {
);
}

// Utf8View, very long parts (256 bytes), position 1
{
let strings = gen_string_array(N_ROWS, 5, 256, ".", true);
let delimiter = ColumnarValue::Scalar(ScalarValue::Utf8View(Some(".".into())));
let position = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
bench_split_part(
&mut group,
&split_part_func,
&config_options,
"scalar_utf8view_very_long_parts",
"pos_first",
strings,
delimiter,
position,
);
}

// ── Array delimiter and position ─────────────────

// Utf8, single-char delimiter, array args
Expand Down
33 changes: 31 additions & 2 deletions datafusion/functions/src/core/named_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use super::getfield::GetFieldFunc;
use arrow::array::StructArray;
use arrow::datatypes::{DataType, Field, FieldRef, Fields};
use datafusion_common::{Result, exec_err, internal_err};
use datafusion_common::{Result, ScalarValue, exec_err, internal_err};
use datafusion_expr::{
ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs,
ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF,
StructFieldMapping,
};
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use datafusion_macros::user_doc;
Expand Down Expand Up @@ -174,4 +176,31 @@ impl ScalarUDFImpl for NamedStructFunc {
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}

fn struct_field_mapping(
&self,
literal_args: &[Option<ScalarValue>],
) -> Option<StructFieldMapping> {
if literal_args.is_empty() || !literal_args.len().is_multiple_of(2) {
return None;
}

let mut fields = Vec::with_capacity(literal_args.len() / 2);
for (i, chunk) in literal_args.chunks(2).enumerate() {
match chunk {
[Some(ScalarValue::Utf8(Some(name))), _] => {
fields.push((
vec![ScalarValue::Utf8(Some(name.clone()))],
i * 2 + 1, // index of the value argument
));
}
_ => return None,
}
}

Some(StructFieldMapping {
field_accessor: Arc::new(ScalarUDF::from(GetFieldFunc::new())),
fields,
})
}
}
154 changes: 145 additions & 9 deletions datafusion/functions/src/string/split_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

use crate::utils::utf8_to_str_type;
use arrow::array::{
Array, ArrayRef, AsArray, GenericStringBuilder, Int64Array, StringArrayType,
StringLikeArrayBuilder, StringViewBuilder, new_null_array,
Array, ArrayRef, AsArray, ByteView, GenericStringBuilder, Int64Array,
StringArrayType, StringLikeArrayBuilder, StringViewArray, StringViewBuilder,
make_view, new_null_array,
};
use arrow::buffer::ScalarBuffer;
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use datafusion_common::cast::as_int64_array;
Expand Down Expand Up @@ -279,12 +281,9 @@ fn split_part_scalar(
}

let result = match string_array.data_type() {
DataType::Utf8View => split_part_scalar_impl(
string_array.as_string_view(),
delimiter,
position,
StringViewBuilder::with_capacity(string_array.len()),
),
DataType::Utf8View => {
split_part_scalar_view(string_array.as_string_view(), delimiter, position)
}
DataType::Utf8 => {
let arr = string_array.as_string::<i32>();
// Conservative under-estimate for data capacity: split_part output
Expand Down Expand Up @@ -425,6 +424,116 @@ fn rsplit_nth_finder<'a>(
}
}

/// Zero-copy scalar fast path for `StringViewArray` inputs.
///
/// Instead of copying substring bytes into a new buffer, constructs
/// `StringView` entries that point back into the original array's data
/// buffers.
fn split_part_scalar_view(
string_view_array: &StringViewArray,
delimiter: &str,
position: i64,
) -> Result<ArrayRef> {
let len = string_view_array.len();
let mut views_buf = Vec::with_capacity(len);
let views = string_view_array.views();

if delimiter.is_empty() {
// PostgreSQL: empty delimiter treats input as a single field.
let empty_view = make_view(b"", 0, 0);
let return_input = position == 1 || position == -1;
for i in 0..len {
if string_view_array.is_null(i) {
views_buf.push(0);
} else if return_input {
views_buf.push(views[i]);
} else {
views_buf.push(empty_view);
}
}
} else if position > 0 {
let idx: usize = (position - 1).try_into().map_err(|_| {
exec_datafusion_err!(
"split_part index {position} exceeds maximum supported value"
)
})?;
let finder = memmem::Finder::new(delimiter.as_bytes());
split_view_loop(string_view_array, views, &mut views_buf, |s| {
split_nth_finder(s, &finder, delimiter.len(), idx)
});
} else {
let idx: usize = (position.unsigned_abs() - 1).try_into().map_err(|_| {
exec_datafusion_err!(
"split_part index {position} exceeds minimum supported value"
)
})?;
let finder_rev = memmem::FinderRev::new(delimiter.as_bytes());
split_view_loop(string_view_array, views, &mut views_buf, |s| {
rsplit_nth_finder(s, &finder_rev, delimiter.len(), idx)
});
}

let views_buf = ScalarBuffer::from(views_buf);

// Nulls pass through unchanged, so we can use the input's null array.
let nulls = string_view_array.nulls().cloned();

// Safety: each view is either copied unchanged from the input, or built
// by `substr_view` from a substring that is a contiguous sub-range of the
// original string value stored in the input's data buffers.
unsafe {
Ok(Arc::new(StringViewArray::new_unchecked(
views_buf,
string_view_array.data_buffers().to_vec(),
nulls,
)) as ArrayRef)
}
}

/// Creates a `StringView` referencing a substring of an existing view's buffer.
/// For substrings ≤ 12 bytes, creates an inline view instead.
#[inline]
fn substr_view(original_view: &u128, substr: &str, start_offset: u32) -> u128 {
if substr.len() > 12 {
let view = ByteView::from(*original_view);
make_view(
substr.as_bytes(),
view.buffer_index,
view.offset + start_offset,
)
} else {
make_view(substr.as_bytes(), 0, 0)
}
}

/// Applies `split_fn` to each non-null string and appends the resulting view to
/// `views_buf`.
#[inline(always)]
fn split_view_loop<F>(
string_view_array: &StringViewArray,
views: &[u128],
views_buf: &mut Vec<u128>,
split_fn: F,
) where
F: Fn(&str) -> Option<&str>,
{
let empty_view = make_view(b"", 0, 0);
for (i, raw_view) in views.iter().enumerate() {
if string_view_array.is_null(i) {
views_buf.push(0);
continue;
}
let string = string_view_array.value(i);
match split_fn(string) {
Some(substr) => {
let start_offset = substr.as_ptr() as usize - string.as_ptr() as usize;
views_buf.push(substr_view(raw_view, substr, start_offset as u32));
}
None => views_buf.push(empty_view),
}
}
}

fn split_part_impl<'a, StringArrType, DelimiterArrType, B>(
string_array: &StringArrType,
delimiter_array: &DelimiterArrType,
Expand Down Expand Up @@ -490,7 +599,7 @@ where

#[cfg(test)]
mod tests {
use arrow::array::{Array, StringArray};
use arrow::array::{Array, AsArray, StringArray, StringViewArray};
use arrow::datatypes::DataType::Utf8;

use datafusion_common::ScalarValue;
Expand Down Expand Up @@ -686,4 +795,31 @@ mod tests {

Ok(())
}

#[test]
fn test_split_part_stringview_sliced() -> Result<()> {
use super::split_part_scalar_view;

let strings: StringViewArray = vec![
Some("skip_this.value"),
Some("this_is_a_long_prefix.suffix"),
Some("short.val"),
Some("another_long_result.rest"),
None,
]
.into_iter()
.collect();

// Slice off the first element to get a non-zero offset array.
let sliced = strings.slice(1, 4);
let result = split_part_scalar_view(&sliced, ".", 1)?;
let result = result.as_string_view();
assert_eq!(result.len(), 4);
assert_eq!(result.value(0), "this_is_a_long_prefix");
assert_eq!(result.value(1), "short");
assert_eq!(result.value(2), "another_long_result");
assert!(result.is_null(3));

Ok(())
}
}
Loading
Loading