From 165c7e97395284b2a7e785089a3ddc9c520f8d83 Mon Sep 17 00:00:00 2001 From: Mingliang Zhu Date: Wed, 5 Nov 2025 09:00:29 +0800 Subject: [PATCH] [VL] Remove support for BNLJ full outer join without condition (#11021) --- .../backendsapi/velox/VeloxBackend.scala | 2 -- .../gluten/execution/MiscOperatorSuite.scala | 20 ------------------ cpp/velox/substrait/SubstraitToVeloxPlan.cc | 3 --- .../SubstraitToVeloxPlanValidator.cc | 7 ------- .../backendsapi/BackendSettingsApi.scala | 2 -- ...oadcastNestedLoopJoinExecTransformer.scala | 10 +-------- .../joins/GlutenBroadcastJoinSuite.scala | 21 ++++++------------- .../sql/gluten/GlutenFallbackSuite.scala | 10 ++++++++- .../sql/gluten/GlutenFallbackSuite.scala | 5 ++++- .../sql/gluten/GlutenFallbackSuite.scala | 5 ++++- 10 files changed, 24 insertions(+), 61 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 9821efa66e43..82684c75b81e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -549,8 +549,6 @@ object VeloxBackendSettings extends BackendSettingsApi { override def needPreComputeRangeFrameBoundary(): Boolean = true - override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true - override def supportIcebergEqualityDeleteRead(): Boolean = false override def reorderColumnsForPartitionWrite(): Boolean = true diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index e14dd1fcb5a8..982eff12523c 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -24,7 +24,6 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec, ShuffleQueryStageExec} -import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -2058,25 +2057,6 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } - test("FullOuter in BroadcastNestLoopJoin") { - withTable("t1", "t2") { - spark.range(10).write.format("parquet").saveAsTable("t1") - spark.range(10).write.format("parquet").saveAsTable("t2") - - // with join condition should fallback. - withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") { - runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.id < t2.id") { - checkSparkOperatorMatch[BroadcastNestedLoopJoinExec] - } - - // without join condition should offload to gluten operator. - runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2") { - checkGlutenOperatorMatch[BroadcastNestedLoopJoinExecTransformer] - } - } - } - } - test("test get_struct_field with scalar function as input") { withSQLConf("spark.sql.json.enablePartialResults" -> "true") { withTable("t") { diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index f6b5284f92a2..17ca9c1d1cf8 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -350,9 +350,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type())); } break; - case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER: - joinType = core::JoinType::kFull; - break; default: VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type())); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index 1f0a5b33613e..1b8df42bc68b 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -1101,13 +1101,6 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::CrossRel& crossR case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT: case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT_SEMI: break; - case ::substrait::CrossRel_JoinType_JOIN_TYPE_OUTER: - if (crossRel.has_expression()) { - LOG_VALIDATION_MSG("Full outer join type with condition is not supported in CrossRel"); - return false; - } else { - break; - } default: LOG_VALIDATION_MSG("Unsupported Join type in CrossRel"); return false; diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index f4b9c46df1e2..4df7d45f42c2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -152,8 +152,6 @@ trait BackendSettingsApi { def needPreComputeRangeFrameBoundary(): Boolean = false - def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false - def supportIcebergEqualityDeleteRead(): Boolean = true def reorderColumnsForPartitionWrite(): Boolean = false diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala index 0f8e25e2f0a3..282cf8581e18 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala @@ -24,7 +24,7 @@ import org.apache.gluten.utils.SubstraitUtil import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} -import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan} import org.apache.spark.sql.execution.joins.BaseJoinExec @@ -148,14 +148,6 @@ abstract class BroadcastNestedLoopJoinExecTransformer( def validateJoinTypeAndBuildSide(): ValidationResult = { val result = joinType match { case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded - case FullOuter - if BackendsApiManager.getSettings.broadcastNestedLoopJoinSupportsFullOuterJoin() => - if (condition.isEmpty) { - ValidationResult.succeeded - } else { - ValidationResult.failed( - s"FullOuter join with join condition is not supported with BroadcastNestedLoopJoin") - } case ExistenceJoin(_) => ValidationResult.succeeded case _ => diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala index ce380aebfaa3..2622e7d599f8 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala @@ -143,12 +143,9 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon // INNER JOIN && t1Size < t2Size => BuildLeft assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", blt, BuildLeft) // FULL JOIN && t1Size < t2Size => BuildLeft - assertJoinBuildSide( - "SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2 ON t1.key < t2.key", - bl, - BuildLeft) + assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2", bl, BuildLeft) // FULL OUTER && t1Size < t2Size => BuildLeft - assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.key < t2.key", bl, BuildLeft) + assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft) // LEFT JOIN => BuildRight assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2", blt, BuildRight) // RIGHT JOIN => BuildLeft @@ -160,13 +157,10 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon // INNER JOIN && broadcast(t2) => BuildRight assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2", blt, BuildRight) // FULL OUTER && broadcast(t1) => BuildLeft - assertJoinBuildSide( - "SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key", - bl, - BuildLeft) + assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft) // FULL OUTER && broadcast(t2) => BuildRight assertJoinBuildSide( - "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key", + "SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2", bl, BuildRight) // LEFT JOIN && broadcast(t1) => BuildLeft @@ -200,11 +194,8 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon /* ######## test cases for non-equal join ######### */ withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // For full outer join, prefer to broadcast the smaller side. - assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key", bl, BuildLeft) - assertJoinBuildSide( - "SELECT * FROM t2 FULL OUTER JOIN t1 on t1.key < t2.key", - bl, - BuildRight) + assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft) + assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, BuildRight) // For inner join, prefer to broadcast the smaller side, if broadcast-able. withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 1).toString()) { diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index c8a8ac85cdbe..7b010688ed85 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -20,6 +20,7 @@ import org.apache.gluten.GlutenBuildInfo import org.apache.gluten.config.GlutenConfig import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.gluten.execution.FileSourceScanExecTransformer +import org.apache.gluten.utils.BackendTestUtils import org.apache.spark.SparkConf import org.apache.spark.internal.config.UI.UI_ENABLED @@ -118,7 +119,14 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2") val execution = glutenStore.execution(id) - execution.get.numFallbackNodes == 0 + if (BackendTestUtils.isVeloxBackendLoaded()) { + assert(execution.get.numFallbackNodes == 1) + assert( + execution.get.fallbackNodeToReason.head._2 + .contains("FullOuter join is not supported with BroadcastNestedLoopJoin")) + } else { + assert(execution.get.numFallbackNodes == 0) + } } // [GLUTEN-4119] Skip add ReusedExchange to fallback node diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index 058a63a67de6..8edcef1c08c8 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2") val execution = glutenStore.execution(id) if (BackendTestUtils.isVeloxBackendLoaded()) { - assert(execution.get.numFallbackNodes == 0) + assert(execution.get.numFallbackNodes == 1) + assert( + execution.get.fallbackNodeToReason.head._2 + .contains("FullOuter join is not supported with BroadcastNestedLoopJoin")) } else { assert(execution.get.numFallbackNodes == 2) } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index 058a63a67de6..8edcef1c08c8 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2") val execution = glutenStore.execution(id) if (BackendTestUtils.isVeloxBackendLoaded()) { - assert(execution.get.numFallbackNodes == 0) + assert(execution.get.numFallbackNodes == 1) + assert( + execution.get.fallbackNodeToReason.head._2 + .contains("FullOuter join is not supported with BroadcastNestedLoopJoin")) } else { assert(execution.get.numFallbackNodes == 2) }