Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ public long rtHandle() {
return runtime.getHandle();
}

public native long create(int minOutputBatchSize, ColumnarBatchInIterator itr);
public native long create(int minOutputBatchSize, long memLimit, ColumnarBatchInIterator itr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,18 @@ public static ColumnarBatchOutIterator create(
new ColumnarBatchInIterator(BackendsApiManager.getBackendName(), in));
return new ColumnarBatchOutIterator(runtime, outHandle);
}

public static ColumnarBatchOutIterator createCudf(
int minOutputBatchSize, long memLimit, Iterator<ColumnarBatch> in) {
final Runtime runtime =
Runtimes.contextInstance(
BackendsApiManager.getBackendName(), "GpuBufferColumnarBatchResizer");
long outHandle =
GpuBufferBatchResizerJniWrapper.create(runtime)
.create(
minOutputBatchSize,
memLimit,
new ColumnarBatchInIterator(BackendsApiManager.getBackendName(), in));
return new ColumnarBatchOutIterator(runtime, outHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ object VeloxRuleApi {
offloads))

// Legacy: Post-transform rules.
injector.injectPostTransform(_ => AppendBatchResizeForShuffleInputAndOutput())
injector.injectPostTransform(_ => GpuBufferBatchResizeForShuffleInputOutput())
injector.injectPostTransform(c => AppendBatchResizeForShuffleInputAndOutput(c.caller.isAqe()))
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => PartialGenerateRule())
Expand Down Expand Up @@ -148,6 +147,8 @@ object VeloxRuleApi {
c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), Set(VeloxBatchType)))
injector.injectFinal(
c => GlutenAutoAdjustStageResourceProfile(new GlutenConfig(c.sqlConf), c.session))
injector.injectFinal(
c => AdjustStageExecutionMode(new GlutenConfig(c.sqlConf), c.session, c.caller.isAqe()))
injector.injectFinal(c => GlutenFallbackReporter(new GlutenConfig(c.sqlConf), c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}
Expand Down Expand Up @@ -215,8 +216,7 @@ object VeloxRuleApi {

// Gluten RAS: Post rules.
injector.injectPostTransform(_ => DistinguishIdenticalScans)
injector.injectPostTransform(_ => AppendBatchResizeForShuffleInputAndOutput())
injector.injectPostTransform(_ => GpuBufferBatchResizeForShuffleInputOutput())
injector.injectPostTransform(c => AppendBatchResizeForShuffleInputAndOutput(c.caller.isAqe()))
injector.injectPostTransform(_ => RemoveTransitions)
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
Expand Down Expand Up @@ -246,6 +246,8 @@ object VeloxRuleApi {
c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), Set(VeloxBatchType)))
injector.injectPostTransform(
c => GlutenAutoAdjustStageResourceProfile(new GlutenConfig(c.sqlConf), c.session))
injector.injectPostTransform(
c => AdjustStageExecutionMode(new GlutenConfig(c.sqlConf), c.session, c.caller.isAqe()))
injector.injectPostTransform(
c => GlutenFallbackReporter(new GlutenConfig(c.sqlConf), c.session))
injector.injectPostTransform(_ => RemoveFallbackTagRule())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {

def cudfBatchSize: Int = getConf(CUDF_BATCH_SIZE)

def cudfShuffleReaderMemoryPct: Double = getConf(CUDF_SHUFFLE_READER_MEMORY_PCT)

def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)

def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
Expand Down Expand Up @@ -636,6 +638,12 @@ object VeloxConfig extends ConfigRegistry {
.intConf
.createWithDefault(50)

val CUDF_SHUFFLE_READER_MEMORY_PCT =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.shuffle.readerMemoryPct")
.doc("The percentage of CPU memory to use for shuffle reader to buffer cudf batches.")
.doubleConf
.createWithDefault(0.5)

val CUDF_ENABLE_TABLE_SCAN =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan")
.doc("Enable cudf table scan")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.velox.VeloxBatchType
import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.ClosableIterator
import org.apache.gluten.utils.VeloxBatchResizer

import org.apache.spark.SparkEnv
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{GPUStageMode, SparkPlan, StageExecutionMode}
import org.apache.spark.sql.vectorized.ColumnarBatch

import scala.collection.JavaConverters._
Expand All @@ -34,15 +37,29 @@ import scala.collection.JavaConverters._
*/
case class VeloxResizeBatchesExec(
override val child: SparkPlan,
minOutputBatchSize: Int,
maxOutputBatchSize: Int,
preferredBatchBytes: Long)
executionMode: Option[StageExecutionMode] = None)
extends ColumnarToColumnarExec(child) {

override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
VeloxBatchResizer
.create(minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes, in.asJava)
.asScala
val veloxConfig = VeloxConfig.get
executionMode match {
case Some(GPUStageMode) =>
VeloxBatchResizer
.createCudf(
veloxConfig.cudfBatchSize,
Math
.floor(SparkMemoryUtil.availableOffHeapPerTask(
SparkEnv.get.conf) * veloxConfig.cudfShuffleReaderMemoryPct)
.toLong,
in.asJava
)
.asScala
case _ =>
val range = veloxConfig.veloxResizeBatchesShuffleInputOutputRange
VeloxBatchResizer
.create(range.min, range.max, veloxConfig.veloxPreferredBatchBytes, in.asJava)
.asScala
}
}

override protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,78 +28,65 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
* Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to make the batch sizes in
* good shape.
*/
case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] {
case class AppendBatchResizeForShuffleInputAndOutput(isAdaptiveContext: Boolean)
extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
if (VeloxConfig.get.enableColumnarCudf) {
return plan
}
val resizeBatchesShuffleInputEnabled = VeloxConfig.get.veloxResizeBatchesShuffleInput
val resizeBatchesShuffleOutputEnabled = VeloxConfig.get.veloxResizeBatchesShuffleOutput
if (!resizeBatchesShuffleInputEnabled && !resizeBatchesShuffleOutputEnabled) {
return plan
}

val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
val preferredBatchBytes = VeloxConfig.get.veloxPreferredBatchBytes
plan.transformUp {
val newPlan = if (resizeBatchesShuffleInputEnabled) {
addResizeBatchesForShuffleInput(plan)
} else {
plan
}

val resultPlan = if (resizeBatchesShuffleOutputEnabled) {
addResizeBatchesForShuffleOutput(newPlan)
} else {
newPlan
}

resultPlan
}

private def addResizeBatchesForShuffleInput(plan: SparkPlan): SparkPlan = {
plan match {
case shuffle: ColumnarShuffleExchangeExec
if resizeBatchesShuffleInputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleInput =>
if shuffle.shuffleWriterType.requiresResizingShuffleInput =>
val appendBatches =
VeloxResizeBatchesExec(shuffle.child, range.min, range.max, preferredBatchBytes)
VeloxResizeBatchesExec(shuffle.child)
shuffle.withNewChildren(Seq(appendBatches))
case a @ AQEShuffleReadExec(
ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _),
_)
if resizeBatchesShuffleOutputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(a, range.min, range.max, preferredBatchBytes)
case a @ AQEShuffleReadExec(
ShuffleQueryStageExec(
_,
ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec),
_),
_)
if resizeBatchesShuffleOutputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(a, range.min, range.max, preferredBatchBytes)
// Since it's transformed in a bottom to up order, so we may first encounter
// ShuffeQueryStageExec, which is transformed to VeloxResizeBatchesExec(ShuffeQueryStageExec),
// then we see AQEShuffleReadExec
case a @ AQEShuffleReadExec(
VeloxResizeBatchesExec(
s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _),
_,
_,
_),
_)
if resizeBatchesShuffleOutputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max, preferredBatchBytes)
case a @ AQEShuffleReadExec(
VeloxResizeBatchesExec(
s @ ShuffleQueryStageExec(
_,
ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec),
_),
_,
_,
_),
_)
if resizeBatchesShuffleOutputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max, preferredBatchBytes)
case s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _)
if resizeBatchesShuffleOutputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(s, range.min, range.max, preferredBatchBytes)
case s @ ShuffleQueryStageExec(
_,
ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec),
_)
if resizeBatchesShuffleOutputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(s, range.min, range.max, preferredBatchBytes)
case other =>
other.withNewChildren(other.children.map(addResizeBatchesForShuffleInput))
}
}

private def addResizeBatchesForShuffleOutput(plan: SparkPlan): SparkPlan = {
plan match {
case shuffle: ColumnarShuffleExchangeExec
if !isAdaptiveContext && shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(shuffle)
case reused @ ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec)
if !isAdaptiveContext && shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(reused)
case s: ShuffleQueryStageExec if requiresResizingShuffleOutput(s) =>
VeloxResizeBatchesExec(s)
case a @ AQEShuffleReadExec(s @ ShuffleQueryStageExec(_, _, _), _)
if requiresResizingShuffleOutput(s) =>
VeloxResizeBatchesExec(a)
case other =>
other.withNewChildren(other.children.map(addResizeBatchesForShuffleOutput))
}
}

private def requiresResizingShuffleOutput(s: ShuffleQueryStageExec): Boolean = {
s.shuffle match {
case c: ColumnarShuffleExchangeExec if c.shuffleWriterType.requiresResizingShuffleOutput =>
true
case _ => false
}
}
}
Loading
Loading