Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/data_io/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl PruningStatistics for ParquetMetadataIndex {
}

/// return the row counts for each file
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
Some(self.row_counts_ref().clone())
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/query_planning/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl PruningStatistics for MyCatalog {
None
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
// In this example, we know nothing about the number of rows in each file
None
}
Expand Down
211 changes: 211 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,137 @@ config_namespace! {
}
}

/// Options for content-defined chunking (CDC) when writing parquet files.
/// See [`ParquetOptions::use_content_defined_chunking`].
///
/// Can be enabled with default options by setting
/// `use_content_defined_chunking` to `true`, or configured with sub-fields
/// like `use_content_defined_chunking.min_chunk_size`.
#[derive(Debug, Clone, PartialEq)]
pub struct CdcOptions {
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
/// until this many bytes have been accumulated. Default is 256 KiB.
pub min_chunk_size: usize,

/// Maximum chunk size in bytes. A split is forced when the accumulated
/// size exceeds this value. Default is 1 MiB.
pub max_chunk_size: usize,

/// Normalization level. Increasing this improves deduplication ratio
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
pub norm_level: i32,
}

// Note: `CdcOptions` intentionally does NOT implement `Default` so that the
// blanket `impl<F: ConfigField + Default> ConfigField for Option<F>` does not
// apply. This allows the specific `impl ConfigField for Option<CdcOptions>`
// below to handle "true"/"false" for enabling/disabling CDC.
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
impl CdcOptions {
/// Returns a new `CdcOptions` with default values.
#[expect(clippy::should_implement_trait)]
pub fn default() -> Self {
Self {
min_chunk_size: 256 * 1024,
max_chunk_size: 1024 * 1024,
norm_level: 0,
}
}
}

impl ConfigField for CdcOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"min_chunk_size" => self.min_chunk_size.set(rem, value),
"max_chunk_size" => self.max_chunk_size.set(rem, value),
"norm_level" => self.norm_level.set(rem, value),
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
}
}

fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
let key = format!("{key_prefix}.min_chunk_size");
self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB.");
let key = format!("{key_prefix}.max_chunk_size");
self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB.");
let key = format!("{key_prefix}.norm_level");
self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0.");
}

fn reset(&mut self, key: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"min_chunk_size" => {
if rem.is_empty() {
self.min_chunk_size = CdcOptions::default().min_chunk_size;
Ok(())
} else {
self.min_chunk_size.reset(rem)
}
}
"max_chunk_size" => {
if rem.is_empty() {
self.max_chunk_size = CdcOptions::default().max_chunk_size;
Ok(())
} else {
self.max_chunk_size.reset(rem)
}
}
"norm_level" => {
if rem.is_empty() {
self.norm_level = CdcOptions::default().norm_level;
Ok(())
} else {
self.norm_level.reset(rem)
}
}
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
}
}
}

/// `ConfigField` for `Option<CdcOptions>` — allows setting the option to
/// `"true"` (enable with defaults) or `"false"` (disable), in addition to
/// setting individual sub-fields like `min_chunk_size`.
impl ConfigField for Option<CdcOptions> {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
match self {
Some(s) => s.visit(v, key, description),
None => v.none(key, description),
}
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
if key.is_empty() {
match value.to_ascii_lowercase().as_str() {
"true" => {
*self = Some(CdcOptions::default());
Ok(())
}
"false" => {
*self = None;
Ok(())
}
_ => _config_err!(
"Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'"
),
}
} else {
self.get_or_insert_with(CdcOptions::default).set(key, value)
}
}

fn reset(&mut self, key: &str) -> Result<()> {
if key.is_empty() {
*self = None;
Ok(())
} else {
self.get_or_insert_with(CdcOptions::default).reset(key)
}
}
}

config_namespace! {
/// Options for reading and writing parquet files
///
Expand Down Expand Up @@ -872,6 +1003,12 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
/// automatically disabled since the chunker state must persist across row groups.
pub use_content_defined_chunking: Option<CdcOptions>, default = None
}
}

Expand Down Expand Up @@ -1826,6 +1963,7 @@ config_field!(usize);
config_field!(f64);
config_field!(u64);
config_field!(u32);
config_field!(i32);

impl ConfigField for u8 {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
Expand Down Expand Up @@ -3579,4 +3717,77 @@ mod tests {
"Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
);
}

#[cfg(feature = "parquet")]
#[test]
fn set_cdc_option_with_boolean_true() {
use crate::config::ConfigOptions;

let mut config = ConfigOptions::default();
assert!(
config
.execution
.parquet
.use_content_defined_chunking
.is_none()
);

// Setting to "true" should enable CDC with default options
config
.set(
"datafusion.execution.parquet.use_content_defined_chunking",
"true",
)
.unwrap();
let cdc = config
.execution
.parquet
.use_content_defined_chunking
.as_ref()
.expect("CDC should be enabled");
assert_eq!(cdc.min_chunk_size, 256 * 1024);
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
assert_eq!(cdc.norm_level, 0);

// Setting to "false" should disable CDC
config
.set(
"datafusion.execution.parquet.use_content_defined_chunking",
"false",
)
.unwrap();
assert!(
config
.execution
.parquet
.use_content_defined_chunking
.is_none()
);
}

#[cfg(feature = "parquet")]
#[test]
fn set_cdc_option_with_subfields() {
use crate::config::ConfigOptions;

let mut config = ConfigOptions::default();

// Setting sub-fields should also enable CDC
config
.set(
"datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size",
"1024",
)
.unwrap();
let cdc = config
.execution
.parquet
.use_content_defined_chunking
.as_ref()
.expect("CDC should be enabled");
assert_eq!(cdc.min_chunk_size, 1024);
// Other fields should be defaults
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
assert_eq!(cdc.norm_level, 0);
}
}
Loading
Loading