Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,6 @@ object VeloxBackendSettings extends BackendSettingsApi {

override def needPreComputeRangeFrameBoundary(): Boolean = true

override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true

override def supportIcebergEqualityDeleteRead(): Boolean = false

override def reorderColumnsForPartitionWrite(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -2058,25 +2057,6 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}

test("FullOuter in BroadcastNestLoopJoin") {
withTable("t1", "t2") {
spark.range(10).write.format("parquet").saveAsTable("t1")
spark.range(10).write.format("parquet").saveAsTable("t2")

// with join condition should fallback.
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") {
runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.id < t2.id") {
checkSparkOperatorMatch[BroadcastNestedLoopJoinExec]
}

// without join condition should offload to gluten operator.
runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2") {
checkGlutenOperatorMatch[BroadcastNestedLoopJoinExecTransformer]
}
}
}
}

test("test get_struct_field with scalar function as input") {
withSQLConf("spark.sql.json.enablePartialResults" -> "true") {
withTable("t") {
Expand Down
3 changes: 0 additions & 3 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
}
break;
case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER:
joinType = core::JoinType::kFull;
break;
default:
VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
}
Expand Down
7 changes: 0 additions & 7 deletions cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1101,13 +1101,6 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::CrossRel& crossR
case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT_SEMI:
break;
case ::substrait::CrossRel_JoinType_JOIN_TYPE_OUTER:
if (crossRel.has_expression()) {
LOG_VALIDATION_MSG("Full outer join type with condition is not supported in CrossRel");
return false;
} else {
break;
}
default:
LOG_VALIDATION_MSG("Unsupported Join type in CrossRel");
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ trait BackendSettingsApi {

def needPreComputeRangeFrameBoundary(): Boolean = false

def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false

def supportIcebergEqualityDeleteRead(): Boolean = true

def reorderColumnsForPartitionWrite(): Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.gluten.utils.SubstraitUtil

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan}
import org.apache.spark.sql.execution.joins.BaseJoinExec
Expand Down Expand Up @@ -148,14 +148,6 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
def validateJoinTypeAndBuildSide(): ValidationResult = {
val result = joinType match {
case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded
case FullOuter
if BackendsApiManager.getSettings.broadcastNestedLoopJoinSupportsFullOuterJoin() =>
if (condition.isEmpty) {
ValidationResult.succeeded
} else {
ValidationResult.failed(
s"FullOuter join with join condition is not supported with BroadcastNestedLoopJoin")
}
case ExistenceJoin(_) =>
ValidationResult.succeeded
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,9 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon
// INNER JOIN && t1Size < t2Size => BuildLeft
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", blt, BuildLeft)
// FULL JOIN && t1Size < t2Size => BuildLeft
assertJoinBuildSide(
"SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2 ON t1.key < t2.key",
bl,
BuildLeft)
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2", bl, BuildLeft)
// FULL OUTER && t1Size < t2Size => BuildLeft
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.key < t2.key", bl, BuildLeft)
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft)
// LEFT JOIN => BuildRight
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2", blt, BuildRight)
// RIGHT JOIN => BuildLeft
Expand All @@ -160,13 +157,10 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon
// INNER JOIN && broadcast(t2) => BuildRight
assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2", blt, BuildRight)
// FULL OUTER && broadcast(t1) => BuildLeft
assertJoinBuildSide(
"SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key",
bl,
BuildLeft)
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft)
// FULL OUTER && broadcast(t2) => BuildRight
assertJoinBuildSide(
"SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key",
"SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2",
bl,
BuildRight)
// LEFT JOIN && broadcast(t1) => BuildLeft
Expand Down Expand Up @@ -200,11 +194,8 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon
/* ######## test cases for non-equal join ######### */
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
// For full outer join, prefer to broadcast the smaller side.
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key", bl, BuildLeft)
assertJoinBuildSide(
"SELECT * FROM t2 FULL OUTER JOIN t1 on t1.key < t2.key",
bl,
BuildRight)
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft)
assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, BuildRight)

// For inner join, prefer to broadcast the smaller side, if broadcast-able.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 1).toString()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.GlutenBuildInfo
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.gluten.execution.FileSourceScanExecTransformer
import org.apache.gluten.utils.BackendTestUtils

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.UI.UI_ENABLED
Expand Down Expand Up @@ -118,7 +119,14 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp

val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
val execution = glutenStore.execution(id)
execution.get.numFallbackNodes == 0
if (BackendTestUtils.isVeloxBackendLoaded()) {
assert(execution.get.numFallbackNodes == 1)
assert(
execution.get.fallbackNodeToReason.head._2
.contains("FullOuter join is not supported with BroadcastNestedLoopJoin"))
} else {
assert(execution.get.numFallbackNodes == 0)
}
}

// [GLUTEN-4119] Skip add ReusedExchange to fallback node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
val execution = glutenStore.execution(id)
if (BackendTestUtils.isVeloxBackendLoaded()) {
assert(execution.get.numFallbackNodes == 0)
assert(execution.get.numFallbackNodes == 1)
assert(
execution.get.fallbackNodeToReason.head._2
.contains("FullOuter join is not supported with BroadcastNestedLoopJoin"))
} else {
assert(execution.get.numFallbackNodes == 2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
val execution = glutenStore.execution(id)
if (BackendTestUtils.isVeloxBackendLoaded()) {
assert(execution.get.numFallbackNodes == 0)
assert(execution.get.numFallbackNodes == 1)
assert(
execution.get.fallbackNodeToReason.head._2
.contains("FullOuter join is not supported with BroadcastNestedLoopJoin"))
} else {
assert(execution.get.numFallbackNodes == 2)
}
Expand Down