Skip to content

Commit 1f4be91

Browse files
author
Bert Vermeiren
committed
Fix COPY TO does not produce an output file for the empty set (comments added + multi-output file addressed)
1 parent 18fbd63 commit 1f4be91

File tree

1 file changed

+11
-19
lines changed

1 file changed

+11
-19
lines changed

datafusion/datasource/src/write/demux.rs

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ use datafusion_common::cast::{
4040
};
4141
use datafusion_common::{exec_datafusion_err, internal_datafusion_err, not_impl_err};
4242
use datafusion_common_runtime::SpawnedTask;
43-
use datafusion_execution::TaskContext;
4443

4544
use chrono::NaiveDate;
45+
use datafusion_execution::TaskContext;
4646
use futures::StreamExt;
4747
use object_store::path::Path;
4848
use rand::distr::SampleString;
@@ -68,6 +68,11 @@ pub type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
6868
/// be written with the extension from the path. Otherwise the default extension
6969
/// will be used and the output will be split into multiple files.
7070
///
71+
/// Output file guarantees:
72+
/// - Partitioned files: Files are created only for non-empty partitions.
73+
/// - Single-file output: At least one file is always written, even when the stream is empty.
74+
/// - Multi-file output: At least the minimum number of parallel files specified in [`datafusion_common::config::ExecutionOptions`] are always written, even when the stream is empty.
75+
///
7176
/// Examples of `base_output_path`
7277
/// * `tmp/dataset/` -> is a folder since it ends in `/`
7378
/// * `tmp/dataset` -> is still a folder since it does not end in `/` but has no valid file extension
@@ -171,10 +176,9 @@ async fn row_count_demuxer(
171176
max_rows_per_file
172177
};
173178

174-
// Single-file output requires creating at least one file stream in advance.
175-
// If no record batches are present in the input stream,
176-
// the file stream must still be created to produce a valid output file.
177-
if single_file_output {
179+
// ensure we have at least minimum_parallel_files open, even when
180+
// the input stream is empty
181+
for _ in 0..minimum_parallel_files {
178182
open_file_streams.push(create_new_file_stream(
179183
&base_output_path,
180184
&write_id,
@@ -189,20 +193,8 @@ async fn row_count_demuxer(
189193
}
190194

191195
while let Some(rb) = input.next().await.transpose()? {
192-
// ensure we have at least minimum_parallel_files open
193-
if open_file_streams.len() < minimum_parallel_files {
194-
open_file_streams.push(create_new_file_stream(
195-
&base_output_path,
196-
&write_id,
197-
part_idx,
198-
&file_extension,
199-
single_file_output,
200-
max_buffered_batches,
201-
&mut tx,
202-
)?);
203-
row_counts.push(0);
204-
part_idx += 1;
205-
} else if row_counts[next_send_steam] >= max_rows_per_file {
196+
// spill over to new file output if max rows reached
197+
if row_counts[next_send_steam] >= max_rows_per_file {
206198
row_counts[next_send_steam] = 0;
207199
open_file_streams[next_send_steam] = create_new_file_stream(
208200
&base_output_path,

0 commit comments

Comments
 (0)