Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,6 @@ class HashJoinMetricsUpdater(val metrics: Map[String, SQLMetric])
var currentIdx = operatorMetrics.metricsList.size() - 1
var totalTime = 0L

// build side pre projection
if (joinParams.buildPreProjectionNeeded) {
metrics("buildPreProjectionTime") +=
(operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
metrics("outputVectors") += operatorMetrics.metricsList.get(currentIdx).outputVectors
totalTime += operatorMetrics.metricsList.get(currentIdx).time
currentIdx -= 1
}

// stream side pre projection
if (joinParams.streamPreProjectionNeeded) {
metrics("streamPreProjectionTime") +=
(operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
metrics("outputVectors") += operatorMetrics.metricsList.get(currentIdx).outputVectors
totalTime += operatorMetrics.metricsList.get(currentIdx).time
currentIdx -= 1
}

// update fillingRightJoinSideTime
MetricsUtil
.getAllProcessorList(operatorMetrics.metricsList.get(currentIdx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,6 @@ class SortMergeJoinMetricsUpdater(val metrics: Map[String, SQLMetric])
var currentIdx = operatorMetrics.metricsList.size() - 1
var totalTime = 0L

// build side pre projection
if (joinParams.buildPreProjectionNeeded) {
metrics("buildPreProjectionTime") +=
(operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
metrics("outputVectors") += operatorMetrics.metricsList.get(currentIdx).outputVectors
totalTime += operatorMetrics.metricsList.get(currentIdx).time
currentIdx -= 1
}

// stream side pre projection
if (joinParams.streamPreProjectionNeeded) {
metrics("streamPreProjectionTime") +=
(operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
metrics("outputVectors") += operatorMetrics.metricsList.get(currentIdx).outputVectors
totalTime += operatorMetrics.metricsList.get(currentIdx).time
currentIdx -= 1
}

// update fillingRightJoinSideTime
MetricsUtil
.getAllProcessorList(operatorMetrics.metricsList.get(currentIdx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ class GlutenClickHouseTPCHBucketSuite
plans(3)
.asInstanceOf[HashJoinLikeExecTransformer]
.left
.isInstanceOf[InputIteratorTransformer])
.isInstanceOf[ProjectExecTransformer])
assert(
plans(3)
.asInstanceOf[HashJoinLikeExecTransformer]
.right
.isInstanceOf[InputIteratorTransformer])
.isInstanceOf[ProjectExecTransformer])

// Check the bucket join
assert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class GlutenClickHouseTPCDSMetricsSuite extends GlutenClickHouseTPCDSAbstractSui
case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g
}

assert(allGlutenPlans.size == 30)
assert(allGlutenPlans.size == 34)

val windowPlan0 = allGlutenPlans(3)
assert(windowPlan0.metrics("totalTime").value == 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class GlutenClickHouseTPCHMetricsSuite extends ParquetTPCHSuite {
case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g
}

assert(allGlutenPlans.size == 58)
assert(allGlutenPlans.size == 60)

val shjPlan = allGlutenPlans(8)
assert(shjPlan.metrics("totalTime").value == 6)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])

val bloomFilterBlocksByteSize: SQLMetric = metrics("bloomFilterBlocksByteSize")

val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount")
val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos")

val buildPreProjectionCpuCount: SQLMetric = metrics("buildPreProjectionCpuCount")
val buildPreProjectionWallNanos: SQLMetric = metrics("buildPreProjectionWallNanos")

val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")

override protected def updateJoinMetricsInternal(
Expand Down Expand Up @@ -148,17 +142,6 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
hashBuildSpilledFiles += hashBuildMetrics.spilledFiles
idx += 1

if (joinParams.buildPreProjectionNeeded) {
buildPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
buildPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
idx += 1
}

if (joinParams.streamPreProjectionNeeded) {
streamPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
idx += 1
}
if (TaskResources.inSparkTask()) {
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
Expand All @@ -185,11 +168,6 @@ class SortMergeJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes")
val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations")

val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount")
val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos")
val bufferPreProjectionCpuCount: SQLMetric = metrics("bufferPreProjectionCpuCount")
val bufferPreProjectionWallNanos: SQLMetric = metrics("bufferPreProjectionWallNanos")

override protected def updateJoinMetricsInternal(
joinMetrics: util.ArrayList[OperatorMetrics],
joinParams: JoinParams): Unit = {
Expand All @@ -200,17 +178,5 @@ class SortMergeJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
peakMemoryBytes += smjMetrics.peakMemoryBytes
numMemoryAllocations += smjMetrics.numMemoryAllocations
idx += 1

if (joinParams.buildPreProjectionNeeded) {
bufferPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
bufferPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
idx += 1
}

if (joinParams.streamPreProjectionNeeded) {
streamPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
idx += 1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
val metrics = smj.get.metrics
assert(metrics("numOutputRows").value == 100)
assert(metrics("numOutputVectors").value > 0)
assert(metrics("streamPreProjectionCpuCount").value > 0)
assert(metrics("bufferPreProjectionCpuCount").value > 0)
}
}
}
Expand Down Expand Up @@ -133,8 +131,6 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
val metrics = smj.get.metrics
assert(metrics("numOutputRows").value == 100)
assert(metrics("numOutputVectors").value > 0)
assert(metrics("streamPreProjectionCpuCount").value > 0)
assert(metrics("buildPreProjectionCpuCount").value > 0)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{LeafExecNode, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}

case class RewrittenNodeWall(originalChild: SparkPlan) extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()
Expand All @@ -50,7 +51,7 @@ class RewriteSparkPlanRulesManager private (
FallbackTags.maybeOffloadable(plan) && rewriteRules.exists(_.isRewritable(plan))
}

private def getFallbackTagBack(rewrittenPlan: SparkPlan): Option[FallbackTag] = {
private def getRewriteNodeBack(rewrittenPlan: SparkPlan): SparkPlan = {
// The rewritten plan may contain more nodes than origin, for now it should only be
// `ProjectExec`.
// TODO: Find a better approach than checking `p.isInstanceOf[ProjectExec]` which is not
Expand All @@ -59,7 +60,7 @@ class RewriteSparkPlanRulesManager private (
case p if !p.isInstanceOf[ProjectExec] && !p.isInstanceOf[RewrittenNodeWall] => p
}
assert(target.size == 1)
FallbackTags.getOption(target.head)
target.head
}

private def applyRewriteRules(origin: SparkPlan): (SparkPlan, Option[String]) = {
Expand Down Expand Up @@ -99,10 +100,22 @@ class RewriteSparkPlanRulesManager private (
origin
} else {
validateRule.apply(rewrittenPlan)
val tag = getFallbackTagBack(rewrittenPlan)
if (tag.isDefined) {
val rewriteNode = getRewriteNodeBack(rewrittenPlan)
val allFallbackTags = rewrittenPlan.collect {
case p if !p.isInstanceOf[RewrittenNodeWall] => FallbackTags.getOption(p)
}
if (FallbackTags.getOption(rewriteNode).isDefined) {
// If the rewritten plan is still not transformable, return the original plan.
FallbackTags.add(origin, tag.get)
FallbackTags.add(origin, FallbackTags.getOption(rewriteNode).get)
origin
} else if (
(rewriteNode.isInstanceOf[BroadcastHashJoinExec] ||
rewriteNode.isInstanceOf[BroadcastNestedLoopJoinExec]) &&
allFallbackTags.exists(_.isDefined)
) {
// If the inserted projects for join is not transformable, return the original plan.
val reason = allFallbackTags.collect { case Some(s) => s.reason() }.mkString(", ")
FallbackTags.add(origin, FallbackTag.Converter.FromString.from(reason).get)
origin
} else {
rewrittenPlan.transformUp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}

import org.apache.spark.SparkContextUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -189,6 +190,10 @@ abstract class ProjectExecTransformerBase(val list: Seq[NamedExpression], val in
}()
}

override def doExecuteBroadcast[T](): Broadcast[T] = {
child.executeBroadcast[T]()
}

override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater(metrics)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, ExplainUtils, SparkPlan}
import org.apache.spark.sql.execution.joins.{BaseJoinExec, HashedRelationBroadcastMode, HashJoin}
import org.apache.spark.sql.execution.joins.{BaseJoinExec, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -135,18 +135,11 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport {
.forall(types => sameType(types._1, types._2)),
"Join keys from two sides should have same length and types"
)
// Spark has an improvement which would patch integer joins keys to a Long value.
// But this improvement would cause add extra project before hash join in velox,
// disabling this improvement as below would help reduce the project.
val (lkeys, rkeys) = if (BackendsApiManager.getSettings.enableJoinKeysRewrite()) {
(HashJoin.rewriteKeyExpr(leftKeys), HashJoin.rewriteKeyExpr(rightKeys))
} else {
(leftKeys, rightKeys)
}

if (needSwitchChildren) {
(lkeys, rkeys)
(leftKeys, rightKeys)
} else {
(rkeys, lkeys)
(rightKeys, leftKeys)
}
}

Expand Down Expand Up @@ -234,13 +227,6 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport {
val operatorId = context.nextOperatorId(this.nodeName)

val joinParams = new JoinParams
if (JoinUtils.preProjectionNeeded(streamedKeyExprs)) {
joinParams.streamPreProjectionNeeded = true
}
if (JoinUtils.preProjectionNeeded(buildKeyExprs)) {
joinParams.buildPreProjectionNeeded = true
}

if (condition.isDefined) {
joinParams.isWithCondition = true
}
Expand Down
Loading