From d2c7177c986b5c79429352124d85a16b1d9600fd Mon Sep 17 00:00:00 2001 From: zml1206 Date: Tue, 4 Nov 2025 18:10:13 +0800 Subject: [PATCH 1/3] [VL] Remove support for BNLJ full outer join without condition --- .../backendsapi/velox/VeloxBackend.scala | 2 -- .../gluten/execution/MiscOperatorSuite.scala | 20 ------------------ cpp/velox/substrait/SubstraitToVeloxPlan.cc | 3 --- .../SubstraitToVeloxPlanValidator.cc | 7 ------- .../backendsapi/BackendSettingsApi.scala | 2 -- ...oadcastNestedLoopJoinExecTransformer.scala | 8 ------- .../joins/GlutenBroadcastJoinSuite.scala | 21 ++++++------------- .../sql/gluten/GlutenFallbackSuite.scala | 9 +++++++- .../sql/gluten/GlutenFallbackSuite.scala | 5 ++++- .../sql/gluten/GlutenFallbackSuite.scala | 5 ++++- 10 files changed, 22 insertions(+), 60 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 3da306e1e9f5..e97747b842ee 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -568,8 +568,6 @@ object VeloxBackendSettings extends BackendSettingsApi { override def needPreComputeRangeFrameBoundary(): Boolean = true - override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true - override def supportIcebergEqualityDeleteRead(): Boolean = false override def reorderColumnsForPartitionWrite(): Boolean = true diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index d8b3929ded14..b6578c0ea534 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -24,7 +24,6 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec, ShuffleQueryStageExec} -import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -2065,25 +2064,6 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } - test("FullOuter in BroadcastNestLoopJoin") { - withTable("t1", "t2") { - spark.range(10).write.format("parquet").saveAsTable("t1") - spark.range(10).write.format("parquet").saveAsTable("t2") - - // with join condition should fallback. - withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") { - runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.id < t2.id") { - checkSparkOperatorMatch[BroadcastNestedLoopJoinExec] - } - - // without join condition should offload to gluten operator. - runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2") { - checkGlutenOperatorMatch[BroadcastNestedLoopJoinExecTransformer] - } - } - } - } - test("test get_struct_field with scalar function as input") { withSQLConf("spark.sql.json.enablePartialResults" -> "true") { withTable("t") { diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 5b76c22d8c24..fc58302c146b 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -387,9 +387,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type())); } break; - case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER: - joinType = core::JoinType::kFull; - break; default: VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type())); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index 4b39af51c623..5531a54710f4 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -1101,13 +1101,6 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::CrossRel& crossR case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT: case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT_SEMI: break; - case ::substrait::CrossRel_JoinType_JOIN_TYPE_OUTER: - if (crossRel.has_expression()) { - LOG_VALIDATION_MSG("Full outer join type with condition is not supported in CrossRel"); - return false; - } else { - break; - } default: LOG_VALIDATION_MSG("Unsupported Join type in CrossRel"); return false; diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 7c84218681b2..a1647b2f3014 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -149,8 +149,6 @@ trait BackendSettingsApi { def needPreComputeRangeFrameBoundary(): Boolean = false - def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false - def supportIcebergEqualityDeleteRead(): Boolean = true def reorderColumnsForPartitionWrite(): Boolean = false diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala index 0f8e25e2f0a3..9965d32b33a1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala @@ -148,14 +148,6 @@ abstract class BroadcastNestedLoopJoinExecTransformer( def validateJoinTypeAndBuildSide(): ValidationResult = { val result = joinType match { case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded - case FullOuter - if BackendsApiManager.getSettings.broadcastNestedLoopJoinSupportsFullOuterJoin() => - if (condition.isEmpty) { - ValidationResult.succeeded - } else { - ValidationResult.failed( - s"FullOuter join with join condition is not supported with BroadcastNestedLoopJoin") - } case ExistenceJoin(_) => ValidationResult.succeeded case _ => diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala index ce380aebfaa3..2622e7d599f8 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala @@ -143,12 +143,9 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon // INNER JOIN && t1Size < t2Size => BuildLeft assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", blt, BuildLeft) // FULL JOIN && t1Size < t2Size => BuildLeft - assertJoinBuildSide( - "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2 ON t1.key < t2.key", - bl, - BuildLeft) + assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2", bl, BuildLeft) // FULL OUTER && t1Size < t2Size => BuildLeft - assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.key < t2.key", bl, BuildLeft) + assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft) // LEFT JOIN => BuildRight assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2", blt, BuildRight) // RIGHT JOIN => BuildLeft @@ -160,13 +157,10 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon // INNER JOIN && broadcast(t2) => BuildRight assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2", blt, BuildRight) // FULL OUTER && broadcast(t1) => BuildLeft - assertJoinBuildSide( - "SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key", - bl, - BuildLeft) + assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft) // FULL OUTER && broadcast(t2) => BuildRight assertJoinBuildSide( - "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key", + "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2", bl, BuildRight) // LEFT JOIN && broadcast(t1) => BuildLeft @@ -200,11 +194,8 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon /* ######## test cases for non-equal join ######### */ withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // For full outer join, prefer to broadcast the smaller side. - assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key", bl, BuildLeft) - assertJoinBuildSide( - "SELECT * FROM t2 FULL OUTER JOIN t1 on t1.key < t2.key", - bl, - BuildRight) + assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft) + assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, BuildRight) // For inner join, prefer to broadcast the smaller side, if broadcast-able. withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 1).toString()) { diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index c8a8ac85cdbe..4736e6071d9c 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -118,7 +118,14 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2") val execution = glutenStore.execution(id) - execution.get.numFallbackNodes == 0 + if (BackendTestUtils.isVeloxBackendLoaded()) { + assert(execution.get.numFallbackNodes == 1) + assert( + execution.get.fallbackNodeToReason.head._2 + .contains("FullOuter join is not supported with BroadcastNestedLoopJoin")) + } else { + assert(execution.get.numFallbackNodes == 0) + } } // [GLUTEN-4119] Skip add ReusedExchange to fallback node diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index 058a63a67de6..8edcef1c08c8 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2") val execution = glutenStore.execution(id) if (BackendTestUtils.isVeloxBackendLoaded()) { - assert(execution.get.numFallbackNodes == 0) + assert(execution.get.numFallbackNodes == 1) + assert( + execution.get.fallbackNodeToReason.head._2 + .contains("FullOuter join is not supported with BroadcastNestedLoopJoin")) } else { assert(execution.get.numFallbackNodes == 2) } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index 058a63a67de6..8edcef1c08c8 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2") val execution = glutenStore.execution(id) if (BackendTestUtils.isVeloxBackendLoaded()) { - assert(execution.get.numFallbackNodes == 0) + assert(execution.get.numFallbackNodes == 1) + assert( + execution.get.fallbackNodeToReason.head._2 + .contains("FullOuter join is not supported with BroadcastNestedLoopJoin")) } else { assert(execution.get.numFallbackNodes == 2) } From bdae0a6dd5dd9d52073d4491e4eca57fb3c5668b Mon Sep 17 00:00:00 2001 From: zml1206 Date: Tue, 4 Nov 2025 18:25:49 +0800 Subject: [PATCH 2/3] fix --- .../execution/BroadcastNestedLoopJoinExecTransformer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala index 9965d32b33a1..282cf8581e18 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala @@ -24,7 +24,7 @@ import org.apache.gluten.utils.SubstraitUtil import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} -import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan} import org.apache.spark.sql.execution.joins.BaseJoinExec From ddc7e4d4886f4b36292d6e82df6fbbebe3305ae2 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Tue, 4 Nov 2025 19:33:58 +0800 Subject: [PATCH 3/3] fix --- .../scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index 4736e6071d9c..7b010688ed85 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -20,6 +20,7 @@ import org.apache.gluten.GlutenBuildInfo import org.apache.gluten.config.GlutenConfig import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.gluten.execution.FileSourceScanExecTransformer +import org.apache.gluten.utils.BackendTestUtils import org.apache.spark.SparkConf import org.apache.spark.internal.config.UI.UI_ENABLED