@@ -68,15 +68,16 @@ use datafusion_common::{
6868 TableReference , UnnestOptions ,
6969} ;
7070use datafusion_common_runtime:: SpawnedTask ;
71+ use datafusion_datasource:: file_format:: format_as_file_type;
7172use datafusion_execution:: config:: SessionConfig ;
7273use datafusion_execution:: runtime_env:: RuntimeEnv ;
7374use datafusion_expr:: expr:: { FieldMetadata , GroupingSet , Sort , WindowFunction } ;
7475use datafusion_expr:: var_provider:: { VarProvider , VarType } ;
7576use datafusion_expr:: {
7677 cast, col, create_udf, exists, in_subquery, lit, out_ref_col, placeholder,
7778 scalar_subquery, when, wildcard, Expr , ExprFunctionExt , ExprSchemable , LogicalPlan ,
78- ScalarFunctionImplementation , WindowFrame , WindowFrameBound , WindowFrameUnits ,
79- WindowFunctionDefinition ,
79+ LogicalPlanBuilder , ScalarFunctionImplementation , SortExpr , WindowFrame ,
80+ WindowFrameBound , WindowFrameUnits , WindowFunctionDefinition ,
8081} ;
8182use datafusion_physical_expr:: expressions:: Column ;
8283use datafusion_physical_expr:: Partitioning ;
@@ -6193,3 +6194,59 @@ async fn test_copy_schema() -> Result<()> {
61936194 assert_logical_expr_schema_eq_physical_expr_schema ( result) . await ?;
61946195 Ok ( ( ) )
61956196}
6197+
6198+ #[ tokio:: test]
6199+ async fn test_copy_to_preserves_order ( ) -> Result < ( ) > {
6200+ let tmp_dir = TempDir :: new ( ) ?;
6201+
6202+ let session_state = SessionStateBuilder :: new_with_default_features ( ) . build ( ) ;
6203+ let session_ctx = SessionContext :: new_with_state ( session_state) ;
6204+
6205+ let target_path = tmp_dir. path ( ) . join ( "target_ordered.csv" ) ;
6206+ let csv_file_format = session_ctx
6207+ . state ( )
6208+ . get_file_format_factory ( "csv" )
6209+ . map ( |file_format_factory| format_as_file_type ( file_format_factory) )
6210+ . unwrap ( ) ;
6211+
6212+ let ordered_select_plan = LogicalPlanBuilder :: values ( vec ! [
6213+ vec![ lit( 1u64 ) ] ,
6214+ vec![ lit( 10u64 ) ] ,
6215+ vec![ lit( 20u64 ) ] ,
6216+ vec![ lit( 100u64 ) ] ,
6217+ ] ) ?
6218+ . sort ( vec ! [ SortExpr :: new( col( "column1" ) , false , true ) ] ) ?
6219+ . build ( ) ?;
6220+
6221+ let copy_to_plan = LogicalPlanBuilder :: copy_to (
6222+ ordered_select_plan,
6223+ target_path. to_str ( ) . unwrap ( ) . to_string ( ) ,
6224+ csv_file_format,
6225+ HashMap :: new ( ) ,
6226+ vec ! [ ] ,
6227+ ) ?
6228+ . build ( ) ?;
6229+
6230+ let union_side_branch = LogicalPlanBuilder :: values ( vec ! [ vec![ lit( 1u64 ) ] ] ) ?. build ( ) ?;
6231+ let union_plan = LogicalPlanBuilder :: from ( copy_to_plan)
6232+ . union ( union_side_branch) ?
6233+ . build ( ) ?;
6234+
6235+ let frame = session_ctx. execute_logical_plan ( union_plan) . await ?;
6236+ let physical_plan = frame. create_physical_plan ( ) . await ?;
6237+
6238+ let physical_plan_format =
6239+ displayable ( physical_plan. as_ref ( ) ) . indent ( true ) . to_string ( ) ;
6240+
6241+ assert_snapshot ! (
6242+ physical_plan_format,
6243+ @r###"
6244+ UnionExec
6245+ DataSinkExec: sink=CsvSink(file_groups=[])
6246+ SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
6247+ DataSourceExec: partitions=1, partition_sizes=[1]
6248+ DataSourceExec: partitions=1, partition_sizes=[1]
6249+ "###
6250+ ) ;
6251+ Ok ( ( ) )
6252+ }
0 commit comments