Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ jobs:
org.apache.comet.exec.CometAggregateSuite
org.apache.comet.exec.CometExec3_4PlusSuite
org.apache.comet.exec.CometExecSuite
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ jobs:
org.apache.comet.exec.CometAggregateSuite
org.apache.comet.exec.CometExec3_4PlusSuite
org.apache.comet.exec.CometExecSuite
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ object CometConf extends ShimCometConf {
createExecEnabledConfig("union", defaultValue = true)
val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("expand", defaultValue = true)
val COMET_EXEC_EXPLODE_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("explode", defaultValue = true)
val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("window", defaultValue = true)
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true |
| `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true |
| `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true |
| `spark.comet.exec.explode.enabled` | Whether to enable explode by default. | true |
| `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true |
| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true |
| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true |
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ not supported by Comet will fall back to regular Spark execution.
| ExpandExec | Yes | |
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
| FilterExec | Yes | |
| GenerateExec | Yes | Supports `explode` generator only. |
| GlobalLimitExec | Yes | |
| HashAggregateExec | Yes | |
| InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. |
Expand Down
113 changes: 113 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ use arrow::array::{
use arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer};
use arrow::row::{OwnedRow, RowConverter, SortField};
use datafusion::common::utils::SingleRowListArrayBuilder;
use datafusion::common::UnnestOptions;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
use datafusion_comet_proto::spark_expression::ListLiteral;
use datafusion_comet_proto::spark_operator::SparkFilePartition;
use datafusion_comet_proto::{
Expand Down Expand Up @@ -1528,6 +1530,117 @@ impl PhysicalPlanner {
Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])),
))
}
OpStruct::Explode(explode) => {
assert_eq!(children.len(), 1);
let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?;

// Create the expression for the array to explode
let child_expr = if let Some(child_expr) = &explode.child {
self.create_expr(child_expr, child.schema())?
} else {
return Err(ExecutionError::GeneralError(
"Explode operator requires a child expression".to_string(),
));
};

// Create projection expressions for other columns
let projections: Vec<Arc<dyn PhysicalExpr>> = explode
.project_list
.iter()
.map(|expr| self.create_expr(expr, child.schema()))
.collect::<Result<Vec<_>, _>>()?;

// For UnnestExec, we need to add a projection to put the columns in the right order:
// 1. First add all projection columns
// 2. Then add the array column to be exploded
// Then UnnestExec will unnest the last column

// Use return_field() to get the proper column names from the expressions
let child_schema = child.schema();
let mut project_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = projections
.iter()
.map(|expr| {
let field = expr
.return_field(&child_schema)
.expect("Failed to get field from expression");
let name = field.name().to_string();
(Arc::clone(expr), name)
})
.collect();

// Add the array column as the last column
let array_field = child_expr
.return_field(&child_schema)
.expect("Failed to get field from array expression");
let array_col_name = array_field.name().to_string();
project_exprs.push((Arc::clone(&child_expr), array_col_name.clone()));

Comment on lines +1533 to +1577
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid expect on return_field to prevent planner panics

The Explode arm’s logic (projection to move non-generator columns first, then Unnest on the last list column) looks correct, but there are two expect calls:

  • Line 1563: expr.return_field(&child_schema).expect("Failed to get field from expression")
  • Line 1573: child_expr.return_field(&child_schema).expect("Failed to get field from array expression")

If return_field fails (e.g., unexpected or partially-supported expression type), this will panic the process instead of returning a structured ExecutionError. Given these expressions originate from serialized Spark plans, we should treat such cases as user/data errors, not hard crashes.

Consider mapping the DataFusionError into ExecutionError::GeneralError instead, e.g.:

-                let field = expr
-                    .return_field(&child_schema)
-                    .expect("Failed to get field from expression");
+                let field = expr
+                    .return_field(&child_schema)
+                    .map_err(|e| GeneralError(e.to_string()))?;

and similarly for the array expression.

This keeps Explode planning consistent with the rest of the planner, which generally avoids panicking on invalid remote input.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
native/core/src/execution/planner.rs around lines 1533 to 1577: replace the two
expect(...) calls on expr.return_field(&child_schema) and
child_expr.return_field(&child_schema) with proper error propagation by mapping
the DataFusion error into an ExecutionError::GeneralError (include a short
contextual message and the original error). Do this for both the projection
expressions loop and the array column lookup so that failures from return_field
return Err(ExecutionError::GeneralError(...)) instead of panicking; keep the
existing variable names (child_schema, array_col_name) and ensure the function
returns the mapped error using the same Result type the planner uses.

// Create a projection to arrange columns as needed
let project_exec = Arc::new(ProjectionExec::try_new(
project_exprs,
Arc::clone(&child.native_plan),
)?);

// Get the input schema from the projection
let project_schema = project_exec.schema();

// Build the output schema for UnnestExec
// The output schema replaces the list column with its element type
let mut output_fields: Vec<Field> = Vec::new();

// Add all projection columns (non-array columns)
for i in 0..projections.len() {
output_fields.push(project_schema.field(i).clone());
}

// Add the unnested array element field
// Extract the element type from the list/array type
let array_field = project_schema.field(projections.len());
let element_type = match array_field.data_type() {
DataType::List(field) => field.data_type().clone(),
dt => {
return Err(ExecutionError::GeneralError(format!(
"Expected List type for explode, got {:?}",
dt
)))
}
};

// The output column has the same name as the input array column
// but with the element type instead of the list type
output_fields.push(Field::new(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exploded element column is marked nullable unconditionally; to keep the native schema consistent with the input and Spark/DataFusion expectations, consider basing nullability on the list’s element field (only requiring nullable when outer semantics can introduce nulls). This mismatch could lead to downstream inconsistencies that rely on nullability metadata.

🤖 Was this useful? React with 👍 or 👎

array_field.name(),
element_type,
true, // Element is nullable after unnesting
));

let output_schema = Arc::new(Schema::new(output_fields));

// Use UnnestExec to explode the last column (the array column)
// ListUnnest specifies which column to unnest and the depth (1 for single level)
let list_unnest = ListUnnest {
index_in_input_schema: projections.len(), // Index of the array column to unnest
depth: 1, // Unnest one level (explode single array)
};

let unnest_options = UnnestOptions {
preserve_nulls: explode.outer,
recursions: vec![],
};

let unnest_exec = Arc::new(UnnestExec::new(
project_exec,
vec![list_unnest],
vec![], // No struct columns to unnest
output_schema,
unnest_options,
));

Ok((
scans,
Arc::new(SparkPlan::new(spark_plan.plan_id, unnest_exec, vec![child])),
))
}
OpStruct::SortMergeJoin(join) => {
let (join_params, scans) = self.parse_join_parameters(
inputs,
Expand Down
10 changes: 10 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message Operator {
NativeScan native_scan = 111;
IcebergScan iceberg_scan = 112;
ParquetWriter parquet_writer = 113;
Explode explode = 114;
}
}

Expand Down Expand Up @@ -253,6 +254,15 @@ message Expand {
int32 num_expr_per_project = 3;
}

message Explode {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says explode_outer produces a null row for empty arrays, but the current implementation (via DataFusion Unnest) doesn’t preserve empty arrays as null rows. Consider clarifying the doc to avoid implying behavior that isn’t implemented (also noted in the Scala support check).

🤖 Was this useful? React with 👍 or 👎

// The array expression to explode into multiple rows
spark.spark_expression.Expr child = 1;
// Whether this is explode_outer (produces null row for empty/null arrays)
bool outer = 2;
// Expressions for other columns to project alongside the exploded values
repeated spark.spark_expression.Expr project_list = 3;
}

message HashJoin {
repeated spark.spark_expression.Expr left_join_keys = 1;
repeated spark.spark_expression.Expr right_join_keys = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object CometExecRule {
classOf[LocalLimitExec] -> CometLocalLimitExec,
classOf[GlobalLimitExec] -> CometGlobalLimitExec,
classOf[ExpandExec] -> CometExpandExec,
classOf[GenerateExec] -> CometExplodeExec,
classOf[HashAggregateExec] -> CometHashAggregateExec,
classOf[ObjectHashAggregateExec] -> CometObjectHashAggregateExec,
classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoinExec,
Expand Down
125 changes: 122 additions & 3 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.spark.sql.comet

import java.io.ByteArrayOutputStream
import java.util.Locale

Comment on lines 22 to 24
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix support-level classification for explode_outer on MapType

CometExplodeExec.getSupportLevel currently does:

if (!op.generator.deterministic) return Unsupported(...)
if (op.generator.children.length != 1) return Unsupported(...)
if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") return Unsupported(...)

if (op.outer) {
  return Incompatible(Some("Empty arrays are not preserved as null outputs when outer=true"))
}

op.generator.children.head.dataType match {
  case _: ArrayType => Compatible()
  case _: MapType =>
    Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps"))
  case other => Unsupported(Some(s"Unsupported data type: $other"))
}

Because the op.outer check happens before the type match, explode_outer on a map column is always classified as Incompatible, never Unsupported. If a user turns on the per-operator “allow incompat” config for GenerateExec, this can allow a Map-based explode to proceed into native planning, where the Rust planner will reject the non-List type with a runtime ExecutionError instead of falling back cleanly.

To keep map inputs consistently treated as unsupported (regardless of outer) and preserve the current array/outer incompat note, consider restructuring as:

override def getSupportLevel(op: GenerateExec): SupportLevel = {
  if (!op.generator.deterministic) {
    return Unsupported(Some("Only deterministic generators are supported"))
  }
  if (op.generator.children.length != 1) {
    return Unsupported(Some("generators with multiple inputs are not supported"))
  }
  if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") {
    return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}"))
  }

  op.generator.children.head.dataType match {
    case _: ArrayType =>
      if (op.outer) {
        Incompatible(
          Some("Empty arrays are not preserved as null outputs when outer=true"))
      } else {
        Compatible()
      }
    case _: MapType =>
      Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps"))
    case other =>
      Unsupported(Some(s"Unsupported data type: $other"))
  }
}

This ensures:

  • All map-based explode/explode_outer calls are always Unsupported, unaffected by the “allow incompat” flag.
  • Array-based explode_outer remains Incompatible with the documented empty-array semantics difference.

Also applies to: 33-34, 47-48, 886-915

🤖 Prompt for AI Agents
In spark/src/main/scala/org/apache/spark/sql/comet/operators.scala around lines
22-24 (and also apply same change at 33-34, 47-48, and 886-915), the current
getSupportLevel logic checks op.outer before matching on the generator child
data type, which causes explode_outer on MapType to be classified as
Incompatible rather than Unsupported; change the control flow to first validate
generator determinism/arity/name, then match on
op.generator.children.head.dataType: for ArrayType return Compatible or
Incompatible(Some(...)) when outer=true, for MapType always return
Unsupported(Some("Comet only supports explode/explode_outer for arrays, not
maps")), and for other types return Unsupported(Some(s"Unsupported data type:
$other")), so maps are consistently Unsupported regardless of the outer flag.

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand All @@ -29,7 +30,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateMode, Final, Partial}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans._
Expand All @@ -43,7 +44,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType}
import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.io.ChunkedByteBuffer
Expand All @@ -53,7 +54,7 @@ import com.google.common.base.Objects
import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo}
import org.apache.comet.parquet.CometParquetUtils
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, SupportLevel, Unsupported}
import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator}
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, supportedSortType}
import org.apache.comet.serde.operator.CometSink
Expand Down Expand Up @@ -882,6 +883,124 @@ case class CometExpandExec(
override lazy val metrics: Map[String, SQLMetric] = Map.empty
}

object CometExplodeExec extends CometOperatorSerde[GenerateExec] {

override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
CometConf.COMET_EXEC_EXPLODE_ENABLED)

override def getSupportLevel(op: GenerateExec): SupportLevel = {
if (!op.generator.deterministic) {
return Unsupported(Some("Only deterministic generators are supported"))
}
if (op.generator.children.length != 1) {
return Unsupported(Some("generators with multiple inputs are not supported"))
}
if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") {
return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}"))
}
if (op.outer) {
// DataFusion UnnestExec has different semantics to Spark for this case
// https://github.com/apache/datafusion/issues/19053
return Incompatible(Some("Empty arrays are not preserved as null outputs when outer=true"))
}
op.generator.children.head.dataType match {
case _: ArrayType =>
Compatible()
case _: MapType =>
// TODO add support for map types
// https://github.com/apache/datafusion-comet/issues/2837
Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps"))
case other =>
Unsupported(Some(s"Unsupported data type: $other"))
}
}

override def convert(
op: GenerateExec,
builder: Operator.Builder,
childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = {
val childExpr = op.generator.children.head
val childExprProto = exprToProto(childExpr, op.child.output)

if (childExprProto.isEmpty) {
withInfo(op, childExpr)
return None
}

// Convert the projection expressions (columns to carry forward)
// These are the non-generator output columns
val generatorOutput = op.generatorOutput.toSet
val projectExprs = op.output.filterNot(generatorOutput.contains).map { attr =>
exprToProto(attr, op.child.output)
}

if (projectExprs.exists(_.isEmpty) || childOp.isEmpty) {
withInfo(op, op.output: _*)
return None
}

val explodeBuilder = OperatorOuterClass.Explode
.newBuilder()
.setChild(childExprProto.get)
.setOuter(op.outer)
.addAllProjectList(projectExprs.map(_.get).asJava)

Some(builder.setExplode(explodeBuilder).build())
}

override def createExec(nativeOp: Operator, op: GenerateExec): CometNativeExec = {
CometExplodeExec(
nativeOp,
op,
op.output,
op.generator,
op.generatorOutput,
op.child,
SerializedPlan(None))
}
}

case class CometExplodeExec(
override val nativeOp: Operator,
override val originalPlan: SparkPlan,
override val output: Seq[Attribute],
generator: Generator,
generatorOutput: Seq[Attribute],
child: SparkPlan,
override val serializedPlanOpt: SerializedPlan)
extends CometUnaryExec {
override def outputPartitioning: Partitioning = child.outputPartitioning

override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)

override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
this.copy(child = newChild)

override def stringArgs: Iterator[Any] = Iterator(generator, generatorOutput, output, child)

override def equals(obj: Any): Boolean = {
obj match {
case other: CometExplodeExec =>
this.output == other.output &&
this.generator == other.generator &&
this.generatorOutput == other.generatorOutput &&
this.child == other.child &&
this.serializedPlanOpt == other.serializedPlanOpt
case _ =>
false
}
}

override def hashCode(): Int = Objects.hashCode(output, generator, generatorOutput, child)

override lazy val metrics: Map[String, SQLMetric] =
CometMetricNode.baselineMetrics(sparkContext) ++
Map(
"input_batches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"input_rows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"output_batches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"))
}

object CometUnionExec extends CometSink[UnionExec] {

override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
Expand Down
4 changes: 4 additions & 0 deletions spark/src/test/resources/tpcds-micro-benchmarks/explode.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SELECT i_item_sk, explode(array(i_brand_id, i_class_id, i_category_id, i_manufact_id, i_manager_id))
FROM item
ORDER BY i_item_sk
LIMIT 1000
Loading