Skip to content

2812: feat: Add experimental support for native Parquet writes#30

Open
martin-augment wants to merge 36 commits intomainfrom
pr-2812-2025-11-25-14-26-09
Open

2812: feat: Add experimental support for native Parquet writes#30
martin-augment wants to merge 36 commits intomainfrom
pr-2812-2025-11-25-14-26-09

Conversation

@martin-augment
Copy link
Copy Markdown
Owner

2812: To review by AI

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Nov 25, 2025

Walkthrough

This PR implements native Parquet write support for Comet. It introduces a new configuration option spark.comet.parquet.write.enabled (disabled by default), adds a ParquetWriterExec operator in Rust to handle writing RecordBatches to partitioned Parquet files, integrates this operator into the physical plan through a new CometDataWritingCommand serde handler, and provides a CometNativeWriteExec wrapper for execution. The implementation includes protobuf definitions, planner integration, configuration documentation, and a test suite to verify basic Parquet write functionality with local filesystem targets.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-2812-2025-11-25-14-26-09

Tip

📝 Customizable high-level summaries are now available in beta!

You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.

  • Provide your own instructions using the high_level_summary_instructions setting.
  • Format the summary however you like (bullet lists, tables, multi-section layouts, contributor stats, etc.).
  • Use high_level_summary_in_walkthrough to move the summary from the description to the walkthrough section.

Example instruction:

"Divide the high-level summary into five sections:

  1. 📝 Description — Summarize the main change in 50–60 words, explaining what was done.
  2. 📓 References — List relevant issues, discussions, documentation, or related PRs.
  3. 📦 Dependencies & Requirements — Mention any new/updated dependencies, environment variable changes, or configuration updates.
  4. 📊 Contributor Summary — Include a Markdown table showing contributions:
    | Contributor | Lines Added | Lines Removed | Files Changed |
  5. ✔️ Additional Notes — Add any extra reviewer context.
    Keep each section concise (under 200 words) and use bullet or numbered lists for clarity."

Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude bot commented Nov 25, 2025

Code Review: Native Parquet Write Support

Summary

This PR adds experimental support for native Parquet writes in Comet. The implementation includes a new ParquetWriterExec operator in Rust, Spark integration via CometNativeWriteExec, and appropriate configuration and testing infrastructure. Overall, this is a solid initial implementation, though there are several areas for improvement before production readiness.


🟢 Strengths

Architecture & Design

  • Clean separation of concerns: The implementation correctly separates the native Rust writer from Spark integration code
  • Proper use of existing patterns: Follows the established operator serde framework and Comet conventions
  • Appropriate safety guards: Feature is behind a configuration flag (spark.comet.parquet.write.enabled, default false) with clear warnings about experimental status

Code Quality

  • Good error handling: Comprehensive error messages in both Rust and Scala code
  • Proper resource management: Writer is properly closed in the async task
  • Explicit limitations: getSupportLevel correctly identifies unsupported scenarios (bucketing, partitioning, complex types)

🟡 Issues & Concerns

1. Critical: Synchronous I/O in Async Context

Location: native/core/src/execution/operators/parquet_writer.rs:172-280

The execute() method creates file handles and performs directory operations synchronously within an async context. This can block the async runtime:

// Lines 211-228: Blocking filesystem operations
std::fs::create_dir_all(&local_path).map_err(...)?;
let file = File::create(&part_file).map_err(...)?;
let mut writer = ArrowWriter::try_new(file, ...).map_err(...)?;

Recommendation: Use tokio::fs or spawn blocking operations:

let file = tokio::task::spawn_blocking(move || {
    std::fs::create_dir_all(&local_path)?;
    File::create(&part_file)
}).await??;

2. Path Injection Vulnerability

Location: native/core/src/execution/operators/parquet_writer.rs:204-220

The code strips URL prefixes and uses the path directly without validation:

let local_path = output_path
    .strip_prefix("file://")
    .or_else(|| output_path.strip_prefix("file:"))
    .unwrap_or(&output_path)
    .to_string();

Concern: Malicious paths like ../../../etc/passwd or paths with null bytes could be exploited.

Recommendation:

  • Validate paths are within expected boundaries
  • Use Path::canonicalize() to resolve relative paths
  • Check for path traversal attempts
  • Validate against null bytes and other problematic characters

