diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 43e1f776bf..0fd28cb58e 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -132,6 +132,7 @@ jobs: org.apache.comet.exec.CometAggregateSuite org.apache.comet.exec.CometExec3_4PlusSuite org.apache.comet.exec.CometExecSuite + org.apache.comet.exec.CometGenerateExecSuite org.apache.comet.exec.CometWindowExecSuite org.apache.comet.exec.CometJoinSuite org.apache.comet.CometNativeSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 88dfd9f92a..e915fa74a6 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -97,6 +97,7 @@ jobs: org.apache.comet.exec.CometAggregateSuite org.apache.comet.exec.CometExec3_4PlusSuite org.apache.comet.exec.CometExecSuite + org.apache.comet.exec.CometGenerateExecSuite org.apache.comet.exec.CometWindowExecSuite org.apache.comet.exec.CometJoinSuite org.apache.comet.CometNativeSuite diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 1e5d19ee23..3560fc07cb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -270,6 +270,8 @@ object CometConf extends ShimCometConf { createExecEnabledConfig("union", defaultValue = true) val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("expand", defaultValue = true) + val COMET_EXEC_EXPLODE_ENABLED: ConfigEntry[Boolean] = + createExecEnabledConfig("explode", defaultValue = true) val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("window", defaultValue = true) val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index a1c3212c20..5f9cbc507d 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -162,6 +162,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true | | `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true | | `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true | +| `spark.comet.exec.explode.enabled` | Whether to enable explode by default. | true | | `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true | | `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true | | `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true | diff --git a/docs/source/user-guide/latest/operators.md b/docs/source/user-guide/latest/operators.md index f5f2d9724d..77ba84e4f7 100644 --- a/docs/source/user-guide/latest/operators.md +++ b/docs/source/user-guide/latest/operators.md @@ -30,6 +30,7 @@ not supported by Comet will fall back to regular Spark execution. | ExpandExec | Yes | | | FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. | | FilterExec | Yes | | +| GenerateExec | Yes | Supports `explode` generator only. | | GlobalLimitExec | Yes | | | HashAggregateExec | Yes | | | InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. | diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b0746a6f84..c246d1fef6 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -96,9 +96,11 @@ use arrow::array::{ use arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer}; use arrow::row::{OwnedRow, RowConverter, SortField}; use datafusion::common::utils::SingleRowListArrayBuilder; +use datafusion::common::UnnestOptions; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::GlobalLimitExec; +use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion_comet_proto::spark_expression::ListLiteral; use datafusion_comet_proto::spark_operator::SparkFilePartition; use datafusion_comet_proto::{ @@ -1528,6 +1530,117 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), )) } + OpStruct::Explode(explode) => { + assert_eq!(children.len(), 1); + let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; + + // Create the expression for the array to explode + let child_expr = if let Some(child_expr) = &explode.child { + self.create_expr(child_expr, child.schema())? + } else { + return Err(ExecutionError::GeneralError( + "Explode operator requires a child expression".to_string(), + )); + }; + + // Create projection expressions for other columns + let projections: Vec> = explode + .project_list + .iter() + .map(|expr| self.create_expr(expr, child.schema())) + .collect::, _>>()?; + + // For UnnestExec, we need to add a projection to put the columns in the right order: + // 1. First add all projection columns + // 2. Then add the array column to be exploded + // Then UnnestExec will unnest the last column + + // Use return_field() to get the proper column names from the expressions + let child_schema = child.schema(); + let mut project_exprs: Vec<(Arc, String)> = projections + .iter() + .map(|expr| { + let field = expr + .return_field(&child_schema) + .expect("Failed to get field from expression"); + let name = field.name().to_string(); + (Arc::clone(expr), name) + }) + .collect(); + + // Add the array column as the last column + let array_field = child_expr + .return_field(&child_schema) + .expect("Failed to get field from array expression"); + let array_col_name = array_field.name().to_string(); + project_exprs.push((Arc::clone(&child_expr), array_col_name.clone())); + + // Create a projection to arrange columns as needed + let project_exec = Arc::new(ProjectionExec::try_new( + project_exprs, + Arc::clone(&child.native_plan), + )?); + + // Get the input schema from the projection + let project_schema = project_exec.schema(); + + // Build the output schema for UnnestExec + // The output schema replaces the list column with its element type + let mut output_fields: Vec = Vec::new(); + + // Add all projection columns (non-array columns) + for i in 0..projections.len() { + output_fields.push(project_schema.field(i).clone()); + } + + // Add the unnested array element field + // Extract the element type from the list/array type + let array_field = project_schema.field(projections.len()); + let element_type = match array_field.data_type() { + DataType::List(field) => field.data_type().clone(), + dt => { + return Err(ExecutionError::GeneralError(format!( + "Expected List type for explode, got {:?}", + dt + ))) + } + }; + + // The output column has the same name as the input array column + // but with the element type instead of the list type + output_fields.push(Field::new( + array_field.name(), + element_type, + true, // Element is nullable after unnesting + )); + + let output_schema = Arc::new(Schema::new(output_fields)); + + // Use UnnestExec to explode the last column (the array column) + // ListUnnest specifies which column to unnest and the depth (1 for single level) + let list_unnest = ListUnnest { + index_in_input_schema: projections.len(), // Index of the array column to unnest + depth: 1, // Unnest one level (explode single array) + }; + + let unnest_options = UnnestOptions { + preserve_nulls: explode.outer, + recursions: vec![], + }; + + let unnest_exec = Arc::new(UnnestExec::new( + project_exec, + vec![list_unnest], + vec![], // No struct columns to unnest + output_schema, + unnest_options, + )); + + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, unnest_exec, vec![child])), + )) + } OpStruct::SortMergeJoin(join) => { let (join_params, scans) = self.parse_join_parameters( inputs, diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index a958327099..6eb6edc6b3 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -50,6 +50,7 @@ message Operator { NativeScan native_scan = 111; IcebergScan iceberg_scan = 112; ParquetWriter parquet_writer = 113; + Explode explode = 114; } } @@ -253,6 +254,15 @@ message Expand { int32 num_expr_per_project = 3; } +message Explode { + // The array expression to explode into multiple rows + spark.spark_expression.Expr child = 1; + // Whether this is explode_outer (produces null row for empty/null arrays) + bool outer = 2; + // Expressions for other columns to project alongside the exploded values + repeated spark.spark_expression.Expr project_list = 3; +} + message HashJoin { repeated spark.spark_expression.Expr left_join_keys = 1; repeated spark.spark_expression.Expr right_join_keys = 2; diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 124188b64d..2a051b1a77 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -62,6 +62,7 @@ object CometExecRule { classOf[LocalLimitExec] -> CometLocalLimitExec, classOf[GlobalLimitExec] -> CometGlobalLimitExec, classOf[ExpandExec] -> CometExpandExec, + classOf[GenerateExec] -> CometExplodeExec, classOf[HashAggregateExec] -> CometHashAggregateExec, classOf[ObjectHashAggregateExec] -> CometObjectHashAggregateExec, classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoinExec, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index c955f79d91..33e44ea813 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.comet import java.io.ByteArrayOutputStream +import java.util.Locale import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -29,7 +30,7 @@ import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateMode, Final, Partial} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans._ @@ -43,7 +44,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType} +import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.io.ChunkedByteBuffer @@ -53,7 +54,7 @@ import com.google.common.base.Objects import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo} import org.apache.comet.parquet.CometParquetUtils -import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} +import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, SupportLevel, Unsupported} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator} import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, supportedSortType} import org.apache.comet.serde.operator.CometSink @@ -882,6 +883,124 @@ case class CometExpandExec( override lazy val metrics: Map[String, SQLMetric] = Map.empty } +object CometExplodeExec extends CometOperatorSerde[GenerateExec] { + + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( + CometConf.COMET_EXEC_EXPLODE_ENABLED) + + override def getSupportLevel(op: GenerateExec): SupportLevel = { + if (!op.generator.deterministic) { + return Unsupported(Some("Only deterministic generators are supported")) + } + if (op.generator.children.length != 1) { + return Unsupported(Some("generators with multiple inputs are not supported")) + } + if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { + return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) + } + if (op.outer) { + // DataFusion UnnestExec has different semantics to Spark for this case + // https://github.com/apache/datafusion/issues/19053 + return Incompatible(Some("Empty arrays are not preserved as null outputs when outer=true")) + } + op.generator.children.head.dataType match { + case _: ArrayType => + Compatible() + case _: MapType => + // TODO add support for map types + // https://github.com/apache/datafusion-comet/issues/2837 + Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps")) + case other => + Unsupported(Some(s"Unsupported data type: $other")) + } + } + + override def convert( + op: GenerateExec, + builder: Operator.Builder, + childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { + val childExpr = op.generator.children.head + val childExprProto = exprToProto(childExpr, op.child.output) + + if (childExprProto.isEmpty) { + withInfo(op, childExpr) + return None + } + + // Convert the projection expressions (columns to carry forward) + // These are the non-generator output columns + val generatorOutput = op.generatorOutput.toSet + val projectExprs = op.output.filterNot(generatorOutput.contains).map { attr => + exprToProto(attr, op.child.output) + } + + if (projectExprs.exists(_.isEmpty) || childOp.isEmpty) { + withInfo(op, op.output: _*) + return None + } + + val explodeBuilder = OperatorOuterClass.Explode + .newBuilder() + .setChild(childExprProto.get) + .setOuter(op.outer) + .addAllProjectList(projectExprs.map(_.get).asJava) + + Some(builder.setExplode(explodeBuilder).build()) + } + + override def createExec(nativeOp: Operator, op: GenerateExec): CometNativeExec = { + CometExplodeExec( + nativeOp, + op, + op.output, + op.generator, + op.generatorOutput, + op.child, + SerializedPlan(None)) + } +} + +case class CometExplodeExec( + override val nativeOp: Operator, + override val originalPlan: SparkPlan, + override val output: Seq[Attribute], + generator: Generator, + generatorOutput: Seq[Attribute], + child: SparkPlan, + override val serializedPlanOpt: SerializedPlan) + extends CometUnaryExec { + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + this.copy(child = newChild) + + override def stringArgs: Iterator[Any] = Iterator(generator, generatorOutput, output, child) + + override def equals(obj: Any): Boolean = { + obj match { + case other: CometExplodeExec => + this.output == other.output && + this.generator == other.generator && + this.generatorOutput == other.generatorOutput && + this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt + case _ => + false + } + } + + override def hashCode(): Int = Objects.hashCode(output, generator, generatorOutput, child) + + override lazy val metrics: Map[String, SQLMetric] = + CometMetricNode.baselineMetrics(sparkContext) ++ + Map( + "input_batches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), + "input_rows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "output_batches" -> SQLMetrics.createMetric(sparkContext, "number of output batches")) +} + object CometUnionExec extends CometSink[UnionExec] { override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( diff --git a/spark/src/test/resources/tpcds-micro-benchmarks/explode.sql b/spark/src/test/resources/tpcds-micro-benchmarks/explode.sql new file mode 100644 index 0000000000..79eacfda09 --- /dev/null +++ b/spark/src/test/resources/tpcds-micro-benchmarks/explode.sql @@ -0,0 +1,4 @@ +SELECT i_item_sk, explode(array(i_brand_id, i_class_id, i_category_id, i_manufact_id, i_manager_id)) +FROM item +ORDER BY i_item_sk +LIMIT 1000 \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala new file mode 100644 index 0000000000..a9ac3deb34 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.GenerateExec +import org.apache.spark.sql.functions.col + +import org.apache.comet.CometConf + +class CometGenerateExecSuite extends CometTestBase { + + import testImplicits._ + + test("explode with simple array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5)), (3, Array(6))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with empty array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2)), (2, Array.empty[Int]), (3, Array(3))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with null array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some(Array(1, 2))), (2, None), (3, Some(Array(3)))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode_outer with simple array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[GenerateExec]) -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5)), (3, Array(6))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + // https://github.com/apache/datafusion-comet/issues/2838 + ignore("explode_outer with empty array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2)), (2, Array.empty[Int]), (3, Array(3))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode_outer with null array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[GenerateExec]) -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some(Array(1, 2))), (2, None), (3, Some(Array(3)))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with multiple columns") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, "A", Array(1, 2, 3)), (2, "B", Array(4, 5)), (3, "C", Array(6))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with array of strings") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array("a", "b", "c")), (2, Array("d", "e")), (3, Array("f"))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with filter") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5, 6)), (3, Array(7, 8, 9))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + .filter(col("value") > 5) + checkSparkAnswerAndOperator(df) + } + } + + test("explode fallback when disabled") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "false") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndFallbackReason( + df, + "Native support for operator GenerateExec is disabled") + } + } + + test("explode with map input falls back") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Map("a" -> 1, "b" -> 2)), (2, Map("c" -> 3))) + .toDF("id", "map") + .selectExpr("id", "explode(map) as (key, value)") + checkSparkAnswerAndFallbackReason( + df, + "Comet only supports explode/explode_outer for arrays, not maps") + } + } + + test("explode with nullable projected column") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some("A"), Array(1, 2)), (2, None, Array(3, 4)), (3, Some("C"), Array(5))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + // https://github.com/apache/datafusion-comet/issues/2838 + ignore("explode_outer with nullable projected column") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = + Seq((1, Some("A"), Array(1, 2)), (2, None, Array.empty[Int]), (3, Some("C"), Array(5))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with mixed null, empty, and non-empty arrays") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (1, Some(Array(1, 2))), + (2, None), + (3, Some(Array.empty[Int])), + (4, Some(Array(3))), + (5, None), + (6, Some(Array(4, 5, 6)))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + // https://github.com/apache/datafusion-comet/issues/2838 + ignore("explode_outer with mixed null, empty, and non-empty arrays") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (1, Some(Array(1, 2))), + (2, None), + (3, Some(Array.empty[Int])), + (4, Some(Array(3))), + (5, None), + (6, Some(Array(4, 5, 6)))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with multiple nullable columns") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (Some(1), Some("A"), Some(100), Array(1, 2)), + (None, Some("B"), None, Array(3)), + (Some(3), None, Some(300), Array(4, 5)), + (None, None, None, Array(6))) + .toDF("id", "name", "value", "arr") + .selectExpr("id", "name", "value", "explode(arr) as element") + checkSparkAnswerAndOperator(df) + } + } + +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index a672d0937d..d9c49bc596 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -76,6 +76,7 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { "join_left_outer", "join_semi", "rlike", + "explode", "to_json") override def runQueries(