Skip to content

3858: chore: fix native shuffle for thin batches#53

Open
martin-augment wants to merge 5 commits intomainfrom
pr-3858-2026-04-01-10-59-37
Open

3858: chore: fix native shuffle for thin batches#53
martin-augment wants to merge 5 commits intomainfrom
pr-3858-2026-04-01-10-59-37

Conversation

@martin-augment
Copy link
Copy Markdown
Owner

3858: To review by AI

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 1, 2026

Walkthrough

This pull request adds guidelines to three AI agent configuration files (.cursor/rules.md, AGENTS.md, and CLAUDE.md) specifying that AI agents must not link to GitHub issues or pull requests during code reviews and must not review AI agent configuration files. Additionally, the PartitionedBatchIterator::next method in the native shuffle module is updated to handle zero-column batches by constructing them directly instead of using the standard interleaving process. A new test case is added to validate native datafusion scan with round-robin shuffle repartitioning on a 1,000-row dataset.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pr-3858-2026-04-01-10-59-37

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude bot commented Apr 1, 2026

Review: Fix native shuffle for thin batches

This PR fixes a crash in the native shuffle path when dealing with zero-column batches, which can arise during count() queries when the query optimizer projects away all columns before shuffling.

What the fix does

The change in partitioned_batch_iterator.rs branches on whether the schema has any fields. For schemas with columns it uses the existing interleave_record_batch path. For zero-column schemas it creates the batch directly via RecordBatch::try_new_with_options with an explicit row count. Arrow's interleave_record_batch cannot handle the zero-column case because it has no way to infer the output row count without at least one column.

There is also a nice secondary fix here: in the original code, self.pos was only updated inside the Ok arm of the match, meaning that if interleave_record_batch returned an error the iterator would be stuck in an infinite loop producing the same error on every call. The new code always advances pos regardless of the result.

Implementation looks solid

The comment explaining the invariant (record_batches is guaranteed non-empty when indices is non-empty) is helpful. The guard at the top of next() (if self.pos >= self.indices.len()) ensures we never reach the record_batches[0] access with an empty slice, provided the caller maintains the invariant that non-empty indices implies non-empty batches. That invariant is enforced in the new() constructor's early return for the empty-indices case.

One small thought: should the schema be taken from self.record_batches[0] for the zero-column case, or would it be cleaner to store the schema as a field on PartitionedBatchIterator so it's accessible even when record_batches might theoretically be empty? Probably not necessary given the invariant holds, but worth a second look.

Test coverage

The test native datafusion scan - repartition count writes 1000 rows across 100 partitions and reads them back with 10 partitions using native DataFusion scan and round-robin repartitioning. The count() call is where the zero-column batch path is exercised (the optimizer drops all columns before the shuffle since they are not needed for counting). The assertion that count == 1000 would catch any incorrect row counts.

It might be worth adding a more direct test that explicitly verifies the zero-column batch case, for example a simple SELECT COUNT(*) FROM t over a table that is shuffled through native Comet. The current test relies on the optimizer removing columns before the shuffle, which is an implicit dependency on optimizer behavior. If the optimizer changes, the test might no longer exercise the buggy path. That said, if the existing test was reproducing the original crash reliably, it serves its purpose as a regression test.

One other thing: after calling testDF.count() the test calls checkSparkAnswerAndOperator(testDF). The second call re-executes the full query with both columns present, which also exercises the normal (non-zero-column) shuffle path. That is a useful sanity check that the change did not break anything for the regular case.

Overall

This is a clean, targeted fix for a real crash. The logic is correct, the comment explaining the Arrow limitation is clear, and the secondary pos bug fix is a nice bonus. Good work.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for zero-column batches in the native shuffle partitioner, specifically addressing issues with COUNT queries where interleave_record_batch cannot be used. It also adds a corresponding test case in CometNativeShuffleSuite to verify the fix and includes new AI agent guidelines in .cursor/rules.md, AGENTS.md, and CLAUDE.md. I have no feedback to provide.

@augmentcode
Copy link
Copy Markdown

augmentcode bot commented Apr 1, 2026

🤖 Augment PR Summary

Summary: Prevents native shuffle crashes when shuffling “thin” Arrow batches that have 0 columns but a non-zero row count (e.g., column-pruned COUNT(*) queries).

Changes:

  • Update PartitionedBatchIterator to bypass interleave_record_batch for empty schemas and instead construct a RecordBatch with an explicit row count.
  • Add a Spark regression test for DataFusion native scan + round-robin repartition + count() to validate no crash and correct results.

Notes: The fix relies on explicitly setting RecordBatchOptions.row_count for zero-column batches.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 2 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

RecordBatch::try_new_with_options(schema, vec![], &options)
};

self.pos = indices_end;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

native/shuffle/src/partitioners/partitioned_batch_iterator.rs:116 — self.pos is advanced unconditionally before returning, so an Err from interleave_record_batch/try_new_with_options will still consume those indices and the iterator can continue yielding later batches. Is that behavior change from the previous implementation (which only advanced on Ok) intended?

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-but-wont-fix; category:bug; feedback: The Augment AI reviewer is correct! Yes, this is intentional. Without advancing the position an error and re-attempt will lead to the same error and it won't be able to proceed at all by skipping the problematic data.

CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key -> "true") {
val testDF = spark.read.parquet(dir.toString).repartition(10)
// Actual validation, no crash
val count = testDF.count()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala:493 — This test asserts the count() result and that testDF (without the count) runs with Comet, but it doesn’t assert that the count() plan used Comet native operators (it could potentially fall back and still pass). That could let the original zero-column native-shuffle crash slip through if fallback behavior changes.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! The call to checkSparkAnswerAndOperator() that tests the execution of the data frame on both Spark and Comet does not use count(). To keep the test focused it would be good to use the same dataframe there too.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
.cursor/rules.md (1)

1-5: Optional maintainability improvement: avoid policy drift across duplicated rule files.

The same text is repeated in multiple files; consider defining one canonical source and syncing/generated copies to prevent divergence over time.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.cursor/rules.md around lines 1 - 5, Multiple rule files contain duplicated
policy text (e.g., the blocks in .cursor/rules.md, CLAUDE.md, AGENTS.md);
consolidate by creating one canonical rules source (choose or create a single
file such as RULES.md) and update the other files to be generated or reference
the canonical file during CI/commit using a simple sync/generation step; replace
the duplicated blocks in the copies with a short pointer or include directive
and add a test or CI check to fail the build if the copies drift from the
canonical source to prevent future divergence.
spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala (1)

492-497: Strengthen coverage by validating the aggregate plan, not only the base DataFrame.

On Line 496, checkSparkAnswerAndOperator(testDF) verifies the non-aggregate plan. The regression here is repartition + count; consider asserting operator usage on an explicit count DataFrame.

Proposed test adjustment
-        val testDF = spark.read.parquet(dir.toString).repartition(10)
-        // Actual validation, no crash
-        val count = testDF.count()
-        assert(count == 1000)
-        // Ensure test df evaluated by Comet
-        checkSparkAnswerAndOperator(testDF)
+        val testDF = spark.read.parquet(dir.toString).repartition(10)
+        val countDF = testDF.selectExpr("count(*) as cnt")
+        val count = countDF.collect().head.getLong(0)
+        assert(count == 1000)
+        // Ensure repartition + count plan is evaluated by Comet
+        checkSparkAnswerAndOperator(countDF)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala`
around lines 492 - 497, The test currently validates only the base DataFrame
plan via checkSparkAnswerAndOperator(testDF); instead create an explicit
aggregate DataFrame (e.g., val countDF = testDF.groupBy().count() or val countDF
= testDF.agg(count("*").as("cnt"))), assert the aggregated result equals 1000,
and call checkSparkAnswerAndOperator(countDF) so the operator usage for the
aggregation (repartition+count) is verified; update references to testDF to use
countDF for the operator check and include the aggregation import
(org.apache.spark.sql.functions.count) as needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In @.cursor/rules.md:
- Around line 1-5: Multiple rule files contain duplicated policy text (e.g., the
blocks in .cursor/rules.md, CLAUDE.md, AGENTS.md); consolidate by creating one
canonical rules source (choose or create a single file such as RULES.md) and
update the other files to be generated or reference the canonical file during
CI/commit using a simple sync/generation step; replace the duplicated blocks in
the copies with a short pointer or include directive and add a test or CI check
to fail the build if the copies drift from the canonical source to prevent
future divergence.

In `@spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala`:
- Around line 492-497: The test currently validates only the base DataFrame plan
via checkSparkAnswerAndOperator(testDF); instead create an explicit aggregate
DataFrame (e.g., val countDF = testDF.groupBy().count() or val countDF =
testDF.agg(count("*").as("cnt"))), assert the aggregated result equals 1000, and
call checkSparkAnswerAndOperator(countDF) so the operator usage for the
aggregation (repartition+count) is verified; update references to testDF to use
countDF for the operator check and include the aggregation import
(org.apache.spark.sql.functions.count) as needed.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: beb23cc0-f507-4490-905e-5a18b34fdbc1

📥 Commits

Reviewing files that changed from the base of the PR and between 93eca2d and c51015f.

📒 Files selected for processing (5)
  • .cursor/rules.md
  • AGENTS.md
  • CLAUDE.md
  • native/shuffle/src/partitioners/partitioned_batch_iterator.rs
  • spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala

@martin-augment
Copy link
Copy Markdown
Owner Author

val testDF = spark.read.parquet(dir.toString).repartition(10)
+        val countDF = testDF.selectExpr("count(*) as cnt")
+        val count = countDF.collect().head.getLong(0)
+        assert(count == 1000)
+        // Ensure repartition + count plan is evaluated by Comet
+        checkSparkAnswerAndOperator(countDF)

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The call to checkSparkAnswerAndOperator() that tests the execution of the data frame on both Spark and Comet does not use count(). To keep the test focused it would be good to use the same dataframe there too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants