Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 28 additions & 35 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,22 +191,22 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {

// For AQE broadcast stage on a Comet broadcast exchange
case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) =>
convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s)
convertToComet(s, CometExchangeSink).getOrElse(s)

case s @ BroadcastQueryStageExec(
_,
ReusedExchangeExec(_, _: CometBroadcastExchangeExec),
_) =>
convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s)
convertToComet(s, CometExchangeSink).getOrElse(s)

// `CometBroadcastExchangeExec`'s broadcast output is not compatible with Spark's broadcast
// exchange. It is only used for Comet native execution. We only transform Spark broadcast
// exchange to Comet broadcast exchange if its downstream is a Comet native plan or if the
// broadcast exchange is forced to be enabled by Comet config.
case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
val newChildren = plan.children.map {
case b: BroadcastExchangeExec =>
convertToCometIfAllChildrenAreNative(b, CometBroadcastExchangeExec).getOrElse(b)
case b: BroadcastExchangeExec if b.children.forall(_.isInstanceOf[CometNativeExec]) =>
convertToComet(b, CometBroadcastExchangeExec).getOrElse(b)
case other => other
}
if (!newChildren.exists(_.isInstanceOf[BroadcastExchangeExec])) {
Expand All @@ -227,42 +227,49 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {

// For AQE shuffle stage on a Comet shuffle exchange
case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) =>
convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s)
convertToComet(s, CometExchangeSink).getOrElse(s)

// For AQE shuffle stage on a reused Comet shuffle exchange
// Note that we don't need to handle `ReusedExchangeExec` for non-AQE case, because
// the query plan won't be re-optimized/planned in non-AQE mode.
case s @ ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) =>
convertToCometIfAllChildrenAreNative(s, CometExchangeSink).getOrElse(s)
convertToComet(s, CometExchangeSink).getOrElse(s)

case s: ShuffleExchangeExec =>
convertToComet(s, CometShuffleExchangeExec).getOrElse(s)

case op =>
val handler = allExecs
.get(op.getClass)
.map(_.asInstanceOf[CometOperatorSerde[SparkPlan]])
handler match {
case Some(handler) =>
return convertToCometIfAllChildrenAreNative(op, handler).getOrElse(op)
case _ =>
// if all children are native (or if this is a leaf node) then see if there is a
// registered handler for creating a fully native plan
if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
val handler = allExecs
.get(op.getClass)
.map(_.asInstanceOf[CometOperatorSerde[SparkPlan]])
handler match {
case Some(handler) =>
return convertToComet(op, handler).getOrElse(op)
case _ =>
}
}

op match {
case _: CometPlan | _: AQEShuffleReadExec | _: BroadcastExchangeExec |
_: BroadcastQueryStageExec | _: AdaptiveSparkPlanExec =>
_: BroadcastQueryStageExec | _: AdaptiveSparkPlanExec | _: ExecutedCommandExec |
_: V2CommandExec =>
// Some execs should never be replaced. We include
// these cases specially here so we do not add a misleading 'info' message
op
case _: ExecutedCommandExec | _: V2CommandExec =>
// Some execs that comet will not accelerate, such as command execs.
op
case _ =>
if (!hasExplainInfo(op)) {
// An operator that is not supported by Comet
// The operator was not converted to a Comet plan. Possible reasons for this happening:
// 1. Comet does not support this operator.
// 2. The operator could not be supported based on query context and current
// configs. In this case, it should have already been tagged with fallback
// reasons.
// 3. The operator has children that could not be converted, so execution
// has already fallen back to Spark.
if (op.children.forall(_.isInstanceOf[CometNativeExec]) && !hasExplainInfo(op)) {
withInfo(op, s"${op.nodeName} is not supported")
} else {
// Already has fallback reason, do not override it
op
}
}
Expand Down Expand Up @@ -449,26 +456,12 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
}
}

/**
* Convert a Spark plan to a Comet plan using the specified serde handler, but only if all
* children are native.
*/
private def convertToCometIfAllChildrenAreNative(
op: SparkPlan,
handler: CometOperatorSerde[_]): Option[SparkPlan] = {
if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
convertToComet(op, handler)
} else {
None
}
}

/** Convert a Spark plan to a Comet plan using the specified serde handler */
private def convertToComet(op: SparkPlan, handler: CometOperatorSerde[_]): Option[SparkPlan] = {
val serde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
if (isOperatorEnabled(serde, op)) {
val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
if (op.children.forall(_.isInstanceOf[CometNativeExec])) {
if (op.children.nonEmpty && op.children.forall(_.isInstanceOf[CometNativeExec])) {
val childOp = op.children.map(_.asInstanceOf[CometNativeExec].nativeOp)
childOp.foreach(builder.addChildren)
return serde
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- CometColumnarToRow
: : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- CometColumnarToRow
: : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin
: : : : :- BroadcastNestedLoopJoin
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : : : :- CometColumnarToRow
: : : : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin
: : : : :- BroadcastNestedLoopJoin
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : : : :- CometColumnarToRow
: : : : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- CometColumnarToRow
: : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- CometColumnarToRow
: : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin
: : : : :- BroadcastNestedLoopJoin
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : : : :- CometColumnarToRow
: : : : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin
: : : : :- BroadcastNestedLoopJoin
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : : : :- CometColumnarToRow
: : : : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- CometColumnarToRow
: : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- CometColumnarToRow
: : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin
: : : : :- BroadcastNestedLoopJoin
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : : : :- CometColumnarToRow
: : : : : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin
: : : : :- BroadcastNestedLoopJoin
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : : : :- CometColumnarToRow
: : : : : : : +- CometHashAggregate
Expand Down