-
Notifications
You must be signed in to change notification settings - Fork 0
21149: fix(datasource): keep stats absent when collect_stats is false #295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -320,16 +320,19 @@ pub async fn get_statistics_with_limit( | |||||
| file.statistics = Some(Arc::clone(&file_stats)); | ||||||
| result_files.push(file); | ||||||
|
|
||||||
| // First file, we set them directly from the file statistics. | ||||||
| num_rows = file_stats.num_rows; | ||||||
| total_byte_size = file_stats.total_byte_size; | ||||||
| for (index, file_column) in | ||||||
| file_stats.column_statistics.clone().into_iter().enumerate() | ||||||
| { | ||||||
| col_stats_set[index].null_count = file_column.null_count; | ||||||
| col_stats_set[index].max_value = file_column.max_value; | ||||||
| col_stats_set[index].min_value = file_column.min_value; | ||||||
| col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type(); | ||||||
| if collect_stats { | ||||||
| // First file, we set them directly from the file statistics. | ||||||
| num_rows = file_stats.num_rows; | ||||||
| total_byte_size = file_stats.total_byte_size; | ||||||
| for (index, file_column) in | ||||||
| file_stats.column_statistics.clone().into_iter().enumerate() | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||
| { | ||||||
| col_stats_set[index].null_count = file_column.null_count; | ||||||
| col_stats_set[index].max_value = file_column.max_value; | ||||||
| col_stats_set[index].min_value = file_column.min_value; | ||||||
| col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type(); | ||||||
| col_stats_set[index].byte_size = file_column.byte_size; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // If the number of rows exceeds the limit, we can stop processing | ||||||
|
|
@@ -520,6 +523,39 @@ mod tests { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| fn test_schema() -> SchemaRef { | ||||||
| Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])) | ||||||
| } | ||||||
|
|
||||||
| fn make_file_stats( | ||||||
| num_rows: usize, | ||||||
| total_byte_size: usize, | ||||||
| col_stats: ColumnStatistics, | ||||||
| ) -> Arc<Statistics> { | ||||||
| Arc::new(Statistics { | ||||||
| num_rows: Precision::Exact(num_rows), | ||||||
| total_byte_size: Precision::Exact(total_byte_size), | ||||||
| column_statistics: vec![col_stats], | ||||||
| }) | ||||||
| } | ||||||
|
|
||||||
| fn rich_col_stats( | ||||||
| null_count: usize, | ||||||
| min: i64, | ||||||
| max: i64, | ||||||
| sum: i64, | ||||||
| byte_size: usize, | ||||||
| ) -> ColumnStatistics { | ||||||
| ColumnStatistics { | ||||||
| null_count: Precision::Exact(null_count), | ||||||
| max_value: Precision::Exact(ScalarValue::Int64(Some(max))), | ||||||
| min_value: Precision::Exact(ScalarValue::Int64(Some(min))), | ||||||
| distinct_count: Precision::Absent, | ||||||
| sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))), | ||||||
| byte_size: Precision::Exact(byte_size), | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| #[tokio::test] | ||||||
| #[expect(deprecated)] | ||||||
| async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type() | ||||||
|
|
@@ -533,7 +569,7 @@ mod tests { | |||||
| ))]); | ||||||
|
|
||||||
| let (_group, stats) = | ||||||
| get_statistics_with_limit(files, schema, None, false).await?; | ||||||
| get_statistics_with_limit(files, schema, None, true).await?; | ||||||
|
|
||||||
| assert_eq!( | ||||||
| stats.column_statistics[0].sum_value, | ||||||
|
|
@@ -571,4 +607,123 @@ mod tests { | |||||
|
|
||||||
| Ok(()) | ||||||
| } | ||||||
|
|
||||||
| #[tokio::test] | ||||||
| #[expect(deprecated)] | ||||||
| async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() { | ||||||
| let all_files = stream::iter(vec![ | ||||||
| Ok(( | ||||||
| PartitionedFile::new("first.parquet", 10), | ||||||
| make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)), | ||||||
| )), | ||||||
| Ok(( | ||||||
| PartitionedFile::new("second.parquet", 20), | ||||||
| make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), | ||||||
| )), | ||||||
| ]); | ||||||
|
|
||||||
| let (_files, statistics) = | ||||||
| get_statistics_with_limit(all_files, test_schema(), None, false) | ||||||
| .await | ||||||
| .unwrap(); | ||||||
|
|
||||||
| assert_eq!(statistics.num_rows, Precision::Absent); | ||||||
| assert_eq!(statistics.total_byte_size, Precision::Absent); | ||||||
| assert_eq!(statistics.column_statistics.len(), 1); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].null_count, | ||||||
| Precision::Absent | ||||||
| ); | ||||||
| assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent); | ||||||
| assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent); | ||||||
| assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent); | ||||||
| assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent); | ||||||
| } | ||||||
|
|
||||||
| #[tokio::test] | ||||||
| #[expect(deprecated)] | ||||||
| async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() { | ||||||
| let all_files = stream::iter(vec![ | ||||||
| Ok(( | ||||||
| PartitionedFile::new("first.parquet", 10), | ||||||
| make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)), | ||||||
| )), | ||||||
| Ok(( | ||||||
| PartitionedFile::new("second.parquet", 20), | ||||||
| make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), | ||||||
| )), | ||||||
| ]); | ||||||
|
|
||||||
| let (_files, statistics) = | ||||||
| get_statistics_with_limit(all_files, test_schema(), None, true) | ||||||
| .await | ||||||
| .unwrap(); | ||||||
|
|
||||||
| assert_eq!(statistics.num_rows, Precision::Exact(15)); | ||||||
| assert_eq!(statistics.total_byte_size, Precision::Exact(150)); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].null_count, | ||||||
| Precision::Exact(3) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].min_value, | ||||||
| Precision::Exact(ScalarValue::Int64(Some(1))) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].max_value, | ||||||
| Precision::Exact(ScalarValue::Int64(Some(99))) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].sum_value, | ||||||
| Precision::Exact(ScalarValue::Int64(Some(315))) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].byte_size, | ||||||
| Precision::Exact(192) | ||||||
| ); | ||||||
| } | ||||||
|
|
||||||
| #[tokio::test] | ||||||
| #[expect(deprecated)] | ||||||
| async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() { | ||||||
| let all_files = stream::iter(vec![ | ||||||
| Ok(( | ||||||
| PartitionedFile::new("first.parquet", 10), | ||||||
| make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)), | ||||||
| )), | ||||||
| Ok(( | ||||||
| PartitionedFile::new("second.parquet", 20), | ||||||
| make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)), | ||||||
| )), | ||||||
| Ok(( | ||||||
| PartitionedFile::new("third.parquet", 20), | ||||||
| make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)), | ||||||
| )), | ||||||
| ]); | ||||||
|
|
||||||
| let (files, statistics) = | ||||||
| get_statistics_with_limit(all_files, test_schema(), Some(8), true) | ||||||
| .await | ||||||
| .unwrap(); | ||||||
|
|
||||||
| assert_eq!(files.len(), 2); | ||||||
| assert_eq!(statistics.num_rows, Precision::Inexact(10)); | ||||||
| assert_eq!(statistics.total_byte_size, Precision::Inexact(100)); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].min_value, | ||||||
| Precision::Inexact(ScalarValue::Int64(Some(1))) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].max_value, | ||||||
| Precision::Inexact(ScalarValue::Int64(Some(10))) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].sum_value, | ||||||
| Precision::Inexact(ScalarValue::Int64(Some(55))) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| statistics.column_statistics[0].byte_size, | ||||||
| Precision::Inexact(128) | ||||||
| ); | ||||||
| } | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With
collect_stats=false,num_rowsstaysPrecision::Absent, so the limit logic will treat it as 0 and may end up iterating all files (even when the first file’sfile_stats.num_rowsalready exceedslimit). Iflimitis still intended to constrain returnedFileGroupindependently of summary-stat aggregation, this change looks like it could regress that behavior (a targeted test forcollect_stats=false+limitwould catch it).Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.