Skip to content

Commit bdc737d

Browse files
author
Bert Vermeiren
committed
Fix: Preserve sorting for the COPY statement
1 parent 8674454 commit bdc737d

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ use datafusion_sql::TableReference;
9797
use sqlparser::ast::NullTreatment;
9898

9999
use async_trait::async_trait;
100+
use datafusion_physical_expr_common::sort_expr::LexRequirement;
100101
use datafusion_physical_plan::async_func::{AsyncFuncExec, AsyncMapper};
101102
use futures::{StreamExt, TryStreamExt};
102103
use itertools::{multiunzip, Itertools};
@@ -556,8 +557,17 @@ impl DefaultPhysicalPlanner {
556557
file_extension,
557558
};
558559

560+
let ordering = input_exec.properties().output_ordering().cloned();
561+
let sort_req: Option<LexRequirement> =
562+
ordering.map(|o| LexRequirement::from(o));
563+
559564
sink_format
560-
.create_writer_physical_plan(input_exec, session_state, config, None)
565+
.create_writer_physical_plan(
566+
input_exec,
567+
session_state,
568+
config,
569+
sort_req,
570+
)
561571
.await?
562572
}
563573
LogicalPlan::Dml(DmlStatement {

datafusion/sqllogictest/test_files/copy.slt

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,70 @@ select * from validate_parquet_single;
427427
1 Foo
428428
2 Bar
429429

430+
431+
# copy with sorting requirement
432+
433+
statement ok
434+
create table to_order_by_table(col1 integer) as values (1), (4), (3), (2);
435+
436+
query TT
437+
EXPLAIN COPY (SELECT * FROM to_order_by_table ORDER BY col1 ASC NULLS LAST) to 'test_files/scratch/copy/sorted_table.parquet/';
438+
----
439+
logical_plan
440+
01)CopyTo: format=parquet output_url=test_files/scratch/copy/sorted_table.parquet/ options: ()
441+
02)--Sort: to_order_by_table.col1 ASC NULLS LAST
442+
03)----TableScan: to_order_by_table projection=[col1]
443+
physical_plan
444+
01)DataSinkExec: sink=ParquetSink(file_groups=[])
445+
02)--SortExec: expr=[col1@0 ASC NULLS LAST], preserve_partitioning=[false]
446+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
447+
448+
449+
query I
450+
COPY (SELECT * FROM to_order_by_table ORDER BY col1 ASC NULLS LAST) to 'test_files/scratch/copy/sorted_table.parquet';
451+
----
452+
4
453+
454+
statement ok
455+
drop table to_order_by_table;
456+
457+
statement ok
458+
CREATE EXTERNAL TABLE validate_sorted_table STORED AS PARQUET LOCATION 'test_files/scratch/copy/sorted_table.parquet';
459+
460+
# verify col1 is sorted ASC within the just written parquet file
461+
query I
462+
WITH
463+
numbered_rows AS (
464+
SELECT
465+
ROW_NUMBER() OVER () as row_num,
466+
col1
467+
FROM validate_sorted_table
468+
),
469+
adjacent_check AS (
470+
SELECT
471+
a.row_num,
472+
a.col1 as current_value,
473+
b.col1 as next_value
474+
FROM numbered_rows a
475+
JOIN numbered_rows b ON a.row_num + 1 = b.row_num
476+
),
477+
violations AS (
478+
SELECT
479+
row_num,
480+
current_value,
481+
next_value,
482+
'VIOLATION: ' || current_value || ' > ' || next_value as issue
483+
FROM adjacent_check
484+
WHERE current_value > next_value
485+
)
486+
SELECT COUNT(*) FROM violations;
487+
----
488+
0
489+
490+
statement ok
491+
drop table validate_sorted_table;
492+
493+
430494
# copy from table to folder of compressed json files
431495
query I
432496
COPY source_table to 'test_files/scratch/copy/table_json_gz' STORED AS JSON OPTIONS ('format.compression' gzip);

0 commit comments

Comments
 (0)