diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 89e38e5d7b..f490202537 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -191,13 +191,13 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { // For AQE broadcast stage on a Comet broadcast exchange case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => - convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s) + convertToComet(s, CometExchangeSink).getOrElse(s) case s @ BroadcastQueryStageExec( _, ReusedExchangeExec(_, _: CometBroadcastExchangeExec), _) => - convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s) + convertToComet(s, CometExchangeSink).getOrElse(s) // `CometBroadcastExchangeExec`'s broadcast output is not compatible with Spark's broadcast // exchange. It is only used for Comet native execution. We only transform Spark broadcast @@ -205,8 +205,8 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { // broadcast exchange is forced to be enabled by Comet config. case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) => val newChildren = plan.children.map { - case b: BroadcastExchangeExec => - convertToCometIfAllChildrenAreNative(b, CometBroadcastExchangeExec).getOrElse(b) + case b: BroadcastExchangeExec if b.children.forall(_.isInstanceOf[CometNativeExec]) => + convertToComet(b, CometBroadcastExchangeExec).getOrElse(b) case other => other } if (!newChildren.exists(_.isInstanceOf[BroadcastExchangeExec])) { @@ -227,42 +227,49 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { // For AQE shuffle stage on a Comet shuffle exchange case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => - convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s) + convertToComet(s, CometExchangeSink).getOrElse(s) // For AQE shuffle stage on a reused Comet shuffle exchange // Note that we don't need to handle `ReusedExchangeExec` for non-AQE case, because // the query plan won't be re-optimized/planned in non-AQE mode. case s @ ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) => - convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s) + convertToComet(s, CometExchangeSink).getOrElse(s) case s: ShuffleExchangeExec => convertToComet(s, CometShuffleExchangeExec).getOrElse(s) case op => - val handler = allExecs - .get(op.getClass) - .map(_.asInstanceOf[CometOperatorSerde[SparkPlan]]) - handler match { - case Some(handler) => - return convertToCometIfAllChildrenAreNative(op, handler).getOrElse(op) - case _ => + // if all children are native (or if this is a leaf node) then see if there is a + // registered handler for creating a fully native plan + if (op.children.forall(_.isInstanceOf[CometNativeExec])) { + val handler = allExecs + .get(op.getClass) + .map(_.asInstanceOf[CometOperatorSerde[SparkPlan]]) + handler match { + case Some(handler) => + return convertToComet(op, handler).getOrElse(op) + case _ => + } } op match { case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec | - _: BroadcastQueryStageExec | _: AdaptiveSparkPlanExec => + _: BroadcastQueryStageExec | _: AdaptiveSparkPlanExec | _: ExecutedCommandExec | + _: V2CommandExec => // Some execs should never be replaced. We include // these cases specially here so we do not add a misleading 'info' message op - case _: ExecutedCommandExec | _: V2CommandExec => - // Some execs that comet will not accelerate, such as command execs. - op case _ => - if (!hasExplainInfo(op)) { - // An operator that is not supported by Comet + // The operator was not converted to a Comet plan. Possible reasons for this happening: + // 1. Comet does not support this operator. + // 2. The operator could not be supported based on query context and current + // configs. In this case, it should have already been tagged with fallback + // reasons. + // 3. The operator has children that could not be converted, so execution + // has already fallen back to Spark. + if (op.children.forall(_.isInstanceOf[CometNativeExec]) && !hasExplainInfo(op)) { withInfo(op, s"${op.nodeName} is not supported") } else { - // Already has fallback reason, do not override it op } } @@ -449,26 +456,12 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } } - /** - * Convert a Spark plan to a Comet plan using the specified serde handler, but only if all - * children are native. - */ - private def convertToCometIfAllChildrenAreNative( - op: SparkPlan, - handler: CometOperatorSerde[_]): Option[SparkPlan] = { - if (op.children.forall(_.isInstanceOf[CometNativeExec])) { - convertToComet(op, handler) - } else { - None - } - } - /** Convert a Spark plan to a Comet plan using the specified serde handler */ private def convertToComet(op: SparkPlan, handler: CometOperatorSerde[_]): Option[SparkPlan] = { val serde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]] if (isOperatorEnabled(serde, op)) { val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id) - if (op.children.forall(_.isInstanceOf[CometNativeExec])) { + if (op.children.nonEmpty && op.children.forall(_.isInstanceOf[CometNativeExec])) { val childOp = op.children.map(_.asInstanceOf[CometNativeExec].nativeOp) childOp.foreach(builder.addChildren) return serde diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28.native_iceberg_compat/extended.txt index d2c0985d1f..809265b4d5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28.native_iceberg_compat/extended.txt @@ -1,7 +1,7 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : :- CometColumnarToRow : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28/extended.txt index d2c0985d1f..809265b4d5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q28/extended.txt @@ -1,7 +1,7 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : :- CometColumnarToRow : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88.native_iceberg_compat/extended.txt index caaa611f9e..1e1247665c 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88.native_iceberg_compat/extended.txt @@ -1,9 +1,9 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin +: : : :- BroadcastNestedLoopJoin +: : : : :- BroadcastNestedLoopJoin : : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : : : :- CometColumnarToRow : : : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88/extended.txt index caaa611f9e..1e1247665c 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q88/extended.txt @@ -1,9 +1,9 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin +: : : :- BroadcastNestedLoopJoin +: : : : :- BroadcastNestedLoopJoin : : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : : : :- CometColumnarToRow : : : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28.native_iceberg_compat/extended.txt index d2c0985d1f..809265b4d5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28.native_iceberg_compat/extended.txt @@ -1,7 +1,7 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : :- CometColumnarToRow : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28/extended.txt index d2c0985d1f..809265b4d5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q28/extended.txt @@ -1,7 +1,7 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : :- CometColumnarToRow : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88.native_iceberg_compat/extended.txt index caaa611f9e..1e1247665c 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88.native_iceberg_compat/extended.txt @@ -1,9 +1,9 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin +: : : :- BroadcastNestedLoopJoin +: : : : :- BroadcastNestedLoopJoin : : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : : : :- CometColumnarToRow : : : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88/extended.txt index caaa611f9e..1e1247665c 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q88/extended.txt @@ -1,9 +1,9 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin +: : : :- BroadcastNestedLoopJoin +: : : : :- BroadcastNestedLoopJoin : : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : : : :- CometColumnarToRow : : : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28.native_iceberg_compat/extended.txt index d2c0985d1f..809265b4d5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28.native_iceberg_compat/extended.txt @@ -1,7 +1,7 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : :- CometColumnarToRow : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28/extended.txt index d2c0985d1f..809265b4d5 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28/extended.txt @@ -1,7 +1,7 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : :- CometColumnarToRow : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q88.native_iceberg_compat/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q88.native_iceberg_compat/extended.txt index caaa611f9e..1e1247665c 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q88.native_iceberg_compat/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q88.native_iceberg_compat/extended.txt @@ -1,9 +1,9 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin +: : : :- BroadcastNestedLoopJoin +: : : : :- BroadcastNestedLoopJoin : : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : : : :- CometColumnarToRow : : : : : : : +- CometHashAggregate diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q88/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q88/extended.txt index caaa611f9e..1e1247665c 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q88/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q88/extended.txt @@ -1,9 +1,9 @@ - BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] -: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] +BroadcastNestedLoopJoin +:- BroadcastNestedLoopJoin +: :- BroadcastNestedLoopJoin +: : :- BroadcastNestedLoopJoin +: : : :- BroadcastNestedLoopJoin +: : : : :- BroadcastNestedLoopJoin : : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported] : : : : : : :- CometColumnarToRow : : : : : : : +- CometHashAggregate