2836: feat: Add support for explode and explode_outer for array inputs#34
2836: feat: Add support for explode and explode_outer for array inputs#34martin-augment wants to merge 19 commits intomainfrom
explode and explode_outer for array inputs#34Conversation
WalkthroughThe changes add support for the Explode operator (GenerateExec) to Apache Comet's native execution engine. This includes adding a configuration flag (COMET_EXEC_EXPLODE_ENABLED), proto definitions for the Explode operator, native planner logic for converting Explode operations to DataFusion's UnnestExec, Spark serialization/deserialization (CometExplodeExec), comprehensive test coverage, documentation updates, and microbenchmark integration. The implementation validates that explode generators are deterministic, single-input, named "explode", and operate on Arrays rather than Maps. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
|
||
| // The output column has the same name as the input array column | ||
| // but with the element type instead of the list type | ||
| output_fields.push(Field::new( |
There was a problem hiding this comment.
The exploded element column is marked nullable unconditionally; to keep the native schema consistent with the input and Spark/DataFusion expectations, consider basing nullability on the list’s element field (only requiring nullable when outer semantics can introduce nulls). This mismatch could lead to downstream inconsistencies that rely on nullability metadata.
🤖 Was this useful? React with 👍 or 👎
| int32 num_expr_per_project = 3; | ||
| } | ||
|
|
||
| message Explode { |
There was a problem hiding this comment.
The comment says explode_outer produces a null row for empty arrays, but the current implementation (via DataFusion Unnest) doesn’t preserve empty arrays as null rows. Consider clarifying the doc to avoid implying behavior that isn’t implemented (also noted in the Scala support check).
🤖 Was this useful? React with 👍 or 👎
Pull Request Review: Add support for explode and explode_outer for array inputsOverall AssessmentThis is a well-structured PR that adds support for the explode and explode_outer operators for array inputs in Apache DataFusion Comet. The implementation follows the project's conventions and demonstrates good understanding of the codebase architecture. The code quality is generally high with comprehensive testing. Code Quality & Best PracticesStrengths
Areas for Improvement1. Error Handling in Rust (planner.rs:1570-1582)Issue: Multiple uses of .expect() which will panic on failure. While these cases should theoretically not fail, panics provide poor debugging experience. Recommendation: Consider using proper error propagation with map_err. However, I note that .expect() is used elsewhere in the codebase (23 occurrences in planner.rs), so this aligns with existing patterns. If the project prefers .expect() for cases that should never fail, this is acceptable. 2. Schema Construction Logic (planner.rs:1594-1622)Observation: The schema building logic is quite verbose with manual field iteration. Consider if this could be simplified, though the explicit approach does make the logic clear. This is a minor point and current code is clear and correct. 3. SQL Test File Missing NewlineIssue: The SQL file spark/src/test/resources/tpcds-micro-benchmarks/explode.sql ends without a newline character, which may trigger linter warnings. Recommendation: Add a newline at the end of the file. Potential Bugs or IssuesCritical: Known Incompatibility with explode_outerLocation: operators.scala:901-905 The code correctly identifies that explode_outer with empty arrays has incompatible semantics with DataFusion, referencing upstream issue apache/datafusion#19053 Issues:
Recommendations:
Minor: Test Configuration InconsistencySome explode_outer tests require CometConf.getOperatorAllowIncompatConfigKey(classOf[GenerateExec]) while others don't. Ensure consistency in test setup for incompatible features. Performance ConsiderationsPositive Aspects
Suggestions
Security ConcernsNo Major Security Issues
Minor ObservationsThe .expect() calls could theoretically panic if schemas are malformed, but this is caught by validation earlier in the pipeline. Test CoverageComprehensive CoverageThe test suite covers simple arrays, empty arrays, null arrays, multiple columns projection, nullable columns, mixed scenarios, filter operations after explode, and fallback scenarios. Total tests: 12 (3 ignored due to known issues) Missing Test Cases
SummaryThis is a high-quality PR that demonstrates good understanding of the Comet architecture and follows established patterns well. The implementation is solid, with comprehensive testing and proper documentation. Key Recommendations Priority
Approval StatusThe code is well-implemented and follows best practices. The known issue with explode_outer is properly documented and tracked. With the recommendations above addressed, this PR will be a valuable addition to the project. Great work on this feature implementation! |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
docs/source/user-guide/latest/operators.md (1)
25-35: Optional: clarify explode vs explode_outer behavior for GenerateExecThe note “Supports
explodegenerator only.” is accurate but a bit terse. Consider clarifying that only theexplodegenerator on array inputs is supported by default, and thatexplode_outerhas different semantics and is not enabled unless explicitly allowed. This would align the doc more closely with the support checks inCometExplodeExec.spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala (1)
32-233: Good coverage of explode / explode_outer behavior and fallbacksThis suite exercises the main happy paths, null/empty/mixed-array cases, config-gated incompat paths, and map-input fallback, and it documents the known
explode_outer/empty-array gap via ignored tests tied to issue 2838. Once the Scala support-level logic is adjusted forexplode_outeron maps, you might optionally add a small test mirroring"explode with map input falls back"but usingexplode_outer(map)to lock in the intended fallback reason.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
.github/workflows/pr_build_linux.yml(1 hunks).github/workflows/pr_build_macos.yml(1 hunks)common/src/main/scala/org/apache/comet/CometConf.scala(1 hunks)docs/source/user-guide/latest/configs.md(1 hunks)docs/source/user-guide/latest/operators.md(1 hunks)native/core/src/execution/planner.rs(2 hunks)native/proto/src/proto/operator.proto(2 hunks)spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala(1 hunks)spark/src/main/scala/org/apache/spark/sql/comet/operators.scala(5 hunks)spark/src/test/resources/tpcds-micro-benchmarks/explode.sql(1 hunks)spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala(1 hunks)spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: martin-augment
Repo: martin-augment/datafusion-comet PR: 17
File: docs/source/contributor-guide/adding_a_new_operator.md:349-354
Timestamp: 2025-11-11T15:01:48.203Z
Learning: For Apache DataFusion Comet debugging documentation, the correct configuration keys are `spark.comet.explain.format=verbose` for verbose explain plans and `spark.comet.logFallbackReasons.enabled=true` for logging fallback reasons (not `spark.comet.explain.verbose` or `spark.comet.logFallbackReasons` without `.enabled`).
📚 Learning: 2025-11-11T15:01:48.203Z
Learnt from: martin-augment
Repo: martin-augment/datafusion-comet PR: 17
File: docs/source/contributor-guide/adding_a_new_operator.md:349-354
Timestamp: 2025-11-11T15:01:48.203Z
Learning: For Apache DataFusion Comet debugging documentation, the correct configuration keys are `spark.comet.explain.format=verbose` for verbose explain plans and `spark.comet.logFallbackReasons.enabled=true` for logging fallback reasons (not `spark.comet.explain.verbose` or `spark.comet.logFallbackReasons` without `.enabled`).
Applied to files:
docs/source/user-guide/latest/configs.mdcommon/src/main/scala/org/apache/comet/CometConf.scalaspark/src/main/scala/org/apache/spark/sql/comet/operators.scala
🧬 Code graph analysis (2)
spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala (1)
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala (2)
CometExplodeExec(886-961)CometExplodeExec(963-1002)
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala (1)
spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala (3)
Compatible(31-31)Incompatible(39-39)Unsupported(42-42)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Cursor Bugbot
- GitHub Check: claude-review
🔇 Additional comments (10)
.github/workflows/pr_build_macos.yml (1)
95-103: CometGenerateExecSuite wired into macOS exec matrix correctlyThe new
CometGenerateExecSuiteentry under theexecsuite looks consistent with the existing suite pattern and ensures explode coverage on macOS..github/workflows/pr_build_linux.yml (1)
131-139: Linux exec matrix update for CometGenerateExecSuite looks goodIncluding
org.apache.comet.exec.CometGenerateExecSuitein the Linux exec suite keeps platform coverage aligned and follows the existing workflow pattern.spark/src/test/resources/tpcds-micro-benchmarks/explode.sql (1)
1-4: Explode micro-benchmark query is well-formedThe
explodequery matches the existing TPCDS micro-benchmark style (single table, ordered, limited) and is suitable for exercising array explode performance.docs/source/user-guide/latest/configs.md (1)
159-167: Explode exec config entry matches CometConfThe
spark.comet.exec.explode.enabledrow (description and defaulttrue) is consistent withCOMET_EXEC_EXPLODE_ENABLEDinCometConfand with other per-operator toggles.spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala (1)
55-81: New "explode" micro-benchmark is correctly wiredAdding
"explode"to thequeriessequence matches the newexplode.sqlresource path and integrates cleanly with the existingrunQueriesinfrastructure.common/src/main/scala/org/apache/comet/CometConf.scala (1)
269-276: Explode exec config in CometConf is consistent and well-scoped
COMET_EXEC_EXPLODE_ENABLEDleveragescreateExecEnabledConfig("explode", defaultValue = true)in line with other per-operator flags and matches the documentedspark.comet.exec.explode.enabledkey. No issues found.spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala (1)
58-73: GenerateExec → CometExplodeExec mapping is correctly integratedAdding
classOf[GenerateExec] -> CometExplodeExectonativeExecscleanly wires GenerateExec into the Comet native pipeline, whileCometExplodeExec’s support checks andCOMET_EXEC_EXPLODE_ENABLEDgating ensure only compatibleexplodecases are offloaded.native/proto/src/proto/operator.proto (1)
53-54: Explode proto wiring looks consistent and minimalThe new
Explodeentry inop_structand its message (child, outer, project_list) align with how the Scala serde and Rust planner consume explode metadata; numbering and field types look correct.Also applies to: 257-264
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala (2)
918-949: Explode serde and projection logic align with planner expectationsThe
convertimplementation forCometExplodeExec:
- Validates and serializes the explode child expression against
op.child.output.- Derives
project_listby filtering outgeneratorOutputfromop.output, then serializing the remaining attributes againstop.child.output.- Builds an
OperatorOuterClass.Explodewithchild,outer, andproject_list.This matches the Rust planner’s assumption that
project_listare “carry-forward” non-generator columns and that the array to explode is provided as a separatechildexpression, so the native side can project non-generator columns first and append the explode array as the last input column toUnnestExec. The overall serde wiring looks sound.
963-1002: CometExplodeExec execution wrapper is consistent with other Comet unary execsThe
CometExplodeExeccase class correctly:
- Preserves the child’s partitioning (
outputPartitioning = child.outputPartitioning).- Marks
producedAttributesas the generator outputs (AttributeSet(generatorOutput)), matching Spark’s GenerateExec behavior.- Implements
withNewChildInternal,stringArgs,equals, andhashCodeconsistently with other Comet*Exec wrappers.- Adds basic metrics for input/output batches and input rows on top of the baseline metrics.
No issues here from a planning/execution perspective.
| OpStruct::Explode(explode) => { | ||
| assert_eq!(children.len(), 1); | ||
| let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; | ||
|
|
||
| // Create the expression for the array to explode | ||
| let child_expr = if let Some(child_expr) = &explode.child { | ||
| self.create_expr(child_expr, child.schema())? | ||
| } else { | ||
| return Err(ExecutionError::GeneralError( | ||
| "Explode operator requires a child expression".to_string(), | ||
| )); | ||
| }; | ||
|
|
||
| // Create projection expressions for other columns | ||
| let projections: Vec<Arc<dyn PhysicalExpr>> = explode | ||
| .project_list | ||
| .iter() | ||
| .map(|expr| self.create_expr(expr, child.schema())) | ||
| .collect::<Result<Vec<_>, _>>()?; | ||
|
|
||
| // For UnnestExec, we need to add a projection to put the columns in the right order: | ||
| // 1. First add all projection columns | ||
| // 2. Then add the array column to be exploded | ||
| // Then UnnestExec will unnest the last column | ||
|
|
||
| // Use return_field() to get the proper column names from the expressions | ||
| let child_schema = child.schema(); | ||
| let mut project_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = projections | ||
| .iter() | ||
| .map(|expr| { | ||
| let field = expr | ||
| .return_field(&child_schema) | ||
| .expect("Failed to get field from expression"); | ||
| let name = field.name().to_string(); | ||
| (Arc::clone(expr), name) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // Add the array column as the last column | ||
| let array_field = child_expr | ||
| .return_field(&child_schema) | ||
| .expect("Failed to get field from array expression"); | ||
| let array_col_name = array_field.name().to_string(); | ||
| project_exprs.push((Arc::clone(&child_expr), array_col_name.clone())); | ||
|
|
There was a problem hiding this comment.
Avoid expect on return_field to prevent planner panics
The Explode arm’s logic (projection to move non-generator columns first, then Unnest on the last list column) looks correct, but there are two expect calls:
- Line 1563:
expr.return_field(&child_schema).expect("Failed to get field from expression") - Line 1573:
child_expr.return_field(&child_schema).expect("Failed to get field from array expression")
If return_field fails (e.g., unexpected or partially-supported expression type), this will panic the process instead of returning a structured ExecutionError. Given these expressions originate from serialized Spark plans, we should treat such cases as user/data errors, not hard crashes.
Consider mapping the DataFusionError into ExecutionError::GeneralError instead, e.g.:
- let field = expr
- .return_field(&child_schema)
- .expect("Failed to get field from expression");
+ let field = expr
+ .return_field(&child_schema)
+ .map_err(|e| GeneralError(e.to_string()))?;and similarly for the array expression.
This keeps Explode planning consistent with the rest of the planner, which generally avoids panicking on invalid remote input.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
native/core/src/execution/planner.rs around lines 1533 to 1577: replace the two
expect(...) calls on expr.return_field(&child_schema) and
child_expr.return_field(&child_schema) with proper error propagation by mapping
the DataFusion error into an ExecutionError::GeneralError (include a short
contextual message and the original error). Do this for both the projection
expressions loop and the array column lookup so that failures from return_field
return Err(ExecutionError::GeneralError(...)) instead of panicking; keep the
existing variable names (child_schema, array_col_name) and ensure the function
returns the mapped error using the same Result type the planner uses.
| import java.io.ByteArrayOutputStream | ||
| import java.util.Locale | ||
|
|
There was a problem hiding this comment.
Fix support-level classification for explode_outer on MapType
CometExplodeExec.getSupportLevel currently does:
if (!op.generator.deterministic) return Unsupported(...)
if (op.generator.children.length != 1) return Unsupported(...)
if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") return Unsupported(...)
if (op.outer) {
return Incompatible(Some("Empty arrays are not preserved as null outputs when outer=true"))
}
op.generator.children.head.dataType match {
case _: ArrayType => Compatible()
case _: MapType =>
Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps"))
case other => Unsupported(Some(s"Unsupported data type: $other"))
}Because the op.outer check happens before the type match, explode_outer on a map column is always classified as Incompatible, never Unsupported. If a user turns on the per-operator “allow incompat” config for GenerateExec, this can allow a Map-based explode to proceed into native planning, where the Rust planner will reject the non-List type with a runtime ExecutionError instead of falling back cleanly.
To keep map inputs consistently treated as unsupported (regardless of outer) and preserve the current array/outer incompat note, consider restructuring as:
override def getSupportLevel(op: GenerateExec): SupportLevel = {
if (!op.generator.deterministic) {
return Unsupported(Some("Only deterministic generators are supported"))
}
if (op.generator.children.length != 1) {
return Unsupported(Some("generators with multiple inputs are not supported"))
}
if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") {
return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}"))
}
op.generator.children.head.dataType match {
case _: ArrayType =>
if (op.outer) {
Incompatible(
Some("Empty arrays are not preserved as null outputs when outer=true"))
} else {
Compatible()
}
case _: MapType =>
Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps"))
case other =>
Unsupported(Some(s"Unsupported data type: $other"))
}
}This ensures:
- All map-based explode/explode_outer calls are always
Unsupported, unaffected by the “allow incompat” flag. - Array-based
explode_outerremainsIncompatiblewith the documented empty-array semantics difference.
Also applies to: 33-34, 47-48, 886-915
🤖 Prompt for AI Agents
In spark/src/main/scala/org/apache/spark/sql/comet/operators.scala around lines
22-24 (and also apply same change at 33-34, 47-48, and 886-915), the current
getSupportLevel logic checks op.outer before matching on the generator child
data type, which causes explode_outer on MapType to be classified as
Incompatible rather than Unsupported; change the control flow to first validate
generator determinism/arity/name, then match on
op.generator.children.head.dataType: for ArrayType return Compatible or
Incompatible(Some(...)) when outer=true, for MapType always return
Unsupported(Some("Comet only supports explode/explode_outer for arrays, not
maps")), and for other types return Unsupported(Some(s"Unsupported data type:
$other")), so maps are consistently Unsupported regardless of the outer flag.
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
2836: To review by AI