Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
64c3dd7
save
andygrove Nov 21, 2025
c6fe639
save
andygrove Nov 21, 2025
2a8dc52
save
andygrove Nov 21, 2025
e4acf93
save
andygrove Nov 21, 2025
82a552d
save
andygrove Nov 21, 2025
ba93de7
save
andygrove Nov 21, 2025
96087ef
save [skip ci]
andygrove Nov 21, 2025
e6bb5cf
delete some tests
andygrove Nov 21, 2025
8c5a6be
test passes
andygrove Nov 21, 2025
42e0c79
prep for review
andygrove Nov 21, 2025
fac5c56
improve test
andygrove Nov 21, 2025
30a503f
prep for review
andygrove Nov 21, 2025
5174ba3
remove partition id from proto
andygrove Nov 21, 2025
8748d85
prep for review
andygrove Nov 21, 2025
f1b7ba8
move test
andygrove Nov 21, 2025
3ad4d6e
clippy
andygrove Nov 21, 2025
f35196b
code cleanup
andygrove Nov 21, 2025
1ea7f31
preserve column names
andygrove Nov 21, 2025
61b6690
fix assertion
andygrove Nov 21, 2025
1c00aae
test
andygrove Nov 21, 2025
e1e357e
improve test
andygrove Nov 21, 2025
9fb2b53
refactor to use operator serde framework
andygrove Nov 21, 2025
457a9b9
skip testing for native_datafusion for now
andygrove Nov 21, 2025
757ba81
remove hard-coded compression codec
andygrove Nov 21, 2025
7b9e925
lz4 level
andygrove Nov 21, 2025
913f2ba
remove partition id from proto
andygrove Nov 21, 2025
de07d45
implement getSupportLevel
andygrove Nov 22, 2025
eb4fee4
move config to testing category
andygrove Nov 22, 2025
9a6e48c
fuzz test
andygrove Nov 22, 2025
8b1cb0b
remove snappy from filename
andygrove Nov 22, 2025
58c056d
remove snappy from filename
andygrove Nov 22, 2025
bd04274
docs
andygrove Nov 22, 2025
55d5d48
fix ci
andygrove Nov 22, 2025
cda8322
Revert "fix ci"
andygrove Nov 22, 2025
6a4726b
Merge remote-tracking branch 'apache/main' into parquet-write-poc
andygrove Nov 24, 2025
1d1d430
partially address feedback
andygrove Nov 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ jobs:
org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
- name: "parquet"
value: |
org.apache.comet.parquet.CometParquetWriterSuite
org.apache.comet.parquet.ParquetReadV1Suite
org.apache.comet.parquet.ParquetReadV2Suite
org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ jobs:
org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
- name: "parquet"
value: |
org.apache.comet.parquet.CometParquetWriterSuite
org.apache.comet.parquet.ParquetReadV1Suite
org.apache.comet.parquet.ParquetReadV2Suite
org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite
Expand Down
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_NATIVE_PARQUET_WRITE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.write.enabled")
.category(CATEGORY_TESTING)
.doc(
"Whether to enable native Parquet write through Comet. When enabled, " +
"Comet will intercept Parquet write operations and execute them natively. This " +
"feature is highly experimental and only partially implemented. It should not " +
"be used in production.")
.booleanConf
.createWithDefault(false)

val SCAN_NATIVE_COMET = "native_comet"
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false |
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
| `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write through Comet. When enabled, Comet will intercept Parquet write operations and execute them natively. This feature is highly experimental and only partially implemented. It should not be used in production. | false |
| `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false |
| `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan |
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false |
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ not supported by Comet will fall back to regular Spark execution.
| BatchScanExec | Yes | Supports Parquet files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more information. |
| BroadcastExchangeExec | Yes | |
| BroadcastHashJoinExec | Yes | |
| DataWritingCommandExec | No | Experimental support for native Parquet writes. Disabled by default. |
| ExpandExec | Yes | |
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
| FilterExec | Yes | |
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ mod copy;
mod expand;
pub use expand::ExpandExec;
mod iceberg_scan;
mod parquet_writer;
pub use parquet_writer::ParquetWriterExec;
mod scan;

/// Error returned during executing operators.
Expand Down
282 changes: 282 additions & 0 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Parquet writer operator for writing RecordBatches to Parquet files

use std::{
any::Any,
fmt,
fmt::{Debug, Formatter},
fs::File,
sync::Arc,
};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_plan::{
execution_plan::{Boundedness, EmissionType},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
SendableRecordBatchStream, Statistics,
},
};
use futures::TryStreamExt;
use parquet::{
arrow::ArrowWriter,
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};

use crate::execution::shuffle::CompressionCodec;

/// Parquet writer operator that writes input batches to a Parquet file
#[derive(Debug)]
pub struct ParquetWriterExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
/// Output file path
output_path: String,
/// Compression codec
compression: CompressionCodec,
/// Partition ID (from Spark TaskContext)
partition_id: i32,
/// Column names to use in the output Parquet file
column_names: Vec<String>,
/// Metrics
metrics: ExecutionPlanMetricsSet,
/// Cache for plan properties
cache: PlanProperties,
}

impl ParquetWriterExec {
/// Create a new ParquetWriterExec
pub fn try_new(
input: Arc<dyn ExecutionPlan>,
output_path: String,
compression: CompressionCodec,
partition_id: i32,
column_names: Vec<String>,
) -> Result<Self> {
// Preserve the input's partitioning so each partition writes its own file
let input_partitioning = input.output_partitioning().clone();

let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&input.schema())),
input_partitioning,
EmissionType::Final,
Boundedness::Bounded,
);

Ok(ParquetWriterExec {
input,
output_path,
compression,
partition_id,
column_names,
metrics: ExecutionPlanMetricsSet::new(),
cache,
})
}

fn compression_to_parquet(&self) -> Result<Compression> {
match self.compression {
CompressionCodec::None => Ok(Compression::UNCOMPRESSED),
CompressionCodec::Zstd(level) => Ok(Compression::ZSTD(ZstdLevel::try_new(level)?)),
CompressionCodec::Lz4Frame => Ok(Compression::LZ4),
CompressionCodec::Snappy => Ok(Compression::SNAPPY),
}
}
}

impl DisplayAs for ParquetWriterExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"ParquetWriterExec: path={}, compression={:?}",
self.output_path, self.compression
)
}
DisplayFormatType::TreeRender => unimplemented!(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

unimplemented!() will panic at runtime.

If TreeRender format is ever requested, this will crash. Consider returning a reasonable default or the same output as Default.

-            DisplayFormatType::TreeRender => unimplemented!(),
+            DisplayFormatType::TreeRender => {
+                write!(f, "ParquetWriterExec: path={}", self.output_path)
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
DisplayFormatType::TreeRender => unimplemented!(),
DisplayFormatType::TreeRender => {
write!(f, "ParquetWriterExec: path={}", self.output_path)
}
🤖 Prompt for AI Agents
In native/core/src/execution/operators/parquet_writer.rs around line 119, the
match arm for DisplayFormatType::TreeRender currently calls unimplemented!()
which will panic at runtime; change this arm to return a non-panicking, sensible
output (for example, reuse the same rendering path as DisplayFormatType::Default
or return the default formatted string) so TreeRender is handled safely; ensure
the function compiles by matching the expected return type and include any
necessary imports or helper calls already used by the Default branch.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:annoying; category:bug; feedback: The CodeRabbit AI reviewer is not correct ! It is better to fail early on invalid/unsupported config value then pretending that it is supported. Prevents confusion in the developer that something works as intended.

}
}
}

#[async_trait]
impl ExecutionPlan for ParquetWriterExec {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ParquetWriterExec"
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Result<Statistics> {
self.input.partition_statistics(None)
}
Comment on lines +138 to +140
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Verify partition_statistics vs Statistics::new_unknown().

Using self.input.partition_statistics(None) may not be semantically correct for a writer operator. Consider returning Ok(Statistics::new_unknown(input_schema)) instead since writers typically don't produce statistics.


🌐 Web query:

What should ExecutionPlan::statistics return for a sink/writer operator in DataFusion?

💡 Result:

Use unknown stats — return Statistics::new_unknown (i.e. don’t error). The ExecutionPlan docs say nodes that have no statistics should return Statistics::new_unknown; likewise TableProvider::insert_into describes the insert plan behaviour (it should produce a single-row "count" result), but does not require concrete statistics for writer/sink plans. [1][2]

Sources:
[1] ExecutionPlan::statistics / partition_statistics docs — DataFusion. [2] TableProvider::insert_into (returned plan should emit a single UInt64 "count") — DataFusion.


Return Statistics::new_unknown() for writer operators instead of forwarding input statistics.

Per DataFusion documentation, sink/writer operators should return Statistics::new_unknown() rather than partition_statistics. The statistics() method should not forward input statistics since writer operators don't produce statistical information about their output—they only emit row counts.

Change line 139 from self.input.partition_statistics(None) to return unknown statistics.

🤖 Prompt for AI Agents
In native/core/src/execution/operators/parquet_writer.rs around lines 138 to
140, the statistics() implementation incorrectly forwards input partition
statistics; for a writer/sink operator replace that call and return
Statistics::new_unknown() instead so the writer reports unknown statistics
rather than propagating input stats.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct that the output stats should not be the input stats. It is better to return unknown stats.


fn properties(&self) -> &PlanProperties {
&self.cache
}

fn schema(&self) -> SchemaRef {
self.input.schema()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(ParquetWriterExec::try_new(
Arc::clone(&children[0]),
self.output_path.clone(),
self.compression.clone(),
self.partition_id,
self.column_names.clone(),
)?)),
_ => Err(DataFusionError::Internal(
"ParquetWriterExec requires exactly one child".to_string(),
)),
}
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
Comment on lines +172 to +177
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Partition ID mismatch: uses self.partition_id instead of partition parameter.

The execute method receives a partition: usize parameter, but the part file name uses self.partition_id (line 220). This could cause issues if the execution partition differs from the stored partition ID. Consider using the partition parameter instead.

-        let part_file = format!("{}/part-{:05}.parquet", local_path, self.partition_id);
+        let part_file = format!("{}/part-{:05}.parquet", local_path, partition);

Also applies to: 220-228

🤖 Prompt for AI Agents
In native/core/src/execution/operators/parquet_writer.rs around lines 172-177
(and also 220-228), the execute method is using self.partition_id when
constructing the part file name instead of the partition parameter passed into
execute; change usages of self.partition_id in this method (and the part
filename construction at lines ~220-228) to use the local partition parameter so
the runtime execution partition is used consistently (replace self.partition_id
with partition, preserving any necessary type conversion/formatting).

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:incorrect-but-reasonable; category:bug; feedback:The CodeRabbit AI reviewer is not correct! The same value for partition id is passed when the writer is created and as a parameter to the execute() method. Both will work the same.

let input_schema = self.schema();
let output_path = self.output_path.clone();
let compression = self.compression_to_parquet()?;
let column_names = self.column_names.clone();

// Create output schema with correct column names
let output_schema = if !column_names.is_empty() {
// Replace the generic column names (col_0, col_1, etc.) with the actual names
let fields: Vec<_> = input_schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
if i < column_names.len() {
Arc::new(field.as_ref().clone().with_name(&column_names[i]))
} else {
Arc::clone(field)
}
})
.collect();
Arc::new(arrow::datatypes::Schema::new(fields))
} else {
// No column names provided, use input schema as-is
Arc::clone(&input_schema)
};

// Strip file:// or file: prefix if present
let local_path = output_path
.strip_prefix("file://")
.or_else(|| output_path.strip_prefix("file:"))
.unwrap_or(&output_path)
.to_string();

// Create output directory
std::fs::create_dir_all(&local_path).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create output directory '{}': {}",
local_path, e
))
})?;

// Generate part file name for this partition
let part_file = format!("{}/part-{:05}.parquet", local_path, self.partition_id);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using only partition_id in the output filename (e.g., part-xxxxx.parquet) means task retries or speculative execution for the same Spark partition will write to the same path, risking overwrite and inconsistent results.

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct that writing directly to the final destination may lead to problems if the write fails and is re-executed. It would be better to write to a temporary location first and then move the files/folder, a.k.a. atomic write.


// Create the Parquet file
let file = File::create(&part_file).map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create output file '{}': {}",
part_file, e
))
})?;

// Configure writer properties
let props = WriterProperties::builder()
.set_compression(compression)
.build();

let mut writer = ArrowWriter::try_new(file, Arc::clone(&output_schema), Some(props))
.map_err(|e| DataFusionError::Execution(format!("Failed to create writer: {}", e)))?;

// Clone schema for use in async closure
let schema_for_write = Arc::clone(&output_schema);

// Write batches
let write_task = async move {
let mut stream = input;

while let Some(batch_result) = stream.try_next().await.transpose() {
let batch = batch_result?;

// Rename columns in the batch to match output schema
let renamed_batch = if !column_names.is_empty() {
use arrow::record_batch::RecordBatch;
RecordBatch::try_new(Arc::clone(&schema_for_write), batch.columns().to_vec())
.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to rename batch columns: {}",
e
))
})?
} else {
batch
};

writer.write(&renamed_batch).map_err(|e| {
DataFusionError::Execution(format!("Failed to write batch: {}", e))
})?;
}

writer.close().map_err(|e| {
DataFusionError::Execution(format!("Failed to close writer: {}", e))
})?;

// Return empty stream to indicate completion
Ok::<_, DataFusionError>(futures::stream::empty())
};

// Execute the write task and convert to a stream
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Ok(Box::pin(RecordBatchStreamAdapter::new(
output_schema,
futures::stream::once(write_task).try_flatten(),
)))
}
}
34 changes: 33 additions & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
errors::ExpressionError,
execution::{
expressions::subquery::Subquery,
operators::{ExecutionError, ExpandExec, ScanExec},
operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec},
serde::to_arrow_datatype,
shuffle::ShuffleWriterExec,
},
Expand Down Expand Up @@ -1448,6 +1448,38 @@ impl PhysicalPlanner {
)),
))
}
OpStruct::ParquetWriter(writer) => {
assert_eq!(children.len(), 1);
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;

let codec = match writer.compression.try_into() {
Ok(SparkCompressionCodec::None) => Ok(CompressionCodec::None),
Ok(SparkCompressionCodec::Snappy) => Ok(CompressionCodec::Snappy),
Ok(SparkCompressionCodec::Zstd) => Ok(CompressionCodec::Zstd(3)),
Ok(SparkCompressionCodec::Lz4) => Ok(CompressionCodec::Lz4Frame),
_ => Err(GeneralError(format!(
"Unsupported parquet compression codec: {:?}",
writer.compression
))),
}?;

let parquet_writer = Arc::new(ParquetWriterExec::try_new(
Arc::clone(&child.native_plan),
writer.output_path.clone(),
codec,
self.partition,
writer.column_names.clone(),
)?);

Ok((
scans,
Arc::new(SparkPlan::new(
spark_plan.plan_id,
parquet_writer,
vec![Arc::clone(&child)],
)),
))
}
OpStruct::Expand(expand) => {
assert_eq!(children.len(), 1);
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;
Expand Down
Loading