diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml index c4acb09fe8702..c880d1bae000e 100644 --- a/.github/workflows/audit.yml +++ b/.github/workflows/audit.yml @@ -42,7 +42,7 @@ jobs: steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Install cargo-audit - uses: taiki-e/install-action@0d865d5cc6d507df4765f1f866bfae8bab4e2a73 # v2.69.7 + uses: taiki-e/install-action@6ef672efc2b5aabc787a9e94baf4989aa02a97df # v2.70.3 with: tool: cargo-audit - name: Run audit check diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index be04992378b3d..920e1e79c8540 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -45,11 +45,11 @@ jobs: persist-credentials: false - name: Initialize CodeQL - uses: github/codeql-action/init@38697555549f1db7851b81482ff19f1fa5c4fedc # v4 + uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4 with: languages: actions - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@38697555549f1db7851b81482ff19f1fa5c4fedc # v4 + uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4 with: category: "/language:actions" diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml index 63add4dacc812..89bd77670c12d 100644 --- a/.github/workflows/docs.yaml +++ b/.github/workflows/docs.yaml @@ -41,7 +41,7 @@ jobs: path: asf-site - name: Setup uv - uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # v7.6.0 + uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0 - name: Install dependencies run: uv sync --package datafusion-docs diff --git a/.github/workflows/docs_pr.yaml b/.github/workflows/docs_pr.yaml index cc5b9a1e44bb5..5abf9a119d2f5 100644 --- a/.github/workflows/docs_pr.yaml +++ b/.github/workflows/docs_pr.yaml @@ -45,7 +45,7 @@ jobs: submodules: true fetch-depth: 1 - name: Setup uv - uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # v7.6.0 + uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0 - name: Install doc dependencies run: uv sync --package datafusion-docs - name: Install dependency graph tooling diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 24b988476fc14..0d12ddc375718 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -431,7 +431,7 @@ jobs: sudo apt-get update -qq sudo apt-get install -y -qq clang - name: Setup wasm-pack - uses: taiki-e/install-action@0d865d5cc6d507df4765f1f866bfae8bab4e2a73 # v2.69.7 + uses: taiki-e/install-action@6ef672efc2b5aabc787a9e94baf4989aa02a97df # v2.70.3 with: tool: wasm-pack - name: Run tests with headless mode @@ -771,7 +771,7 @@ jobs: - name: Setup Rust toolchain uses: ./.github/actions/setup-builder - name: Install cargo-msrv - uses: taiki-e/install-action@0d865d5cc6d507df4765f1f866bfae8bab4e2a73 # v2.69.7 + uses: taiki-e/install-action@6ef672efc2b5aabc787a9e94baf4989aa02a97df # v2.70.3 with: tool: cargo-msrv diff --git a/Cargo.lock b/Cargo.lock index e0cc5845c00e5..8d87c5bc5b541 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -616,7 +616,7 @@ dependencies = [ "fastrand", "hex", "http 1.4.0", - "sha1", + "sha1 0.10.6", "time", "tokio", "tracing", @@ -1040,7 +1040,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" dependencies = [ - "digest", + "digest 0.10.7", ] [[package]] @@ -1054,7 +1054,7 @@ dependencies = [ "cc", "cfg-if", "constant_time_eq", - "cpufeatures", + "cpufeatures 0.2.17", ] [[package]] @@ -1066,6 +1066,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-buffer" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdd35008169921d80bc60d3d0ab416eecb028c4cd653352907921d95084790be" +dependencies = [ + "hybrid-array", +] + [[package]] name = "bollard" version = "0.20.2" @@ -1418,6 +1427,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "const-oid" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" + [[package]] name = "const-random" version = "0.1.18" @@ -1493,6 +1508,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.4.0" @@ -1604,6 +1628,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "crypto-common" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77727bb15fa921304124b128af125e7e3b968275d1b108b379190264f4423710" +dependencies = [ + "hybrid-array", +] + [[package]] name = "csv" version = "1.4.0" @@ -1627,9 +1660,9 @@ dependencies = [ [[package]] name = "ctor" -version = "0.6.3" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "424e0138278faeb2b401f174ad17e715c829512d74f3d1e81eb43365c2e0590e" +checksum = "352d39c2f7bef1d6ad73db6f5160efcaed66d94ef8c6c573a8410c00bf909a98" dependencies = [ "ctor-proc-macro", "dtor", @@ -1746,7 +1779,7 @@ dependencies = [ "itertools 0.14.0", "liblzma", "log", - "nix 0.31.2", + "nix", "object_store", "parking_lot", "parquet", @@ -2085,7 +2118,7 @@ dependencies = [ "insta", "log", "mimalloc", - "nix 0.31.2", + "nix", "nom", "object_store", "prost", @@ -2285,6 +2318,7 @@ dependencies = [ "itertools 0.14.0", "itoa", "log", + "memchr", "rand 0.9.2", ] @@ -2573,7 +2607,7 @@ dependencies = [ "percent-encoding", "rand 0.9.2", "serde_json", - "sha1", + "sha1 0.11.0", "sha2", "url", ] @@ -2700,11 +2734,22 @@ version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", - "crypto-common", + "block-buffer 0.10.4", + "crypto-common 0.1.7", "subtle", ] +[[package]] +name = "digest" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4850db49bf08e663084f7fb5c87d202ef91a3907271aff24a94eb97ff039153c" +dependencies = [ + "block-buffer 0.12.0", + "const-oid", + "crypto-common 0.2.1", +] + [[package]] name = "dirs" version = "6.0.0" @@ -2756,9 +2801,9 @@ dependencies = [ [[package]] name = "dtor" -version = "0.1.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "404d02eeb088a82cfd873006cb713fe411306c7d182c344905e101fb1167d301" +checksum = "f1057d6c64987086ff8ed0fd3fbf377a6b7d205cc7715868cd401705f715cbe4" dependencies = [ "dtor-proc-macro", ] @@ -2807,9 +2852,9 @@ checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" [[package]] name = "endian-type" -version = "0.1.2" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" +checksum = "869b0adbda23651a9c5c0c3d270aac9fcb52e8622a8f2b17e57802d7791962f2" [[package]] name = "enum-ordinalize" @@ -2904,17 +2949,6 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" -[[package]] -name = "fd-lock" -version = "4.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" -dependencies = [ - "cfg-if", - "rustix", - "windows-sys 0.59.0", -] - [[package]] name = "ferroid" version = "0.8.9" @@ -3313,7 +3347,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest", + "digest 0.10.7", ] [[package]] @@ -3398,6 +3432,15 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" +[[package]] +name = "hybrid-array" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a79f2aff40c18ab8615ddc5caa9eb5b96314aef18fe5823090f204ad988e813" +dependencies = [ + "typenum", +] + [[package]] name = "hyper" version = "1.8.1" @@ -4008,7 +4051,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ "cfg-if", - "digest", + "digest 0.10.7", ] [[package]] @@ -4078,18 +4121,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "nix" -version = "0.30.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" -dependencies = [ - "bitflags", - "cfg-if", - "cfg_aliases", - "libc", -] - [[package]] name = "nix" version = "0.31.2" @@ -4899,9 +4930,9 @@ checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" [[package]] name = "radix_trie" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +checksum = "3b4431027dcd37fc2a73ef740b5f233aa805897935b8bce0195e41bbf9a3289a" dependencies = [ "endian-type", "nibble_vec", @@ -5307,24 +5338,23 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "rustyline" -version = "17.0.2" +version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e902948a25149d50edc1a8e0141aad50f54e22ba83ff988cf8f7c9ef07f50564" +checksum = "4a990b25f351b25139ddc7f21ee3f6f56f86d6846b74ac8fad3a719a287cd4a0" dependencies = [ "bitflags", "cfg-if", "clipboard-win", - "fd-lock", "home", "libc", "log", "memchr", - "nix 0.30.1", + "nix", "radix_trie", "unicode-segmentation", "unicode-width 0.2.2", "utf8parse", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -5585,8 +5615,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", - "digest", + "cpufeatures 0.2.17", + "digest 0.10.7", +] + +[[package]] +name = "sha1" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aacc4cc499359472b4abe1bf11d0b12e688af9a805fa5e3016f9a386dc2d0214" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "digest 0.11.2", ] [[package]] @@ -5596,8 +5637,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", - "digest", + "cpufeatures 0.2.17", + "digest 0.10.7", ] [[package]] @@ -5669,18 +5710,18 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "snmalloc-rs" -version = "0.3.8" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb317153089fdfa4d8a2eec059d40a5a23c3bde43995ea23b19121c3f621e74a" +checksum = "530a04ae687609072d0edd38866406fbbcd23d2f716791437e312ec4d64a355a" dependencies = [ "snmalloc-sys", ] [[package]] name = "snmalloc-sys" -version = "0.3.8" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065fea53d32bb77bc36cca466cb191f2e5216ebfd0ed360b1d64889ee6e559ea" +checksum = "a96cbeb16d6bcc5979f80ec907582a886b7fb3b9a707678b63dd93a10d8ee858" dependencies = [ "cmake", ] diff --git a/Cargo.toml b/Cargo.toml index 1bf039845fb7f..ffdc14cc514dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,7 +114,7 @@ bytes = "1.11" bzip2 = "0.6.1" chrono = { version = "0.4.44", default-features = false } criterion = "0.8" -ctor = "0.6.3" +ctor = "0.8.0" dashmap = "6.0.1" datafusion = { path = "datafusion/core", version = "53.0.0", default-features = false } datafusion-catalog = { path = "datafusion/catalog", version = "53.0.0" } diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 56f7704309780..f82f1c0a03e3d 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -56,7 +56,7 @@ rand = { workspace = true } regex.workspace = true serde = { version = "1.0.228", features = ["derive"] } serde_json = { workspace = true } -snmalloc-rs = { version = "0.3", optional = true } +snmalloc-rs = { version = "0.7", optional = true } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tokio-util = { version = "0.7.17" } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 3fe6be964c3f6..40e0e50dacd7a 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -65,7 +65,7 @@ object_store = { workspace = true, features = ["aws", "gcp", "http"] } parking_lot = { workspace = true } parquet = { workspace = true, default-features = false } regex = { workspace = true } -rustyline = "17.0" +rustyline = "18.0" tokio = { workspace = true, features = ["macros", "parking_lot", "rt", "rt-multi-thread", "signal", "sync"] } url = { workspace = true } diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index b10270851cc06..19aaa0371ada3 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -302,6 +302,32 @@ impl TrackedConsumer { } } +/// A point-in-time snapshot of a tracked memory consumer's state. +/// +/// Returned by [`TrackConsumersPool::metrics()`]. +#[derive(Debug, Clone)] +pub struct MemoryConsumerMetrics { + /// The name of the memory consumer + pub name: String, + /// Whether this consumer can spill to disk + pub can_spill: bool, + /// The number of bytes currently reserved by this consumer + pub reserved: usize, + /// The peak number of bytes reserved by this consumer + pub peak: usize, +} + +impl From<&TrackedConsumer> for MemoryConsumerMetrics { + fn from(tracked: &TrackedConsumer) -> Self { + Self { + name: tracked.name.clone(), + can_spill: tracked.can_spill, + reserved: tracked.reserved(), + peak: tracked.peak(), + } + } +} + /// A [`MemoryPool`] that tracks the consumers that have /// reserved memory within the inner memory pool. /// @@ -381,6 +407,15 @@ impl TrackConsumersPool { } } + /// Returns a snapshot of all currently tracked consumers. + pub fn metrics(&self) -> Vec { + self.tracked_consumers + .lock() + .values() + .map(Into::into) + .collect() + } + /// Returns a formatted string with the top memory consumers. pub fn report_top(&self, top: usize) -> String { let mut consumers = self @@ -778,6 +813,54 @@ mod tests { test_per_pool_type(tracked_greedy_pool); } + #[test] + fn test_track_consumers_pool_metrics() { + let track_consumers_pool = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(1000), + NonZeroUsize::new(3).unwrap(), + )); + let memory_pool: Arc = Arc::clone(&track_consumers_pool) as _; + + // Empty pool has no metrics + assert!(track_consumers_pool.metrics().is_empty()); + + // Register consumers with different spill settings + let r1 = MemoryConsumer::new("spilling") + .with_can_spill(true) + .register(&memory_pool); + let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool); + + // Grow r1 in two steps to verify peak tracking + r1.grow(100); + r1.grow(50); + r1.shrink(50); // reserved=100, peak=150 + + r2.grow(200); // reserved=200, peak=200 + + let mut metrics = track_consumers_pool.metrics(); + metrics.sort_by_key(|m| m.name.clone()); + + assert_eq!(metrics.len(), 2); + + let m_non = &metrics[0]; + assert_eq!(m_non.name, "non-spilling"); + assert!(!m_non.can_spill); + assert_eq!(m_non.reserved, 200); + assert_eq!(m_non.peak, 200); + + let m_spill = &metrics[1]; + assert_eq!(m_spill.name, "spilling"); + assert!(m_spill.can_spill); + assert_eq!(m_spill.reserved, 100); + assert_eq!(m_spill.peak, 150); + + // Unregistered consumers are removed from metrics + drop(r2); + let metrics = track_consumers_pool.metrics(); + assert_eq!(metrics.len(), 1); + assert_eq!(metrics[0].name, "spilling"); + } + #[test] fn test_tracked_consumers_pool_use_beyond_errors() { let setting = make_settings(); diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index 0f88723d116f5..883c721080611 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -49,6 +49,8 @@ macro_rules! get_extreme_value { DataType::Int64 => ScalarValue::Int64(Some(i64::$extreme)), DataType::Float32 => ScalarValue::Float32(Some(f32::$extreme)), DataType::Float64 => ScalarValue::Float64(Some(f64::$extreme)), + DataType::Date32 => ScalarValue::Date32(Some(i32::$extreme)), + DataType::Date64 => ScalarValue::Date64(Some(i64::$extreme)), DataType::Duration(TimeUnit::Second) => { ScalarValue::DurationSecond(Some(i64::$extreme)) } diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index 6e96a44fc98c4..31462d5e509ed 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -61,6 +61,7 @@ hashbrown = { workspace = true } itertools = { workspace = true, features = ["use_std"] } itoa = { workspace = true } log = { workspace = true } +memchr = { workspace = true } [dev-dependencies] criterion = { workspace = true, features = ["async_tokio"] } @@ -117,3 +118,7 @@ name = "array_position" [[bench]] harness = false name = "array_sort" + +[[bench]] +harness = false +name = "string_to_array" diff --git a/datafusion/functions-nested/benches/string_to_array.rs b/datafusion/functions-nested/benches/string_to_array.rs new file mode 100644 index 0000000000000..e403d5e51bac8 --- /dev/null +++ b/datafusion/functions-nested/benches/string_to_array.rs @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, StringArray}; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_common::ScalarValue; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::string::StringToArray; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 1000; +const SEED: u64 = 42; + +fn criterion_benchmark(c: &mut Criterion) { + // Single-char delimiter + let comma = ColumnarValue::Scalar(ScalarValue::Utf8(Some(",".to_string()))); + bench_string_to_array( + c, + "string_to_array_single_char_delim", + create_csv_strings, + &comma, + None, + ); + + // Multi-char delimiter + let double_colon = ColumnarValue::Scalar(ScalarValue::Utf8(Some("::".to_string()))); + bench_string_to_array( + c, + "string_to_array_multi_char_delim", + create_multi_delim_strings, + &double_colon, + None, + ); + + // With null_str argument + let null_str = ColumnarValue::Scalar(ScalarValue::Utf8(Some("NULL".to_string()))); + bench_string_to_array( + c, + "string_to_array_with_null_str", + create_csv_strings_with_nulls, + &comma, + Some(&null_str), + ); + + // NULL delimiter + let null_delim = ColumnarValue::Scalar(ScalarValue::Utf8(None)); + bench_string_to_array( + c, + "string_to_array_null_delim", + create_short_strings, + &null_delim, + None, + ); + + // Columnar delimiter (fall-back path) + bench_string_to_array_columnar_delim(c); +} + +fn bench_string_to_array_columnar_delim(c: &mut Criterion) { + let mut group = c.benchmark_group("string_to_array_columnar_delim"); + + for &num_elements in &[5, 20, 100] { + let string_array = create_csv_strings(num_elements); + let delimiter_array: ArrayRef = + Arc::new(StringArray::from(vec![Some(","); NUM_ROWS])); + + let args = vec![ + ColumnarValue::Array(string_array.clone()), + ColumnarValue::Array(delimiter_array), + ]; + let arg_fields = vec![ + Field::new("str", DataType::Utf8, true).into(), + Field::new("delimiter", DataType::Utf8, false).into(), + ]; + + let return_field = Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + ); + + group.bench_with_input( + BenchmarkId::from_parameter(num_elements), + &num_elements, + |b, _| { + let udf = StringToArray::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: NUM_ROWS, + return_field: return_field.clone().into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +fn bench_string_to_array( + c: &mut Criterion, + group_name: &str, + make_strings: fn(usize) -> ArrayRef, + delimiter: &ColumnarValue, + null_str: Option<&ColumnarValue>, +) { + let mut group = c.benchmark_group(group_name); + + for &num_elements in &[5, 20, 100] { + let string_array = make_strings(num_elements); + + let mut args = vec![ + ColumnarValue::Array(string_array.clone()), + delimiter.clone(), + ]; + let mut arg_fields = vec![ + Field::new("str", DataType::Utf8, true).into(), + Field::new("delimiter", DataType::Utf8, true).into(), + ]; + if let Some(ns) = null_str { + args.push(ns.clone()); + arg_fields.push(Field::new("null_str", DataType::Utf8, true).into()); + } + + let return_field = Field::new( + "result", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), + true, + ); + + group.bench_with_input( + BenchmarkId::from_parameter(num_elements), + &num_elements, + |b, _| { + let udf = StringToArray::new(); + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: NUM_ROWS, + return_field: return_field.clone().into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + + group.finish(); +} + +/// Creates strings like "val1,val2,val3,...,valN" with `num_elements` elements. +fn create_csv_strings(num_elements: usize) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let strings: StringArray = (0..NUM_ROWS) + .map(|_| { + let parts: Vec = (0..num_elements) + .map(|_| format!("val{}", rng.random_range(0..1000))) + .collect(); + Some(parts.join(",")) + }) + .collect(); + Arc::new(strings) +} + +/// Creates strings like "val1::val2::val3::...::valN". +fn create_multi_delim_strings(num_elements: usize) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let strings: StringArray = (0..NUM_ROWS) + .map(|_| { + let parts: Vec = (0..num_elements) + .map(|_| format!("val{}", rng.random_range(0..1000))) + .collect(); + Some(parts.join("::")) + }) + .collect(); + Arc::new(strings) +} + +/// Creates CSV strings where ~10% of elements are the literal "NULL". +fn create_csv_strings_with_nulls(num_elements: usize) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let strings: StringArray = (0..NUM_ROWS) + .map(|_| { + let parts: Vec = (0..num_elements) + .map(|_| { + if rng.random::() < 0.1 { + "NULL".to_string() + } else { + format!("val{}", rng.random_range(0..1000)) + } + }) + .collect(); + Some(parts.join(",")) + }) + .collect(); + Arc::new(strings) +} + +/// Creates short strings (length = `num_chars`) for the NULL-delimiter +/// (split-into-characters) benchmark. +fn create_short_strings(num_chars: usize) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(SEED); + let strings: StringArray = (0..NUM_ROWS) + .map(|_| { + let s: String = (0..num_chars) + .map(|_| rng.random_range(b'a'..=b'z') as char) + .collect(); + Some(s) + }) + .collect(); + Arc::new(strings) +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-nested/src/string.rs b/datafusion/functions-nested/src/string.rs index aa2ae69bea4e5..b76736672cffa 100644 --- a/datafusion/functions-nested/src/string.rs +++ b/datafusion/functions-nested/src/string.rs @@ -26,13 +26,13 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field}; use datafusion_common::utils::ListCoercion; -use datafusion_common::{DataFusionError, Result, not_impl_err}; +use datafusion_common::{DataFusionError, Result, ScalarValue, not_impl_err}; use std::fmt::{self, Write}; use crate::utils::make_scalar_function; use arrow::array::{ - GenericStringArray, StringArrayType, StringViewArray, + StringArrayType, StringViewArray, builder::{ArrayBuilder, LargeStringBuilder, StringViewBuilder}, cast::AsArray, }; @@ -43,8 +43,8 @@ use arrow::datatypes::DataType::{ use datafusion_common::cast::{ as_fixed_size_list_array, as_large_list_array, as_list_array, }; -use datafusion_common::exec_err; use datafusion_common::types::logical_string; +use datafusion_common::{exec_datafusion_err, exec_err}; use datafusion_expr::{ ArrayFunctionArgument, ArrayFunctionSignature, Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, @@ -189,11 +189,17 @@ make_udf_expr_and_func!( ) )] #[derive(Debug, PartialEq, Eq, Hash)] -pub(super) struct StringToArray { +pub struct StringToArray { signature: Signature, aliases: Vec, } +impl Default for StringToArray { + fn default() -> Self { + Self::new() + } +} + impl StringToArray { pub fn new() -> Self { Self { @@ -233,13 +239,71 @@ impl ScalarUDFImpl for StringToArray { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let args = &args.args; - match args[0].data_type() { - Utf8 | Utf8View => make_scalar_function(string_to_array_inner::)(args), - LargeUtf8 => make_scalar_function(string_to_array_inner::)(args), + let ScalarFunctionArgs { args, .. } = args; + + let delimiter_is_scalar = matches!(&args[1], ColumnarValue::Scalar(_)); + let null_str_is_scalar = args + .get(2) + .is_none_or(|a| matches!(a, ColumnarValue::Scalar(_))); + + if !(delimiter_is_scalar && null_str_is_scalar) { + return make_scalar_function(string_to_array_fallback)(&args); + } + + // Delimiter and null_str (if given) are scalar, so use the fast path + let delimiter = match &args[1] { + ColumnarValue::Scalar(s) => s.try_as_str().ok_or_else(|| { + exec_datafusion_err!( + "unsupported type for string_to_array delimiter: {:?}", + args[1].data_type() + ) + })?, + _ => unreachable!("delimiter must be scalar in this branch"), + }; + let null_value = match args.get(2) { + Some(ColumnarValue::Scalar(s)) => s.try_as_str().ok_or_else(|| { + exec_datafusion_err!( + "unsupported type for string_to_array null_str: {:?}", + args[2].data_type() + ) + })?, + _ => None, + }; + + let (all_scalar, string_array) = match &args[0] { + ColumnarValue::Array(a) => (false, Arc::clone(a)), + ColumnarValue::Scalar(s) => (true, s.to_array_of_size(1)?), + }; + + let result = match string_array.data_type() { + Utf8 => { + let arr = string_array.as_string::(); + let builder = + StringBuilder::with_capacity(arr.len(), arr.get_buffer_memory_size()); + string_to_array_scalar_args(&arr, delimiter, null_value, builder) + } + Utf8View => { + let arr = string_array.as_string_view(); + let builder = StringViewBuilder::with_capacity(arr.len()); + string_to_array_scalar_args(&arr, delimiter, null_value, builder) + } + LargeUtf8 => { + let arr = string_array.as_string::(); + let builder = LargeStringBuilder::with_capacity( + arr.len(), + arr.get_buffer_memory_size(), + ); + string_to_array_scalar_args(&arr, delimiter, null_value, builder) + } other => { exec_err!("unsupported type for string_to_array function as {other:?}") } + }?; + + if all_scalar { + ScalarValue::try_from_array(&result, 0).map(ColumnarValue::Scalar) + } else { + Ok(ColumnarValue::Array(result)) } } @@ -252,6 +316,201 @@ impl ScalarUDFImpl for StringToArray { } } +/// Appends `value` to the string builder, or NULL if it matches `null_value`. +#[inline(always)] +fn append_part( + builder: &mut impl StringArrayBuilderType, + value: &str, + null_value: Option<&str>, +) { + if null_value == Some(value) { + builder.append_null(); + } else { + builder.append_value(value); + } +} + +/// Optimized `string_to_array` implementation for the common case where +/// delimiter and null_value are scalar values. +fn string_to_array_scalar_args<'a, StringArrType, StringBuilderType>( + string_array: &StringArrType, + delimiter: Option<&str>, + null_value: Option<&str>, + string_builder: StringBuilderType, +) -> Result +where + StringArrType: StringArrayType<'a>, + StringBuilderType: StringArrayBuilderType, +{ + let mut list_builder = ListBuilder::new(string_builder); + + match delimiter { + Some("") => { + // Empty delimiter: each non-empty string becomes a single-element list. + // Empty strings produce an empty array (PostgreSQL compat). + for i in 0..string_array.len() { + if string_array.is_null(i) { + list_builder.append(false); + continue; + } + let string = string_array.value(i); + if !string.is_empty() { + append_part(list_builder.values(), string, null_value); + } + list_builder.append(true); + } + } + Some(delimiter) => { + // Rather than using `str::split`, do the split ourselves using + // `memmem::Finder`. This allows pre-compiling the delimiter search + // pattern once and reusing it for all rows. + let finder = memchr::memmem::Finder::new(delimiter.as_bytes()); + let delim_len = delimiter.len(); + + for i in 0..string_array.len() { + if string_array.is_null(i) { + list_builder.append(false); + continue; + } + let string = string_array.value(i); + if !string.is_empty() { + let bytes = string.as_bytes(); + let mut start = 0; + for pos in finder.find_iter(bytes) { + append_part( + list_builder.values(), + &string[start..pos], + null_value, + ); + start = pos + delim_len; + } + // Trailing part after last delimiter (or entire string if no + // delimiter was found). + append_part(list_builder.values(), &string[start..], null_value); + } + list_builder.append(true); + } + } + None => { + // NULL delimiter: split into individual characters. + for i in 0..string_array.len() { + if string_array.is_null(i) { + list_builder.append(false); + continue; + } + let string = string_array.value(i); + for (pos, c) in string.char_indices() { + append_part( + list_builder.values(), + &string[pos..pos + c.len_utf8()], + null_value, + ); + } + list_builder.append(true); + } + } + } + + Ok(Arc::new(list_builder.finish()) as ArrayRef) +} + +/// Fallback path for `string_to_array` when delimiter and/or null_value +/// are array columns rather than scalars. +fn string_to_array_fallback(args: &[ArrayRef]) -> Result { + let null_value_array = args.get(2); + + match args[0].data_type() { + Utf8 => { + let arr = args[0].as_string::(); + let builder = + StringBuilder::with_capacity(arr.len(), arr.get_buffer_memory_size()); + string_to_array_column_args(&arr, &args[1], null_value_array, builder) + } + Utf8View => { + let arr = args[0].as_string_view(); + let builder = StringViewBuilder::with_capacity(arr.len()); + string_to_array_column_args(&arr, &args[1], null_value_array, builder) + } + LargeUtf8 => { + let arr = args[0].as_string::(); + let builder = LargeStringBuilder::with_capacity( + arr.len(), + arr.get_buffer_memory_size(), + ); + string_to_array_column_args(&arr, &args[1], null_value_array, builder) + } + other => exec_err!("unsupported type for string_to_array function as {other:?}"), + } +} + +fn string_to_array_column_args<'a, StringArrType, StringBuilderType>( + string_array: &StringArrType, + delimiter_array: &ArrayRef, + null_value_array: Option<&ArrayRef>, + string_builder: StringBuilderType, +) -> Result +where + StringArrType: StringArrayType<'a>, + StringBuilderType: StringArrayBuilderType, +{ + let mut list_builder = ListBuilder::new(string_builder); + + for i in 0..string_array.len() { + if string_array.is_null(i) { + list_builder.append(false); + continue; + } + + let string = string_array.value(i); + let delimiter = get_str_value(delimiter_array, i); + let null_value = null_value_array.and_then(|arr| get_str_value(arr, i)); + + match delimiter { + Some("") => { + if !string.is_empty() { + append_part(list_builder.values(), string, null_value); + } + } + Some(delimiter) => { + if !string.is_empty() { + for part in string.split(delimiter) { + append_part(list_builder.values(), part, null_value); + } + } + } + None => { + for (pos, c) in string.char_indices() { + append_part( + list_builder.values(), + &string[pos..pos + c.len_utf8()], + null_value, + ); + } + } + } + + list_builder.append(true); + } + + Ok(Arc::new(list_builder.finish()) as ArrayRef) +} + +/// Returns the string value at index `i` from a string array of any type. +fn get_str_value(array: &ArrayRef, i: usize) -> Option<&str> { + if array.is_null(i) { + return None; + } + match array.data_type() { + Utf8 => Some(array.as_string::().value(i)), + LargeUtf8 => Some(array.as_string::().value(i)), + Utf8View => Some(array.as_string_view().value(i)), + other => { + debug_assert!(false, "unexpected type in get_str_value: {other:?}"); + None + } + } +} + fn array_to_string_inner(args: &[ArrayRef]) -> Result { if args.len() < 2 || args.len() > 3 { return exec_err!("array_to_string expects two or three arguments"); @@ -521,275 +780,6 @@ where Ok(()) } -/// String_to_array SQL function -/// Splits string at occurrences of delimiter and returns an array of parts -/// string_to_array('abc~@~def~@~ghi', '~@~') = '["abc", "def", "ghi"]' -fn string_to_array_inner(args: &[ArrayRef]) -> Result { - if args.len() < 2 || args.len() > 3 { - return exec_err!("string_to_array expects two or three arguments"); - } - - match args[0].data_type() { - Utf8 => { - let string_array = args[0].as_string::(); - let builder = StringBuilder::with_capacity( - string_array.len(), - string_array.get_buffer_memory_size(), - ); - string_to_array_inner_2::<&GenericStringArray, StringBuilder>( - args, - &string_array, - builder, - ) - } - Utf8View => { - let string_array = args[0].as_string_view(); - let builder = StringViewBuilder::with_capacity(string_array.len()); - string_to_array_inner_2::<&StringViewArray, StringViewBuilder>( - args, - &string_array, - builder, - ) - } - LargeUtf8 => { - let string_array = args[0].as_string::(); - let builder = LargeStringBuilder::with_capacity( - string_array.len(), - string_array.get_buffer_memory_size(), - ); - string_to_array_inner_2::<&GenericStringArray, LargeStringBuilder>( - args, - &string_array, - builder, - ) - } - other => exec_err!( - "unsupported type for first argument to string_to_array function as {other:?}" - ), - } -} - -fn string_to_array_inner_2<'a, StringArrType, StringBuilderType>( - args: &'a [ArrayRef], - string_array: &StringArrType, - string_builder: StringBuilderType, -) -> Result -where - StringArrType: StringArrayType<'a>, - StringBuilderType: StringArrayBuilderType, -{ - match args[1].data_type() { - Utf8 => { - let delimiter_array = args[1].as_string::(); - if args.len() == 2 { - string_to_array_impl::< - StringArrType, - &GenericStringArray, - &StringViewArray, - StringBuilderType, - >(string_array, &delimiter_array, None, string_builder) - } else { - string_to_array_inner_3::< - StringArrType, - &GenericStringArray, - StringBuilderType, - >(args, string_array, &delimiter_array, string_builder) - } - } - Utf8View => { - let delimiter_array = args[1].as_string_view(); - - if args.len() == 2 { - string_to_array_impl::< - StringArrType, - &StringViewArray, - &StringViewArray, - StringBuilderType, - >(string_array, &delimiter_array, None, string_builder) - } else { - string_to_array_inner_3::< - StringArrType, - &StringViewArray, - StringBuilderType, - >(args, string_array, &delimiter_array, string_builder) - } - } - LargeUtf8 => { - let delimiter_array = args[1].as_string::(); - if args.len() == 2 { - string_to_array_impl::< - StringArrType, - &GenericStringArray, - &StringViewArray, - StringBuilderType, - >(string_array, &delimiter_array, None, string_builder) - } else { - string_to_array_inner_3::< - StringArrType, - &GenericStringArray, - StringBuilderType, - >(args, string_array, &delimiter_array, string_builder) - } - } - other => exec_err!( - "unsupported type for second argument to string_to_array function as {other:?}" - ), - } -} - -fn string_to_array_inner_3<'a, StringArrType, DelimiterArrType, StringBuilderType>( - args: &'a [ArrayRef], - string_array: &StringArrType, - delimiter_array: &DelimiterArrType, - string_builder: StringBuilderType, -) -> Result -where - StringArrType: StringArrayType<'a>, - DelimiterArrType: StringArrayType<'a>, - StringBuilderType: StringArrayBuilderType, -{ - match args[2].data_type() { - Utf8 => { - let null_type_array = Some(args[2].as_string::()); - string_to_array_impl::< - StringArrType, - DelimiterArrType, - &GenericStringArray, - StringBuilderType, - >( - string_array, - delimiter_array, - null_type_array, - string_builder, - ) - } - Utf8View => { - let null_type_array = Some(args[2].as_string_view()); - string_to_array_impl::< - StringArrType, - DelimiterArrType, - &StringViewArray, - StringBuilderType, - >( - string_array, - delimiter_array, - null_type_array, - string_builder, - ) - } - LargeUtf8 => { - let null_type_array = Some(args[2].as_string::()); - string_to_array_impl::< - StringArrType, - DelimiterArrType, - &GenericStringArray, - StringBuilderType, - >( - string_array, - delimiter_array, - null_type_array, - string_builder, - ) - } - other => { - exec_err!("unsupported type for string_to_array function as {other:?}") - } - } -} - -fn string_to_array_impl< - 'a, - StringArrType, - DelimiterArrType, - NullValueArrType, - StringBuilderType, ->( - string_array: &StringArrType, - delimiter_array: &DelimiterArrType, - null_value_array: Option, - string_builder: StringBuilderType, -) -> Result -where - StringArrType: StringArrayType<'a>, - DelimiterArrType: StringArrayType<'a>, - NullValueArrType: StringArrayType<'a>, - StringBuilderType: StringArrayBuilderType, -{ - let mut list_builder = ListBuilder::new(string_builder); - - match null_value_array { - None => string_array.iter().zip(delimiter_array.iter()).for_each( - |(string, delimiter)| match (string, delimiter) { - (Some(string), Some("")) => { - if !string.is_empty() { - list_builder.values().append_value(string); - } - list_builder.append(true); - } - (Some(string), Some(delimiter)) => { - if !string.is_empty() { - string.split(delimiter).for_each(|s| { - list_builder.values().append_value(s); - }); - } - list_builder.append(true); - } - (Some(string), None) => { - string.chars().map(|c| c.to_string()).for_each(|c| { - list_builder.values().append_value(c.as_str()); - }); - list_builder.append(true); - } - _ => list_builder.append(false), - }, - ), - Some(null_value_array) => string_array - .iter() - .zip(delimiter_array.iter()) - .zip(null_value_array.iter()) - .for_each(|((string, delimiter), null_value)| { - match (string, delimiter) { - (Some(string), Some("")) => { - if !string.is_empty() { - if Some(string) == null_value { - list_builder.values().append_null(); - } else { - list_builder.values().append_value(string); - } - } - list_builder.append(true); - } - (Some(string), Some(delimiter)) => { - if !string.is_empty() { - string.split(delimiter).for_each(|s| { - if Some(s) == null_value { - list_builder.values().append_null(); - } else { - list_builder.values().append_value(s); - } - }); - } - list_builder.append(true); - } - (Some(string), None) => { - string.chars().map(|c| c.to_string()).for_each(|c| { - if Some(c.as_str()) == null_value { - list_builder.values().append_null(); - } else { - list_builder.values().append_value(c.as_str()); - } - }); - list_builder.append(true); - } - _ => list_builder.append(false), // null value - } - }), - }; - - let list_array = list_builder.finish(); - Ok(Arc::new(list_array) as ArrayRef) -} - trait StringArrayBuilderType: ArrayBuilder { fn append_value(&mut self, val: &str); diff --git a/datafusion/functions/src/string/repeat.rs b/datafusion/functions/src/string/repeat.rs index 4e38ec9af3859..f100a29e309e2 100644 --- a/datafusion/functions/src/string/repeat.rs +++ b/datafusion/functions/src/string/repeat.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use crate::utils::utf8_to_str_type; use arrow::array::{ Array, ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, Int64Array, - OffsetSizeTrait, StringArrayType, StringViewArray, + StringArrayType, StringLikeArrayBuilder, StringViewArray, StringViewBuilder, }; use arrow::datatypes::DataType; use arrow::datatypes::DataType::{LargeUtf8, Utf8, Utf8View}; @@ -91,6 +91,9 @@ impl ScalarUDFImpl for RepeatFunc { } fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types[0] == Utf8View { + return Ok(Utf8View); + } utf8_to_str_type(&arg_types[0], "repeat") } @@ -126,13 +129,12 @@ impl ScalarUDFImpl for RepeatFunc { }; let result = match string_scalar { - ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => { - ScalarValue::Utf8(Some(compute_repeat( - s, - count, - i32::MAX as usize, - )?)) - } + ScalarValue::Utf8View(Some(s)) => ScalarValue::Utf8View(Some( + compute_repeat(s, count, i32::MAX as usize)?, + )), + ScalarValue::Utf8(Some(s)) => ScalarValue::Utf8(Some( + compute_repeat(s, count, i32::MAX as usize)?, + )), ScalarValue::LargeUtf8(Some(s)) => ScalarValue::LargeUtf8(Some( compute_repeat(s, count, i64::MAX as usize)?, )), @@ -183,26 +185,47 @@ fn repeat(string_array: &ArrayRef, count_array: &ArrayRef) -> Result { match string_array.data_type() { Utf8View => { let string_view_array = string_array.as_string_view(); - repeat_impl::( + let (_, max_item_capacity) = calculate_capacities( &string_view_array, number_array, i32::MAX as usize, + )?; + let builder = StringViewBuilder::with_capacity(string_array.len()); + repeat_impl::<&StringViewArray, StringViewBuilder>( + &string_view_array, + number_array, + max_item_capacity, + builder, ) } Utf8 => { let string_arr = string_array.as_string::(); - repeat_impl::>( + let (total_capacity, max_item_capacity) = + calculate_capacities(&string_arr, number_array, i32::MAX as usize)?; + let builder = GenericStringBuilder::::with_capacity( + string_array.len(), + total_capacity, + ); + repeat_impl::<&GenericStringArray, GenericStringBuilder>( &string_arr, number_array, - i32::MAX as usize, + max_item_capacity, + builder, ) } LargeUtf8 => { let string_arr = string_array.as_string::(); - repeat_impl::>( + let (total_capacity, max_item_capacity) = + calculate_capacities(&string_arr, number_array, i64::MAX as usize)?; + let builder = GenericStringBuilder::::with_capacity( + string_array.len(), + total_capacity, + ); + repeat_impl::<&GenericStringArray, GenericStringBuilder>( &string_arr, number_array, - i64::MAX as usize, + max_item_capacity, + builder, ) } other => exec_err!( @@ -212,17 +235,17 @@ fn repeat(string_array: &ArrayRef, count_array: &ArrayRef) -> Result { } } -fn repeat_impl<'a, T, S>( +fn calculate_capacities<'a, S>( string_array: &S, number_array: &Int64Array, max_str_len: usize, -) -> Result +) -> Result<(usize, usize)> where - T: OffsetSizeTrait, - S: StringArrayType<'a> + 'a, + S: StringArrayType<'a>, { let mut total_capacity = 0; let mut max_item_capacity = 0; + string_array.iter().zip(number_array.iter()).try_for_each( |(string, number)| -> Result<(), DataFusionError> { match (string, number) { @@ -244,9 +267,19 @@ where }, )?; - let mut builder = - GenericStringBuilder::::with_capacity(string_array.len(), total_capacity); + Ok((total_capacity, max_item_capacity)) +} +fn repeat_impl<'a, S, B>( + string_array: &S, + number_array: &Int64Array, + max_item_capacity: usize, + mut builder: B, +) -> Result +where + S: StringArrayType<'a> + 'a, + B: StringLikeArrayBuilder, +{ // Reusable buffer to avoid allocations in string.repeat() let mut buffer = Vec::::with_capacity(max_item_capacity); @@ -303,8 +336,8 @@ where #[cfg(test)] mod tests { - use arrow::array::{Array, StringArray}; - use arrow::datatypes::DataType::Utf8; + use arrow::array::{Array, LargeStringArray, StringArray, StringViewArray}; + use arrow::datatypes::DataType::{LargeUtf8, Utf8, Utf8View}; use datafusion_common::ScalarValue; use datafusion_common::{Result, exec_err}; @@ -357,8 +390,8 @@ mod tests { ], Ok(Some("PgPgPgPg")), &str, - Utf8, - StringArray + Utf8View, + StringViewArray ); test_function!( RepeatFunc::new(), @@ -368,8 +401,19 @@ mod tests { ], Ok(None), &str, - Utf8, - StringArray + Utf8View, + StringViewArray + ); + test_function!( + RepeatFunc::new(), + vec![ + ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(String::from("Pg")))), + ColumnarValue::Scalar(ScalarValue::Int64(None)), + ], + Ok(None), + &str, + LargeUtf8, + LargeStringArray ); test_function!( RepeatFunc::new(), @@ -379,8 +423,8 @@ mod tests { ], Ok(None), &str, - Utf8, - StringArray + Utf8View, + StringViewArray ); test_function!( RepeatFunc::new(), diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index e7bc62e8da097..08839b49ef4b0 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -26,7 +26,9 @@ use crate::simplify_expressions::ExprSimplifier; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; -use datafusion_common::{Column, DFSchemaRef, HashMap, Result, ScalarValue, plan_err}; +use datafusion_common::{ + Column, DFSchemaRef, HashMap, Result, ScalarValue, assert_or_internal_err, plan_err, +}; use datafusion_expr::expr::Alias; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::utils::{ @@ -179,7 +181,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { find_join_exprs(subquery_filter_exprs)?; if let Some(in_predicate) = &self.in_predicate_opt { // in_predicate may be already included in the join filters, remove it from the join filters first. - join_filters = remove_duplicated_filter(join_filters, in_predicate); + join_filters = remove_duplicated_filter(join_filters, in_predicate)?; } let correlated_subquery_cols = collect_subquery_cols(&join_filters, subquery_schema)?; @@ -460,25 +462,39 @@ fn collect_local_correlated_cols( } } -fn remove_duplicated_filter(filters: Vec, in_predicate: &Expr) -> Vec { - filters +fn remove_duplicated_filter( + filters: Vec, + in_predicate: &Expr, +) -> Result> { + // We assume below that swapping the order of operands to an operator does + // not change behavior, which is only true if the operator is commutative. + assert_or_internal_err!( + match in_predicate { + Expr::BinaryExpr(b) => b.op.swap() == Some(b.op), + _ => true, + }, + "remove_duplicated_filter: in_predicate must use a commutative operator" + ); + + Ok(filters .into_iter() .filter(|filter| { if filter == in_predicate { return false; } - // ignore the binary order + // Treat swapped operand order to a binary operator as equivalent !match (filter, in_predicate) { (Expr::BinaryExpr(a_expr), Expr::BinaryExpr(b_expr)) => { - (a_expr.op == b_expr.op) - && (a_expr.left == b_expr.left && a_expr.right == b_expr.right) - || (a_expr.left == b_expr.right && a_expr.right == b_expr.left) + a_expr.op == b_expr.op + && ((a_expr.left == b_expr.left && a_expr.right == b_expr.right) + || (a_expr.left == b_expr.right + && a_expr.right == b_expr.left)) } _ => false, } }) - .collect::>() + .collect::>()) } fn agg_exprs_evaluation_result_on_empty_batch( diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 975c234b38836..590b00098bd46 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -111,7 +111,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { // replace column references with entry in map, if it exists if let Some(map_expr) = expr .try_as_col() - .and_then(|col| expr_check_map.get(&col.name)) + .and_then(|col| expr_check_map.get(col)) { Ok(Transformed::yes(map_expr.clone())) } else { @@ -176,7 +176,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { // replace column references with entry in map, if it exists if let Some(map_expr) = expr .try_as_col() - .and_then(|col| expr_check_map.get(&col.name)) + .and_then(|col| expr_check_map.get(col)) { Ok(Transformed::yes(map_expr.clone())) } else { @@ -301,7 +301,7 @@ fn build_join( subquery: &Subquery, filter_input: &LogicalPlan, subquery_alias: &str, -) -> Result)>> { +) -> Result)>> { let subquery_plan = subquery.subquery.as_ref(); let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true); let new_plan = subquery_plan.clone().rewrite(&mut pull_up).data()?; @@ -358,14 +358,19 @@ fn build_join( // If expr always returns null when column is null, skip processing continue; } + + let indicator_col = + Column::new(Some(subquery_alias), UN_MATCHED_ROW_INDICATOR); + // Qualify with the subquery alias to avoid ambiguity when the + // outer table has a column with the same name as the aggregate. + let value_col = Column::new(Some(subquery_alias), name.clone()); + let computer_expr = if let Some(filter) = &pull_up.pull_up_having_expr { Expr::Case(expr::Case { expr: None, when_then_expr: vec![ ( - Box::new(Expr::IsNull(Box::new(Expr::Column( - Column::new_unqualified(UN_MATCHED_ROW_INDICATOR), - )))), + Box::new(Expr::IsNull(Box::new(Expr::Column(indicator_col)))), Box::new(result), ), ( @@ -373,29 +378,23 @@ fn build_join( Box::new(Expr::Literal(ScalarValue::Null, None)), ), ], - else_expr: Some(Box::new(Expr::Column(Column::new_unqualified( - name.clone(), - )))), + else_expr: Some(Box::new(Expr::Column(value_col.clone()))), }) } else { Expr::Case(expr::Case { expr: None, when_then_expr: vec![( - Box::new(Expr::IsNull(Box::new(Expr::Column( - Column::new_unqualified(UN_MATCHED_ROW_INDICATOR), - )))), + Box::new(Expr::IsNull(Box::new(Expr::Column(indicator_col)))), Box::new(result), )], - else_expr: Some(Box::new(Expr::Column(Column::new_unqualified( - name.clone(), - )))), + else_expr: Some(Box::new(Expr::Column(value_col.clone()))), }) }; let mut expr_rewrite = TypeCoercionRewriter { schema: new_plan.schema(), }; computation_project_expr - .insert(name, computer_expr.rewrite(&mut expr_rewrite).data()?); + .insert(value_col, computer_expr.rewrite(&mut expr_rewrite).data()?); } } diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 162b6d814e804..bd5f2bb18aaec 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -60,7 +60,7 @@ log = { workspace = true } percent-encoding = "2.3.2" rand = { workspace = true } serde_json = { workspace = true } -sha1 = "0.10" +sha1 = "0.11" sha2 = { workspace = true } url = { workspace = true } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 1216e1e0238dc..25136ca777c74 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -9358,6 +9358,74 @@ select string_to_list(e, 'm') from values; [adipiscing] NULL +# string_to_array: single-char delimiter producing multiple elements +query ? +SELECT string_to_array('a,b,c', ',') +---- +[a, b, c] + +# string_to_array: delimiter not found in input +query ? +SELECT string_to_array('abc', ',') +---- +[abc] + +# string_to_array: empty string input +query ? +SELECT string_to_array('', ',') +---- +[] + +# string_to_array: null_str matching multiple elements +query ? +SELECT string_to_array('a,NULL,b,NULL,c', ',', 'NULL') +---- +[a, NULL, b, NULL, c] + +# string_to_array: null_str matching all elements +query ? +SELECT string_to_array('x,x,x', ',', 'x') +---- +[NULL, NULL, NULL] + +# string_to_array: null_str with empty-string delimiter +query ? +SELECT string_to_array('abc', '', 'abc') +---- +[NULL] + +# string_to_array: NULL string input +query ? +SELECT string_to_array(NULL, ',') +---- +NULL + +# string_to_array: columnar delimiter +query ?? +SELECT string_to_array('a,b,c', col1), string_to_array('a::b::c', col2) + FROM (VALUES (',', '::')) AS t(col1, col2) +---- +[a, b, c] [a, b, c] + +# string_to_array: columnar null_str +query ? +SELECT string_to_array('a,NULL,b', ',', col1) + FROM (VALUES ('NULL')) AS t(col1) +---- +[a, NULL, b] + +# string_to_array: adjacent delimiters produce empty strings +query ? +SELECT string_to_array('a,,b', ',') +---- +[a, , b] + +# string_to_array: delimiter at start and end +query ? +SELECT string_to_array(',a,b,', ',') +---- +[, a, b, ] + # array_resize scalar function #1 query ? select array_resize(make_array(1, 2, 3), 1); diff --git a/datafusion/sqllogictest/test_files/datetime/arith_date_interval.slt b/datafusion/sqllogictest/test_files/datetime/arith_date_interval.slt index ad2e7ed496f79..01e1939996dfc 100644 --- a/datafusion/sqllogictest/test_files/datetime/arith_date_interval.slt +++ b/datafusion/sqllogictest/test_files/datetime/arith_date_interval.slt @@ -35,3 +35,15 @@ query T SELECT arrow_typeof('2001-09-28'::date - interval '25 hour') ---- Date32 + +query error Arrow error: Compute error: Date arithmetic overflow +SELECT arrow_cast('2020-01-01', 'Date32') + INTERVAL '999999' YEAR + +query error Arrow error: Compute error: Date arithmetic overflow +SELECT arrow_cast('2020-01-01', 'Date32') - INTERVAL '999999' YEAR + +query error Arrow error: Compute error: Date arithmetic overflow +SELECT arrow_cast('2020-01-01', 'Date64') + INTERVAL '999999' YEAR + +query error Arrow error: Compute error: Date arithmetic overflow +SELECT arrow_cast('2020-01-01', 'Date64') - INTERVAL '999999' YEAR diff --git a/datafusion/sqllogictest/test_files/string/string_literal.slt b/datafusion/sqllogictest/test_files/string/string_literal.slt index 569dfe0336f74..d4fe8ee178719 100644 --- a/datafusion/sqllogictest/test_files/string/string_literal.slt +++ b/datafusion/sqllogictest/test_files/string/string_literal.slt @@ -347,11 +347,35 @@ SELECT repeat('foo', 3) ---- foofoofoo +query T +SELECT repeat(arrow_cast('foo', 'LargeUtf8'), 3) +---- +foofoofoo + +query T +SELECT repeat(arrow_cast('foo', 'Utf8View'), 3) +---- +foofoofoo + query T SELECT repeat(arrow_cast('foo', 'Dictionary(Int32, Utf8)'), 3) ---- foofoofoo +query T +SELECT arrow_typeof(repeat('foo', 3)) +---- +Utf8 + +query T +SELECT arrow_typeof(repeat(arrow_cast('foo', 'LargeUtf8'), 3)) +---- +LargeUtf8 + +query T +SELECT arrow_typeof(repeat(arrow_cast('foo', 'Utf8View'), 3)) +---- +Utf8View query T SELECT replace('foobar', 'bar', 'hello') diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index e5ca9d674e19f..7f88199b3c0ef 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1671,3 +1671,71 @@ drop table employees; statement count 0 drop table project_assignments; + +# https://github.com/apache/datafusion/issues/21205 +statement ok +CREATE TABLE dup_filter_t1(id INTEGER) AS VALUES (1), (2), (3); + +statement ok +CREATE TABLE dup_filter_t2(id INTEGER) AS VALUES (1), (2), (3); + +query I +SELECT * FROM dup_filter_t1 WHERE dup_filter_t1.id IN ( + SELECT dup_filter_t2.id FROM dup_filter_t2 WHERE dup_filter_t2.id > dup_filter_t1.id +); +---- + +statement ok +DROP TABLE dup_filter_t1; + +statement ok +DROP TABLE dup_filter_t2; + +# https://github.com/apache/datafusion/issues/21206 +statement ok +CREATE TABLE sq_name_t1(id INTEGER) AS VALUES (1), (2), (3); + +statement ok +CREATE TABLE sq_name_t2(id INTEGER, outer_id INTEGER) AS VALUES (10, 1), (20, 1), (30, 2); + +query II +SELECT sq_name_t1.id, + (SELECT count(*) AS id FROM sq_name_t2 WHERE sq_name_t2.outer_id = sq_name_t1.id) AS cnt +FROM sq_name_t1 +ORDER BY sq_name_t1.id; +---- +1 2 +2 1 +3 0 + +query I +SELECT sq_name_t1.id +FROM sq_name_t1 +WHERE sq_name_t1.id > ( + SELECT count(*) AS id + FROM sq_name_t2 + WHERE sq_name_t2.outer_id = sq_name_t1.id +) +ORDER BY sq_name_t1.id; +---- +2 +3 + +query I +SELECT sq_name_t1.id * 10 + ( + SELECT count(*) AS id + FROM sq_name_t2 + WHERE sq_name_t2.outer_id = sq_name_t1.id +) AS total +FROM sq_name_t1 +ORDER BY sq_name_t1.id; +---- +12 +21 +30 + +statement ok +DROP TABLE sq_name_t1; + +statement ok +DROP TABLE sq_name_t2; diff --git a/dev/update_config_docs.sh b/dev/update_config_docs.sh index f39bdda3aee87..7ab998f3dad48 100755 --- a/dev/update_config_docs.sh +++ b/dev/update_config_docs.sh @@ -101,9 +101,24 @@ EOF echo "Running CLI and inserting config docs table" $PRINT_CONFIG_DOCS_COMMAND >> "$TARGET_FILE" -echo "Inserting runtime config header" +echo "Inserting reset command details and runtime config header" cat <<'EOF' >> "$TARGET_FILE" +You can also reset configuration options to default settings via SQL using the `RESET` command. For +example, to set and reset `datafusion.execution.batch_size`: + +```sql +SET datafusion.execution.batch_size = '10000'; + +SHOW datafusion.execution.batch_size; +datafusion.execution.batch_size 10000 + +RESET datafusion.execution.batch_size; + +SHOW datafusion.execution.batch_size; +datafusion.execution.batch_size 8192 +``` + # Runtime Configuration Settings DataFusion runtime configurations can be set via SQL using the `SET` command. diff --git a/docs/source/library-user-guide/upgrading/53.0.0.md b/docs/source/library-user-guide/upgrading/53.0.0.md index ef5f5743f5ea6..f616220778936 100644 --- a/docs/source/library-user-guide/upgrading/53.0.0.md +++ b/docs/source/library-user-guide/upgrading/53.0.0.md @@ -37,7 +37,7 @@ these crates. See the [Arrow 58.0.0 release notes] and the [object_store 0.13.0 upgrade guide] for details on breaking changes in those versions. [arrow 58.0.0 release notes]: https://github.com/apache/arrow-rs/releases/tag/58.0.0 -[object_store 0.13.0 upgrade guide]: https://github.com/apache/arrow-rs/releases/tag/58.0.0 +[object_store 0.13.0 upgrade guide]: https://github.com/apache/arrow-rs-object-store/blob/v0.13.0/CHANGELOG.md ### `ExecutionPlan::properties` now returns `&Arc` diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 95abb2769d287..69627e3cb9148 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -197,6 +197,21 @@ The following configuration settings are available: | datafusion.format.duration_format | pretty | Duration format. Can be either `"pretty"` or `"ISO8601"` | | datafusion.format.types_info | false | Show types in visual representation batches | +You can also reset configuration options to default settings via SQL using the `RESET` command. For +example, to set and reset `datafusion.execution.batch_size`: + +```sql +SET datafusion.execution.batch_size = '10000'; + +SHOW datafusion.execution.batch_size; +datafusion.execution.batch_size 10000 + +RESET datafusion.execution.batch_size; + +SHOW datafusion.execution.batch_size; +datafusion.execution.batch_size 8192 +``` + # Runtime Configuration Settings DataFusion runtime configurations can be set via SQL using the `SET` command. diff --git a/uv.lock b/uv.lock index 541fe15f43383..925d850bba42d 100644 --- a/uv.lock +++ b/uv.lock @@ -793,11 +793,11 @@ wheels = [ [[package]] name = "pygments" -version = "2.19.2" +version = "2.20.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c3/b2/bc9c9196916376152d655522fdcebac55e66de6603a76a02bca1b6414f6c/pygments-2.20.0.tar.gz", hash = "sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f", size = 4955991, upload-time = "2026-03-29T13:29:33.898Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, + { url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" }, ] [[package]]