From f5a8f89f1b1d0d179fc9f97322a0e72fdf40a0c6 Mon Sep 17 00:00:00 2001 From: wecharyu Date: Mon, 2 Mar 2026 19:02:51 +0800 Subject: [PATCH 1/3] [GLUTEN-11678][VL] Native validation should check CrossRelNode's expression --- .../gluten/execution/FallbackSuite.scala | 37 ++++++++++++++++++- .../SubstraitToVeloxPlanValidator.cc | 3 ++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index de2fc07b3fca..910c806a2aeb 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarShuffleExchangeExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, SortMergeJoinExec} import org.apache.spark.utils.GlutenSuiteUtils import scala.collection.mutable.ArrayBuffer @@ -359,4 +359,39 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl } } } + + test("fallback when nested loop join has unsupported expression") { + val events = new ArrayBuffer[GlutenPlanFallbackEvent] + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: GlutenPlanFallbackEvent => events.append(e) + case _ => + } + } + } + spark.sparkContext.addSparkListener(listener) + + try { + val df = spark.sql(""" + |select tmp1.c1, tmp1.c2 from tmp1 + |left join tmp2 on ( + | tmp1.c1 = regexp_extract(tmp2.c1, '(?<=@)[^.]+(?=\.)', 0) + | or tmp2.c1 > 10 + |) + |""".stripMargin) + df.collect() + GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext) + + val nestedLoopJoin = find(df.queryExecution.executedPlan) { + _.isInstanceOf[BroadcastNestedLoopJoinExec] + } + assert(nestedLoopJoin.isDefined) + val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values) + assert(fallbackReasons.nonEmpty) + assert(fallbackReasons.forall(_.contains("regexp_extract due to Pattern (?<="))) + } finally { + spark.sparkContext.removeSparkListener(listener) + } + } } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index 38d81320f9d9..85db1bc48c6f 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -1124,6 +1124,9 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::CrossRel& crossR auto rowType = std::make_shared(std::move(names), std::move(types)); if (crossRel.has_expression()) { + if (!validateExpression(crossRel.expression(), rowType)) { + return false; + } auto expression = exprConverter_->toVeloxExpr(crossRel.expression(), rowType); exec::ExprSet exprSet({std::move(expression)}, execCtx_.get()); } From 8ecc2408db7d6037353cf5347fd4cf78be0cbdcf Mon Sep 17 00:00:00 2001 From: wecharyu Date: Mon, 2 Mar 2026 23:45:15 +0800 Subject: [PATCH 2/3] fix test --- .../test/scala/org/apache/gluten/execution/FallbackSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index 910c806a2aeb..fed1d540986e 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -389,7 +389,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl assert(nestedLoopJoin.isDefined) val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values) assert(fallbackReasons.nonEmpty) - assert(fallbackReasons.forall(_.contains("regexp_extract due to Pattern (?<="))) + assert(fallbackReasons.forall(_.contains("regexp_extract due to Pattern"))) } finally { spark.sparkContext.removeSparkListener(listener) } From 63d06a269ec6115a2d7db75ac72e5b88c642460b Mon Sep 17 00:00:00 2001 From: wecharyu Date: Tue, 3 Mar 2026 10:40:45 +0800 Subject: [PATCH 3/3] fix test --- .../gluten/execution/FallbackSuite.scala | 56 ++++++++++--------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index fed1d540986e..ecd6101f65ca 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -361,37 +361,39 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl } test("fallback when nested loop join has unsupported expression") { - val events = new ArrayBuffer[GlutenPlanFallbackEvent] - val listener = new SparkListener { - override def onOtherEvent(event: SparkListenerEvent): Unit = { - event match { - case e: GlutenPlanFallbackEvent => events.append(e) - case _ => + withSQLConf(GlutenConfig.RAS_ENABLED.key -> "false") { + val events = new ArrayBuffer[GlutenPlanFallbackEvent] + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: GlutenPlanFallbackEvent => events.append(e) + case _ => + } } } - } - spark.sparkContext.addSparkListener(listener) + spark.sparkContext.addSparkListener(listener) + + try { + val df = spark.sql(""" + |select tmp1.c1, tmp1.c2 from tmp1 + |left join tmp2 on ( + | tmp1.c1 = regexp_extract(tmp2.c1, '(?<=@)[^.]+(?=\.)', 0) + | or tmp2.c1 > 10 + |) + |""".stripMargin) + df.collect() + GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext) - try { - val df = spark.sql(""" - |select tmp1.c1, tmp1.c2 from tmp1 - |left join tmp2 on ( - | tmp1.c1 = regexp_extract(tmp2.c1, '(?<=@)[^.]+(?=\.)', 0) - | or tmp2.c1 > 10 - |) - |""".stripMargin) - df.collect() - GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext) - - val nestedLoopJoin = find(df.queryExecution.executedPlan) { - _.isInstanceOf[BroadcastNestedLoopJoinExec] + val nestedLoopJoin = find(df.queryExecution.executedPlan) { + _.isInstanceOf[BroadcastNestedLoopJoinExec] + } + assert(nestedLoopJoin.isDefined) + val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values) + assert(fallbackReasons.nonEmpty) + assert(fallbackReasons.forall(_.contains("regexp_extract due to Pattern"))) + } finally { + spark.sparkContext.removeSparkListener(listener) } - assert(nestedLoopJoin.isDefined) - val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values) - assert(fallbackReasons.nonEmpty) - assert(fallbackReasons.forall(_.contains("regexp_extract due to Pattern"))) - } finally { - spark.sparkContext.removeSparkListener(listener) } } }