Skip to content

Commit 411e940

Browse files
author
Bert Vermeiren
committed
Fix COPY TO does not produce an output file for the empty set (comments added + multi-output file addressed)
1 parent 18fbd63 commit 411e940

File tree

1 file changed

+12
-19
lines changed

1 file changed

+12
-19
lines changed

datafusion/datasource/src/write/demux.rs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ use datafusion_common::cast::{
3838
as_int8_array, as_string_array, as_string_view_array, as_uint16_array,
3939
as_uint32_array, as_uint64_array, as_uint8_array,
4040
};
41+
use datafusion_common::config::ExecutionOptions;
4142
use datafusion_common::{exec_datafusion_err, internal_datafusion_err, not_impl_err};
4243
use datafusion_common_runtime::SpawnedTask;
43-
use datafusion_execution::TaskContext;
4444

4545
use chrono::NaiveDate;
46+
use datafusion_execution::TaskContext;
4647
use futures::StreamExt;
4748
use object_store::path::Path;
4849
use rand::distr::SampleString;
@@ -68,6 +69,11 @@ pub type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
6869
/// be written with the extension from the path. Otherwise the default extension
6970
/// will be used and the output will be split into multiple files.
7071
///
72+
/// Output file guarantees:
73+
/// - Partitioned files: Files are created only for non-empty partitions.
74+
/// - Single-file output: At least one file is always written, even when the stream is empty.
75+
/// - Multi-file output: At least the minimum number of parallel files specified in [`ExecutionOptions`] are always written, even when the stream is empty.
76+
///
7177
/// Examples of `base_output_path`
7278
/// * `tmp/dataset/` -> is a folder since it ends in `/`
7379
/// * `tmp/dataset` -> is still a folder since it does not end in `/` but has no valid file extension
@@ -171,10 +177,9 @@ async fn row_count_demuxer(
171177
max_rows_per_file
172178
};
173179

174-
// Single-file output requires creating at least one file stream in advance.
175-
// If no record batches are present in the input stream,
176-
// the file stream must still be created to produce a valid output file.
177-
if single_file_output {
180+
// ensure we have at least minimum_parallel_files open, even when
181+
// the input stream is empty
182+
for _ in 0..minimum_parallel_files {
178183
open_file_streams.push(create_new_file_stream(
179184
&base_output_path,
180185
&write_id,
@@ -189,20 +194,8 @@ async fn row_count_demuxer(
189194
}
190195

191196
while let Some(rb) = input.next().await.transpose()? {
192-
// ensure we have at least minimum_parallel_files open
193-
if open_file_streams.len() < minimum_parallel_files {
194-
open_file_streams.push(create_new_file_stream(
195-
&base_output_path,
196-
&write_id,
197-
part_idx,
198-
&file_extension,
199-
single_file_output,
200-
max_buffered_batches,
201-
&mut tx,
202-
)?);
203-
row_counts.push(0);
204-
part_idx += 1;
205-
} else if row_counts[next_send_steam] >= max_rows_per_file {
197+
// spill over to new file output if max rows reached
198+
if row_counts[next_send_steam] >= max_rows_per_file {
206199
row_counts[next_send_steam] = 0;
207200
open_file_streams[next_send_steam] = create_new_file_stream(
208201
&base_output_path,

0 commit comments

Comments
 (0)