From 953919c29d91604c11fe05b9212f8de019fd4e5f Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 31 Mar 2026 08:21:01 -0700 Subject: [PATCH 1/5] chore: `native_datafusion` fails on repartition + count --- .../comet/exec/CometNativeShuffleSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index efb5fbca8a..d955af89f1 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -474,4 +474,24 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } } } + + test("native datafusion scan - repartition count") { + withTempPath { dir => + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + spark + .range(1000) + .selectExpr("id", "concat('name_', id) as name") + .repartition(100) + .write + .parquet(dir.toString) + } + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") { + val count = spark.read.parquet(dir.toString).repartition(10).count() + checkSparkAnswerAndOperator(spark.read.parquet(dir.toString).repartition(10)) + assert(count == 1000) + } + } + } } From 3a3b715243f068279dcb2bd98ab4b71492e1e999 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 31 Mar 2026 11:21:46 -0700 Subject: [PATCH 2/5] chore: `native_datafusion` fails on repartition + count --- .../partitioned_batch_iterator.rs | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 8309a8ed4a..40d61b3c21 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -97,15 +97,25 @@ impl Iterator for PartitionedBatchIterator<'_> { let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); let indices = &self.indices[self.pos..indices_end]; - match interleave_record_batch(&self.record_batches, indices) { - Ok(batch) => { - self.pos = indices_end; - Some(Ok(batch)) - } - Err(e) => Some(Err(DataFusionError::ArrowError( - Box::from(e), - Some(DataFusionError::get_back_trace()), - ))), - } + + // record_batches is guaranteed non-empty when indices is non-empty + // (indices reference rows within the buffered batches) + let schema = self.record_batches[0].schema(); + + let result = if schema.fields().is_empty() { + // For zero-column batches (e.g. COUNT queries), we can't use + // interleave_record_batch because Arrow requires either at least one + // column or an explicit row count. Create the batch directly. + let options = + arrow::array::RecordBatchOptions::new().with_row_count(Some(indices.len())); + RecordBatch::try_new_with_options(schema, vec![], &options) + } else { + interleave_record_batch(&self.record_batches, indices) + }; + + self.pos = indices_end; + Some(result.map_err(|e| { + DataFusionError::ArrowError(Box::from(e), Some(DataFusionError::get_back_trace())) + })) } } From c2a8f27b57261b8e2f4051c17881258c2982a970 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 31 Mar 2026 12:31:47 -0700 Subject: [PATCH 3/5] fix branch predictability --- .../shuffle/src/partitioners/partitioned_batch_iterator.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs index 40d61b3c21..ef0e6f425d 100644 --- a/native/shuffle/src/partitioners/partitioned_batch_iterator.rs +++ b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs @@ -102,15 +102,15 @@ impl Iterator for PartitionedBatchIterator<'_> { // (indices reference rows within the buffered batches) let schema = self.record_batches[0].schema(); - let result = if schema.fields().is_empty() { + let result = if !schema.fields.is_empty() { + interleave_record_batch(&self.record_batches, indices) + } else { // For zero-column batches (e.g. COUNT queries), we can't use // interleave_record_batch because Arrow requires either at least one // column or an explicit row count. Create the batch directly. let options = arrow::array::RecordBatchOptions::new().with_row_count(Some(indices.len())); RecordBatch::try_new_with_options(schema, vec![], &options) - } else { - interleave_record_batch(&self.record_batches, indices) }; self.pos = indices_end; From dadbec01b5d34a950a61b259e4441e497732ef12 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 31 Mar 2026 16:48:17 -0700 Subject: [PATCH 4/5] chore: `native_datafusion` fails on repartition + count --- .../org/apache/comet/exec/CometNativeShuffleSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index d955af89f1..d39fa8ac00 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -488,9 +488,12 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") { - val count = spark.read.parquet(dir.toString).repartition(10).count() - checkSparkAnswerAndOperator(spark.read.parquet(dir.toString).repartition(10)) + val testDF = spark.read.parquet(dir.toString).repartition(10) + // Actual validation, no crash + val count = testDF.count() assert(count == 1000) + // Ensure test df evaluated by Comet + checkSparkAnswerAndOperator(testDF) } } } From c51015f2e1b13507edbf84e638fbb80241d68b02 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Wed, 1 Apr 2026 13:59:37 +0300 Subject: [PATCH 5/5] Add CLAUDE.md & AGENTS.md --- .cursor/rules.md | 5 +++++ AGENTS.md | 5 +++++ CLAUDE.md | 5 +++++ 3 files changed, 15 insertions(+) create mode 100644 .cursor/rules.md create mode 100644 AGENTS.md create mode 100644 CLAUDE.md diff --git a/.cursor/rules.md b/.cursor/rules.md new file mode 100644 index 0000000000..00385e6f5f --- /dev/null +++ b/.cursor/rules.md @@ -0,0 +1,5 @@ +The AI agents should never link to any issue or a pull request +in any GitHub repository in the code reviews! + +The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! + diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000000..00385e6f5f --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,5 @@ +The AI agents should never link to any issue or a pull request +in any GitHub repository in the code reviews! + +The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! + diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000000..00385e6f5f --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,5 @@ +The AI agents should never link to any issue or a pull request +in any GitHub repository in the code reviews! + +The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! +