Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.vectorized;

import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.RuntimeAware;

public class HashJoinBuilder implements RuntimeAware {
private final Runtime runtime;

private HashJoinBuilder(Runtime runtime) {
this.runtime = runtime;
}

public static HashJoinBuilder create(Runtime runtime) {
return new HashJoinBuilder(runtime);
}

@Override
public long rtHandle() {
return runtime.getHandle();
}

public static native void clearHashTable(long hashTableData);

public static native long cloneHashTable(long hashTableData);

public static native long nativeBuild(
String buildHashTableId,
long[] batchHandlers,
String joinKeys,
int joinType,
boolean hasMixedFiltCondition,
boolean isExistenceJoin,
byte[] namedStruct,
boolean isNullAwareAntiJoin,
long bloomFilterPushdownSize,
int broadcastHashTableBuildThreads);
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + ".internal.udfLibraryPaths"
val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX + ".udfAllowTypeConversion"

val GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME: String =
VeloxBackend.CONF_PREFIX + ("broadcast.cache.expired.time")
// unit: SECONDS, default 1 day
val GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT: Int = 86400

override def primaryBatchType: Convention.BatchType = VeloxBatchType

override def validateScanExec(
Expand Down Expand Up @@ -495,13 +500,17 @@ object VeloxBackendSettings extends BackendSettingsApi {
allSupported
}

override def enableJoinKeysRewrite(): Boolean = false

override def supportColumnarShuffleExec(): Boolean = {
val conf = GlutenConfig.get
conf.enableColumnarShuffle &&
(conf.isUseGlutenShuffleManager || conf.shuffleManagerSupportsColumnarShuffle)
}

override def enableJoinKeysRewrite(): Boolean = false
override def enableHashTableBuildOncePerExecutor(): Boolean = {
VeloxConfig.get.enableBroadcastBuildOncePerExecutor
}

override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
t =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType, ArrowNativeBatchType}
import org.apache.gluten.config.{GlutenConfig, GlutenCoreConfig, VeloxConfig}
import org.apache.gluten.config.VeloxConfig._
import org.apache.gluten.execution.VeloxBroadcastBuildSideCache
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.extension.columnar.transition.Convention
Expand All @@ -35,8 +36,10 @@ import org.apache.gluten.utils._
import org.apache.spark.{HdfsConfGenerator, ShuffleDependency, SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.listener.VeloxGlutenSQLAppStatusListener
import org.apache.spark.memory.GlobalOffHeapMemory
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.rpc.{GlutenDriverEndpoint, GlutenExecutorEndpoint}
import org.apache.spark.shuffle.{ColumnarShuffleDependency, LookupKey, ShuffleManagerRegistry}
import org.apache.spark.shuffle.sort.ColumnarShuffleManager
import org.apache.spark.sql.execution.ColumnarCachedBatchSerializer
Expand All @@ -56,6 +59,8 @@ class VeloxListenerApi extends ListenerApi with Logging {
import VeloxListenerApi._

override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
GlutenDriverEndpoint.glutenDriverEndpointRef = (new GlutenDriverEndpoint).self
VeloxGlutenSQLAppStatusListener.registerListener(sc)
val conf = pc.conf()

// When the Velox cache is enabled, the Velox file handle cache should also be enabled.
Expand Down Expand Up @@ -138,6 +143,8 @@ class VeloxListenerApi extends ListenerApi with Logging {
override def onDriverShutdown(): Unit = shutdown()

override def onExecutorStart(pc: PluginContext): Unit = {
GlutenExecutorEndpoint.executorEndpoint = new GlutenExecutorEndpoint(pc.executorID, pc.conf)

val conf = pc.conf()

// Static initializers for executor.
Expand Down Expand Up @@ -250,6 +257,11 @@ class VeloxListenerApi extends ListenerApi with Logging {

private def shutdown(): Unit = {
// TODO shutdown implementation in velox to release resources
VeloxBroadcastBuildSideCache.cleanAll()
val executorEndpoint = GlutenExecutorEndpoint.executorEndpoint
if (executorEndpoint != null) {
executorEndpoint.stop()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GlutenExecutorEndpoint.executorEndpoint may be null if no executor has been started or initialized ?

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ object VeloxRuleApi {
injector.injectOptimizerRule(CollapseGetJsonObjectExpressionRule.apply)
injector.injectOptimizerRule(RewriteCastFromArray.apply)
injector.injectOptimizerRule(RewriteUnboundedWindow.apply)

if (!BackendsApiManager.getSettings.enableJoinKeysRewrite()) {
injector.injectPlannerStrategy(_ => org.apache.gluten.extension.GlutenJoinKeysCapture())
}

if (BackendsApiManager.getSettings.supportAppendDataExec()) {
injector.injectPlannerStrategy(SparkShimLoader.getSparkShims.getRewriteCreateTableAsSelect(_))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import org.apache.gluten.exception.{GlutenExceptionUtil, GlutenNotSupportExcepti
import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
import org.apache.gluten.extension.JoinKeysTag
import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.vectorized.{ColumnarBatchSerializer, ColumnarBatchSerializeResult}

import org.apache.spark.{ShuffleDependency, SparkEnv, SparkException}
import org.apache.spark.api.python.{ColumnarArrowEvalPythonExec, PullOutArrowEvalPythonPreProjectHelper}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
Expand All @@ -43,9 +45,10 @@ import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode, SparkHashJoinUtils}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation
Expand All @@ -64,8 +67,9 @@ import javax.ws.rs.core.UriBuilder
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class VeloxSparkPlanExecApi extends SparkPlanExecApi {
class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {

/** Transform GetArrayItem to Substrait. */
override def genGetArrayItemTransformer(
Expand Down Expand Up @@ -678,9 +682,136 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
child: SparkPlan,
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation = {

val buildKeys = mode match {
case mode1: HashedRelationBroadcastMode =>
mode1.key
case _ =>
// IdentityBroadcastMode
Seq.empty
}
var offload = true
val (newChild, newOutput, newBuildKeys) =
if (VeloxConfig.get.enableBroadcastBuildOncePerExecutor) {

// Try to lookup from TreeNodeTag using child's logical plan
// Need to recursively find logicalLink in case of AQE or other wrappers
@scala.annotation.tailrec
def findLogicalLink(
plan: SparkPlan): Option[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan] = {
plan.logicalLink match {
case some @ Some(_) => some
case None =>
plan.children match {
case Seq(child) => findLogicalLink(child)
case _ => None
}
}
}

val newBuildKeys = findLogicalLink(child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the logic here: The current implementation prioritizes fetching the tag from the logical link, with getOriginalKeysFromPacked as a fallback.

I have two concerns:

  1. Under what scenarios would the logical link fail to contain the tag?
  2. The tag seems to store the original Attributes, but the buildKeys in the mode are already bindReferenced. If getOriginalKeysFromPacked returns BoundReferences, don't we need to resolve them back to Attributes using the child's output schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Under what scenarios would the logical link fail to contain the tag?

In some unit tests (e.g., GlutenOuterJoinSuiteForceShjOn), BHJ is created manually and does not trigger GlutenJoinKeysCapture. This causes the loss of the original expression.

  1. The tag seems to store the original Attributes, but the buildKeys in the mode are already bindReferenced. If getOriginalKeysFromPacked returns BoundReferences, don't we need to resolve them back to Attributes using the child's output schema?

We added the getOriginalKeysFromPacked method to extract the original expression instead of the bound reference.

.flatMap(_.getTagValue(JoinKeysTag.ORIGINAL_JOIN_KEYS))
.getOrElse {
if (SparkHashJoinUtils.canRewriteAsLongType(buildKeys) && buildKeys.nonEmpty) {
SparkHashJoinUtils.getOriginalKeysFromPacked(buildKeys.head)
} else {
buildKeys
}
}

val noNeedPreOp = newBuildKeys.forall {
case _: AttributeReference | _: BoundReference => true
case _ => false
}

if (noNeedPreOp) {
(child, child.output, Seq.empty[Expression])
} else {
// pre projection in case of expression join keys
val appendedProjections = new ArrayBuffer[NamedExpression]()
val preProjectionBuildKeys = newBuildKeys.zipWithIndex.map {
case (e, idx) =>
e match {
case b: BoundReference => child.output(b.ordinal)
case a: AttributeReference => a
case o: Expression =>
val newExpr = Alias(o, "col_" + idx)()
appendedProjections += newExpr
newExpr
}
}

def wrapChild(child: SparkPlan): SparkPlan = {
val childWithAdapter =
ColumnarCollapseTransformStages.wrapInputIteratorTransformer(child)
val projectExecTransformer =
ProjectExecTransformer(child.output ++ appendedProjections, childWithAdapter)
val validationResult = projectExecTransformer.doValidate()
if (validationResult.ok()) {
WholeStageTransformer(
ProjectExecTransformer(child.output ++ appendedProjections, childWithAdapter))(
ColumnarCollapseTransformStages
.getTransformStageCounter(childWithAdapter)
.incrementAndGet()
)
} else {
offload = false
child
}
}

val newChild = child match {
case wt: WholeStageTransformer =>
val projectTransformer =
ProjectExecTransformer(child.output ++ appendedProjections, wt.child)
if (projectTransformer.doValidate().ok()) {
wt.withNewChildren(
Seq(ProjectExecTransformer(child.output ++ appendedProjections, wt.child)))

} else {
offload = false
child
}
case w: WholeStageCodegenExec =>
w.withNewChildren(Seq(ProjectExec(child.output ++ appendedProjections, w.child)))
case r: AQEShuffleReadExec if r.supportsColumnar =>
// when aqe is open
// TODO: remove this after pushdowning preprojection
wrapChild(r)
case r2c: RowToVeloxColumnarExec =>
wrapChild(r2c)
case union: ColumnarUnionExec =>
wrapChild(union)
case ordered: TakeOrderedAndProjectExecTransformer =>
wrapChild(ordered)
case a2v: ArrowColumnarToVeloxColumnarExec =>
wrapChild(a2v)
case other =>
offload = false
logWarning(
"Not supported operator " + other.nodeName +
" for BroadcastRelation and fallback to shuffle hash join")
child
}

if (offload) {
(
newChild,
(child.output ++ appendedProjections).map(_.toAttribute),
preProjectionBuildKeys)
} else {
(child, child.output, Seq.empty[Expression])
}
}
} else {
offload = false
(child, child.output, buildKeys)
}

val useOffheapBroadcastBuildRelation =
VeloxConfig.get.enableBroadcastBuildRelationInOffheap
val serialized: Seq[ColumnarBatchSerializeResult] = child

val serialized: Seq[ColumnarBatchSerializeResult] = newChild
.executeColumnar()
.mapPartitions(itr => Iterator(BroadcastUtils.serializeStream(itr)))
.filter(_.numRows != 0)
Expand All @@ -694,18 +825,23 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
}
numOutputRows += serialized.map(_.numRows).sum
dataSize += rawSize

if (useOffheapBroadcastBuildRelation) {
TaskResources.runUnsafe {
UnsafeColumnarBuildSideRelation(
child.output,
newOutput,
serialized.flatMap(_.offHeapData().asScala),
mode)
mode,
newBuildKeys,
offload)
}
} else {
ColumnarBuildSideRelation(
child.output,
newOutput,
serialized.flatMap(_.onHeapData().asScala).toArray,
mode)
mode,
newBuildKeys,
offload)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.gluten.vectorized.PlanEvaluatorJniWrapper

import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down Expand Up @@ -120,6 +121,10 @@ class VeloxTransformerApi extends TransformerApi with Logging {

override def packPBMessage(message: Message): Any = Any.pack(message, "")

override def invalidateSQLExecutionResource(executionId: String): Unit = {
GlutenDriverEndpoint.invalidateResourceRelation(executionId)
}

override def genWriteParameters(write: WriteFilesExecTransformer): Any = {
write.fileFormat match {
case _ @(_: ParquetFileFormat | _: HiveFileFormat) =>
Expand Down
Loading
Loading