From b50829bc783314fe49c4e18bf6f2d734a86fee64 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 08:42:58 -0700 Subject: [PATCH 01/19] initial implementation --- .../scala/org/apache/comet/CometConf.scala | 2 + .../core/src/execution/operators/explode.rs | 337 ++++++++++++++++++ native/core/src/execution/operators/mod.rs | 2 + native/core/src/execution/planner.rs | 70 +++- native/proto/src/proto/operator.proto | 10 + .../apache/comet/rules/CometExecRule.scala | 1 + .../apache/spark/sql/comet/operators.scala | 105 +++++- 7 files changed, 525 insertions(+), 2 deletions(-) create mode 100644 native/core/src/execution/operators/explode.rs diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 1e5d19ee23..3560fc07cb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -270,6 +270,8 @@ object CometConf extends ShimCometConf { createExecEnabledConfig("union", defaultValue = true) val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("expand", defaultValue = true) + val COMET_EXEC_EXPLODE_ENABLED: ConfigEntry[Boolean] = + createExecEnabledConfig("explode", defaultValue = true) val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("window", defaultValue = true) val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = diff --git a/native/core/src/execution/operators/explode.rs b/native/core/src/execution/operators/explode.rs new file mode 100644 index 0000000000..74913411fe --- /dev/null +++ b/native/core/src/execution/operators/explode.rs @@ -0,0 +1,337 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + make_array, Array, ArrayRef, GenericListArray, MutableArrayData, RecordBatch, + RecordBatchOptions, +}; +use arrow::datatypes::{DataType, SchemaRef}; +use datafusion::common::DataFusionError; +use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::{ + execution::TaskContext, + physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, + }, +}; +use futures::{Stream, StreamExt}; +use std::{ + any::Any, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +/// A Comet native operator that explodes an array column into multiple rows. +/// This behaves the same as Spark's explode/explode_outer functions. +#[derive(Debug)] +pub struct ExplodeExec { + /// The expression that produces the array to explode + child_expr: Arc, + /// Whether this is explode_outer (produces null row for empty/null arrays) + outer: bool, + /// Expressions for other columns to project alongside the exploded values + projections: Vec>, + child: Arc, + schema: SchemaRef, + cache: PlanProperties, +} + +impl ExplodeExec { + /// Create a new ExplodeExec + pub fn new( + child_expr: Arc, + outer: bool, + projections: Vec>, + child: Arc, + schema: SchemaRef, + ) -> Self { + let cache = PlanProperties::new( + EquivalenceProperties::new(Arc::clone(&schema)), + Partitioning::UnknownPartitioning(1), + EmissionType::Final, + Boundedness::Bounded, + ); + + Self { + child_expr, + outer, + projections, + child, + schema, + cache, + } + } +} + +impl DisplayAs for ExplodeExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "CometExplodeExec: child_expr={}, outer={}", + self.child_expr, self.outer + ) + } + DisplayFormatType::TreeRender => unimplemented!(), + } + } +} + +impl ExecutionPlan for ExplodeExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion::common::Result> { + let new_explode = ExplodeExec::new( + Arc::clone(&self.child_expr), + self.outer, + self.projections.clone(), + Arc::clone(&children[0]), + Arc::clone(&self.schema), + ); + Ok(Arc::new(new_explode)) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion::common::Result { + let child_stream = self.child.execute(partition, context)?; + let explode_stream = ExplodeStream::new( + Arc::clone(&self.child_expr), + self.outer, + self.projections.clone(), + child_stream, + Arc::clone(&self.schema), + ); + Ok(Box::pin(explode_stream)) + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn name(&self) -> &str { + "CometExplodeExec" + } +} + +pub struct ExplodeStream { + child_expr: Arc, + outer: bool, + projections: Vec>, + child_stream: SendableRecordBatchStream, + schema: SchemaRef, +} + +impl ExplodeStream { + /// Create a new ExplodeStream + pub fn new( + child_expr: Arc, + outer: bool, + projections: Vec>, + child_stream: SendableRecordBatchStream, + schema: SchemaRef, + ) -> Self { + Self { + child_expr, + outer, + projections, + child_stream, + schema, + } + } + + fn explode(&self, batch: &RecordBatch) -> Result { + // Evaluate the array expression + let array_column = self.child_expr.evaluate(batch)?; + let array_column = array_column.into_array(batch.num_rows())?; + + // Cast to GenericListArray to access array elements + let list_array = array_column + .as_any() + .downcast_ref::>() + .ok_or_else(|| { + DataFusionError::Execution(format!( + "Expected list array for explode, got {:?}", + array_column.data_type() + )) + })?; + + // Calculate output row count and build row index mapping + let mut row_indices = Vec::new(); + let mut array_element_indices = Vec::new(); + + for row_idx in 0..batch.num_rows() { + if list_array.is_null(row_idx) { + // Null array + if self.outer { + // explode_outer produces one null row for null arrays + row_indices.push(row_idx); + array_element_indices.push(None); + } + // else: explode skips null arrays + } else { + let array = list_array.value(row_idx); + let array_len = array.len(); + + if array_len == 0 { + // Empty array + if self.outer { + // explode_outer produces one null row for empty arrays + row_indices.push(row_idx); + array_element_indices.push(None); + } + // else: explode skips empty arrays + } else { + // Non-empty array: produce one row per element + for elem_idx in 0..array_len { + row_indices.push(row_idx); + array_element_indices.push(Some((row_idx, elem_idx))); + } + } + } + } + + let output_row_count = row_indices.len(); + + if output_row_count == 0 { + // No output rows, return empty batch + return Ok(RecordBatch::new_empty(Arc::clone(&self.schema))); + } + + // Build output columns + let mut output_columns: Vec = Vec::new(); + + // First, replicate the projected columns + for proj_expr in &self.projections { + let column = proj_expr.evaluate(batch)?; + let column = column.into_array(batch.num_rows())?; + + // Use MutableArrayData to efficiently replicate rows + let column_data = column.to_data(); + let mut mutable = MutableArrayData::new(vec![&column_data], false, output_row_count); + + for &row_idx in &row_indices { + mutable.extend(0, row_idx, row_idx + 1); + } + + output_columns.push(make_array(mutable.freeze())); + } + + // Now add the exploded array element column + // Get the element type from the list array + let element_type = match list_array.data_type() { + DataType::List(field) => field.data_type().clone(), + DataType::LargeList(field) => field.data_type().clone(), + _ => { + return Err(DataFusionError::Execution(format!( + "Unsupported array type for explode: {:?}", + list_array.data_type() + ))) + } + }; + + // Extract all array values into a flat structure + let mut all_arrays = Vec::new(); + for row_idx in 0..batch.num_rows() { + if !list_array.is_null(row_idx) { + all_arrays.push(list_array.value(row_idx)); + } + } + + // Build the exploded element column + if all_arrays.is_empty() { + // All arrays were null/empty, create a null array + let null_array = arrow::array::new_null_array(&element_type, output_row_count); + output_columns.push(null_array); + } else { + let array_data_refs: Vec<_> = all_arrays.iter().map(|a| a.to_data()).collect(); + let array_data_refs_borrowed: Vec<_> = array_data_refs.iter().collect(); + let mut element_mutable = + MutableArrayData::new(array_data_refs_borrowed, false, output_row_count); + + // Build a mapping from row_idx to which array it came from + let mut row_to_array_idx = vec![None; batch.num_rows()]; + let mut array_idx = 0; + for row_idx in 0..batch.num_rows() { + if !list_array.is_null(row_idx) { + row_to_array_idx[row_idx] = Some(array_idx); + array_idx += 1; + } + } + + for elem_info in &array_element_indices { + if let Some((row_idx, elem_idx)) = elem_info { + if let Some(arr_idx) = row_to_array_idx[*row_idx] { + element_mutable.extend(arr_idx, *elem_idx, *elem_idx + 1); + } else { + // This shouldn't happen, but handle gracefully + element_mutable.extend_nulls(1); + } + } else { + // This is a null element (from empty or null array with outer=true) + element_mutable.extend_nulls(1); + } + } + + output_columns.push(make_array(element_mutable.freeze())); + } + + let options = RecordBatchOptions::new().with_row_count(Some(output_row_count)); + RecordBatch::try_new_with_options(Arc::clone(&self.schema), output_columns, &options) + .map_err(|e| e.into()) + } +} + +impl Stream for ExplodeStream { + type Item = datafusion::common::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = self.child_stream.poll_next_unpin(cx); + match next { + Poll::Ready(Some(Ok(batch))) => { + let result = self.explode(&batch); + Poll::Ready(Some(result)) + } + other => other, + } + } +} + +impl RecordBatchStream for ExplodeStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index b01f7857be..b11c36ac26 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -28,6 +28,8 @@ pub use scan::*; mod copy; mod expand; pub use expand::ExpandExec; +mod explode; +pub use explode::ExplodeExec; mod iceberg_scan; mod parquet_writer; pub use parquet_writer::ParquetWriterExec; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b0746a6f84..e872c94c56 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -22,7 +22,7 @@ use crate::{ errors::ExpressionError, execution::{ expressions::subquery::Subquery, - operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec}, + operators::{ExecutionError, ExpandExec, ExplodeExec, ParquetWriterExec, ScanExec}, serde::to_arrow_datatype, shuffle::ShuffleWriterExec, }, @@ -1528,6 +1528,74 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), )) } + OpStruct::Explode(explode) => { + assert_eq!(children.len(), 1); + let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; + + // Create the expression for the array to explode + let child_expr = if let Some(child_expr) = &explode.child { + self.create_expr(child_expr, child.schema())? + } else { + return Err(ExecutionError::GeneralError( + "Explode operator requires a child expression".to_string(), + )); + }; + + // Create projection expressions for other columns + let projections: Vec> = explode + .project_list + .iter() + .map(|expr| self.create_expr(expr, child.schema())) + .collect::, _>>()?; + + // Build the output schema + // The schema consists of the projected columns followed by the exploded element + let mut fields = Vec::new(); + + // Add fields for projected columns + for (idx, proj_expr) in projections.iter().enumerate() { + let dt = proj_expr.data_type(&child.schema())?; + fields.push(Field::new(format!("col_{idx}"), dt, true)); + } + + // Add field for the exploded element + // Extract element type from the array type + let array_type = child_expr.data_type(&child.schema())?; + let element_type = match array_type { + DataType::List(field) => field.data_type().clone(), + DataType::LargeList(field) => field.data_type().clone(), + _ => { + return Err(ExecutionError::GeneralError(format!( + "Explode requires array type, got {:?}", + array_type + ))) + } + }; + fields.push(Field::new( + format!("col_{}", projections.len()), + element_type, + true, + )); + + let schema = Arc::new(Schema::new(fields)); + + let input = Arc::clone(&child.native_plan); + let explode_exec = Arc::new(ExplodeExec::new( + child_expr, + explode.outer, + projections, + input, + schema, + )); + Ok(( + scans, + Arc::new(SparkPlan::new( + spark_plan.plan_id, + explode_exec, + vec![child], + )), + )) + } OpStruct::SortMergeJoin(join) => { let (join_params, scans) = self.parse_join_parameters( inputs, diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index a958327099..6eb6edc6b3 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -50,6 +50,7 @@ message Operator { NativeScan native_scan = 111; IcebergScan iceberg_scan = 112; ParquetWriter parquet_writer = 113; + Explode explode = 114; } } @@ -253,6 +254,15 @@ message Expand { int32 num_expr_per_project = 3; } +message Explode { + // The array expression to explode into multiple rows + spark.spark_expression.Expr child = 1; + // Whether this is explode_outer (produces null row for empty/null arrays) + bool outer = 2; + // Expressions for other columns to project alongside the exploded values + repeated spark.spark_expression.Expr project_list = 3; +} + message HashJoin { repeated spark.spark_expression.Expr left_join_keys = 1; repeated spark.spark_expression.Expr right_join_keys = 2; diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 124188b64d..2a051b1a77 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -62,6 +62,7 @@ object CometExecRule { classOf[LocalLimitExec] -> CometLocalLimitExec, classOf[GlobalLimitExec] -> CometGlobalLimitExec, classOf[ExpandExec] -> CometExpandExec, + classOf[GenerateExec] -> CometExplodeExec, classOf[HashAggregateExec] -> CometHashAggregateExec, classOf[ObjectHashAggregateExec] -> CometObjectHashAggregateExec, classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoinExec, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index c955f79d91..498bdb3393 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -29,7 +29,7 @@ import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateMode, Final, Partial} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans._ @@ -882,6 +882,109 @@ case class CometExpandExec( override lazy val metrics: Map[String, SQLMetric] = Map.empty } +object CometExplodeExec extends CometOperatorSerde[GenerateExec] { + + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( + CometConf.COMET_EXEC_EXPLODE_ENABLED) + + override def convert( + op: GenerateExec, + builder: Operator.Builder, + childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { + // Check if this is an explode or explode_outer operation + val generator = op.generator + val isExplode = generator.nodeName match { + case "explode" => true + case "explode_outer" => true + case _ => return None // Only support explode/explode_outer, not other generators + } + + val isOuter = generator.nodeName == "explode_outer" + + // The generator should have exactly one child (the array expression) + if (generator.children.length != 1) { + withInfo(op, generator) + return None + } + + val childExpr = generator.children.head + val childExprProto = exprToProto(childExpr, op.child.output) + + if (childExprProto.isEmpty) { + withInfo(op, childExpr) + return None + } + + // Convert the projection expressions (columns to carry forward) + // These are the non-generator output columns + val generatorOutput = op.generatorOutput.toSet + val projectExprs = op.output.filterNot(generatorOutput.contains).map { attr => + exprToProto(attr, op.child.output) + } + + if (projectExprs.exists(_.isEmpty) || childOp.isEmpty) { + withInfo(op, op.output: _*) + return None + } + + val explodeBuilder = OperatorOuterClass.Explode + .newBuilder() + .setChild(childExprProto.get) + .setOuter(isOuter) + .addAllProjectList(projectExprs.map(_.get).asJava) + + Some(builder.setExplode(explodeBuilder).build()) + } + + override def createExec(nativeOp: Operator, op: GenerateExec): CometNativeExec = { + CometExplodeExec( + nativeOp, + op, + op.output, + op.generator, + op.generatorOutput, + op.child, + SerializedPlan(None)) + } +} + +case class CometExplodeExec( + override val nativeOp: Operator, + override val originalPlan: SparkPlan, + override val output: Seq[Attribute], + generator: Generator, + generatorOutput: Seq[Attribute], + child: SparkPlan, + override val serializedPlanOpt: SerializedPlan) + extends CometUnaryExec { + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + this.copy(child = newChild) + + override def stringArgs: Iterator[Any] = Iterator(generator, generatorOutput, output, child) + + override def equals(obj: Any): Boolean = { + obj match { + case other: CometExplodeExec => + this.output == other.output && + this.generator == other.generator && + this.generatorOutput == other.generatorOutput && + this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt + case _ => + false + } + } + + override def hashCode(): Int = Objects.hashCode(output, generator, generatorOutput, child) + + // TODO: support native Explode metrics + override lazy val metrics: Map[String, SQLMetric] = Map.empty +} + object CometUnionExec extends CometSink[UnionExec] { override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( From 21e9bf2c9b2862d03884fdab07d06081cfb06c8e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 09:07:37 -0700 Subject: [PATCH 02/19] add tests and fix bugs --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + docs/source/user-guide/latest/configs.md | 1 + .../apache/spark/sql/comet/operators.scala | 22 +-- .../comet/exec/CometGenerateExecSuite.scala | 145 ++++++++++++++++++ 5 files changed, 161 insertions(+), 9 deletions(-) create mode 100644 spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 43e1f776bf..0fd28cb58e 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -132,6 +132,7 @@ jobs: org.apache.comet.exec.CometAggregateSuite org.apache.comet.exec.CometExec3_4PlusSuite org.apache.comet.exec.CometExecSuite + org.apache.comet.exec.CometGenerateExecSuite org.apache.comet.exec.CometWindowExecSuite org.apache.comet.exec.CometJoinSuite org.apache.comet.CometNativeSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 88dfd9f92a..e915fa74a6 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -97,6 +97,7 @@ jobs: org.apache.comet.exec.CometAggregateSuite org.apache.comet.exec.CometExec3_4PlusSuite org.apache.comet.exec.CometExecSuite + org.apache.comet.exec.CometGenerateExecSuite org.apache.comet.exec.CometWindowExecSuite org.apache.comet.exec.CometJoinSuite org.apache.comet.CometNativeSuite diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index a1c3212c20..5f9cbc507d 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -162,6 +162,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true | | `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true | | `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true | +| `spark.comet.exec.explode.enabled` | Whether to enable explode by default. | true | | `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true | | `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true | | `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true | diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 498bdb3393..830b288aa9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.comet import java.io.ByteArrayOutputStream +import java.util.Locale import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -53,7 +54,7 @@ import com.google.common.base.Objects import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo} import org.apache.comet.parquet.CometParquetUtils -import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} +import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel, Unsupported} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator} import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, supportedSortType} import org.apache.comet.serde.operator.CometSink @@ -887,19 +888,22 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( CometConf.COMET_EXEC_EXPLODE_ENABLED) + override def getSupportLevel(op: GenerateExec): SupportLevel = { + if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { + Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) + } + if (!op.generator.deterministic) { + return Unsupported(Some("Only deterministic generators are supported")) + } + Compatible() + } + override def convert( op: GenerateExec, builder: Operator.Builder, childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { // Check if this is an explode or explode_outer operation val generator = op.generator - val isExplode = generator.nodeName match { - case "explode" => true - case "explode_outer" => true - case _ => return None // Only support explode/explode_outer, not other generators - } - - val isOuter = generator.nodeName == "explode_outer" // The generator should have exactly one child (the array expression) if (generator.children.length != 1) { @@ -930,7 +934,7 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { val explodeBuilder = OperatorOuterClass.Explode .newBuilder() .setChild(childExprProto.get) - .setOuter(isOuter) + .setOuter(op.outer) .addAllProjectList(projectExprs.map(_.get).asJava) Some(builder.setExplode(explodeBuilder).build()) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala new file mode 100644 index 0000000000..6edd678f57 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.functions.col + +import org.apache.comet.CometConf + +class CometGenerateExecSuite extends CometTestBase { + + import testImplicits._ + + test("explode with simple array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5)), (3, Array(6))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with empty array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2)), (2, Array.empty[Int]), (3, Array(3))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with null array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some(Array(1, 2))), (2, None), (3, Some(Array(3)))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode_outer with simple array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5)), (3, Array(6))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + // TODO fix this: fails with MutableArrayData not nullable + ignore("explode_outer with empty array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2)), (2, Array.empty[Int]), (3, Array(3))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode_outer with null array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some(Array(1, 2))), (2, None), (3, Some(Array(3)))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with multiple columns") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, "A", Array(1, 2, 3)), (2, "B", Array(4, 5)), (3, "C", Array(6))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with array of strings") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array("a", "b", "c")), (2, Array("d", "e")), (3, Array("f"))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with filter") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5, 6)), (3, Array(7, 8, 9))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + .filter(col("value") > 5) + checkSparkAnswerAndOperator(df) + } + } + + test("explode fallback when disabled") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "false") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndFallbackReason( + df, + "Native support for operator GenerateExec is disabled") + } + } + +} From 0ea6dce0bee0f56ab72c55e1c5e7d1fd4f8b735d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 09:17:32 -0700 Subject: [PATCH 03/19] clippy --- native/core/src/execution/operators/explode.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/operators/explode.rs b/native/core/src/execution/operators/explode.rs index 74913411fe..9d567eb5c6 100644 --- a/native/core/src/execution/operators/explode.rs +++ b/native/core/src/execution/operators/explode.rs @@ -285,9 +285,13 @@ impl ExplodeStream { // Build a mapping from row_idx to which array it came from let mut row_to_array_idx = vec![None; batch.num_rows()]; let mut array_idx = 0; - for row_idx in 0..batch.num_rows() { + for (row_idx, item) in row_to_array_idx + .iter_mut() + .enumerate() + .take(batch.num_rows()) + { if !list_array.is_null(row_idx) { - row_to_array_idx[row_idx] = Some(array_idx); + *item = Some(array_idx); array_idx += 1; } } From 36cb3bc9e45373655e478b7df573730ad4baf1ea Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 09:35:39 -0700 Subject: [PATCH 04/19] fix failing test --- native/core/src/execution/operators/explode.rs | 3 ++- .../scala/org/apache/comet/exec/CometGenerateExecSuite.scala | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/operators/explode.rs b/native/core/src/execution/operators/explode.rs index 9d567eb5c6..53ef56b5f6 100644 --- a/native/core/src/execution/operators/explode.rs +++ b/native/core/src/execution/operators/explode.rs @@ -279,8 +279,9 @@ impl ExplodeStream { } else { let array_data_refs: Vec<_> = all_arrays.iter().map(|a| a.to_data()).collect(); let array_data_refs_borrowed: Vec<_> = array_data_refs.iter().collect(); + // Use `true` for nullable parameter to support extend_nulls() let mut element_mutable = - MutableArrayData::new(array_data_refs_borrowed, false, output_row_count); + MutableArrayData::new(array_data_refs_borrowed, true, output_row_count); // Build a mapping from row_idx to which array it came from let mut row_to_array_idx = vec![None; batch.num_rows()]; diff --git a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala index 6edd678f57..102f7955c9 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala @@ -72,8 +72,7 @@ class CometGenerateExecSuite extends CometTestBase { } } - // TODO fix this: fails with MutableArrayData not nullable - ignore("explode_outer with empty array") { + test("explode_outer with empty array") { withSQLConf( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { From de1f01223dac8b823a11b333424ff0d6d9e5e3d6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 09:44:25 -0700 Subject: [PATCH 05/19] improve fallback rules --- .../apache/spark/sql/comet/operators.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 830b288aa9..93d1c78d25 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType} +import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.io.ChunkedByteBuffer @@ -889,13 +889,21 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { CometConf.COMET_EXEC_EXPLODE_ENABLED) override def getSupportLevel(op: GenerateExec): SupportLevel = { - if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { - Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) - } if (!op.generator.deterministic) { return Unsupported(Some("Only deterministic generators are supported")) } - Compatible() + if (op.generator.children.length != 1) { + return Unsupported(Some("generators with multiple inputs are not supported")) + } + if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { + return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) + } + op.generator.children.head.dataType match { + case _: ArrayType => + Compatible() + case other => + Unsupported(Some(s"Unsupported data type: $other")) + } } override def convert( @@ -905,12 +913,6 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { // Check if this is an explode or explode_outer operation val generator = op.generator - // The generator should have exactly one child (the array expression) - if (generator.children.length != 1) { - withInfo(op, generator) - return None - } - val childExpr = generator.children.head val childExprProto = exprToProto(childExpr, op.child.output) From 0aa80482dd3dfe3f155c016673431cd6ad1a6f37 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 09:47:12 -0700 Subject: [PATCH 06/19] add fallback test for map input --- .../org/apache/spark/sql/comet/operators.scala | 5 ++++- .../apache/comet/exec/CometGenerateExecSuite.scala | 13 +++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 93d1c78d25..3e29681910 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -895,12 +895,15 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { if (op.generator.children.length != 1) { return Unsupported(Some("generators with multiple inputs are not supported")) } - if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { + if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode" && + op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode_outer") { return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) } op.generator.children.head.dataType match { case _: ArrayType => Compatible() + case _: MapType => + Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps")) case other => Unsupported(Some(s"Unsupported data type: $other")) } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala index 102f7955c9..37e9c8031f 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala @@ -141,4 +141,17 @@ class CometGenerateExecSuite extends CometTestBase { } } + test("explode with map input falls back") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Map("a" -> 1, "b" -> 2)), (2, Map("c" -> 3))) + .toDF("id", "map") + .selectExpr("id", "explode(map) as (key, value)") + checkSparkAnswerAndFallbackReason( + df, + "Comet only supports explode/explode_outer for arrays, not maps") + } + } + } From f5fc1dc35cf672f725d97fbfcb692679f008e2e0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 09:52:19 -0700 Subject: [PATCH 07/19] Revert a change --- .../src/main/scala/org/apache/spark/sql/comet/operators.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 3e29681910..b04b226255 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -895,8 +895,7 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { if (op.generator.children.length != 1) { return Unsupported(Some("generators with multiple inputs are not supported")) } - if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode" && - op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode_outer") { + if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) } op.generator.children.head.dataType match { From 6105dc13ecc227abd04cbb7ee84b459db3b8efc7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 10:09:58 -0700 Subject: [PATCH 08/19] more tests --- .../comet/exec/CometGenerateExecSuite.scala | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala index 37e9c8031f..594f3d6194 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala @@ -154,4 +154,76 @@ class CometGenerateExecSuite extends CometTestBase { } } + test("explode with nullable projected column") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some("A"), Array(1, 2)), (2, None, Array(3, 4)), (3, Some("C"), Array(5))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode_outer with nullable projected column") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = + Seq((1, Some("A"), Array(1, 2)), (2, None, Array.empty[Int]), (3, Some("C"), Array(5))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with mixed null, empty, and non-empty arrays") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (1, Some(Array(1, 2))), + (2, None), + (3, Some(Array.empty[Int])), + (4, Some(Array(3))), + (5, None), + (6, Some(Array(4, 5, 6)))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode_outer with mixed null, empty, and non-empty arrays") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (1, Some(Array(1, 2))), + (2, None), + (3, Some(Array.empty[Int])), + (4, Some(Array(3))), + (5, None), + (6, Some(Array(4, 5, 6)))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with multiple nullable columns") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (Some(1), Some("A"), Some(100), Array(1, 2)), + (None, Some("B"), None, Array(3)), + (Some(3), None, Some(300), Array(4, 5)), + (None, None, None, Array(6))) + .toDF("id", "name", "value", "arr") + .selectExpr("id", "name", "value", "explode(arr) as element") + checkSparkAnswerAndOperator(df) + } + } + } From a3cfffe0536510041fc0d0826f10c7d6406befb9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 10:10:06 -0700 Subject: [PATCH 09/19] fix null support --- native/core/src/execution/operators/explode.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/operators/explode.rs b/native/core/src/execution/operators/explode.rs index 53ef56b5f6..b1ce64a72b 100644 --- a/native/core/src/execution/operators/explode.rs +++ b/native/core/src/execution/operators/explode.rs @@ -240,8 +240,9 @@ impl ExplodeStream { let column = column.into_array(batch.num_rows())?; // Use MutableArrayData to efficiently replicate rows + // Use true for nullable to properly handle null values in projected columns let column_data = column.to_data(); - let mut mutable = MutableArrayData::new(vec![&column_data], false, output_row_count); + let mut mutable = MutableArrayData::new(vec![&column_data], true, output_row_count); for &row_idx in &row_indices { mutable.extend(0, row_idx, row_idx + 1); From 28a17522d0738776407c67e80560710b96272bdc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 10:54:56 -0700 Subject: [PATCH 10/19] switch to UnnestExec --- .../core/src/execution/operators/explode.rs | 343 ------------------ native/core/src/execution/operators/mod.rs | 2 - native/core/src/execution/planner.rs | 99 +++-- 3 files changed, 67 insertions(+), 377 deletions(-) delete mode 100644 native/core/src/execution/operators/explode.rs diff --git a/native/core/src/execution/operators/explode.rs b/native/core/src/execution/operators/explode.rs deleted file mode 100644 index b1ce64a72b..0000000000 --- a/native/core/src/execution/operators/explode.rs +++ /dev/null @@ -1,343 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::{ - make_array, Array, ArrayRef, GenericListArray, MutableArrayData, RecordBatch, - RecordBatchOptions, -}; -use arrow::datatypes::{DataType, SchemaRef}; -use datafusion::common::DataFusionError; -use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr}; -use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; -use datafusion::{ - execution::TaskContext, - physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, - }, -}; -use futures::{Stream, StreamExt}; -use std::{ - any::Any, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -/// A Comet native operator that explodes an array column into multiple rows. -/// This behaves the same as Spark's explode/explode_outer functions. -#[derive(Debug)] -pub struct ExplodeExec { - /// The expression that produces the array to explode - child_expr: Arc, - /// Whether this is explode_outer (produces null row for empty/null arrays) - outer: bool, - /// Expressions for other columns to project alongside the exploded values - projections: Vec>, - child: Arc, - schema: SchemaRef, - cache: PlanProperties, -} - -impl ExplodeExec { - /// Create a new ExplodeExec - pub fn new( - child_expr: Arc, - outer: bool, - projections: Vec>, - child: Arc, - schema: SchemaRef, - ) -> Self { - let cache = PlanProperties::new( - EquivalenceProperties::new(Arc::clone(&schema)), - Partitioning::UnknownPartitioning(1), - EmissionType::Final, - Boundedness::Bounded, - ); - - Self { - child_expr, - outer, - projections, - child, - schema, - cache, - } - } -} - -impl DisplayAs for ExplodeExec { - fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "CometExplodeExec: child_expr={}, outer={}", - self.child_expr, self.outer - ) - } - DisplayFormatType::TreeRender => unimplemented!(), - } - } -} - -impl ExecutionPlan for ExplodeExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.child] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> datafusion::common::Result> { - let new_explode = ExplodeExec::new( - Arc::clone(&self.child_expr), - self.outer, - self.projections.clone(), - Arc::clone(&children[0]), - Arc::clone(&self.schema), - ); - Ok(Arc::new(new_explode)) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> datafusion::common::Result { - let child_stream = self.child.execute(partition, context)?; - let explode_stream = ExplodeStream::new( - Arc::clone(&self.child_expr), - self.outer, - self.projections.clone(), - child_stream, - Arc::clone(&self.schema), - ); - Ok(Box::pin(explode_stream)) - } - - fn properties(&self) -> &PlanProperties { - &self.cache - } - - fn name(&self) -> &str { - "CometExplodeExec" - } -} - -pub struct ExplodeStream { - child_expr: Arc, - outer: bool, - projections: Vec>, - child_stream: SendableRecordBatchStream, - schema: SchemaRef, -} - -impl ExplodeStream { - /// Create a new ExplodeStream - pub fn new( - child_expr: Arc, - outer: bool, - projections: Vec>, - child_stream: SendableRecordBatchStream, - schema: SchemaRef, - ) -> Self { - Self { - child_expr, - outer, - projections, - child_stream, - schema, - } - } - - fn explode(&self, batch: &RecordBatch) -> Result { - // Evaluate the array expression - let array_column = self.child_expr.evaluate(batch)?; - let array_column = array_column.into_array(batch.num_rows())?; - - // Cast to GenericListArray to access array elements - let list_array = array_column - .as_any() - .downcast_ref::>() - .ok_or_else(|| { - DataFusionError::Execution(format!( - "Expected list array for explode, got {:?}", - array_column.data_type() - )) - })?; - - // Calculate output row count and build row index mapping - let mut row_indices = Vec::new(); - let mut array_element_indices = Vec::new(); - - for row_idx in 0..batch.num_rows() { - if list_array.is_null(row_idx) { - // Null array - if self.outer { - // explode_outer produces one null row for null arrays - row_indices.push(row_idx); - array_element_indices.push(None); - } - // else: explode skips null arrays - } else { - let array = list_array.value(row_idx); - let array_len = array.len(); - - if array_len == 0 { - // Empty array - if self.outer { - // explode_outer produces one null row for empty arrays - row_indices.push(row_idx); - array_element_indices.push(None); - } - // else: explode skips empty arrays - } else { - // Non-empty array: produce one row per element - for elem_idx in 0..array_len { - row_indices.push(row_idx); - array_element_indices.push(Some((row_idx, elem_idx))); - } - } - } - } - - let output_row_count = row_indices.len(); - - if output_row_count == 0 { - // No output rows, return empty batch - return Ok(RecordBatch::new_empty(Arc::clone(&self.schema))); - } - - // Build output columns - let mut output_columns: Vec = Vec::new(); - - // First, replicate the projected columns - for proj_expr in &self.projections { - let column = proj_expr.evaluate(batch)?; - let column = column.into_array(batch.num_rows())?; - - // Use MutableArrayData to efficiently replicate rows - // Use true for nullable to properly handle null values in projected columns - let column_data = column.to_data(); - let mut mutable = MutableArrayData::new(vec![&column_data], true, output_row_count); - - for &row_idx in &row_indices { - mutable.extend(0, row_idx, row_idx + 1); - } - - output_columns.push(make_array(mutable.freeze())); - } - - // Now add the exploded array element column - // Get the element type from the list array - let element_type = match list_array.data_type() { - DataType::List(field) => field.data_type().clone(), - DataType::LargeList(field) => field.data_type().clone(), - _ => { - return Err(DataFusionError::Execution(format!( - "Unsupported array type for explode: {:?}", - list_array.data_type() - ))) - } - }; - - // Extract all array values into a flat structure - let mut all_arrays = Vec::new(); - for row_idx in 0..batch.num_rows() { - if !list_array.is_null(row_idx) { - all_arrays.push(list_array.value(row_idx)); - } - } - - // Build the exploded element column - if all_arrays.is_empty() { - // All arrays were null/empty, create a null array - let null_array = arrow::array::new_null_array(&element_type, output_row_count); - output_columns.push(null_array); - } else { - let array_data_refs: Vec<_> = all_arrays.iter().map(|a| a.to_data()).collect(); - let array_data_refs_borrowed: Vec<_> = array_data_refs.iter().collect(); - // Use `true` for nullable parameter to support extend_nulls() - let mut element_mutable = - MutableArrayData::new(array_data_refs_borrowed, true, output_row_count); - - // Build a mapping from row_idx to which array it came from - let mut row_to_array_idx = vec![None; batch.num_rows()]; - let mut array_idx = 0; - for (row_idx, item) in row_to_array_idx - .iter_mut() - .enumerate() - .take(batch.num_rows()) - { - if !list_array.is_null(row_idx) { - *item = Some(array_idx); - array_idx += 1; - } - } - - for elem_info in &array_element_indices { - if let Some((row_idx, elem_idx)) = elem_info { - if let Some(arr_idx) = row_to_array_idx[*row_idx] { - element_mutable.extend(arr_idx, *elem_idx, *elem_idx + 1); - } else { - // This shouldn't happen, but handle gracefully - element_mutable.extend_nulls(1); - } - } else { - // This is a null element (from empty or null array with outer=true) - element_mutable.extend_nulls(1); - } - } - - output_columns.push(make_array(element_mutable.freeze())); - } - - let options = RecordBatchOptions::new().with_row_count(Some(output_row_count)); - RecordBatch::try_new_with_options(Arc::clone(&self.schema), output_columns, &options) - .map_err(|e| e.into()) - } -} - -impl Stream for ExplodeStream { - type Item = datafusion::common::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let next = self.child_stream.poll_next_unpin(cx); - match next { - Poll::Ready(Some(Ok(batch))) => { - let result = self.explode(&batch); - Poll::Ready(Some(result)) - } - other => other, - } - } -} - -impl RecordBatchStream for ExplodeStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index b11c36ac26..b01f7857be 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -28,8 +28,6 @@ pub use scan::*; mod copy; mod expand; pub use expand::ExpandExec; -mod explode; -pub use explode::ExplodeExec; mod iceberg_scan; mod parquet_writer; pub use parquet_writer::ParquetWriterExec; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index e872c94c56..bfed3ec788 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -22,7 +22,7 @@ use crate::{ errors::ExpressionError, execution::{ expressions::subquery::Subquery, - operators::{ExecutionError, ExpandExec, ExplodeExec, ParquetWriterExec, ScanExec}, + operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec}, serde::to_arrow_datatype, shuffle::ShuffleWriterExec, }, @@ -96,9 +96,11 @@ use arrow::array::{ use arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer}; use arrow::row::{OwnedRow, RowConverter, SortField}; use datafusion::common::utils::SingleRowListArrayBuilder; +use datafusion::common::UnnestOptions; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::GlobalLimitExec; +use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion_comet_proto::spark_expression::ListLiteral; use datafusion_comet_proto::spark_operator::SparkFilePartition; use datafusion_comet_proto::{ @@ -1548,52 +1550,85 @@ impl PhysicalPlanner { .map(|expr| self.create_expr(expr, child.schema())) .collect::, _>>()?; - // Build the output schema - // The schema consists of the projected columns followed by the exploded element - let mut fields = Vec::new(); + // For UnnestExec, we need to add a projection to put the columns in the right order: + // 1. First add all projection columns + // 2. Then add the array column to be exploded + // Then UnnestExec will unnest the last column - // Add fields for projected columns - for (idx, proj_expr) in projections.iter().enumerate() { - let dt = proj_expr.data_type(&child.schema())?; - fields.push(Field::new(format!("col_{idx}"), dt, true)); + let mut project_exprs: Vec<(Arc, String)> = projections + .iter() + .enumerate() + .map(|(idx, expr)| (Arc::clone(expr), format!("col_{idx}"))) + .collect(); + + // Add the array column as the last column + let array_col_name = format!("col_{}", projections.len()); + project_exprs.push((child_expr.clone(), array_col_name.clone())); + + // Create a projection to arrange columns as needed + let project_exec = Arc::new(ProjectionExec::try_new( + project_exprs, + Arc::clone(&child.native_plan), + )?); + + // Get the input schema from the projection + let project_schema = project_exec.schema(); + + // Build the output schema for UnnestExec + // The output schema replaces the list column with its element type + let mut output_fields: Vec = Vec::new(); + + // Add all projection columns (non-array columns) + for i in 0..projections.len() { + output_fields.push(project_schema.field(i).clone()); } - // Add field for the exploded element - // Extract element type from the array type - let array_type = child_expr.data_type(&child.schema())?; - let element_type = match array_type { + // Add the unnested array element field + // Extract the element type from the list/array type + let array_field = project_schema.field(projections.len()); + let element_type = match array_field.data_type() { DataType::List(field) => field.data_type().clone(), - DataType::LargeList(field) => field.data_type().clone(), - _ => { + dt => { return Err(ExecutionError::GeneralError(format!( - "Explode requires array type, got {:?}", - array_type + "Expected List type for explode, got {:?}", + dt ))) } }; - fields.push(Field::new( - format!("col_{}", projections.len()), + + // The output column has the same name as the input array column + // but with the element type instead of the list type + output_fields.push(Field::new( + array_field.name(), element_type, - true, + true, // Element is nullable after unnesting )); - let schema = Arc::new(Schema::new(fields)); + let output_schema = Arc::new(Schema::new(output_fields)); - let input = Arc::clone(&child.native_plan); - let explode_exec = Arc::new(ExplodeExec::new( - child_expr, - explode.outer, - projections, - input, - schema, + // Use UnnestExec to explode the last column (the array column) + // ListUnnest specifies which column to unnest and the depth (1 for single level) + let list_unnest = ListUnnest { + index_in_input_schema: projections.len(), // Index of the array column to unnest + depth: 1, // Unnest one level (explode single array) + }; + + let unnest_options = UnnestOptions { + preserve_nulls: explode.outer, + recursions: vec![], + }; + + let unnest_exec = Arc::new(UnnestExec::new( + project_exec, + vec![list_unnest], + vec![], // No struct columns to unnest + output_schema, + unnest_options, )); + Ok(( scans, - Arc::new(SparkPlan::new( - spark_plan.plan_id, - explode_exec, - vec![child], - )), + Arc::new(SparkPlan::new(spark_plan.plan_id, unnest_exec, vec![child])), )) } OpStruct::SortMergeJoin(join) => { From c1a564c6611bf9b337fe295c91b290666541a1ea Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 11:56:19 -0700 Subject: [PATCH 11/19] fall back for explode_outer --- .../scala/org/apache/spark/sql/comet/operators.scala | 5 ++++- .../org/apache/comet/exec/CometGenerateExecSuite.scala | 9 ++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index b04b226255..52727bf58d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -54,7 +54,7 @@ import com.google.common.base.Objects import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo} import org.apache.comet.parquet.CometParquetUtils -import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel, Unsupported} +import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, SupportLevel, Unsupported} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator} import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, supportedSortType} import org.apache.comet.serde.operator.CometSink @@ -898,6 +898,9 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) } + if (op.outer) { + return Incompatible(Some("Empty arrays are not preserved as null outputs when outer=true")) + } op.generator.children.head.dataType match { case _: ArrayType => Compatible() diff --git a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala index 594f3d6194..29ebadc6ed 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala @@ -20,6 +20,7 @@ package org.apache.comet.exec import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.GenerateExec import org.apache.spark.sql.functions.col import org.apache.comet.CometConf @@ -64,6 +65,7 @@ class CometGenerateExecSuite extends CometTestBase { test("explode_outer with simple array") { withSQLConf( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[GenerateExec]) -> "true", CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5)), (3, Array(6))) .toDF("id", "arr") @@ -72,7 +74,7 @@ class CometGenerateExecSuite extends CometTestBase { } } - test("explode_outer with empty array") { + ignore("explode_outer with empty array") { withSQLConf( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { @@ -86,6 +88,7 @@ class CometGenerateExecSuite extends CometTestBase { test("explode_outer with null array") { withSQLConf( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[GenerateExec]) -> "true", CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { val df = Seq((1, Some(Array(1, 2))), (2, None), (3, Some(Array(3)))) .toDF("id", "arr") @@ -165,7 +168,7 @@ class CometGenerateExecSuite extends CometTestBase { } } - test("explode_outer with nullable projected column") { + ignore("explode_outer with nullable projected column") { withSQLConf( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { @@ -194,7 +197,7 @@ class CometGenerateExecSuite extends CometTestBase { } } - test("explode_outer with mixed null, empty, and non-empty arrays") { + ignore("explode_outer with mixed null, empty, and non-empty arrays") { withSQLConf( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { From 234c231cf26623ab1da0370fecf7728338f072f2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 12:51:47 -0700 Subject: [PATCH 12/19] clippy --- native/core/src/execution/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index bfed3ec788..1c1611e326 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1563,7 +1563,7 @@ impl PhysicalPlanner { // Add the array column as the last column let array_col_name = format!("col_{}", projections.len()); - project_exprs.push((child_expr.clone(), array_col_name.clone())); + project_exprs.push((Arc::clone(&child_expr), array_col_name.clone())); // Create a projection to arrange columns as needed let project_exec = Arc::new(ProjectionExec::try_new( From a595a18e771d08547b9ba0d87ae60aa164a58acd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 12:53:20 -0700 Subject: [PATCH 13/19] metrics --- .../main/scala/org/apache/spark/sql/comet/operators.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 52727bf58d..51ddb83bc6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -992,8 +992,12 @@ case class CometExplodeExec( override def hashCode(): Int = Objects.hashCode(output, generator, generatorOutput, child) - // TODO: support native Explode metrics - override lazy val metrics: Map[String, SQLMetric] = Map.empty + override lazy val metrics: Map[String, SQLMetric] = + CometMetricNode.baselineMetrics(sparkContext) ++ + Map( + "input_batches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), + "input_rows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "output_batches" -> SQLMetrics.createMetric(sparkContext, "number of output batches")) } object CometUnionExec extends CometSink[UnionExec] { From d0b90d7484e6af2e97bc52c055f6c7b678f42c32 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 14:12:19 -0700 Subject: [PATCH 14/19] add benchmark --- docs/source/user-guide/latest/operators.md | 1 + .../src/main/scala/org/apache/spark/sql/comet/operators.scala | 4 ++++ spark/src/test/resources/tpcds-micro-benchmarks/explode.sql | 4 ++++ .../apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala | 1 + 4 files changed, 10 insertions(+) create mode 100644 spark/src/test/resources/tpcds-micro-benchmarks/explode.sql diff --git a/docs/source/user-guide/latest/operators.md b/docs/source/user-guide/latest/operators.md index f5f2d9724d..77ba84e4f7 100644 --- a/docs/source/user-guide/latest/operators.md +++ b/docs/source/user-guide/latest/operators.md @@ -30,6 +30,7 @@ not supported by Comet will fall back to regular Spark execution. | ExpandExec | Yes | | | FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. | | FilterExec | Yes | | +| GenerateExec | Yes | Supports `explode` generator only. | | GlobalLimitExec | Yes | | | HashAggregateExec | Yes | | | InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. | diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 51ddb83bc6..c86997206a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -899,12 +899,16 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) } if (op.outer) { + // DataFusion UnnestExec has different semantics to Spark for this case + // https://github.com/apache/datafusion/issues/19053 return Incompatible(Some("Empty arrays are not preserved as null outputs when outer=true")) } op.generator.children.head.dataType match { case _: ArrayType => Compatible() case _: MapType => + // TODO add support for map types + // https://github.com/apache/datafusion-comet/issues/2837 Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps")) case other => Unsupported(Some(s"Unsupported data type: $other")) diff --git a/spark/src/test/resources/tpcds-micro-benchmarks/explode.sql b/spark/src/test/resources/tpcds-micro-benchmarks/explode.sql new file mode 100644 index 0000000000..79eacfda09 --- /dev/null +++ b/spark/src/test/resources/tpcds-micro-benchmarks/explode.sql @@ -0,0 +1,4 @@ +SELECT i_item_sk, explode(array(i_brand_id, i_class_id, i_category_id, i_manufact_id, i_manager_id)) +FROM item +ORDER BY i_item_sk +LIMIT 1000 \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index a672d0937d..d9c49bc596 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -76,6 +76,7 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { "join_left_outer", "join_semi", "rlike", + "explode", "to_json") override def runQueries( From 3b00224097b070e5c194537dfa57f87e77edfcd8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 14:17:08 -0700 Subject: [PATCH 15/19] link to issue --- .../scala/org/apache/comet/exec/CometGenerateExecSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala index 29ebadc6ed..a9ac3deb34 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala @@ -74,6 +74,7 @@ class CometGenerateExecSuite extends CometTestBase { } } + // https://github.com/apache/datafusion-comet/issues/2838 ignore("explode_outer with empty array") { withSQLConf( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", @@ -168,6 +169,7 @@ class CometGenerateExecSuite extends CometTestBase { } } + // https://github.com/apache/datafusion-comet/issues/2838 ignore("explode_outer with nullable projected column") { withSQLConf( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", @@ -197,6 +199,7 @@ class CometGenerateExecSuite extends CometTestBase { } } + // https://github.com/apache/datafusion-comet/issues/2838 ignore("explode_outer with mixed null, empty, and non-empty arrays") { withSQLConf( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", From 876e7a15b104548fac6da9e835df74d5759587b3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 14:34:48 -0700 Subject: [PATCH 16/19] remove outdated comment --- .../apache/spark/sql/comet/operators.scala | 5 +--- .../apache/spark/sql/CometTPCQueryBase.scala | 1 + .../benchmark/CometTPCDSMicroBenchmark.scala | 27 +------------------ 3 files changed, 3 insertions(+), 30 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index c86997206a..33e44ea813 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -919,10 +919,7 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { op: GenerateExec, builder: Operator.Builder, childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { - // Check if this is an explode or explode_outer operation - val generator = op.generator - - val childExpr = generator.children.head + val childExpr = op.generator.children.head val childExprProto = exprToProto(childExpr, op.child.output) if (childExprProto.isEmpty) { diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala index 99f5f3f427..b7771f715b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala @@ -40,6 +40,7 @@ trait CometTPCQueryBase extends Logging { .setMaster(System.getProperty("spark.sql.test.master", "local[*]")) .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .set("spark.sql.parquet.compression.codec", "snappy") + .set("spark.eventLog.enabled", "true") .set( "spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4")) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index d9c49bc596..ceae981f65 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -52,32 +52,7 @@ import org.apache.comet.CometConf */ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { - val queries: Seq[String] = Seq( - "scan_decimal", - "add_decimals", - "add_many_decimals", - "add_many_integers", - "agg_high_cardinality", - "agg_low_cardinality", - "agg_sum_decimals_no_grouping", - "agg_sum_integers_no_grouping", - "agg_sum_integers_with_grouping", - "agg_stddev", - "case_when_column_or_null", - "case_when_scalar", - "char_type", - "filter_highly_selective", - "filter_less_selective", - "if_column_or_null", - "join_anti", - "join_condition", - "join_exploding_output", - "join_inner", - "join_left_outer", - "join_semi", - "rlike", - "explode", - "to_json") + val queries: Seq[String] = Seq("explode") override def runQueries( queryLocation: String, From f9dc3e5b3070d0eeb1ee22b2967e1ac5cb483571 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 14:36:17 -0700 Subject: [PATCH 17/19] Revert --- .../benchmark/CometTPCDSMicroBenchmark.scala | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index ceae981f65..d9c49bc596 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -52,7 +52,32 @@ import org.apache.comet.CometConf */ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { - val queries: Seq[String] = Seq("explode") + val queries: Seq[String] = Seq( + "scan_decimal", + "add_decimals", + "add_many_decimals", + "add_many_integers", + "agg_high_cardinality", + "agg_low_cardinality", + "agg_sum_decimals_no_grouping", + "agg_sum_integers_no_grouping", + "agg_sum_integers_with_grouping", + "agg_stddev", + "case_when_column_or_null", + "case_when_scalar", + "char_type", + "filter_highly_selective", + "filter_less_selective", + "if_column_or_null", + "join_anti", + "join_condition", + "join_exploding_output", + "join_inner", + "join_left_outer", + "join_semi", + "rlike", + "explode", + "to_json") override def runQueries( queryLocation: String, From 96336ab2c084b7698b88771bf893fe3dfd1a44cb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 2 Dec 2025 14:36:43 -0700 Subject: [PATCH 18/19] Revert --- .../src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala index b7771f715b..99f5f3f427 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala @@ -40,7 +40,6 @@ trait CometTPCQueryBase extends Logging { .setMaster(System.getProperty("spark.sql.test.master", "local[*]")) .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .set("spark.sql.parquet.compression.codec", "snappy") - .set("spark.eventLog.enabled", "true") .set( "spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4")) From 311e044f0bd841c48b43321025ee61a50307aa2a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 3 Dec 2025 16:32:54 -0700 Subject: [PATCH 19/19] stop renaming columns --- native/core/src/execution/planner.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 1c1611e326..c246d1fef6 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1555,14 +1555,24 @@ impl PhysicalPlanner { // 2. Then add the array column to be exploded // Then UnnestExec will unnest the last column + // Use return_field() to get the proper column names from the expressions + let child_schema = child.schema(); let mut project_exprs: Vec<(Arc, String)> = projections .iter() - .enumerate() - .map(|(idx, expr)| (Arc::clone(expr), format!("col_{idx}"))) + .map(|expr| { + let field = expr + .return_field(&child_schema) + .expect("Failed to get field from expression"); + let name = field.name().to_string(); + (Arc::clone(expr), name) + }) .collect(); // Add the array column as the last column - let array_col_name = format!("col_{}", projections.len()); + let array_field = child_expr + .return_field(&child_schema) + .expect("Failed to get field from array expression"); + let array_col_name = array_field.name().to_string(); project_exprs.push((Arc::clone(&child_expr), array_col_name.clone())); // Create a projection to arrange columns as needed