3. Inconsistent Column Name Handling

Location: native/core/src/execution/operators/parquet_writer.rs:183-202, 248-260

Column names are renamed twice - once when creating the output schema and again when writing each batch. This is redundant and could lead to inconsistencies.

Recommendation: Rename columns once when creating the schema, then use that schema consistently.

4. Missing Metrics

Location: native/core/src/execution/operators/parquet_writer.rs:64

The metrics field is initialized but never populated with actual metrics (rows written, bytes written, write time, etc.).

Recommendation: Add metrics tracking:

  • Number of rows written
  • Number of batches written
  • Bytes written
  • Write duration

5. Test Coverage Gaps

Location: spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala

The test suite only has one test case covering the happy path. Missing tests for:

  • Different compression codecs (only default is tested)
  • Error scenarios (write failures, invalid paths, permission errors)
  • Edge cases (empty dataframes, single row, very large batches)
  • Different data types (the test uses fuzz-generated schema but doesn't systematically test each type)
  • Timezone handling (config mentions different timezones but doesn't verify correctness)

Recommendation: Add comprehensive test coverage for error paths and edge cases.

6. Hardcoded Zstd Compression Level

Location: native/core/src/execution/planner.rs:1458

Ok(SparkCompressionCodec::Zstd) => Ok(CompressionCodec::Zstd(3)),

The ZSTD compression level is hardcoded to 3. Spark allows configuring this via spark.sql.parquet.compression.codec with levels.

Recommendation: Parse and respect the compression level from Spark configuration.

7. Unsafe Cast Without Validation

Location: spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala:88, 151

val cmd = op.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand]

The code uses asInstanceOf without verifying the type first, which could throw ClassCastException.

Recommendation: Pattern match or check type before casting:

op.cmd match {
  case cmd: InsertIntoHadoopFsRelationCommand => // use cmd
  case _ => // handle error
}

8. Incomplete Error Recovery

Location: native/core/src/execution/operators/parquet_writer.rs:242-273

If writing fails mid-stream, partial files may be left on disk without cleanup.

Recommendation: Implement cleanup on failure or use atomic writes (write to temp file, then rename).

9. Missing Documentation

Several areas lack sufficient documentation:

  • The relationship between partition_id and Spark's TaskContext.getPartitionId()
  • Why arrow_ffi_safe is set to false in the scan operator
  • The expected format of output paths (local vs distributed filesystems)

🔵 Performance Considerations

1. Single-threaded Write per Partition

Each partition writes sequentially. For large partitions, this could be slow.

Consideration: Document the expected use case and performance characteristics. Consider allowing configuration for write batch size or buffering strategy.

2. Schema Cloning

Multiple Arc::clone operations on schema could be optimized.

Minor: Not a major issue but could be streamlined.


🟣 Code Style & Best Practices

1. TODO Comments

Location: spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala:80-81

// TODO this could delegate to CometRowToColumnar, but maybe Comet
// does not need to support this case?

This TODO should be tracked as an issue or removed if the decision is final.

2. Magic Numbers

Location: spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala:54, 132

Magic numbers like 1000 rows and 42 for random seed should be constants with meaningful names.

3. Commit History

The PR has 34 commits with messages like "save", "prep for review", "test", etc. Consider squashing into logical commits with descriptive messages before merge.


🔒 Security Assessment

High Priority

  1. Path injection vulnerability (detailed above) - MUST be fixed before merge
  2. Missing input validation on compression codec enum conversion

Medium Priority

  1. Resource exhaustion: No limits on file size or number of batches written
  2. File permission handling: No explicit permission setting on created files/directories

✅ Recommendations Summary

Before Merge (Required)

  1. ✅ Fix path injection vulnerability with proper validation
  2. ✅ Replace blocking I/O with async-safe alternatives
  3. ✅ Add error handling and cleanup for partial writes
  4. ✅ Fix unsafe asInstanceOf casts
  5. ✅ Add tests for error scenarios and different codecs

For Production Readiness (Future)

  1. Implement proper metrics collection
  2. Add support for distributed filesystems (HDFS, S3, etc.)
  3. Support partitioned and bucketed writes
  4. Handle row-based input operators
  5. Make ZSTD compression level configurable
  6. Add comprehensive documentation

Nice to Have

  1. Optimize schema handling to avoid redundant operations
  2. Add performance benchmarks comparing to Spark native writes
  3. Document performance characteristics and tuning guidelines

📊 Test Coverage Assessment

Current Coverage: Basic happy path only
Recommended Coverage:

  • ✅ Basic write (covered)
  • ❌ All compression codecs (snappy, lz4, zstd levels, none)
  • ❌ Error scenarios (invalid paths, permission denied, disk full)
  • ❌ Edge cases (empty df, single row, null values, large batches)
  • ❌ Data type coverage (systematic testing of all supported types)
  • ❌ Concurrent writes from multiple partitions
  • ❌ Write failures and partial file cleanup

Final Assessment

This is a solid proof-of-concept implementation that demonstrates native Parquet writes can work in Comet. The architecture is sound and follows good patterns. However, several critical issues (path validation, blocking I/O) must be addressed before this can be safely merged, even as an experimental feature.

The experimental status and default-disabled configuration are appropriate given the current implementation state.

Recommendation: Address critical security and correctness issues, add error scenario tests, then merge as experimental. Plan follow-up work for production readiness.

Great work on getting this initial implementation done! 🚀

Copy link
Copy Markdown

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 1 suggestions posted.

Comment augment review to trigger a new review at any time.

})?;

// Generate part file name for this partition
let part_file = format!("{}/part-{:05}.parquet", local_path, self.partition_id);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using only partition_id in the output filename (e.g., part-xxxxx.parquet) means task retries or speculative execution for the same Spark partition will write to the same path, risking overwrite and inconsistent results.

🤖 Was this useful? React with 👍 or 👎

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct that writing directly to the final destination may lead to problems if the write fails and is re-executed. It would be better to write to a temporary location first and then move the files/folder, a.k.a. atomic write.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (7)
spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala (1)

35-36: Verify DataWritingCommandExec gating and simplify writeExecs lookup

The new DataWritingCommandExec handling is structurally sound: it uses the existing CometOperatorSerde pipeline, respects enabledConfig, and cleanly falls back to the original exec if conversion fails.

Two minor points worth revisiting:

  1. Lookup key for writeExecs

    You currently do:

    CometExecRule.writeExecs.get(classOf[DataWritingCommandExec]) match {

    Since writeExecs is keyed by plan class (like allExecs), using exec.getClass keeps this consistent and is more robust if you ever add additional write handlers:

  •    CometExecRule.writeExecs.get(classOf[DataWritingCommandExec]) match {
    
  •    CometExecRule.writeExecs.get(exec.getClass) match {
    
    
    
  1. Double gating for native Parquet writes

    From CometDataWritingCommand.getSupportLevel (in spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala), supported Parquet writes currently return Incompatible(Some("Parquet write support is highly experimental")). Combined with isOperatorEnabled, this means native writes only activate when:

    • spark.comet.parquet.write.enabled = true and
    • spark.comet.operator.DataWritingCommandExec.allowIncompatible = true.

    If the intent is that the new spark.comet.parquet.write.enabled flag alone is the opt‑in for this experimental path, you may want to switch the support level for the supported case to Compatible(...) (still with a strong warning in the notes), or explicitly document the additional .allowIncompatible toggle.

Please confirm whether this double gating is intentional and, if so, whether you want to surface it in the user docs or config guide.

Also applies to: 51-52, 74-80, 229-245

spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala (2)

94-103: Polling loop with Thread.sleep may cause flaky tests.

The busy-wait pattern is generally discouraged. Consider using spark.sparkContext.listenerBus.waitUntilEmpty(timeoutMs) or Eventually from ScalaTest for more reliable synchronization.


141-148: Prefer idiomatic Scala over null initialization.

Using var sparkDf: DataFrame = null is not idiomatic Scala. Consider restructuring to avoid nulls, for example by collecting results directly into vals within each withSQLConf block.

-          var sparkDf: DataFrame = null
-          var cometDf: DataFrame = null
-          withSQLConf(CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false") {
-            sparkDf = spark.read.parquet(outputPath)
-          }
-          withSQLConf(CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true") {
-            cometDf = spark.read.parquet(outputPath)
-          }
-          checkAnswer(sparkDf, cometDf)
+          val sparkDf = withSQLConf(CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false") {
+            spark.read.parquet(outputPath).collect()
+          }
+          val cometDf = withSQLConf(CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true") {
+            spark.read.parquet(outputPath).collect()
+          }
+          assert(sparkDf.toSeq == cometDf.toSeq)
native/core/src/execution/planner.rs (1)

1451-1464: Zstd compression level is hardcoded to 3, unlike ShuffleWriter.

The ShuffleWriter path (line 1424) uses writer.compression_level from the protobuf message, but ParquetWriter hardcodes level 3. Consider making this configurable via the protobuf message for consistency.

-                    Ok(SparkCompressionCodec::Zstd) => Ok(CompressionCodec::Zstd(3)),
+                    Ok(SparkCompressionCodec::Zstd) => Ok(CompressionCodec::Zstd(writer.compression_level)),
spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala (1)

42-42: Minor typo: "Codes" should be "Codecs".

-  private val supportedCompressionCodes = Set("none", "snappy", "lz4", "zstd")
+  private val supportedCompressionCodecs = Set("none", "snappy", "lz4", "zstd")
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala (1)

91-97: Plan bytes are serialized twice - once in serializedPlanOpt and again here.

Consider caching the serialized bytes to avoid redundant serialization. You could capture serializedPlanOpt.plan.get before the transformation instead of re-serializing inside mapPartitionsInternal.

+    // Cache serialized plan bytes
+    val planBytes = serializedPlanOpt.plan.get
+
     // Execute native write operation
     childRDD.mapPartitionsInternal { iter =>
       val nativeMetrics = CometMetricNode.fromCometPlan(this)

-      val outputStream = new java.io.ByteArrayOutputStream()
-      nativeOp.writeTo(outputStream)
-      outputStream.close()
-      val planBytes = outputStream.toByteArray
-
       new CometExecIterator(
native/core/src/execution/operators/parquet_writer.rs (1)

222-236: Synchronous file I/O inside async execution context.

File::create and ArrowWriter::try_new are blocking operations that could stall the async executor. For an experimental feature this is acceptable, but consider using tokio::task::spawn_blocking or async file APIs in the future for better scalability.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 510d5b6 and 1d1d430.

📒 Files selected for processing (13)
  • .github/workflows/pr_build_linux.yml (1 hunks)
  • .github/workflows/pr_build_macos.yml (1 hunks)
  • common/src/main/scala/org/apache/comet/CometConf.scala (1 hunks)
  • docs/source/user-guide/latest/configs.md (1 hunks)
  • docs/source/user-guide/latest/operators.md (1 hunks)
  • native/core/src/execution/operators/mod.rs (1 hunks)
  • native/core/src/execution/operators/parquet_writer.rs (1 hunks)
  • native/core/src/execution/planner.rs (2 hunks)
  • native/proto/src/proto/operator.proto (2 hunks)
  • spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala (4 hunks)
  • spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala (1 hunks)
  • spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala (1 hunks)
  • spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: martin-augment
Repo: martin-augment/datafusion-comet PR: 14
File: common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:998-1012
Timestamp: 2025-11-10T07:10:16.261Z
Learning: In NativeBatchReader.java (Java, Apache Comet Parquet reader), when matching Spark field names to Parquet field names, always respect the `isCaseSensitive` configuration flag. Use case-sensitive comparison (`equals`) when `isCaseSensitive == true`, and case-insensitive comparison (`equalsIgnoreCase`) when `isCaseSensitive == false`. This prevents field lookup failures in Iceberg/Hive workloads that run in the default case-insensitive mode.
📚 Learning: 2025-11-11T15:01:48.203Z
Learnt from: martin-augment
Repo: martin-augment/datafusion-comet PR: 17
File: docs/source/contributor-guide/adding_a_new_operator.md:349-354
Timestamp: 2025-11-11T15:01:48.203Z
Learning: For Apache DataFusion Comet debugging documentation, the correct configuration keys are `spark.comet.explain.format=verbose` for verbose explain plans and `spark.comet.logFallbackReasons.enabled=true` for logging fallback reasons (not `spark.comet.explain.verbose` or `spark.comet.logFallbackReasons` without `.enabled`).

Applied to files:

  • docs/source/user-guide/latest/configs.md
  • common/src/main/scala/org/apache/comet/CometConf.scala
📚 Learning: 2025-11-10T07:10:16.261Z
Learnt from: martin-augment
Repo: martin-augment/datafusion-comet PR: 14
File: common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:998-1012
Timestamp: 2025-11-10T07:10:16.261Z
Learning: In NativeBatchReader.java (Java, Apache Comet Parquet reader), when matching Spark field names to Parquet field names, always respect the `isCaseSensitive` configuration flag. Use case-sensitive comparison (`equals`) when `isCaseSensitive == true`, and case-insensitive comparison (`equalsIgnoreCase`) when `isCaseSensitive == false`. This prevents field lookup failures in Iceberg/Hive workloads that run in the default case-insensitive mode.

Applied to files:

  • common/src/main/scala/org/apache/comet/CometConf.scala
🧬 Code graph analysis (5)
spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala (1)
spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala (3)
  • CometDataWritingCommand (40-175)
  • convert (82-148)
  • createExec (150-165)
spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala (4)
spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala (1)
  • CometTestBase (55-1300)
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala (1)
  • CometNativeWriteExec (43-112)
common/src/main/scala/org/apache/comet/CometConf.scala (2)
  • internal (1035-1038)
  • CometConf (46-789)
spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala (3)
  • DataGenOptions (262-268)
  • FuzzDataGenerator (35-236)
  • generateSchema (50-89)
spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala (4)
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala (1)
  • CometNativeWriteExec (43-112)
spark/src/main/scala/org/apache/comet/DataTypeSupport.scala (2)
  • DataTypeSupport (73-82)
  • isComplexType (78-81)
spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala (2)
  • Incompatible (39-39)
  • Unsupported (42-42)
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala (2)
  • QueryPlanSerde (45-717)
  • serializeDataType (274-368)
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala (3)
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala (2)
  • SerializedPlan (482-486)
  • newIterId (113-113)
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala (1)
  • executeColumnar (224-231)
spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala (1)
  • fromCometPlan (262-265)
native/core/src/execution/planner.rs (1)
native/core/src/execution/operators/parquet_writer.rs (2)
  • children (150-152)
  • try_new (71-97)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: claude-review
🔇 Additional comments (19)
docs/source/user-guide/latest/operators.md (1)

25-31: Docs entry correctly reflects experimental native Parquet writes

The new DataWritingCommandExec row accurately describes the feature as experimental and disabled by default; no changes needed here.

docs/source/user-guide/latest/configs.md (1)

136-149: Config docs match implementation for spark.comet.parquet.write.enabled

The description, key name, and default false value are consistent with CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED and clearly mark the feature as highly experimental and non‑production.

.github/workflows/pr_build_macos.yml (1)

84-89: Add CometParquetWriterSuite to macOS parquet CI suite

Including org.apache.comet.parquet.CometParquetWriterSuite in the parquet suite keeps macOS CI in sync with Linux and ensures the new native writer path is exercised.

.github/workflows/pr_build_linux.yml (1)

120-129: Linux parquet CI also runs CometParquetWriterSuite

Adding org.apache.comet.parquet.CometParquetWriterSuite to the Linux parquet test suite gives CI coverage for the new writer on all Linux profiles.

native/core/src/execution/operators/mod.rs (1)

28-35: ParquetWriterExec correctly wired into operators module

Declaring mod parquet_writer; and pub use parquet_writer::ParquetWriterExec; follows the existing pattern for operators and exposes the writer exec cleanly for planner/use sites.

native/proto/src/proto/operator.proto (1)

38-53: ParquetWriter proto variant and message are consistent with usage

The new ParquetWriter message and corresponding parquet_writer = 113 oneof arm integrate cleanly:

  • Reuses CompressionCodec for consistency with ShuffleWriter.
  • Encodes just the essentials (output_path, compression, column_names) expected from CometDataWritingCommand.

The gap in field numbering (no field 3) is valid in proto3 and does not introduce correctness issues.

Also applies to: 240-245

common/src/main/scala/org/apache/comet/CometConf.scala (1)

103-112: New COMET_NATIVE_PARQUET_WRITE_ENABLED flag is well-scoped

spark.comet.parquet.write.enabled is correctly categorized under testing, default‑off, and clearly documented as highly experimental and non‑production, which aligns with its use in CometDataWritingCommand.enabledConfig.

spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala (2)

37-39: Good feature guard using assume.

Skipping when native DataFusion scan is in use makes sense since this test targets the non-native scan path.


112-124: Plan verification logic is correct.

The check for CometNativeWriteExec either at top level or nested inside DataWritingCommandExec properly covers expected plan structures.

native/core/src/execution/planner.rs (2)

25-25: Import addition looks correct.

Adding ParquetWriterExec to the operators import aligns with the new operator.


1466-1482: ParquetWriter operator construction follows established patterns.

The implementation correctly builds the child plan recursively, constructs ParquetWriterExec with appropriate parameters, and wraps it in a SparkPlan. This mirrors the ShuffleWriter pattern.

spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala (3)

47-80: getSupportLevel validation is thorough.

The method correctly checks for ParquetFileFormat, local filesystem paths, absence of bucketing/partitioning, primitive data types, and supported compression codecs. Returning Incompatible for the experimental feature is appropriate.


150-165: createExec properly unwraps WriteFilesExec.

The logic to extract the actual child plan from WriteFilesExec when present is correct for proper plan tree construction.


115-123: Same case-sensitivity issue in convert method.

The match on compression codec strings should handle case insensitivity to match how Spark handles compression options.

-      val codec = parseCompressionCodec(cmd) match {
+      val codec = parseCompressionCodec(cmd).toLowerCase match {
⛔ Skipped due to learnings
Learnt from: martin-augment
Repo: martin-augment/datafusion-comet PR: 14
File: common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:998-1012
Timestamp: 2025-11-10T07:10:16.261Z
Learning: In NativeBatchReader.java (Java, Apache Comet Parquet reader), when matching Spark field names to Parquet field names, always respect the `isCaseSensitive` configuration flag. Use case-sensitive comparison (`equals`) when `isCaseSensitive == true`, and case-insensitive comparison (`equalsIgnoreCase`) when `isCaseSensitive == false`. This prevents field lookup failures in Iceberg/Hive workloads that run in the default case-insensitive mode.
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala (3)

43-45: Class declaration looks correct.

The case class properly extends CometNativeExec and UnaryExecNode, following the pattern of other Comet execution operators.


76-84: Row-based fallback throws instead of converting.

The TODO comment suggests delegating to CometRowToColumnar. For now, throwing UnsupportedOperationException is acceptable for an experimental feature, but consider logging a warning with context about which operator triggered this.


61-70: doExecute correctly delegates to columnar path.

The implementation properly executes the columnar write and converts the result to an empty InternalRow RDD, which is the expected behavior for write operations.

native/core/src/execution/operators/parquet_writer.rs (2)

99-106: Compression codec mapping looks correct.

The conversion from CompressionCodec to Parquet Compression correctly handles all variants including Zstd with configurable level.


245-265: Batch processing and column renaming logic is correct.

The write task properly iterates through input batches, renames columns when needed, writes to Parquet, and closes the writer. Error handling with descriptive messages is good.

self.output_path, self.compression
)
}
DisplayFormatType::TreeRender => unimplemented!(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

unimplemented!() will panic at runtime.

If TreeRender format is ever requested, this will crash. Consider returning a reasonable default or the same output as Default.

-            DisplayFormatType::TreeRender => unimplemented!(),
+            DisplayFormatType::TreeRender => {
+                write!(f, "ParquetWriterExec: path={}", self.output_path)
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
DisplayFormatType::TreeRender => unimplemented!(),
DisplayFormatType::TreeRender => {
write!(f, "ParquetWriterExec: path={}", self.output_path)
}
🤖 Prompt for AI Agents
In native/core/src/execution/operators/parquet_writer.rs around line 119, the
match arm for DisplayFormatType::TreeRender currently calls unimplemented!()
which will panic at runtime; change this arm to return a non-panicking, sensible
output (for example, reuse the same rendering path as DisplayFormatType::Default
or return the default formatted string) so TreeRender is handled safely; ensure
the function compiles by matching the expected return type and include any
necessary imports or helper calls already used by the Default branch.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:annoying; category:bug; feedback: The CodeRabbit AI reviewer is not correct ! It is better to fail early on invalid/unsupported config value then pretending that it is supported. Prevents confusion in the developer that something works as intended.

Comment on lines +138 to +140
fn statistics(&self) -> Result<Statistics> {
self.input.partition_statistics(None)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Verify partition_statistics vs Statistics::new_unknown().

Using self.input.partition_statistics(None) may not be semantically correct for a writer operator. Consider returning Ok(Statistics::new_unknown(input_schema)) instead since writers typically don't produce statistics.


🌐 Web query:

What should ExecutionPlan::statistics return for a sink/writer operator in DataFusion?

💡 Result:

Use unknown stats — return Statistics::new_unknown (i.e. don’t error). The ExecutionPlan docs say nodes that have no statistics should return Statistics::new_unknown; likewise TableProvider::insert_into describes the insert plan behaviour (it should produce a single-row "count" result), but does not require concrete statistics for writer/sink plans. [1][2]

Sources:
[1] ExecutionPlan::statistics / partition_statistics docs — DataFusion. [2] TableProvider::insert_into (returned plan should emit a single UInt64 "count") — DataFusion.


Return Statistics::new_unknown() for writer operators instead of forwarding input statistics.

Per DataFusion documentation, sink/writer operators should return Statistics::new_unknown() rather than partition_statistics. The statistics() method should not forward input statistics since writer operators don't produce statistical information about their output—they only emit row counts.

Change line 139 from self.input.partition_statistics(None) to return unknown statistics.

🤖 Prompt for AI Agents
In native/core/src/execution/operators/parquet_writer.rs around lines 138 to
140, the statistics() implementation incorrectly forwards input partition
statistics; for a writer/sink operator replace that call and return
Statistics::new_unknown() instead so the writer reports unknown statistics
rather than propagating input stats.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct that the output stats should not be the input stats. It is better to return unknown stats.

Comment on lines +172 to +177
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Partition ID mismatch: uses self.partition_id instead of partition parameter.

The execute method receives a partition: usize parameter, but the part file name uses self.partition_id (line 220). This could cause issues if the execution partition differs from the stored partition ID. Consider using the partition parameter instead.

-        let part_file = format!("{}/part-{:05}.parquet", local_path, self.partition_id);
+        let part_file = format!("{}/part-{:05}.parquet", local_path, partition);

Also applies to: 220-228

🤖 Prompt for AI Agents
In native/core/src/execution/operators/parquet_writer.rs around lines 172-177
(and also 220-228), the execute method is using self.partition_id when
constructing the part file name instead of the partition parameter passed into
execute; change usages of self.partition_id in this method (and the part
filename construction at lines ~220-228) to use the local partition parameter so
the runtime execution partition is used consistently (replace self.partition_id
with partition, preserving any necessary type conversion/formatting).

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:incorrect-but-reasonable; category:bug; feedback:The CodeRabbit AI reviewer is not correct! The same value for partition id is passed when the writer is created and as a parameter to the execute() method. Both will work the same.

Comment on lines +68 to +71
val codec = parseCompressionCodec(cmd)
if (!supportedCompressionCodes.contains(codec)) {
return Unsupported(Some(s"Unsupported compression codec: $codec"))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Compression codec comparison may be case-sensitive.

The parseCompressionCodec returns the raw config value which could be uppercase (e.g., "SNAPPY"). Consider normalizing to lowercase before comparison.

-            val codec = parseCompressionCodec(cmd)
-            if (!supportedCompressionCodes.contains(codec)) {
+            val codec = parseCompressionCodec(cmd).toLowerCase
+            if (!supportedCompressionCodecs.contains(codec)) {

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
around lines 68 to 71, the code compares the raw codec string returned by
parseCompressionCodec against supportedCompressionCodes in a case-sensitive way;
normalize the parsed codec (e.g., call .trim.toLowerCase(Locale.ROOT) or
otherwise standardize casing) before checking membership (or perform a
case-insensitive membership check such as comparing against a lowercased set) so
values like "SNAPPY" match "snappy" and avoid false Unsupported results.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback:The CodeRabbit AI reviewer is correct that the parsing of the codec names could be made more relaxed and allow any case. This would make it more user friendly.

@martin-augment
Copy link
Copy Markdown
Owner Author

1. Critical: Synchronous I/O in Async Context

Location: native/core/src/execution/operators/parquet_writer.rs:172-280

The execute() method creates file handles and performs directory operations synchronously within an async context. This can block the async runtime:

// Lines 211-228: Blocking filesystem operations
std::fs::create_dir_all(&local_path).map_err(...)?;
let file = File::create(&part_file).map_err(...)?;
let mut writer = ArrowWriter::try_new(file, ...).map_err(...)?;

Recommendation: Use tokio::fs or spawn blocking operations:

value:annoying; category:bug; feedback:The Claude AI reviewer is not correct. The trait is annotated with #[async_trait] but the execute() method where this code snippet is taken from does not have async in its signature, so it is not executed in async environment.

@martin-augment
Copy link
Copy Markdown
Owner Author

2. Path Injection Vulnerability

Location: native/core/src/execution/operators/parquet_writer.rs:204-220

The code strips URL prefixes and uses the path directly without validation:

let local_path = output_path
    .strip_prefix("file://")
    .or_else(|| output_path.strip_prefix("file:"))
    .unwrap_or(&output_path)
    .to_string();

Concern: Malicious paths like ../../../etc/passwd or paths with null bytes could be exploited.

value:annoying; category:bug; feedback:The Claude AI reviewer is not correct! The paths are not checked intentionally. It is a responsibility of the user to make sure they are safe. The user could pass directly "/etc/passwd" and the code will try to create a directory with that name, not a file. It will fail if there is a file with that name or if the user has no permissions to create it.

@martin-augment
Copy link
Copy Markdown
Owner Author

3. Inconsistent Column Name Handling

Location: native/core/src/execution/operators/parquet_writer.rs:183-202, 248-260

Column names are renamed twice - once when creating the output schema and again when writing each batch. This is redundant and could lead to inconsistencies.

Recommendation: Rename columns once when creating the schema, then use that schema consistently.

value:good-to-have; category:bug; feedback:The Claude AI reviewer is correct that the columns are renamed twice and it should be optimized. Prevents wasting CPU resources.

@martin-augment
Copy link
Copy Markdown
Owner Author

4. Missing Metrics

Location: native/core/src/execution/operators/parquet_writer.rs:64

The metrics field is initialized but never populated with actual metrics (rows written, bytes written, write time, etc.).

value:good-to-have; category:bug; feedback:The Claude AI reviewer is correct that the metrics are not yet implemented. They will be implemented in a follow up Pull Request.

@martin-augment
Copy link
Copy Markdown
Owner Author

6. Hardcoded Zstd Compression Level

Location: native/core/src/execution/planner.rs:1458

Ok(SparkCompressionCodec::Zstd) => Ok(CompressionCodec::Zstd(3)),

The ZSTD compression level is hardcoded to 3. Spark allows configuring this via spark.sql.parquet.compression.codec with levels.

value:good-to-have; category:bug; feedback:The Claude AI reviewer is correct that the compression level is currently hardcoded. The protobuf file should be extended to also pass an optional compression level for the codec.

@martin-augment
Copy link
Copy Markdown
Owner Author

7. Unsafe Cast Without Validation

Location: spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala:88, 151

val cmd = op.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand]

The code uses asInstanceOf without verifying the type first, which could throw ClassCastException.

Recommendation: Pattern match or check type before casting:

value:good-but-wont-fix; category:bug; feedback:The Claude AI reviewer is correct that this may fail with a RuntimeException but it is intentional. All kinds of exceptions are try/catched, logged and the execution falls back to Apache Spark.

@martin-augment
Copy link
Copy Markdown
Owner Author

8. Incomplete Error Recovery

Location: native/core/src/execution/operators/parquet_writer.rs:242-273

If writing fails mid-stream, partial files may be left on disk without cleanup.

Recommendation: Implement cleanup on failure or use atomic writes (write to temp file, then rename).

value:good-to-have; category:bug; feedback:The Claude AI reviewer is correct that the files are not cleaned up. It should be improved with atomic writes. Prevents file system pollution in case of an error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants