@@ -284,6 +284,80 @@ fn sort_columns_from_physical_sort_exprs(
284284 . collect ( )
285285}
286286
287+ fn seed_summary_statistics ( summary_statistics : & mut Statistics , file_stats : & Statistics ) {
288+ summary_statistics. num_rows = file_stats. num_rows ;
289+ summary_statistics. total_byte_size = file_stats. total_byte_size ;
290+
291+ for ( summary_col_stats, file_col_stats) in summary_statistics
292+ . column_statistics
293+ . iter_mut ( )
294+ . zip ( file_stats. column_statistics . iter ( ) )
295+ {
296+ summary_col_stats. null_count = file_col_stats. null_count ;
297+ summary_col_stats. max_value = file_col_stats. max_value . clone ( ) ;
298+ summary_col_stats. min_value = file_col_stats. min_value . clone ( ) ;
299+ summary_col_stats. sum_value = file_col_stats. sum_value . cast_to_sum_type ( ) ;
300+ summary_col_stats. byte_size = file_col_stats. byte_size ;
301+ }
302+ }
303+
304+ fn merge_summary_statistics (
305+ summary_statistics : & mut Statistics ,
306+ file_stats : & Statistics ,
307+ ) {
308+ summary_statistics. num_rows = summary_statistics. num_rows . add ( & file_stats. num_rows ) ;
309+ summary_statistics. total_byte_size = summary_statistics
310+ . total_byte_size
311+ . add ( & file_stats. total_byte_size ) ;
312+
313+ for ( summary_col_stats, file_col_stats) in summary_statistics
314+ . column_statistics
315+ . iter_mut ( )
316+ . zip ( file_stats. column_statistics . iter ( ) )
317+ {
318+ let ColumnStatistics {
319+ null_count : file_nc,
320+ max_value : file_max,
321+ min_value : file_min,
322+ sum_value : file_sum,
323+ distinct_count : _,
324+ byte_size : file_sbs,
325+ } = file_col_stats;
326+
327+ summary_col_stats. null_count = summary_col_stats. null_count . add ( file_nc) ;
328+ summary_col_stats. max_value = summary_col_stats. max_value . max ( file_max) ;
329+ summary_col_stats. min_value = summary_col_stats. min_value . min ( file_min) ;
330+ summary_col_stats. sum_value = summary_col_stats. sum_value . add_for_sum ( file_sum) ;
331+ summary_col_stats. byte_size = summary_col_stats. byte_size . add ( file_sbs) ;
332+ }
333+ }
334+
335+ fn seed_first_file_statistics (
336+ limit_num_rows : & mut Precision < usize > ,
337+ summary_statistics : & mut Statistics ,
338+ file_stats : & Statistics ,
339+ collect_stats : bool ,
340+ ) {
341+ * limit_num_rows = file_stats. num_rows ;
342+
343+ if collect_stats {
344+ seed_summary_statistics ( summary_statistics, file_stats) ;
345+ }
346+ }
347+
348+ fn merge_file_statistics (
349+ limit_num_rows : & mut Precision < usize > ,
350+ summary_statistics : & mut Statistics ,
351+ file_stats : & Statistics ,
352+ collect_stats : bool ,
353+ ) {
354+ * limit_num_rows = limit_num_rows. add ( & file_stats. num_rows ) ;
355+
356+ if collect_stats {
357+ merge_summary_statistics ( summary_statistics, file_stats) ;
358+ }
359+ }
360+
287361/// Get all files as well as the file level summary statistics (no statistic for partition columns).
288362/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
289363/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
@@ -308,9 +382,14 @@ pub async fn get_statistics_with_limit(
308382 // - zero for summations, and
309383 // - neutral element for extreme points.
310384 let size = file_schema. fields ( ) . len ( ) ;
311- let mut col_stats_set = vec ! [ ColumnStatistics :: default ( ) ; size] ;
312- let mut num_rows = Precision :: < usize > :: Absent ;
313- let mut total_byte_size = Precision :: < usize > :: Absent ;
385+ let mut summary_statistics = Statistics {
386+ num_rows : Precision :: Absent ,
387+ total_byte_size : Precision :: Absent ,
388+ column_statistics : vec ! [ ColumnStatistics :: default ( ) ; size] ,
389+ } ;
390+ // Keep limit pruning separate from the returned summary so `collect_stats=false`
391+ // can still stop early using known file row counts.
392+ let mut limit_num_rows = Precision :: < usize > :: Absent ;
314393
315394 // Fusing the stream allows us to call next safely even once it is finished.
316395 let mut all_files = Box :: pin ( all_files. fuse ( ) ) ;
@@ -320,23 +399,18 @@ pub async fn get_statistics_with_limit(
320399 file. statistics = Some ( Arc :: clone ( & file_stats) ) ;
321400 result_files. push ( file) ;
322401
323- // First file, we set them directly from the file statistics.
324- num_rows = file_stats. num_rows ;
325- total_byte_size = file_stats. total_byte_size ;
326- for ( index, file_column) in
327- file_stats. column_statistics . clone ( ) . into_iter ( ) . enumerate ( )
328- {
329- col_stats_set[ index] . null_count = file_column. null_count ;
330- col_stats_set[ index] . max_value = file_column. max_value ;
331- col_stats_set[ index] . min_value = file_column. min_value ;
332- col_stats_set[ index] . sum_value = file_column. sum_value . cast_to_sum_type ( ) ;
333- }
402+ seed_first_file_statistics (
403+ & mut limit_num_rows,
404+ & mut summary_statistics,
405+ & file_stats,
406+ collect_stats,
407+ ) ;
334408
335409 // If the number of rows exceeds the limit, we can stop processing
336410 // files. This only applies when we know the number of rows. It also
337411 // currently ignores tables that have no statistics regarding the
338412 // number of rows.
339- let conservative_num_rows = match num_rows {
413+ let conservative_num_rows = match limit_num_rows {
340414 Precision :: Exact ( nr) => nr,
341415 _ => usize:: MIN ,
342416 } ;
@@ -345,44 +419,18 @@ pub async fn get_statistics_with_limit(
345419 let ( mut file, file_stats) = current?;
346420 file. statistics = Some ( Arc :: clone ( & file_stats) ) ;
347421 result_files. push ( file) ;
348- if !collect_stats {
349- continue ;
350- }
351-
352- // We accumulate the number of rows, total byte size and null
353- // counts across all the files in question. If any file does not
354- // provide any information or provides an inexact value, we demote
355- // the statistic precision to inexact.
356- num_rows = num_rows. add ( & file_stats. num_rows ) ;
357-
358- total_byte_size = total_byte_size. add ( & file_stats. total_byte_size ) ;
359-
360- for ( file_col_stats, col_stats) in file_stats
361- . column_statistics
362- . iter ( )
363- . zip ( col_stats_set. iter_mut ( ) )
364- {
365- let ColumnStatistics {
366- null_count : file_nc,
367- max_value : file_max,
368- min_value : file_min,
369- sum_value : file_sum,
370- distinct_count : _,
371- byte_size : file_sbs,
372- } = file_col_stats;
373-
374- col_stats. null_count = col_stats. null_count . add ( file_nc) ;
375- col_stats. max_value = col_stats. max_value . max ( file_max) ;
376- col_stats. min_value = col_stats. min_value . min ( file_min) ;
377- col_stats. sum_value = col_stats. sum_value . add_for_sum ( file_sum) ;
378- col_stats. byte_size = col_stats. byte_size . add ( file_sbs) ;
379- }
422+ merge_file_statistics (
423+ & mut limit_num_rows,
424+ & mut summary_statistics,
425+ & file_stats,
426+ collect_stats,
427+ ) ;
380428
381429 // If the number of rows exceeds the limit, we can stop processing
382430 // files. This only applies when we know the number of rows. It also
383431 // currently ignores tables that have no statistics regarding the
384432 // number of rows.
385- if num_rows . get_value ( ) . unwrap_or ( & usize:: MIN )
433+ if limit_num_rows . get_value ( ) . unwrap_or ( & usize:: MIN )
386434 > & limit. unwrap_or ( usize:: MAX )
387435 {
388436 break ;
@@ -391,11 +439,7 @@ pub async fn get_statistics_with_limit(
391439 }
392440 } ;
393441
394- let mut statistics = Statistics {
395- num_rows,
396- total_byte_size,
397- column_statistics : col_stats_set,
398- } ;
442+ let mut statistics = summary_statistics;
399443 if all_files. next ( ) . await . is_some ( ) {
400444 // If we still have files in the stream, it means that the limit kicked
401445 // in, and the statistic could have been different had we processed the
@@ -520,6 +564,39 @@ mod tests {
520564 }
521565 }
522566
567+ fn test_schema ( ) -> SchemaRef {
568+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int64 , true ) ] ) )
569+ }
570+
571+ fn make_file_stats (
572+ num_rows : usize ,
573+ total_byte_size : usize ,
574+ col_stats : ColumnStatistics ,
575+ ) -> Arc < Statistics > {
576+ Arc :: new ( Statistics {
577+ num_rows : Precision :: Exact ( num_rows) ,
578+ total_byte_size : Precision :: Exact ( total_byte_size) ,
579+ column_statistics : vec ! [ col_stats] ,
580+ } )
581+ }
582+
583+ fn rich_col_stats (
584+ null_count : usize ,
585+ min : i64 ,
586+ max : i64 ,
587+ sum : i64 ,
588+ byte_size : usize ,
589+ ) -> ColumnStatistics {
590+ ColumnStatistics {
591+ null_count : Precision :: Exact ( null_count) ,
592+ max_value : Precision :: Exact ( ScalarValue :: Int64 ( Some ( max) ) ) ,
593+ min_value : Precision :: Exact ( ScalarValue :: Int64 ( Some ( min) ) ) ,
594+ distinct_count : Precision :: Absent ,
595+ sum_value : Precision :: Exact ( ScalarValue :: Int64 ( Some ( sum) ) ) ,
596+ byte_size : Precision :: Exact ( byte_size) ,
597+ }
598+ }
599+
523600 #[ tokio:: test]
524601 #[ expect( deprecated) ]
525602 async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type ( )
@@ -533,7 +610,7 @@ mod tests {
533610 ) ) ] ) ;
534611
535612 let ( _group, stats) =
536- get_statistics_with_limit ( files, schema, None , false ) . await ?;
613+ get_statistics_with_limit ( files, schema, None , true ) . await ?;
537614
538615 assert_eq ! (
539616 stats. column_statistics[ 0 ] . sum_value,
@@ -571,4 +648,151 @@ mod tests {
571648
572649 Ok ( ( ) )
573650 }
651+
652+ #[ tokio:: test]
653+ #[ expect( deprecated) ]
654+ async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics ( ) {
655+ let all_files = stream:: iter ( vec ! [
656+ Ok ( (
657+ PartitionedFile :: new( "first.parquet" , 10 ) ,
658+ make_file_stats( 0 , 0 , rich_col_stats( 1 , 1 , 9 , 15 , 64 ) ) ,
659+ ) ) ,
660+ Ok ( (
661+ PartitionedFile :: new( "second.parquet" , 20 ) ,
662+ make_file_stats( 10 , 100 , rich_col_stats( 2 , 10 , 99 , 300 , 128 ) ) ,
663+ ) ) ,
664+ ] ) ;
665+
666+ let ( _files, statistics) =
667+ get_statistics_with_limit ( all_files, test_schema ( ) , None , false )
668+ . await
669+ . unwrap ( ) ;
670+
671+ assert_eq ! ( statistics. num_rows, Precision :: Absent ) ;
672+ assert_eq ! ( statistics. total_byte_size, Precision :: Absent ) ;
673+ assert_eq ! ( statistics. column_statistics. len( ) , 1 ) ;
674+ assert_eq ! (
675+ statistics. column_statistics[ 0 ] . null_count,
676+ Precision :: Absent
677+ ) ;
678+ assert_eq ! ( statistics. column_statistics[ 0 ] . max_value, Precision :: Absent ) ;
679+ assert_eq ! ( statistics. column_statistics[ 0 ] . min_value, Precision :: Absent ) ;
680+ assert_eq ! ( statistics. column_statistics[ 0 ] . sum_value, Precision :: Absent ) ;
681+ assert_eq ! ( statistics. column_statistics[ 0 ] . byte_size, Precision :: Absent ) ;
682+ }
683+
684+ #[ tokio:: test]
685+ #[ expect( deprecated) ]
686+ async fn get_statistics_with_limit_collect_stats_false_uses_row_counts_for_limit ( ) {
687+ let all_files = stream:: iter ( vec ! [
688+ Ok ( (
689+ PartitionedFile :: new( "first.parquet" , 10 ) ,
690+ make_file_stats( 3 , 30 , rich_col_stats( 1 , 1 , 9 , 15 , 64 ) ) ,
691+ ) ) ,
692+ Ok ( (
693+ PartitionedFile :: new( "second.parquet" , 20 ) ,
694+ make_file_stats( 3 , 30 , rich_col_stats( 2 , 10 , 99 , 300 , 128 ) ) ,
695+ ) ) ,
696+ Ok ( (
697+ PartitionedFile :: new( "third.parquet" , 30 ) ,
698+ make_file_stats( 3 , 30 , rich_col_stats( 0 , 100 , 199 , 450 , 256 ) ) ,
699+ ) ) ,
700+ ] ) ;
701+
702+ let ( files, statistics) =
703+ get_statistics_with_limit ( all_files, test_schema ( ) , Some ( 4 ) , false )
704+ . await
705+ . unwrap ( ) ;
706+
707+ assert_eq ! ( files. len( ) , 2 ) ;
708+ assert_eq ! ( statistics. num_rows, Precision :: Absent ) ;
709+ assert_eq ! ( statistics. total_byte_size, Precision :: Absent ) ;
710+ }
711+
712+ #[ tokio:: test]
713+ #[ expect( deprecated) ]
714+ async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics ( ) {
715+ let all_files = stream:: iter ( vec ! [
716+ Ok ( (
717+ PartitionedFile :: new( "first.parquet" , 10 ) ,
718+ make_file_stats( 5 , 50 , rich_col_stats( 1 , 1 , 9 , 15 , 64 ) ) ,
719+ ) ) ,
720+ Ok ( (
721+ PartitionedFile :: new( "second.parquet" , 20 ) ,
722+ make_file_stats( 10 , 100 , rich_col_stats( 2 , 10 , 99 , 300 , 128 ) ) ,
723+ ) ) ,
724+ ] ) ;
725+
726+ let ( _files, statistics) =
727+ get_statistics_with_limit ( all_files, test_schema ( ) , None , true )
728+ . await
729+ . unwrap ( ) ;
730+
731+ assert_eq ! ( statistics. num_rows, Precision :: Exact ( 15 ) ) ;
732+ assert_eq ! ( statistics. total_byte_size, Precision :: Exact ( 150 ) ) ;
733+ assert_eq ! (
734+ statistics. column_statistics[ 0 ] . null_count,
735+ Precision :: Exact ( 3 )
736+ ) ;
737+ assert_eq ! (
738+ statistics. column_statistics[ 0 ] . min_value,
739+ Precision :: Exact ( ScalarValue :: Int64 ( Some ( 1 ) ) )
740+ ) ;
741+ assert_eq ! (
742+ statistics. column_statistics[ 0 ] . max_value,
743+ Precision :: Exact ( ScalarValue :: Int64 ( Some ( 99 ) ) )
744+ ) ;
745+ assert_eq ! (
746+ statistics. column_statistics[ 0 ] . sum_value,
747+ Precision :: Exact ( ScalarValue :: Int64 ( Some ( 315 ) ) )
748+ ) ;
749+ assert_eq ! (
750+ statistics. column_statistics[ 0 ] . byte_size,
751+ Precision :: Exact ( 192 )
752+ ) ;
753+ }
754+
755+ #[ tokio:: test]
756+ #[ expect( deprecated) ]
757+ async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact ( ) {
758+ let all_files = stream:: iter ( vec ! [
759+ Ok ( (
760+ PartitionedFile :: new( "first.parquet" , 10 ) ,
761+ make_file_stats( 5 , 50 , rich_col_stats( 0 , 1 , 5 , 15 , 64 ) ) ,
762+ ) ) ,
763+ Ok ( (
764+ PartitionedFile :: new( "second.parquet" , 20 ) ,
765+ make_file_stats( 5 , 50 , rich_col_stats( 1 , 6 , 10 , 40 , 64 ) ) ,
766+ ) ) ,
767+ Ok ( (
768+ PartitionedFile :: new( "third.parquet" , 20 ) ,
769+ make_file_stats( 5 , 50 , rich_col_stats( 2 , 11 , 15 , 65 , 64 ) ) ,
770+ ) ) ,
771+ ] ) ;
772+
773+ let ( files, statistics) =
774+ get_statistics_with_limit ( all_files, test_schema ( ) , Some ( 8 ) , true )
775+ . await
776+ . unwrap ( ) ;
777+
778+ assert_eq ! ( files. len( ) , 2 ) ;
779+ assert_eq ! ( statistics. num_rows, Precision :: Inexact ( 10 ) ) ;
780+ assert_eq ! ( statistics. total_byte_size, Precision :: Inexact ( 100 ) ) ;
781+ assert_eq ! (
782+ statistics. column_statistics[ 0 ] . min_value,
783+ Precision :: Inexact ( ScalarValue :: Int64 ( Some ( 1 ) ) )
784+ ) ;
785+ assert_eq ! (
786+ statistics. column_statistics[ 0 ] . max_value,
787+ Precision :: Inexact ( ScalarValue :: Int64 ( Some ( 10 ) ) )
788+ ) ;
789+ assert_eq ! (
790+ statistics. column_statistics[ 0 ] . sum_value,
791+ Precision :: Inexact ( ScalarValue :: Int64 ( Some ( 55 ) ) )
792+ ) ;
793+ assert_eq ! (
794+ statistics. column_statistics[ 0 ] . byte_size,
795+ Precision :: Inexact ( 128 )
796+ ) ;
797+ }
574798}
0 commit comments