diff --git a/datafusion-examples/examples/data_io/parquet_index.rs b/datafusion-examples/examples/data_io/parquet_index.rs index e11a303f442a4..515dad7a51e17 100644 --- a/datafusion-examples/examples/data_io/parquet_index.rs +++ b/datafusion-examples/examples/data_io/parquet_index.rs @@ -462,7 +462,7 @@ impl PruningStatistics for ParquetMetadataIndex { } /// return the row counts for each file - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { Some(self.row_counts_ref().clone()) } diff --git a/datafusion-examples/examples/query_planning/pruning.rs b/datafusion-examples/examples/query_planning/pruning.rs index 33f3f8428a77f..7fdc4a7952d68 100644 --- a/datafusion-examples/examples/query_planning/pruning.rs +++ b/datafusion-examples/examples/query_planning/pruning.rs @@ -174,7 +174,7 @@ impl PruningStatistics for MyCatalog { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { // In this example, we know nothing about the number of rows in each file None } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0551cbbb15ae1..38b18c06fe930 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -687,6 +687,137 @@ config_namespace! { } } +/// Options for content-defined chunking (CDC) when writing parquet files. +/// See [`ParquetOptions::use_content_defined_chunking`]. +/// +/// Can be enabled with default options by setting +/// `use_content_defined_chunking` to `true`, or configured with sub-fields +/// like `use_content_defined_chunking.min_chunk_size`. +#[derive(Debug, Clone, PartialEq)] +pub struct CdcOptions { + /// Minimum chunk size in bytes. The rolling hash will not trigger a split + /// until this many bytes have been accumulated. Default is 256 KiB. + pub min_chunk_size: usize, + + /// Maximum chunk size in bytes. A split is forced when the accumulated + /// size exceeds this value. Default is 1 MiB. + pub max_chunk_size: usize, + + /// Normalization level. Increasing this improves deduplication ratio + /// but increases fragmentation. Recommended range is [-3, 3], default is 0. + pub norm_level: i32, +} + +// Note: `CdcOptions` intentionally does NOT implement `Default` so that the +// blanket `impl ConfigField for Option` does not +// apply. This allows the specific `impl ConfigField for Option` +// below to handle "true"/"false" for enabling/disabling CDC. +// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`. +impl CdcOptions { + /// Returns a new `CdcOptions` with default values. + #[expect(clippy::should_implement_trait)] + pub fn default() -> Self { + Self { + min_chunk_size: 256 * 1024, + max_chunk_size: 1024 * 1024, + norm_level: 0, + } + } +} + +impl ConfigField for CdcOptions { + fn set(&mut self, key: &str, value: &str) -> Result<()> { + let (key, rem) = key.split_once('.').unwrap_or((key, "")); + match key { + "min_chunk_size" => self.min_chunk_size.set(rem, value), + "max_chunk_size" => self.max_chunk_size.set(rem, value), + "norm_level" => self.norm_level.set(rem, value), + _ => _config_err!("Config value \"{}\" not found on CdcOptions", key), + } + } + + fn visit(&self, v: &mut V, key_prefix: &str, _description: &'static str) { + let key = format!("{key_prefix}.min_chunk_size"); + self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB."); + let key = format!("{key_prefix}.max_chunk_size"); + self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB."); + let key = format!("{key_prefix}.norm_level"); + self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0."); + } + + fn reset(&mut self, key: &str) -> Result<()> { + let (key, rem) = key.split_once('.').unwrap_or((key, "")); + match key { + "min_chunk_size" => { + if rem.is_empty() { + self.min_chunk_size = CdcOptions::default().min_chunk_size; + Ok(()) + } else { + self.min_chunk_size.reset(rem) + } + } + "max_chunk_size" => { + if rem.is_empty() { + self.max_chunk_size = CdcOptions::default().max_chunk_size; + Ok(()) + } else { + self.max_chunk_size.reset(rem) + } + } + "norm_level" => { + if rem.is_empty() { + self.norm_level = CdcOptions::default().norm_level; + Ok(()) + } else { + self.norm_level.reset(rem) + } + } + _ => _config_err!("Config value \"{}\" not found on CdcOptions", key), + } + } +} + +/// `ConfigField` for `Option` — allows setting the option to +/// `"true"` (enable with defaults) or `"false"` (disable), in addition to +/// setting individual sub-fields like `min_chunk_size`. +impl ConfigField for Option { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + match self { + Some(s) => s.visit(v, key, description), + None => v.none(key, description), + } + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + if key.is_empty() { + match value.to_ascii_lowercase().as_str() { + "true" => { + *self = Some(CdcOptions::default()); + Ok(()) + } + "false" => { + *self = None; + Ok(()) + } + _ => _config_err!( + "Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'" + ), + } + } else { + self.get_or_insert_with(CdcOptions::default).set(key, value) + } + } + + fn reset(&mut self, key: &str) -> Result<()> { + if key.is_empty() { + *self = None; + Ok(()) + } else { + self.get_or_insert_with(CdcOptions::default).reset(key) + } + } +} + config_namespace! { /// Options for reading and writing parquet files /// @@ -872,6 +1003,12 @@ config_namespace! { /// writing out already in-memory data, such as from a cached /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 + + /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing + /// parquet files. When `Some`, CDC is enabled with the given options; when `None` + /// (the default), CDC is disabled. When CDC is enabled, parallel writing is + /// automatically disabled since the chunker state must persist across row groups. + pub use_content_defined_chunking: Option, default = None } } @@ -1826,6 +1963,7 @@ config_field!(usize); config_field!(f64); config_field!(u64); config_field!(u32); +config_field!(i32); impl ConfigField for u8 { fn visit(&self, v: &mut V, key: &str, description: &'static str) { @@ -3579,4 +3717,77 @@ mod tests { "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0" ); } + + #[cfg(feature = "parquet")] + #[test] + fn set_cdc_option_with_boolean_true() { + use crate::config::ConfigOptions; + + let mut config = ConfigOptions::default(); + assert!( + config + .execution + .parquet + .use_content_defined_chunking + .is_none() + ); + + // Setting to "true" should enable CDC with default options + config + .set( + "datafusion.execution.parquet.use_content_defined_chunking", + "true", + ) + .unwrap(); + let cdc = config + .execution + .parquet + .use_content_defined_chunking + .as_ref() + .expect("CDC should be enabled"); + assert_eq!(cdc.min_chunk_size, 256 * 1024); + assert_eq!(cdc.max_chunk_size, 1024 * 1024); + assert_eq!(cdc.norm_level, 0); + + // Setting to "false" should disable CDC + config + .set( + "datafusion.execution.parquet.use_content_defined_chunking", + "false", + ) + .unwrap(); + assert!( + config + .execution + .parquet + .use_content_defined_chunking + .is_none() + ); + } + + #[cfg(feature = "parquet")] + #[test] + fn set_cdc_option_with_subfields() { + use crate::config::ConfigOptions; + + let mut config = ConfigOptions::default(); + + // Setting sub-fields should also enable CDC + config + .set( + "datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size", + "1024", + ) + .unwrap(); + let cdc = config + .execution + .parquet + .use_content_defined_chunking + .as_ref() + .expect("CDC should be enabled"); + assert_eq!(cdc.min_chunk_size, 1024); + // Other fields should be defaults + assert_eq!(cdc.max_chunk_size, 1024 * 1024); + assert_eq!(cdc.norm_level, 0); + } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a7a1fc6d0bb66..eaf5a1642e8e2 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -95,7 +95,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { global, column_specific_options, key_value_metadata, - crypto: _, + .. } = table_parquet_options; let mut builder = global.into_writer_properties_builder()?; @@ -191,6 +191,7 @@ impl ParquetOptions { bloom_filter_on_write, bloom_filter_fpp, bloom_filter_ndv, + use_content_defined_chunking, // not in WriterProperties enable_page_index: _, @@ -247,6 +248,26 @@ impl ParquetOptions { if let Some(encoding) = encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } + if let Some(cdc) = use_content_defined_chunking { + if cdc.min_chunk_size == 0 { + return Err(DataFusionError::Configuration( + "CDC min_chunk_size must be greater than 0".to_string(), + )); + } + if cdc.max_chunk_size <= cdc.min_chunk_size { + return Err(DataFusionError::Configuration(format!( + "CDC max_chunk_size ({}) must be greater than min_chunk_size ({})", + cdc.max_chunk_size, cdc.min_chunk_size + ))); + } + builder = builder.set_content_defined_chunking(Some( + parquet::file::properties::CdcOptions { + min_chunk_size: cdc.min_chunk_size, + max_chunk_size: cdc.max_chunk_size, + norm_level: cdc.norm_level, + }, + )); + } Ok(builder) } @@ -388,7 +409,9 @@ mod tests { use super::*; #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; - use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; + use crate::config::{ + CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions, + }; use crate::parquet_config::DFParquetWriterVersion; use parquet::basic::Compression; use parquet::file::properties::{ @@ -460,6 +483,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + use_content_defined_chunking: defaults.use_content_defined_chunking.clone(), } } @@ -576,6 +600,13 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + use_content_defined_chunking: props.content_defined_chunking().map(|c| { + CdcOptions { + min_chunk_size: c.min_chunk_size, + max_chunk_size: c.max_chunk_size, + norm_level: c.norm_level, + } + }), }, column_specific_options, key_value_metadata, @@ -786,6 +817,74 @@ mod tests { ); } + #[test] + fn test_cdc_enabled_with_custom_options() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let cdc = props.content_defined_chunking().expect("CDC should be set"); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); + } + + #[test] + fn test_cdc_disabled_by_default() { + let mut opts = TableParquetOptions::default(); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + assert!(props.content_defined_chunking().is_none()); + } + + #[test] + fn test_cdc_round_trip_through_writer_props() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let recovered = session_config_from_writer_props(&props); + + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); + } + + #[test] + fn test_cdc_validation_zero_min_chunk_size() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 0, + ..CdcOptions::default() + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + assert!(WriterPropertiesBuilder::try_from(&opts).is_err()); + } + + #[test] + fn test_cdc_validation_max_not_greater_than_min() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 512 * 1024, + max_chunk_size: 256 * 1024, + ..CdcOptions::default() + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + assert!(WriterPropertiesBuilder::try_from(&opts).is_err()); + } + #[test] fn test_bloom_filter_set_ndv_only() { // the TableParquetOptions::default, with only ndv set diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 27148de59a544..ebae23f0723a1 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -95,15 +95,17 @@ pub trait PruningStatistics { /// [`UInt64Array`]: arrow::array::UInt64Array fn null_counts(&self, column: &Column) -> Option; - /// Return the number of rows for the named column in each container - /// as an [`UInt64Array`]. + /// Return the number of rows in each container as an [`UInt64Array`]. + /// + /// Row counts are container-level (not column-specific) — the value + /// is the same regardless of which column is being considered. /// /// See [`Self::min_values`] for when to return `None` and null values. /// /// Note: the returned array must contain [`Self::num_containers`] rows /// /// [`UInt64Array`]: arrow::array::UInt64Array - fn row_counts(&self, column: &Column) -> Option; + fn row_counts(&self) -> Option; /// Returns [`BooleanArray`] where each row represents information known /// about specific literal `values` in a column. @@ -265,7 +267,7 @@ impl PruningStatistics for PartitionPruningStatistics { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { None } @@ -398,11 +400,7 @@ impl PruningStatistics for PrunableStatistics { } } - fn row_counts(&self, column: &Column) -> Option { - // If the column does not exist in the schema, return None - if self.schema.index_of(column.name()).is_err() { - return None; - } + fn row_counts(&self) -> Option { if self .statistics .iter() @@ -502,9 +500,9 @@ impl PruningStatistics for CompositePruningStatistics { None } - fn row_counts(&self, column: &Column) -> Option { + fn row_counts(&self) -> Option { for stats in &self.statistics { - if let Some(array) = stats.row_counts(column) { + if let Some(array) = stats.row_counts() { return Some(array); } } @@ -566,9 +564,9 @@ mod tests { // Partition values don't know anything about nulls or row counts assert!(partition_stats.null_counts(&column_a).is_none()); - assert!(partition_stats.row_counts(&column_a).is_none()); + assert!(partition_stats.row_counts().is_none()); assert!(partition_stats.null_counts(&column_b).is_none()); - assert!(partition_stats.row_counts(&column_b).is_none()); + assert!(partition_stats.row_counts().is_none()); // Min/max values are the same as the partition values let min_values_a = @@ -709,9 +707,9 @@ mod tests { // Partition values don't know anything about nulls or row counts assert!(partition_stats.null_counts(&column_a).is_none()); - assert!(partition_stats.row_counts(&column_a).is_none()); + assert!(partition_stats.row_counts().is_none()); assert!(partition_stats.null_counts(&column_b).is_none()); - assert!(partition_stats.row_counts(&column_b).is_none()); + assert!(partition_stats.row_counts().is_none()); // Min/max values are all missing assert!(partition_stats.min_values(&column_a).is_none()); @@ -814,13 +812,13 @@ mod tests { assert_eq!(null_counts_b, expected_null_counts_b); // Row counts are the same as the statistics - let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap()) + let row_counts_a = as_uint64_array(&pruning_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); let expected_row_counts_a = vec![Some(100), Some(200)]; assert_eq!(row_counts_a, expected_row_counts_a); - let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap()) + let row_counts_b = as_uint64_array(&pruning_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); @@ -845,7 +843,7 @@ mod tests { // This is debatable, personally I think `row_count` should not take a `Column` as an argument // at all since all columns should have the same number of rows. // But for now we just document the current behavior in this test. - let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap()) + let row_counts_c = as_uint64_array(&pruning_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); @@ -853,12 +851,13 @@ mod tests { assert_eq!(row_counts_c, expected_row_counts_c); assert!(pruning_stats.contained(&column_c, &values).is_none()); - // Test with a column that doesn't exist + // Test with a column that doesn't exist — column-specific stats + // return None, but row_counts is container-level and still available let column_d = Column::new_unqualified("d"); assert!(pruning_stats.min_values(&column_d).is_none()); assert!(pruning_stats.max_values(&column_d).is_none()); assert!(pruning_stats.null_counts(&column_d).is_none()); - assert!(pruning_stats.row_counts(&column_d).is_none()); + assert!(pruning_stats.row_counts().is_some()); assert!(pruning_stats.contained(&column_d, &values).is_none()); } @@ -886,8 +885,8 @@ mod tests { assert!(pruning_stats.null_counts(&column_b).is_none()); // Row counts are all missing - assert!(pruning_stats.row_counts(&column_a).is_none()); - assert!(pruning_stats.row_counts(&column_b).is_none()); + assert!(pruning_stats.row_counts().is_none()); + assert!(pruning_stats.row_counts().is_none()); // Contained values are all empty let values = HashSet::from([ScalarValue::from(1i32)]); @@ -1027,13 +1026,11 @@ mod tests { let expected_null_counts_col_x = vec![Some(0), Some(10)]; assert_eq!(null_counts_col_x, expected_null_counts_col_x); - // Test row counts - only available from file statistics - assert!(composite_stats.row_counts(&part_a).is_none()); - let row_counts_col_x = - as_uint64_array(&composite_stats.row_counts(&col_x).unwrap()) - .unwrap() - .into_iter() - .collect::>(); + // Test row counts — container-level, available from file statistics + let row_counts_col_x = as_uint64_array(&composite_stats.row_counts().unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_row_counts = vec![Some(100), Some(200)]; assert_eq!(row_counts_col_x, expected_row_counts); @@ -1046,12 +1043,13 @@ mod tests { // File statistics don't implement contained assert!(composite_stats.contained(&col_x, &values).is_none()); - // Non-existent column should return None for everything + // Non-existent column should return None for column-specific stats, + // but row_counts is container-level and still available let non_existent = Column::new_unqualified("non_existent"); assert!(composite_stats.min_values(&non_existent).is_none()); assert!(composite_stats.max_values(&non_existent).is_none()); assert!(composite_stats.null_counts(&non_existent).is_none()); - assert!(composite_stats.row_counts(&non_existent).is_none()); + assert!(composite_stats.row_counts().is_some()); assert!(composite_stats.contained(&non_existent, &values).is_none()); // Verify num_containers matches @@ -1155,7 +1153,7 @@ mod tests { let expected_null_counts = vec![Some(0), Some(5)]; assert_eq!(null_counts, expected_null_counts); - let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap()) + let row_counts = as_uint64_array(&composite_stats.row_counts().unwrap()) .unwrap() .into_iter() .collect::>(); @@ -1195,11 +1193,10 @@ mod tests { let expected_null_counts = vec![Some(10), Some(20)]; assert_eq!(null_counts, expected_null_counts); - let row_counts = - as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap()) - .unwrap() - .into_iter() - .collect::>(); + let row_counts = as_uint64_array(&composite_stats_reversed.row_counts().unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_row_counts = vec![Some(1000), Some(2000)]; assert_eq!(row_counts, expected_row_counts); } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index c66d20c52af56..17dd22fef3b31 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -295,8 +295,11 @@ impl DataFrame { self.session_state.create_logical_expr(sql, df_schema) } - /// Consume the DataFrame and produce a physical plan - pub async fn create_physical_plan(self) -> Result> { + /// Create a physical plan from this DataFrame. + /// + /// The `DataFrame` remains accessible after this call, so you can inspect + /// the plan and still call [`DataFrame::collect`] or other execution methods. + pub async fn create_physical_plan(&self) -> Result> { self.session_state.create_physical_plan(&self.plan).await } @@ -2398,7 +2401,7 @@ impl DataFrame { } else { let context = SessionContext::new_with_state((*self.session_state).clone()); // The schema is consistent with the output - let plan = self.clone().create_physical_plan().await?; + let plan = self.create_physical_plan().await?; let schema = plan.schema(); let task_ctx = Arc::new(self.task_ctx()); let partitions = collect_partitioned(plan, task_ctx).await?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 1884002630631..ccd5766f0a24d 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -610,4 +610,25 @@ mod tests { Ok(()) } + + /// Test that `create_physical_plan` does not consume the `DataFrame`, so + /// callers can inspect (e.g. log) the physical plan and then still call + /// `write_parquet` or any other execution method on the same `DataFrame`. + #[tokio::test] + async fn create_physical_plan_does_not_consume_dataframe() -> Result<()> { + use crate::prelude::CsvReadOptions; + let ctx = SessionContext::new(); + let df = ctx + .read_csv("tests/data/example.csv", CsvReadOptions::new()) + .await?; + + // Obtain the physical plan for inspection without consuming `df`. + let _physical_plan = df.create_physical_plan().await?; + + // `df` is still usable — collect the results. + let batches = df.collect().await?; + assert!(!batches.is_empty()); + + Ok(()) + } } diff --git a/datafusion/core/tests/parquet/content_defined_chunking.rs b/datafusion/core/tests/parquet/content_defined_chunking.rs new file mode 100644 index 0000000000000..6a98ded1bd4cf --- /dev/null +++ b/datafusion/core/tests/parquet/content_defined_chunking.rs @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for parquet content-defined chunking (CDC). +//! +//! These tests verify that CDC options are correctly wired through to the +//! parquet writer by inspecting file metadata (compressed sizes, page +//! boundaries) on the written files. + +use arrow::array::{AsArray, Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Int32Type, Int64Type, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_common::config::{CdcOptions, TableParquetOptions}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; +use parquet::file::properties::WriterProperties; +use std::fs::File; +use std::sync::Arc; +use tempfile::NamedTempFile; + +/// Create a RecordBatch with enough data to exercise CDC chunking. +fn make_test_batch(num_rows: usize) -> RecordBatch { + let ids: Vec = (0..num_rows as i32).collect(); + // ~100 bytes per row to generate enough data for CDC page splits + let payloads: Vec = (0..num_rows) + .map(|i| format!("row-{i:06}-payload-{}", "x".repeat(80))) + .collect(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("payload", DataType::Utf8, false), + ])); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(StringArray::from(payloads)), + ], + ) + .unwrap() +} + +/// Build WriterProperties from TableParquetOptions, exercising the same +/// code path that DataFusion's parquet sink uses. +fn writer_props( + opts: &mut TableParquetOptions, + schema: &Arc, +) -> WriterProperties { + opts.arrow_schema(schema); + parquet::file::properties::WriterPropertiesBuilder::try_from( + opts as &TableParquetOptions, + ) + .unwrap() + .build() +} + +/// Write a batch to a temp parquet file and return the file handle. +fn write_parquet_file(batch: &RecordBatch, props: WriterProperties) -> NamedTempFile { + let tmp = tempfile::Builder::new() + .suffix(".parquet") + .tempfile() + .unwrap(); + let mut writer = + ArrowWriter::try_new(tmp.reopen().unwrap(), batch.schema(), Some(props)).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + tmp +} + +/// Read parquet metadata from a file. +fn read_metadata(file: &NamedTempFile) -> parquet::file::metadata::ParquetMetaData { + let f = File::open(file.path()).unwrap(); + let reader_meta = ArrowReaderMetadata::load(&f, Default::default()).unwrap(); + reader_meta.metadata().as_ref().clone() +} + +/// Write parquet with CDC enabled, read it back via DataFusion, and verify +/// the data round-trips correctly. +#[tokio::test] +async fn cdc_data_round_trip() { + let batch = make_test_batch(5000); + + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions::default()); + let props = writer_props(&mut opts, &batch.schema()); + + let tmp = write_parquet_file(&batch, props); + + // Read back via DataFusion and verify row count + let ctx = SessionContext::new(); + ctx.register_parquet( + "data", + tmp.path().to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let result = ctx + .sql("SELECT COUNT(*), MIN(id), MAX(id) FROM data") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let row = &result[0]; + let count = row.column(0).as_primitive::().value(0); + let min_id = row.column(1).as_primitive::().value(0); + let max_id = row.column(2).as_primitive::().value(0); + + assert_eq!(count, 5000); + assert_eq!(min_id, 0); + assert_eq!(max_id, 4999); +} + +/// Verify that CDC options are reflected in the parquet file metadata. +/// With small chunk sizes, CDC should produce different page boundaries +/// compared to default (no CDC) writing. +#[tokio::test] +async fn cdc_affects_page_boundaries() { + let batch = make_test_batch(5000); + + // Write WITHOUT CDC + let mut no_cdc_opts = TableParquetOptions::default(); + let no_cdc_file = + write_parquet_file(&batch, writer_props(&mut no_cdc_opts, &batch.schema())); + let no_cdc_meta = read_metadata(&no_cdc_file); + + // Write WITH CDC using small chunk sizes to maximize effect + let mut cdc_opts = TableParquetOptions::default(); + cdc_opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 512, + max_chunk_size: 2048, + norm_level: 0, + }); + let cdc_file = + write_parquet_file(&batch, writer_props(&mut cdc_opts, &batch.schema())); + let cdc_meta = read_metadata(&cdc_file); + + // Both files should have the same number of rows + assert_eq!( + no_cdc_meta.file_metadata().num_rows(), + cdc_meta.file_metadata().num_rows(), + ); + + // Compare the uncompressed sizes of columns across all row groups. + // CDC with small chunk sizes should produce different page boundaries. + let no_cdc_sizes: Vec = no_cdc_meta + .row_groups() + .iter() + .flat_map(|rg| rg.columns().iter().map(|c| c.uncompressed_size())) + .collect(); + + let cdc_sizes: Vec = cdc_meta + .row_groups() + .iter() + .flat_map(|rg| rg.columns().iter().map(|c| c.uncompressed_size())) + .collect(); + + assert_ne!( + no_cdc_sizes, cdc_sizes, + "CDC with small chunk sizes should produce different page layouts \ + than default writing. no_cdc={no_cdc_sizes:?}, cdc={cdc_sizes:?}" + ); +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 0535ddd9247d4..e96bd49b9ace9 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -44,6 +44,7 @@ use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::sync::Arc; use tempfile::NamedTempFile; +mod content_defined_chunking; mod custom_reader; #[cfg(feature = "parquet_encryption")] mod encryption; diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index da35a1a34d441..c4faedf571f6d 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1370,7 +1370,11 @@ impl FileSink for ParquetSink { while let Some((path, mut rx)) = file_stream_rx.recv().await { let parquet_props = self.create_writer_props(&runtime, &path).await?; - if !parquet_opts.global.allow_single_file_parallelism { + // CDC requires the sequential writer: the chunker state lives in ArrowWriter + // and persists across row groups. The parallel path bypasses ArrowWriter entirely. + if !parquet_opts.global.allow_single_file_parallelism + || parquet_opts.global.use_content_defined_chunking.is_some() + { let mut writer = self .create_async_arrow_writer( &path, diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index 194e6e94fba3a..baef36ce147d4 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -509,7 +509,7 @@ impl PruningStatistics for PagesPruningStatistics<'_> { } } - fn row_counts(&self, _column: &datafusion_common::Column) -> Option { + fn row_counts(&self) -> Option { match self.converter.data_page_row_counts( self.offset_index, self.row_group_metadatas, diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 7a2ed8f2777e3..3f254c9f55282 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use super::{ParquetAccessPlan, ParquetFileMetrics}; -use arrow::array::{ArrayRef, BooleanArray}; +use arrow::array::{ArrayRef, BooleanArray, UInt64Array}; use arrow::datatypes::Schema; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; @@ -536,7 +536,7 @@ impl PruningStatistics for BloomFilterStatistics { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { None } @@ -626,13 +626,13 @@ impl PruningStatistics for RowGroupPruningStatistics<'_> { .map(|counts| Arc::new(counts) as ArrayRef) } - fn row_counts(&self, column: &Column) -> Option { - // row counts are the same for all columns in a row group - self.statistics_converter(column) - .and_then(|c| Ok(c.row_group_row_counts(self.metadata_iter())?)) - .ok() - .flatten() - .map(|counts| Arc::new(counts) as ArrayRef) + fn row_counts(&self) -> Option { + // Row counts are container-level — read directly from row group metadata. + let counts: UInt64Array = self + .metadata_iter() + .map(|rg| Some(rg.num_rows() as u64)) + .collect(); + Some(Arc::new(counts) as ArrayRef) } fn contained( diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 6dd55f1d7e4be..9439d8c590acc 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -382,30 +382,39 @@ fn get_exprs_except_skipped( } } -/// For each column specified in the USING JOIN condition, the JOIN plan outputs it twice -/// (once for each join side), but an unqualified wildcard should include it only once. -/// This function returns the columns that should be excluded. +/// When a JOIN has a USING clause, the join columns appear in the output +/// schema once per side (for inner/outer joins) or once total (for semi/anti +/// joins). An unqualified wildcard should include each USING column only once. +/// This function returns the duplicate columns that should be excluded. fn exclude_using_columns(plan: &LogicalPlan) -> Result> { - let using_columns = plan.using_columns()?; - let excluded = using_columns - .into_iter() - // For each USING JOIN condition, only expand to one of each join column in projection - .flat_map(|cols| { - let mut cols = cols.into_iter().collect::>(); - // sort join columns to make sure we consistently keep the same - // qualified column - cols.sort(); - let mut out_column_names: HashSet = HashSet::new(); - cols.into_iter().filter_map(move |c| { - if out_column_names.contains(&c.name) { - Some(c) - } else { - out_column_names.insert(c.name); - None - } - }) - }) - .collect::>(); + let output_columns: HashSet<_> = plan.schema().columns().iter().cloned().collect(); + let mut excluded = HashSet::new(); + for cols in plan.using_columns()? { + // `using_columns()` returns join columns from both sides regardless of + // the join type. For semi/anti joins, only one side's columns appear in + // the output schema. Filter to output columns so that columns from the + // non-output side don't participate in the deduplication process below + // and displace real output columns. + let mut cols: Vec<_> = cols + .into_iter() + .filter(|c| output_columns.contains(c)) + .collect(); + + // Sort so we keep the same qualified column, regardless of HashSet + // iteration order. + cols.sort(); + + // Keep only one column per name from the columns set, adding any + // duplicates to the excluded set. + let mut seen_names = HashSet::new(); + for col in cols { + if seen_names.contains(col.name.as_str()) { + excluded.insert(col); // exclude columns with already seen name + } else { + seen_names.insert(col.name.clone()); // mark column name as seen + } + } + } Ok(excluded) } diff --git a/datafusion/functions/benches/split_part.rs b/datafusion/functions/benches/split_part.rs index 72ca6f66a00d4..0f4998effc2ac 100644 --- a/datafusion/functions/benches/split_part.rs +++ b/datafusion/functions/benches/split_part.rs @@ -18,6 +18,7 @@ use arrow::array::{ArrayRef, Int64Array, StringArray, StringViewArray}; use arrow::datatypes::{DataType, Field}; use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_common::ScalarValue; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDF}; use datafusion_functions::string::split_part; @@ -29,15 +30,15 @@ use std::sync::Arc; const N_ROWS: usize = 8192; -/// Creates strings with `num_parts` random alphanumeric segments of `part_len` -/// bytes each, joined by `delimiter`. -fn gen_split_part_data( +/// Creates an array of strings with `num_parts` random alphanumeric segments +/// of `part_len` bytes each, joined by `delimiter`. +fn gen_string_array( n_rows: usize, num_parts: usize, part_len: usize, delimiter: &str, use_string_view: bool, -) -> (ColumnarValue, ColumnarValue) { +) -> ColumnarValue { let mut rng = StdRng::seed_from_u64(42); let mut strings: Vec = Vec::with_capacity(n_rows); @@ -54,22 +55,12 @@ fn gen_split_part_data( strings.push(parts.join(delimiter)); } - let delimiters: Vec = vec![delimiter.to_string(); n_rows]; - if use_string_view { let string_array: StringViewArray = strings.into_iter().map(Some).collect(); - let delimiter_array: StringViewArray = delimiters.into_iter().map(Some).collect(); - ( - ColumnarValue::Array(Arc::new(string_array) as ArrayRef), - ColumnarValue::Array(Arc::new(delimiter_array) as ArrayRef), - ) + ColumnarValue::Array(Arc::new(string_array) as ArrayRef) } else { let string_array: StringArray = strings.into_iter().map(Some).collect(); - let delimiter_array: StringArray = delimiters.into_iter().map(Some).collect(); - ( - ColumnarValue::Array(Arc::new(string_array) as ArrayRef), - ColumnarValue::Array(Arc::new(delimiter_array) as ArrayRef), - ) + ColumnarValue::Array(Arc::new(string_array) as ArrayRef) } } @@ -81,12 +72,10 @@ fn bench_split_part( name: &str, tag: &str, strings: ColumnarValue, - delimiters: ColumnarValue, - position: i64, + delimiter: ColumnarValue, + position: ColumnarValue, ) { - let positions: ColumnarValue = - ColumnarValue::Array(Arc::new(Int64Array::from(vec![position; N_ROWS]))); - let args = vec![strings, delimiters, positions]; + let args = vec![strings, delimiter, position]; let arg_fields: Vec<_> = args .iter() .enumerate() @@ -119,108 +108,143 @@ fn criterion_benchmark(c: &mut Criterion) { let config_options = Arc::new(ConfigOptions::default()); let mut group = c.benchmark_group("split_part"); - // Utf8, single-char delimiter, first position + // ── Scalar delimiter and position ──────────────── + + // Utf8, single-char delimiter, scalar args { - let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, ".", false); + let strings = gen_string_array(N_ROWS, 10, 8, ".", false); + let delimiter = ColumnarValue::Scalar(ScalarValue::Utf8(Some(".".into()))); + let position = ColumnarValue::Scalar(ScalarValue::Int64(Some(1))); bench_split_part( &mut group, &split_part_func, &config_options, - "utf8_single_char", + "scalar_utf8_single_char", "pos_first", strings, - delimiters, - 1, + delimiter, + position, ); } - // Utf8, single-char delimiter, middle position { - let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, ".", false); + let strings = gen_string_array(N_ROWS, 10, 8, ".", false); + let delimiter = ColumnarValue::Scalar(ScalarValue::Utf8(Some(".".into()))); + let position = ColumnarValue::Scalar(ScalarValue::Int64(Some(5))); bench_split_part( &mut group, &split_part_func, &config_options, - "utf8_single_char", + "scalar_utf8_single_char", "pos_middle", strings, - delimiters, - 5, + delimiter, + position, ); } - // Utf8, single-char delimiter, negative position { - let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, ".", false); + let strings = gen_string_array(N_ROWS, 10, 8, ".", false); + let delimiter = ColumnarValue::Scalar(ScalarValue::Utf8(Some(".".into()))); + let position = ColumnarValue::Scalar(ScalarValue::Int64(Some(-1))); bench_split_part( &mut group, &split_part_func, &config_options, - "utf8_single_char", + "scalar_utf8_single_char", "pos_negative", strings, - delimiters, - -1, + delimiter, + position, ); } - // Utf8, multi-char delimiter, middle position + // Utf8, multi-char delimiter, scalar args { - let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, "~@~", false); + let strings = gen_string_array(N_ROWS, 10, 8, "~@~", false); + let delimiter = ColumnarValue::Scalar(ScalarValue::Utf8(Some("~@~".into()))); + let position = ColumnarValue::Scalar(ScalarValue::Int64(Some(5))); bench_split_part( &mut group, &split_part_func, &config_options, - "utf8_multi_char", + "scalar_utf8_multi_char", "pos_middle", strings, - delimiters, - 5, + delimiter, + position, ); } - // Utf8View, single-char delimiter, first position + // Utf8, long strings, scalar args { - let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, ".", true); + let strings = gen_string_array(N_ROWS, 50, 16, ".", false); + let delimiter = ColumnarValue::Scalar(ScalarValue::Utf8(Some(".".into()))); + let position = ColumnarValue::Scalar(ScalarValue::Int64(Some(25))); bench_split_part( &mut group, &split_part_func, &config_options, - "utf8view_single_char", - "pos_first", + "scalar_utf8_long_strings", + "pos_middle", strings, - delimiters, - 1, + delimiter, + position, ); } - // Utf8, single-char delimiter, many long parts + // Utf8View, long parts, scalar args + { + let strings = gen_string_array(N_ROWS, 10, 32, ".", true); + let delimiter = ColumnarValue::Scalar(ScalarValue::Utf8View(Some(".".into()))); + let position = ColumnarValue::Scalar(ScalarValue::Int64(Some(5))); + bench_split_part( + &mut group, + &split_part_func, + &config_options, + "scalar_utf8view_long_parts", + "pos_middle", + strings, + delimiter, + position, + ); + } + + // ── Array delimiter and position ───────────────── + + // Utf8, single-char delimiter, array args { - let (strings, delimiters) = gen_split_part_data(N_ROWS, 50, 16, ".", false); + let strings = gen_string_array(N_ROWS, 10, 8, ".", false); + let delimiters: StringArray = vec![Some("."); N_ROWS].into_iter().collect(); + let delimiter = ColumnarValue::Array(Arc::new(delimiters) as ArrayRef); + let positions = ColumnarValue::Array(Arc::new(Int64Array::from(vec![5; N_ROWS]))); bench_split_part( &mut group, &split_part_func, &config_options, - "utf8_long_strings", + "array_utf8_single_char", "pos_middle", strings, - delimiters, - 25, + delimiter, + positions, ); } - // Utf8View, single-char delimiter, middle position, long parts + // Utf8, multi-char delimiter, array args { - let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 32, ".", true); + let strings = gen_string_array(N_ROWS, 10, 8, "~@~", false); + let delimiters: StringArray = vec![Some("~@~"); N_ROWS].into_iter().collect(); + let delimiter = ColumnarValue::Array(Arc::new(delimiters) as ArrayRef); + let positions = ColumnarValue::Array(Arc::new(Int64Array::from(vec![5; N_ROWS]))); bench_split_part( &mut group, &split_part_func, &config_options, - "utf8view_long_parts", + "array_utf8_multi_char", "pos_middle", strings, - delimiters, - 5, + delimiter, + positions, ); } diff --git a/datafusion/functions/src/string/split_part.rs b/datafusion/functions/src/string/split_part.rs index 87beacabe8491..972a10c26474e 100644 --- a/datafusion/functions/src/string/split_part.rs +++ b/datafusion/functions/src/string/split_part.rs @@ -17,8 +17,8 @@ use crate::utils::utf8_to_str_type; use arrow::array::{ - ArrayRef, AsArray, GenericStringBuilder, Int64Array, StringArrayType, - StringLikeArrayBuilder, StringViewBuilder, + Array, ArrayRef, AsArray, GenericStringBuilder, Int64Array, StringArrayType, + StringLikeArrayBuilder, StringViewBuilder, new_null_array, }; use arrow::datatypes::DataType; use datafusion_common::ScalarValue; @@ -30,6 +30,7 @@ use datafusion_expr::{ }; use datafusion_expr::{ScalarFunctionArgs, ScalarUDFImpl, Signature}; use datafusion_macros::user_doc; +use memchr::memmem; use std::sync::Arc; #[user_doc( @@ -101,6 +102,16 @@ impl ScalarUDFImpl for SplitPartFunc { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let ScalarFunctionArgs { args, .. } = args; + // Fast path: array string, scalar delimiter and position. + if let ( + ColumnarValue::Array(string_array), + ColumnarValue::Scalar(delim_scalar), + ColumnarValue::Scalar(pos_scalar), + ) = (&args[0], &args[1], &args[2]) + { + return split_part_scalar(string_array, delim_scalar, pos_scalar); + } + // First, determine if any of the arguments is an Array let len = args.iter().find_map(|arg| match arg { ColumnarValue::Array(a) => Some(a.len()), @@ -158,21 +169,25 @@ impl ScalarUDFImpl for SplitPartFunc { ), DataType::Utf8 => { let str_arr = &args[0].as_string::(); + // Conservative under-estimate for data capacity: split_part + // output is typically much smaller than the input, so avoid + // pre-allocating the full input data size. split_part_for_delimiter_type!( str_arr, GenericStringBuilder::::with_capacity( inferred_length, - str_arr.value_data().len(), + inferred_length, ) ) } DataType::LargeUtf8 => { let str_arr = &args[0].as_string::(); + // Conservative under-estimate; see Utf8 comment above. split_part_for_delimiter_type!( str_arr, GenericStringBuilder::::with_capacity( inferred_length, - str_arr.value_data().len(), + inferred_length, ) ) } @@ -192,7 +207,7 @@ impl ScalarUDFImpl for SplitPartFunc { } } -/// Finds the nth split part of `string` by `delimiter`. +/// Finds the `n`th (0-based) split part of `string` by `delimiter`. #[inline] fn split_nth<'a>(string: &'a str, delimiter: &str, n: usize) -> Option<&'a str> { if delimiter.len() == 1 { @@ -206,7 +221,7 @@ fn split_nth<'a>(string: &'a str, delimiter: &str, n: usize) -> Option<&'a str> } } -/// Like `split_nth` but splits from the right. +/// Like `split_nth` but splits from the right (`n` is 0-based from the end). #[inline] fn rsplit_nth<'a>(string: &'a str, delimiter: &str, n: usize) -> Option<&'a str> { if delimiter.len() == 1 { @@ -220,6 +235,196 @@ fn rsplit_nth<'a>(string: &'a str, delimiter: &str, n: usize) -> Option<&'a str> } } +/// Fast path for `split_part(array, scalar_delimiter, scalar_position)`. +fn split_part_scalar( + string_array: &ArrayRef, + delim_scalar: &ScalarValue, + pos_scalar: &ScalarValue, +) -> Result { + // Empty input array → empty result. + if string_array.is_empty() { + return Ok(ColumnarValue::Array(new_null_array( + string_array.data_type(), + 0, + ))); + } + + let delimiter = delim_scalar.try_as_str().ok_or_else(|| { + exec_datafusion_err!( + "Unsupported delimiter type {:?} for split_part", + delim_scalar.data_type() + ) + })?; + + let position = match pos_scalar { + ScalarValue::Int64(v) => *v, + other => { + return exec_err!( + "Unsupported position type {:?} for split_part", + other.data_type() + ); + } + }; + + // Null delimiter or position → every row is null. + let (Some(delimiter), Some(position)) = (delimiter, position) else { + return Ok(ColumnarValue::Array(new_null_array( + string_array.data_type(), + string_array.len(), + ))); + }; + + if position == 0 { + return exec_err!("field position must not be zero"); + } + + let result = match string_array.data_type() { + DataType::Utf8View => split_part_scalar_impl( + string_array.as_string_view(), + delimiter, + position, + StringViewBuilder::with_capacity(string_array.len()), + ), + DataType::Utf8 => { + let arr = string_array.as_string::(); + // Conservative under-estimate for data capacity: split_part output + // is typically much smaller than the input, so avoid pre-allocating + // the full input data size. + split_part_scalar_impl( + arr, + delimiter, + position, + GenericStringBuilder::::with_capacity(arr.len(), arr.len()), + ) + } + DataType::LargeUtf8 => { + let arr = string_array.as_string::(); + // Conservative under-estimate; see Utf8 comment above. + split_part_scalar_impl( + arr, + delimiter, + position, + GenericStringBuilder::::with_capacity(arr.len(), arr.len()), + ) + } + other => exec_err!("Unsupported string type {other:?} for split_part"), + }?; + + Ok(ColumnarValue::Array(result)) +} + +/// Inner implementation for the scalar-delimiter, scalar-position fast path. +/// Constructing a `memmem::Finder` is somewhat expensive but it's a win when +/// done once and amortized over the entire batch. +fn split_part_scalar_impl<'a, S, B>( + string_array: S, + delimiter: &str, + position: i64, + builder: B, +) -> Result +where + S: StringArrayType<'a> + Copy, + B: StringLikeArrayBuilder, +{ + if delimiter.is_empty() { + // PostgreSQL: empty delimiter treats input as a single field, + // so only position 1 or -1 returns the input string. + return if position == 1 || position == -1 { + map_strings(string_array, builder, Some) + } else { + map_strings(string_array, builder, |_| None) + }; + } + + let delim_bytes = delimiter.as_bytes(); + let delim_len = delimiter.len(); + + if position > 0 { + let idx: usize = (position - 1).try_into().map_err(|_| { + exec_datafusion_err!( + "split_part index {position} exceeds maximum supported value" + ) + })?; + let finder = memmem::Finder::new(delim_bytes); + map_strings(string_array, builder, |s| { + split_nth_finder(s, &finder, delim_len, idx) + }) + } else { + let idx: usize = (position.unsigned_abs() - 1).try_into().map_err(|_| { + exec_datafusion_err!( + "split_part index {position} exceeds minimum supported value" + ) + })?; + let finder_rev = memmem::FinderRev::new(delim_bytes); + map_strings(string_array, builder, |s| { + rsplit_nth_finder(s, &finder_rev, delim_len, idx) + }) + } +} + +/// Applies `f` to each non-null string in `string_array`, appending the +/// result (or `""` when `f` returns `None`) to `builder`. +#[inline] +fn map_strings<'a, S, B, F>(string_array: S, mut builder: B, f: F) -> Result +where + S: StringArrayType<'a> + Copy, + B: StringLikeArrayBuilder, + F: Fn(&'a str) -> Option<&'a str>, +{ + for string in string_array.iter() { + match string { + Some(s) => builder.append_value(f(s).unwrap_or("")), + None => builder.append_null(), + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) +} + +/// Finds the `n`th (0-based) split part using a pre-built `memmem::Finder`. +#[inline] +fn split_nth_finder<'a>( + string: &'a str, + finder: &memmem::Finder, + delim_len: usize, + n: usize, +) -> Option<&'a str> { + let bytes = string.as_bytes(); + let mut start = 0; + for _ in 0..n { + match finder.find(&bytes[start..]) { + Some(pos) => start += pos + delim_len, + None => return None, + } + } + match finder.find(&bytes[start..]) { + Some(pos) => Some(&string[start..start + pos]), + None => Some(&string[start..]), + } +} + +/// Like `split_nth_finder` but splits from the right (`n` is 0-based from +/// the end). +#[inline] +fn rsplit_nth_finder<'a>( + string: &'a str, + finder: &memmem::FinderRev, + delim_len: usize, + n: usize, +) -> Option<&'a str> { + let bytes = string.as_bytes(); + let mut end = bytes.len(); + for _ in 0..n { + match finder.rfind(&bytes[..end]) { + Some(pos) => end = pos, + None => return None, + } + } + match finder.rfind(&bytes[..end]) { + Some(pos) => Some(&string[pos + delim_len..end]), + None => Some(&string[..end]), + } +} + fn split_part_impl<'a, StringArrType, DelimiterArrType, B>( string_array: &StringArrType, delimiter_array: &DelimiterArrType, diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..31ece63577b4f 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -603,6 +603,14 @@ message ParquetOptions { oneof max_predicate_cache_size_opt { uint64 max_predicate_cache_size = 33; } + + CdcOptions content_defined_chunking = 35; +} + +message CdcOptions { + uint64 min_chunk_size = 1; + uint64 max_chunk_size = 2; + int32 norm_level = 3; } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index c616eadf295f2..4b7a91f38c201 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -39,7 +39,7 @@ use datafusion_common::{ DataFusionError, JoinSide, ScalarValue, Statistics, TableReference, arrow_datafusion_err, config::{ - CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, + CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, @@ -1089,6 +1089,17 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + use_content_defined_chunking: value.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + // proto3 uses 0 as the wire default for uint64; a zero chunk size is + // invalid, so treat it as "field not set" and fall back to the default. + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + // norm_level = 0 is a valid value (and the default), so pass it through directly. + norm_level: cdc.norm_level, + } + }), }) } } @@ -1151,7 +1162,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { column_specific_options.insert(column_name.clone(), options.try_into()?); } } - Ok(TableParquetOptions { + let opts = TableParquetOptions { global: value .global .as_ref() @@ -1159,9 +1170,9 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { .unwrap() .unwrap(), column_specific_options, - key_value_metadata: Default::default(), - crypto: Default::default(), - }) + ..Default::default() + }; + Ok(opts) } } @@ -1261,3 +1272,87 @@ pub(crate) fn csv_writer_options_from_proto( .with_null(writer_options.null_value.clone()) .with_double_quote(writer_options.double_quote)) } + +#[cfg(test)] +mod tests { + use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions}; + + fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions { + let proto: crate::protobuf_common::ParquetOptions = + (&opts).try_into().expect("to_proto"); + ParquetOptions::try_from(&proto).expect("from_proto") + } + + fn table_parquet_options_proto_round_trip( + opts: TableParquetOptions, + ) -> TableParquetOptions { + let proto: crate::protobuf_common::TableParquetOptions = + (&opts).try_into().expect("to_proto"); + TableParquetOptions::try_from(&proto).expect("from_proto") + } + + #[test] + fn test_parquet_options_cdc_disabled_round_trip() { + let opts = ParquetOptions::default(); + assert!(opts.use_content_defined_chunking.is_none()); + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!(opts, recovered); + } + + #[test] + fn test_parquet_options_cdc_enabled_round_trip() { + let opts = ParquetOptions { + use_content_defined_chunking: Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }), + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts.clone()); + let cdc = recovered.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); + } + + #[test] + fn test_parquet_options_cdc_negative_norm_level_round_trip() { + let opts = ParquetOptions { + use_content_defined_chunking: Some(CdcOptions { + norm_level: -3, + ..CdcOptions::default() + }), + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts); + assert_eq!( + recovered.use_content_defined_chunking.unwrap().norm_level, + -3 + ); + } + + #[test] + fn test_table_parquet_options_cdc_round_trip() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); + + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); + } + + #[test] + fn test_table_parquet_options_cdc_disabled_round_trip() { + let opts = TableParquetOptions::default(); + assert!(opts.global.use_content_defined_chunking.is_none()); + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + assert!(recovered.global.use_content_defined_chunking.is_none()); + } +} diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..77a3b71488ece 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -883,6 +883,144 @@ impl<'de> serde::Deserialize<'de> for AvroOptions { deserializer.deserialize_struct("datafusion_common.AvroOptions", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CdcOptions { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.min_chunk_size != 0 { + len += 1; + } + if self.max_chunk_size != 0 { + len += 1; + } + if self.norm_level != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.CdcOptions", len)?; + if self.min_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("minChunkSize", ToString::to_string(&self.min_chunk_size).as_str())?; + } + if self.max_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("maxChunkSize", ToString::to_string(&self.max_chunk_size).as_str())?; + } + if self.norm_level != 0 { + struct_ser.serialize_field("normLevel", &self.norm_level)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for CdcOptions { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "min_chunk_size", + "minChunkSize", + "max_chunk_size", + "maxChunkSize", + "norm_level", + "normLevel", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + MinChunkSize, + MaxChunkSize, + NormLevel, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "minChunkSize" | "min_chunk_size" => Ok(GeneratedField::MinChunkSize), + "maxChunkSize" | "max_chunk_size" => Ok(GeneratedField::MaxChunkSize), + "normLevel" | "norm_level" => Ok(GeneratedField::NormLevel), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CdcOptions; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.CdcOptions") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut min_chunk_size__ = None; + let mut max_chunk_size__ = None; + let mut norm_level__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::MinChunkSize => { + if min_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("minChunkSize")); + } + min_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::MaxChunkSize => { + if max_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("maxChunkSize")); + } + max_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::NormLevel => { + if norm_level__.is_some() { + return Err(serde::de::Error::duplicate_field("normLevel")); + } + norm_level__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(CdcOptions { + min_chunk_size: min_chunk_size__.unwrap_or_default(), + max_chunk_size: max_chunk_size__.unwrap_or_default(), + norm_level: norm_level__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion_common.CdcOptions", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for Column { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -5695,6 +5833,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { len += 1; } + if self.content_defined_chunking.is_some() { + len += 1; + } if self.metadata_size_hint_opt.is_some() { len += 1; } @@ -5806,6 +5947,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { struct_ser.serialize_field("createdBy", &self.created_by)?; } + if let Some(v) = self.content_defined_chunking.as_ref() { + struct_ser.serialize_field("contentDefinedChunking", v)?; + } if let Some(v) = self.metadata_size_hint_opt.as_ref() { match v { parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => { @@ -5944,6 +6088,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maxRowGroupSize", "created_by", "createdBy", + "content_defined_chunking", + "contentDefinedChunking", "metadata_size_hint", "metadataSizeHint", "compression", @@ -5989,6 +6135,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { DataPageRowCountLimit, MaxRowGroupSize, CreatedBy, + ContentDefinedChunking, MetadataSizeHint, Compression, DictionaryEnabled, @@ -6042,6 +6189,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), "createdBy" | "created_by" => Ok(GeneratedField::CreatedBy), + "contentDefinedChunking" | "content_defined_chunking" => Ok(GeneratedField::ContentDefinedChunking), "metadataSizeHint" | "metadata_size_hint" => Ok(GeneratedField::MetadataSizeHint), "compression" => Ok(GeneratedField::Compression), "dictionaryEnabled" | "dictionary_enabled" => Ok(GeneratedField::DictionaryEnabled), @@ -6093,6 +6241,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; let mut created_by__ = None; + let mut content_defined_chunking__ = None; let mut metadata_size_hint_opt__ = None; let mut compression_opt__ = None; let mut dictionary_enabled_opt__ = None; @@ -6246,6 +6395,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } created_by__ = Some(map_.next_value()?); } + GeneratedField::ContentDefinedChunking => { + if content_defined_chunking__.is_some() { + return Err(serde::de::Error::duplicate_field("contentDefinedChunking")); + } + content_defined_chunking__ = map_.next_value()?; + } GeneratedField::MetadataSizeHint => { if metadata_size_hint_opt__.is_some() { return Err(serde::de::Error::duplicate_field("metadataSizeHint")); @@ -6336,6 +6491,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), created_by: created_by__.unwrap_or_default(), + content_defined_chunking: content_defined_chunking__, metadata_size_hint_opt: metadata_size_hint_opt__, compression_opt: compression_opt__, dictionary_enabled_opt: dictionary_enabled_opt__, diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..1251a51ab0983 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -838,6 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -931,6 +933,15 @@ pub mod parquet_options { MaxPredicateCacheSize(u64), } } +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { #[prost(enumeration = "PrecisionInfo", tag = "1")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..65089f029b866 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,6 +904,13 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + content_defined_chunking: value.use_content_defined_chunking.as_ref().map(|cdc| + protobuf::CdcOptions { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level, + } + ), }) } } @@ -963,8 +970,11 @@ impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions { .iter() .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone()))) .collect::>(); + + let global: protobuf::ParquetOptions = (&value.global).try_into()?; + Ok(protobuf::TableParquetOptions { - global: Some((&value.global).try_into()?), + global: Some(global), column_specific_options, key_value_metadata, }) diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..1251a51ab0983 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -838,6 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -931,6 +933,15 @@ pub mod parquet_options { MaxPredicateCacheSize(u64), } } +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { #[prost(enumeration = "PrecisionInfo", tag = "1")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..8df0b3f1d9705 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -351,13 +351,13 @@ mod parquet { use super::*; use crate::protobuf::{ - ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions, - ParquetOptions as ParquetOptionsProto, + CdcOptions as CdcOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto, + ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto, TableParquetOptions as TableParquetOptionsProto, parquet_column_options, parquet_options, }; use datafusion_common::config::{ - ParquetColumnOptions, ParquetOptions, TableParquetOptions, + CdcOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }; use datafusion_datasource_parquet::file_format::ParquetFormatFactory; @@ -426,6 +426,13 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + content_defined_chunking: global_options.global.use_content_defined_chunking.as_ref().map(|cdc| { + CdcOptionsProto { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level, + } + }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -525,6 +532,17 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + // proto3 uses 0 as the wire default for uint64; a zero chunk size is + // invalid, so treat it as "field not set" and fall back to the default. + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + // norm_level = 0 is a valid value (and the default), so pass it through directly. + norm_level: cdc.norm_level, + } + }), } } } @@ -585,7 +603,7 @@ mod parquet { .iter() .map(|(k, v)| (k.clone(), Some(v.clone()))) .collect(), - crypto: Default::default(), + ..Default::default() } } } diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 978d79b1f2fb0..873e82a97303e 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -929,7 +929,7 @@ fn build_statistics_record_batch( StatisticsType::Min => statistics.min_values(&column), StatisticsType::Max => statistics.max_values(&column), StatisticsType::NullCount => statistics.null_counts(&column), - StatisticsType::RowCount => statistics.row_counts(&column), + StatisticsType::RowCount => statistics.row_counts(), }; let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers)); @@ -2300,11 +2300,10 @@ mod tests { .unwrap_or(None) } - fn row_counts(&self, column: &Column) -> Option { + fn row_counts(&self) -> Option { self.stats - .get(column) - .map(|container_stats| container_stats.row_counts()) - .unwrap_or(None) + .values() + .find_map(|container_stats| container_stats.row_counts()) } fn contained( @@ -2342,7 +2341,7 @@ mod tests { None } - fn row_counts(&self, _column: &Column) -> Option { + fn row_counts(&self) -> Option { None } diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 950a79ddb0b5e..fd606af3a6af0 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -5004,6 +5004,71 @@ fn test_using_join_wildcard_schema() { ); } +#[test] +fn test_using_join_wildcard_schema_semi_anti() { + let s_columns = &["s.x1", "s.x2", "s.x3"]; + let t_columns = &["t.x1", "t.x2", "t.x3"]; + + let sql = "WITH + s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3), + t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3) + SELECT * FROM s LEFT SEMI JOIN t USING (x1)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!(plan.schema().field_names(), s_columns); + + let sql = "WITH + s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3), + t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3) + SELECT * FROM t RIGHT SEMI JOIN s USING (x1)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!(plan.schema().field_names(), s_columns); + + let sql = "WITH + s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3), + t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3) + SELECT * FROM s LEFT ANTI JOIN t USING (x1)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!(plan.schema().field_names(), s_columns); + + let sql = "WITH + s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3), + t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3) + SELECT * FROM t RIGHT ANTI JOIN s USING (x1)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!(plan.schema().field_names(), s_columns); + + // Same as above, but with swapped s and t sides. + // Tests the issue fixed with #20990. + + let sql = "WITH + s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3), + t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3) + SELECT * FROM t LEFT SEMI JOIN s USING (x1)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!(plan.schema().field_names(), t_columns); + + let sql = "WITH + s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3), + t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3) + SELECT * FROM s RIGHT SEMI JOIN t USING (x1)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!(plan.schema().field_names(), t_columns); + + let sql = "WITH + s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3), + t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3) + SELECT * FROM t LEFT ANTI JOIN s USING (x1)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!(plan.schema().field_names(), t_columns); + + let sql = "WITH + s AS (SELECT 1 AS x1, 2 AS x2, 3 AS x3), + t AS (SELECT 1 AS x1, 4 AS x2, 5 AS x3) + SELECT * FROM s RIGHT ANTI JOIN t USING (x1)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!(plan.schema().field_names(), t_columns); +} + #[test] fn test_2_nested_lateral_join_with_the_deepest_join_referencing_the_outer_most_relation() { diff --git a/datafusion/sqllogictest/test_files/expr.slt b/datafusion/sqllogictest/test_files/expr.slt index 9365f3896b618..163730baae9e8 100644 --- a/datafusion/sqllogictest/test_files/expr.slt +++ b/datafusion/sqllogictest/test_files/expr.slt @@ -724,6 +724,87 @@ SELECT split_part('a,b', '', -2) statement error DataFusion error: Execution error: field position must not be zero SELECT split_part('abc~@~def~@~ghi', '~@~', 0) +# Position 0 with column input errors even for empty/null inputs +statement error DataFusion error: Execution error: field position must not be zero +SELECT split_part(column1, '.', 0) FROM (VALUES (NULL::text)) AS t(column1) + +# NULL delimiter with position 0 returns null (not an error), matching the +# slow path where null args short-circuit before the position check. +query T +SELECT split_part(column1, NULL, 0) FROM (VALUES ('a.b.c')) AS t(column1) +---- +NULL + +# split_part with column input (exercises the scalar-delimiter fast path) +query TTT +SELECT + split_part(column1, '.', 1), + split_part(column1, '.', 2), + split_part(column1, '.', 3) +FROM (VALUES ('a.b.c'), ('d.e.f'), ('x.y')) AS t(column1) +---- +a b c +d e f +x y (empty) + +# Multi-char delimiter with column input +query TT +SELECT + split_part(column1, '~@~', 2), + split_part(column1, '~@~', 3) +FROM (VALUES ('abc~@~def~@~ghi'), ('one~@~two')) AS t(column1) +---- +def ghi +two (empty) + +# Negative position with column input +query TT +SELECT + split_part(column1, '.', -1), + split_part(column1, '.', -2) +FROM (VALUES ('a.b.c'), ('x.y')) AS t(column1) +---- +c b +y x + +# Empty delimiter with column input +query TT +SELECT + split_part(column1, '', 1), + split_part(column1, '', 2) +FROM (VALUES ('abc'), ('xyz')) AS t(column1) +---- +abc (empty) +xyz (empty) + +# NULL column values with scalar delimiter +query T +SELECT split_part(column1, '.', 2) +FROM (VALUES ('a.b'), (NULL), ('c.d')) AS t(column1) +---- +b +NULL +d + +# Utf8View column with scalar delimiter +query TT +SELECT + split_part(column1, '.', 1), + split_part(column1, '.', 2) +FROM (SELECT arrow_cast(column1, 'Utf8View') AS column1 + FROM (VALUES ('a.b.c'), ('x.y.z')) AS t(column1)) +---- +a b +x y + +# LargeUtf8 column with scalar delimiter +query T +SELECT split_part(arrow_cast(column1, 'LargeUtf8'), '.', 2) +FROM (VALUES ('a.b.c'), (NULL)) AS t(column1) +---- +b +NULL + query B SELECT starts_with('alphabet', 'alph') ---- diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index a334bd25b0ce3..77ae1d335fb8d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -259,6 +259,7 @@ datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.statistics_truncate_length 64 +datafusion.execution.parquet.use_content_defined_chunking NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.perfect_hash_join_min_key_density 0.15 @@ -401,6 +402,7 @@ datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding t datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting +datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. diff --git a/datafusion/sqllogictest/test_files/parquet_cdc.slt b/datafusion/sqllogictest/test_files/parquet_cdc.slt new file mode 100644 index 0000000000000..f87f05af74a0c --- /dev/null +++ b/datafusion/sqllogictest/test_files/parquet_cdc.slt @@ -0,0 +1,231 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test parquet content-defined chunking (CDC) end-to-end: +# write parquet files with CDC enabled, then read them back and verify correctness. + +# Create source data +statement ok +CREATE TABLE cdc_source AS VALUES + (1, 'alice', 100.50), + (2, 'bob', 200.75), + (3, 'charlie', 300.25), + (4, 'diana', 400.00), + (5, 'eve', 500.99) + +# +# Test 1: Enable CDC with 'true' (uses default options) +# + +query I +COPY cdc_source TO 'test_files/scratch/parquet_cdc/enabled_true/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) +---- +5 + +statement ok +CREATE EXTERNAL TABLE cdc_enabled_true_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/enabled_true/' + +query ITR rowsort +SELECT * FROM cdc_enabled_true_read +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# Verify filtering works on CDC-written files +query ITR +SELECT * FROM cdc_enabled_true_read WHERE column1 > 3 ORDER BY column1 +---- +4 diana 400 +5 eve 500.99 + +# Verify aggregation works on CDC-written files +query R +SELECT SUM(column3) FROM cdc_enabled_true_read +---- +1502.49 + +# +# Test 2: Disable CDC with 'false' (same as default behavior) +# + +query I +COPY cdc_source TO 'test_files/scratch/parquet_cdc/disabled_false/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'false' +) +---- +5 + +statement ok +CREATE EXTERNAL TABLE cdc_disabled_false_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/disabled_false/' + +query ITR rowsort +SELECT * FROM cdc_disabled_false_read +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# +# Test 3: Enable CDC with custom sub-field options +# + +query I +COPY cdc_source TO 'test_files/scratch/parquet_cdc/custom_chunks/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking.min_chunk_size' '1024', + 'format.use_content_defined_chunking.max_chunk_size' '4096', + 'format.use_content_defined_chunking.norm_level' '1' +) +---- +5 + +statement ok +CREATE EXTERNAL TABLE cdc_custom_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/custom_chunks/' + +query ITR rowsort +SELECT * FROM cdc_custom_read +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# +# Test 4: Write via external table with CDC enabled +# + +statement ok +CREATE EXTERNAL TABLE cdc_external_write ( + id INT, + name VARCHAR, + value DOUBLE +) +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/external_table/' OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) + +query I +INSERT INTO cdc_external_write SELECT * FROM cdc_source +---- +5 + +query ITR rowsort +SELECT * FROM cdc_external_write +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# +# Test 5: Write larger dataset to exercise CDC chunking logic +# + +statement ok +CREATE TABLE cdc_large_source AS + SELECT + value as id, + CONCAT('name_', CAST(value AS VARCHAR)) as name, + CAST(value AS DOUBLE) * 1.5 as amount, + CASE WHEN value % 2 = 0 THEN true ELSE false END as flag + FROM generate_series(1, 1000) t + +query I +COPY cdc_large_source TO 'test_files/scratch/parquet_cdc/large/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) +---- +1000 + +statement ok +CREATE EXTERNAL TABLE cdc_large_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/large/' + +query I +SELECT COUNT(*) FROM cdc_large_read +---- +1000 + +query IR +SELECT MIN(id), MIN(amount) FROM cdc_large_read +---- +1 1.5 + +query IR +SELECT MAX(id), MAX(amount) FROM cdc_large_read +---- +1000 1500 + +query I +SELECT COUNT(*) FROM cdc_large_read WHERE flag = true +---- +500 + +# +# Test 6: CDC with different data types including NULLs +# + +statement ok +CREATE TABLE cdc_types_source AS VALUES + (1::INT, 'text'::VARCHAR, 3.14::DOUBLE, true::BOOLEAN, DATE '2024-01-15', TIMESTAMP '2024-01-15 10:30:00'), + (2::INT, 'more'::VARCHAR, 2.72::DOUBLE, false::BOOLEAN, DATE '2024-06-20', TIMESTAMP '2024-06-20 14:45:00'), + (3::INT, NULL::VARCHAR, NULL::DOUBLE, NULL::BOOLEAN, NULL::DATE, NULL::TIMESTAMP) + +query I +COPY cdc_types_source TO 'test_files/scratch/parquet_cdc/types/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) +---- +3 + +statement ok +CREATE EXTERNAL TABLE cdc_types_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/types/' + +query ITRBDP rowsort +SELECT * FROM cdc_types_read +---- +1 text 3.14 true 2024-01-15 2024-01-15T10:30:00 +2 more 2.72 false 2024-06-20 2024-06-20T14:45:00 +3 NULL NULL NULL NULL NULL diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 6dc08cc344e5f..4e6178345bcce 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -289,6 +289,41 @@ auto-derefs through the `Arc`). > always return `None`. Use the `downcast_ref` method above instead, or > dereference through the `Arc` first with `plan.as_ref() as &dyn Any`. +### `PruningStatistics::row_counts` no longer takes a `column` parameter + +The `row_counts` method on the `PruningStatistics` trait no longer takes a +`&Column` argument, since row counts are a container-level property (the same +for every column). + +**Before:** + +```rust,ignore +fn row_counts(&self, column: &Column) -> Option { + // ... +} +``` + +**After:** + +```rust,ignore +fn row_counts(&self) -> Option { + // ... +} +``` + +**Who is affected:** + +- Users who implement the `PruningStatistics` trait + +**Migration guide:** + +Remove the `column: &Column` parameter from your `row_counts` implementation +and any corresponding call sites. If your implementation was using the column +argument, note that row counts are identical for all columns in a container, so +the parameter was unnecessary. + +See [PR #21369](https://github.com/apache/datafusion/pull/21369) for details. + ### Avro API and timestamp decoding changes DataFusion has switched to use `arrow-avro` (see [#17861]) when reading avro files diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 69627e3cb9148..be42f4a0becb8 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -112,6 +112,7 @@ The following configuration settings are available: | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.use_content_defined_chunking | NULL | (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. |