diff --git a/backends-velox/src/main/java/org/apache/gluten/utils/GpuBufferBatchResizerJniWrapper.java b/backends-velox/src/main/java/org/apache/gluten/utils/GpuBufferBatchResizerJniWrapper.java index 01fb7bf390a9..23c23ee03e9e 100644 --- a/backends-velox/src/main/java/org/apache/gluten/utils/GpuBufferBatchResizerJniWrapper.java +++ b/backends-velox/src/main/java/org/apache/gluten/utils/GpuBufferBatchResizerJniWrapper.java @@ -36,5 +36,5 @@ public long rtHandle() { return runtime.getHandle(); } - public native long create(int minOutputBatchSize, ColumnarBatchInIterator itr); + public native long create(int minOutputBatchSize, long memLimit, ColumnarBatchInIterator itr); } diff --git a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java index fec8e059789a..1e26db67d40c 100644 --- a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java +++ b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java @@ -43,4 +43,18 @@ public static ColumnarBatchOutIterator create( new ColumnarBatchInIterator(BackendsApiManager.getBackendName(), in)); return new ColumnarBatchOutIterator(runtime, outHandle); } + + public static ColumnarBatchOutIterator createCudf( + int minOutputBatchSize, long memLimit, Iterator in) { + final Runtime runtime = + Runtimes.contextInstance( + BackendsApiManager.getBackendName(), "GpuBufferColumnarBatchResizer"); + long outHandle = + GpuBufferBatchResizerJniWrapper.create(runtime) + .create( + minOutputBatchSize, + memLimit, + new ColumnarBatchInIterator(BackendsApiManager.getBackendName(), in)); + return new ColumnarBatchOutIterator(runtime, outHandle); + } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index fcc00389d538..d41547214f86 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -110,8 +110,7 @@ object VeloxRuleApi { offloads)) // Legacy: Post-transform rules. - injector.injectPostTransform(_ => AppendBatchResizeForShuffleInputAndOutput()) - injector.injectPostTransform(_ => GpuBufferBatchResizeForShuffleInputOutput()) + injector.injectPostTransform(c => AppendBatchResizeForShuffleInputAndOutput(c.caller.isAqe())) injector.injectPostTransform(_ => UnionTransformerRule()) injector.injectPostTransform(c => PartialProjectRule.apply(c.session)) injector.injectPostTransform(_ => PartialGenerateRule()) @@ -148,6 +147,8 @@ object VeloxRuleApi { c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), Set(VeloxBatchType))) injector.injectFinal( c => GlutenAutoAdjustStageResourceProfile(new GlutenConfig(c.sqlConf), c.session)) + injector.injectFinal( + c => AdjustStageExecutionMode(new GlutenConfig(c.sqlConf), c.session, c.caller.isAqe())) injector.injectFinal(c => GlutenFallbackReporter(new GlutenConfig(c.sqlConf), c.session)) injector.injectFinal(_ => RemoveFallbackTagRule()) } @@ -215,8 +216,7 @@ object VeloxRuleApi { // Gluten RAS: Post rules. injector.injectPostTransform(_ => DistinguishIdenticalScans) - injector.injectPostTransform(_ => AppendBatchResizeForShuffleInputAndOutput()) - injector.injectPostTransform(_ => GpuBufferBatchResizeForShuffleInputOutput()) + injector.injectPostTransform(c => AppendBatchResizeForShuffleInputAndOutput(c.caller.isAqe())) injector.injectPostTransform(_ => RemoveTransitions) injector.injectPostTransform(_ => UnionTransformerRule()) injector.injectPostTransform(c => PartialProjectRule.apply(c.session)) @@ -246,6 +246,8 @@ object VeloxRuleApi { c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), Set(VeloxBatchType))) injector.injectPostTransform( c => GlutenAutoAdjustStageResourceProfile(new GlutenConfig(c.sqlConf), c.session)) + injector.injectPostTransform( + c => AdjustStageExecutionMode(new GlutenConfig(c.sqlConf), c.session, c.caller.isAqe())) injector.injectPostTransform( c => GlutenFallbackReporter(new GlutenConfig(c.sqlConf), c.session)) injector.injectPostTransform(_ => RemoveFallbackTagRule()) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 03781d2fb5e9..b8501290fddd 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -82,6 +82,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def cudfBatchSize: Int = getConf(CUDF_BATCH_SIZE) + def cudfShuffleReaderMemoryPct: Double = getConf(CUDF_SHUFFLE_READER_MEMORY_PCT) + def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES) def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES) @@ -636,6 +638,12 @@ object VeloxConfig extends ConfigRegistry { .intConf .createWithDefault(50) + val CUDF_SHUFFLE_READER_MEMORY_PCT = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.shuffle.readerMemoryPct") + .doc("The percentage of CPU memory to use for shuffle reader to buffer cudf batches.") + .doubleConf + .createWithDefault(0.5) + val CUDF_ENABLE_TABLE_SCAN = buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan") .doc("Enable cudf table scan") diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/GpuResizeBufferColumnarBatchExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/GpuResizeBufferColumnarBatchExec.scala deleted file mode 100644 index 84768d71bc19..000000000000 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/GpuResizeBufferColumnarBatchExec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.execution - -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.backendsapi.velox.VeloxBatchType -import org.apache.gluten.extension.columnar.transition.Convention -import org.apache.gluten.iterator.ClosableIterator -import org.apache.gluten.runtime.Runtimes -import org.apache.gluten.utils.GpuBufferBatchResizerJniWrapper -import org.apache.gluten.vectorized.{ColumnarBatchInIterator, ColumnarBatchOutIterator} - -import org.apache.spark.sql.catalyst.expressions.SortOrder -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.vectorized.ColumnarBatch - -import scala.collection.JavaConverters._ - -/** - * An operator to resize input BufferBatches generated by shuffle reader, and convert to cudf table. - */ -case class GpuResizeBufferColumnarBatchExec(override val child: SparkPlan, minOutputBatchSize: Int) - extends ColumnarToColumnarExec(child) { - - override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = { - val runtime = - Runtimes.contextInstance(BackendsApiManager.getBackendName, "GpuBufferColumnarBatchResizer") - val outHandle = GpuBufferBatchResizerJniWrapper - .create(runtime) - .create( - minOutputBatchSize, - new ColumnarBatchInIterator(BackendsApiManager.getBackendName, in.asJava)) - new ColumnarBatchOutIterator(runtime, outHandle).asScala - } - - override protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = { - out.asJava match { - case c: ClosableIterator[ColumnarBatch] => c.close() - case _ => - } - } - - override protected def needRecyclePayload: Boolean = true - - override def outputPartitioning: Partitioning = child.outputPartitioning - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = - copy(child = newChild) - - override def batchType(): Convention.BatchType = VeloxBatchType - - override def rowType0(): Convention.RowType = Convention.RowType.None -} diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala index 9febad0e94ef..3dd079cc4291 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala @@ -17,13 +17,16 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.velox.VeloxBatchType +import org.apache.gluten.config.VeloxConfig import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.iterator.ClosableIterator import org.apache.gluten.utils.VeloxBatchResizer +import org.apache.spark.SparkEnv +import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{GPUStageMode, SparkPlan, StageExecutionMode} import org.apache.spark.sql.vectorized.ColumnarBatch import scala.collection.JavaConverters._ @@ -34,15 +37,29 @@ import scala.collection.JavaConverters._ */ case class VeloxResizeBatchesExec( override val child: SparkPlan, - minOutputBatchSize: Int, - maxOutputBatchSize: Int, - preferredBatchBytes: Long) + executionMode: Option[StageExecutionMode] = None) extends ColumnarToColumnarExec(child) { override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = { - VeloxBatchResizer - .create(minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes, in.asJava) - .asScala + val veloxConfig = VeloxConfig.get + executionMode match { + case Some(GPUStageMode) => + VeloxBatchResizer + .createCudf( + veloxConfig.cudfBatchSize, + Math + .floor(SparkMemoryUtil.availableOffHeapPerTask( + SparkEnv.get.conf) * veloxConfig.cudfShuffleReaderMemoryPct) + .toLong, + in.asJava + ) + .asScala + case _ => + val range = veloxConfig.veloxResizeBatchesShuffleInputOutputRange + VeloxBatchResizer + .create(range.min, range.max, veloxConfig.veloxPreferredBatchBytes, in.asJava) + .asScala + } } override protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala index fcce64d65222..83c740a38988 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala @@ -28,78 +28,65 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec * Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to make the batch sizes in * good shape. */ -case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] { +case class AppendBatchResizeForShuffleInputAndOutput(isAdaptiveContext: Boolean) + extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - if (VeloxConfig.get.enableColumnarCudf) { - return plan - } val resizeBatchesShuffleInputEnabled = VeloxConfig.get.veloxResizeBatchesShuffleInput val resizeBatchesShuffleOutputEnabled = VeloxConfig.get.veloxResizeBatchesShuffleOutput if (!resizeBatchesShuffleInputEnabled && !resizeBatchesShuffleOutputEnabled) { return plan } - val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange - val preferredBatchBytes = VeloxConfig.get.veloxPreferredBatchBytes - plan.transformUp { + val newPlan = if (resizeBatchesShuffleInputEnabled) { + addResizeBatchesForShuffleInput(plan) + } else { + plan + } + + val resultPlan = if (resizeBatchesShuffleOutputEnabled) { + addResizeBatchesForShuffleOutput(newPlan) + } else { + newPlan + } + + resultPlan + } + + private def addResizeBatchesForShuffleInput(plan: SparkPlan): SparkPlan = { + plan match { case shuffle: ColumnarShuffleExchangeExec - if resizeBatchesShuffleInputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleInput => + if shuffle.shuffleWriterType.requiresResizingShuffleInput => val appendBatches = - VeloxResizeBatchesExec(shuffle.child, range.min, range.max, preferredBatchBytes) + VeloxResizeBatchesExec(shuffle.child) shuffle.withNewChildren(Seq(appendBatches)) - case a @ AQEShuffleReadExec( - ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(a, range.min, range.max, preferredBatchBytes) - case a @ AQEShuffleReadExec( - ShuffleQueryStageExec( - _, - ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec), - _), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(a, range.min, range.max, preferredBatchBytes) - // Since it's transformed in a bottom to up order, so we may first encounter - // ShuffeQueryStageExec, which is transformed to VeloxResizeBatchesExec(ShuffeQueryStageExec), - // then we see AQEShuffleReadExec - case a @ AQEShuffleReadExec( - VeloxResizeBatchesExec( - s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _), - _, - _, - _), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max, preferredBatchBytes) - case a @ AQEShuffleReadExec( - VeloxResizeBatchesExec( - s @ ShuffleQueryStageExec( - _, - ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec), - _), - _, - _, - _), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max, preferredBatchBytes) - case s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(s, range.min, range.max, preferredBatchBytes) - case s @ ShuffleQueryStageExec( - _, - ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(s, range.min, range.max, preferredBatchBytes) + case other => + other.withNewChildren(other.children.map(addResizeBatchesForShuffleInput)) + } + } + + private def addResizeBatchesForShuffleOutput(plan: SparkPlan): SparkPlan = { + plan match { + case shuffle: ColumnarShuffleExchangeExec + if !isAdaptiveContext && shuffle.shuffleWriterType.requiresResizingShuffleOutput => + VeloxResizeBatchesExec(shuffle) + case reused @ ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec) + if !isAdaptiveContext && shuffle.shuffleWriterType.requiresResizingShuffleOutput => + VeloxResizeBatchesExec(reused) + case s: ShuffleQueryStageExec if requiresResizingShuffleOutput(s) => + VeloxResizeBatchesExec(s) + case a @ AQEShuffleReadExec(s @ ShuffleQueryStageExec(_, _, _), _) + if requiresResizingShuffleOutput(s) => + VeloxResizeBatchesExec(a) + case other => + other.withNewChildren(other.children.map(addResizeBatchesForShuffleOutput)) + } + } + + private def requiresResizingShuffleOutput(s: ShuffleQueryStageExec): Boolean = { + s.shuffle match { + case c: ColumnarShuffleExchangeExec if c.shuffleWriterType.requiresResizingShuffleOutput => + true + case _ => false } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala index 14029cf28ff9..31c234529b32 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/CudfNodeValidationRule.scala @@ -18,12 +18,13 @@ package org.apache.gluten.extension import org.apache.gluten.config.{GlutenConfig, VeloxConfig} import org.apache.gluten.cudf.VeloxCudfPlanValidatorJniWrapper -import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ -import org.apache.gluten.extension.CudfNodeValidationRule.{createGPUColumnarExchange, setTagForWholeStageTransformer} +import org.apache.gluten.extension.CudfNodeValidationRule.{setStageExecutionModeForShuffle, setTagForWholeStageTransformer} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, GPUColumnarShuffleExchangeExec, SparkPlan} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.SparkTestUtil // Add the node name prefix 'Cudf' to GlutenPlan when can offload to cudf case class CudfNodeValidationRule(glutenConf: GlutenConfig) extends Rule[SparkPlan] { @@ -32,23 +33,18 @@ case class CudfNodeValidationRule(glutenConf: GlutenConfig) extends Rule[SparkPl if (!glutenConf.enableColumnarCudf) { return plan } - val transformedPlan = plan.transformUp { - case shuffle @ ColumnarShuffleExchangeExec( - _, - VeloxResizeBatchesExec(w: WholeStageTransformer, _, _, _), - _, - _, - _) => - setTagForWholeStageTransformer(w) - createGPUColumnarExchange(shuffle) - case shuffle @ ColumnarShuffleExchangeExec(_, w: WholeStageTransformer, _, _, _) => - setTagForWholeStageTransformer(w) - createGPUColumnarExchange(shuffle) + val taggedPlan = plan.transformUp { case transformer: WholeStageTransformer => setTagForWholeStageTransformer(transformer) transformer } - transformedPlan + + if (!SQLConf.get.adaptiveExecutionEnabled) { + // Set mapper and reducer stage execution mode for Shuffle. + setStageExecutionModeForShuffle(taggedPlan, supportsCudf = false)._1 + } else { + taggedPlan + } } } @@ -81,17 +77,67 @@ object CudfNodeValidationRule { } } - def createGPUColumnarExchange(shuffle: ColumnarShuffleExchangeExec): SparkPlan = { - val exec = GPUColumnarShuffleExchangeExec( - shuffle.outputPartitioning, - shuffle.child, - shuffle.shuffleOrigin, - shuffle.projectOutputAttributes, - shuffle.advisoryPartitionSize) - val res = exec.doValidate() - if (!res.ok()) { - throw new GlutenNotSupportException(res.reason()) + // supportsCudf is the first parent WholeStageTransformer's `isCudf` for plan. + // For WholeStageTransformer, it calls the child's setStageExecutionModeForShuffle with its + // `isCudf` for the child shuffle reader, + // and returns its `isCudf` for the parent shuffle writer. + def setStageExecutionModeForShuffle( + plan: SparkPlan, + supportsCudf: Boolean): (SparkPlan, Boolean) = { + def getStageExecutionMode(supportsCudf: Boolean): StageExecutionMode = { + if (supportsCudf) { + if (SparkTestUtil.isTesting) { + MockGPUStageMode + } else { + GPUStageMode + } + } else { + CPUStageMode + } + } + + plan match { + case shuffle @ ColumnarShuffleExchangeExec( + _, + VeloxResizeBatchesExec(child, _), + _, + _, + _, + _, + _) => + // `supportsCudf` is not decided yet for shuffle writer. + val (newChild, mapperStageSupportsCudf) = + setStageExecutionModeForShuffle(child, false) + val mapperStageMode = getStageExecutionMode(mapperStageSupportsCudf) + val reducerStageMode = getStageExecutionMode(supportsCudf) + ( + shuffle.copy( + child = VeloxResizeBatchesExec(newChild, Some(mapperStageMode)), + mapperStageMode = Some(mapperStageMode), + reducerStageMode = Some(reducerStageMode)), + supportsCudf) + case shuffle: ColumnarShuffleExchangeExec => + val (newChild, mapperStageSupportsCudf) = + setStageExecutionModeForShuffle(shuffle.child, false) + val mapperStageMode = getStageExecutionMode(mapperStageSupportsCudf) + val reducerStageMode = getStageExecutionMode(supportsCudf) + ( + shuffle.copy( + child = newChild, + mapperStageMode = Some(mapperStageMode), + reducerStageMode = Some(reducerStageMode)), + supportsCudf) + case resizeBatches: VeloxResizeBatchesExec => + val (newChild, _) = + setStageExecutionModeForShuffle(resizeBatches.child, supportsCudf) + (VeloxResizeBatchesExec(newChild, Some(getStageExecutionMode(supportsCudf))), supportsCudf) + case wst: WholeStageTransformer => + val (newChild, _) = setStageExecutionModeForShuffle(wst.child, wst.isCudf) + (wst.withNewChildren(Seq(newChild)), wst.isCudf) + case other => + val newChildren = + other.children.map(child => setStageExecutionModeForShuffle(child, supportsCudf)._1) + (other.withNewChildren(newChildren), supportsCudf) } - exec } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/GpuBufferBatchResizeForShuffleInputOutput.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/GpuBufferBatchResizeForShuffleInputOutput.scala deleted file mode 100644 index bb30dc6c2726..000000000000 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/GpuBufferBatchResizeForShuffleInputOutput.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.extension - -import org.apache.gluten.config.{HashShuffleWriterType, VeloxConfig} -import org.apache.gluten.execution.{GpuResizeBufferColumnarBatchExec, VeloxResizeBatchesExec} - -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, ColumnarShuffleExchangeExecBase, SparkPlan} -import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec} -import org.apache.spark.sql.execution.exchange.ReusedExchangeExec - -/** - * Try to append [[GpuBufferBatchResizeForShuffleInputOutput]] for shuffle input and output to make - * the batch sizes in good shape. - */ -case class GpuBufferBatchResizeForShuffleInputOutput() extends Rule[SparkPlan] { - override def apply(plan: SparkPlan): SparkPlan = { - if (!VeloxConfig.get.enableColumnarCudf) { - return plan - } - val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange - val preferredBatchBytes = VeloxConfig.get.veloxPreferredBatchBytes - val batchSize = VeloxConfig.get.cudfBatchSize - plan.transformUp { - case shuffle: ColumnarShuffleExchangeExec - if shuffle.shuffleWriterType == HashShuffleWriterType && - VeloxConfig.get.veloxResizeBatchesShuffleInput => - val appendBatches = - VeloxResizeBatchesExec(shuffle.child, range.min, range.max, preferredBatchBytes) - shuffle.withNewChildren(Seq(appendBatches)) - case a @ AQEShuffleReadExec( - ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _), - _) => - GpuResizeBufferColumnarBatchExec(a, batchSize) - case a @ AQEShuffleReadExec( - ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase), _), - _) => - GpuResizeBufferColumnarBatchExec(a, batchSize) - // Since it's transformed in a bottom to up order, so we may first encounter - // ShuffeQueryStageExec, which is transformed to VeloxResizeBatchesExec(ShuffeQueryStageExec), - // then we see AQEShuffleReadExec - case a @ AQEShuffleReadExec( - GpuResizeBufferColumnarBatchExec( - s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _), - _), - _) => - GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize) - case a @ AQEShuffleReadExec( - GpuResizeBufferColumnarBatchExec( - s @ ShuffleQueryStageExec( - _, - ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase), - _), - _), - _) => - GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize) - case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _) => - GpuResizeBufferColumnarBatchExec(s, batchSize) - case s @ ShuffleQueryStageExec( - _, - ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase), - _) => - GpuResizeBufferColumnarBatchExec(s, batchSize) - } - } -} diff --git a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala index 3b5fce63f8c2..e6c5ed97a519 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.shuffle.GlutenShuffleUtils +import org.apache.spark.sql.execution.{CPUStageMode, StageExecutionMode} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -131,16 +132,20 @@ private class ColumnarBatchSerializerInstanceImpl( shuffleReaderHandle } + // TODO: remove this method for columnar shuffle. override def deserializeStream(in: InputStream): DeserializationStream = { new TaskDeserializationStream(Iterator((null, in))) } override def deserializeStreams( - streams: Iterator[(BlockId, InputStream)]): DeserializationStream = { - new TaskDeserializationStream(streams) + streams: Iterator[(BlockId, InputStream)], + executionMode: StageExecutionMode): DeserializationStream = { + new TaskDeserializationStream(streams, executionMode) } - private class TaskDeserializationStream(streams: Iterator[(BlockId, InputStream)]) + private class TaskDeserializationStream( + streams: Iterator[(BlockId, InputStream)], + executionMode: StageExecutionMode = CPUStageMode) extends DeserializationStream with TaskResource { private val streamReader = ShuffleStreamReader(streams) @@ -148,7 +153,7 @@ private class ColumnarBatchSerializerInstanceImpl( private val wrappedOut: ClosableIterator[ColumnarBatch] = new ColumnarBatchOutIterator( runtime, jniWrapper - .read(shuffleReaderHandle, streamReader)) + .read(shuffleReaderHandle, streamReader, executionMode.id)) private var cb: ColumnarBatch = _ diff --git a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializerInstance.scala b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializerInstance.scala index 205d38b52882..89e4986cdae0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializerInstance.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializerInstance.scala @@ -17,6 +17,7 @@ package org.apache.gluten.vectorized import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} +import org.apache.spark.sql.execution.{CPUStageMode, StageExecutionMode} import org.apache.spark.storage.BlockId import java.io.{InputStream, OutputStream} @@ -27,7 +28,9 @@ import scala.reflect.ClassTag abstract class ColumnarBatchSerializerInstance extends SerializerInstance { /** Deserialize the streams of ColumnarBatches. */ - def deserializeStreams(streams: Iterator[(BlockId, InputStream)]): DeserializationStream + def deserializeStreams( + streams: Iterator[(BlockId, InputStream)], + executionMode: StageExecutionMode = CPUStageMode): DeserializationStream override def serialize[T: ClassTag](t: T): ByteBuffer = { throw new UnsupportedOperationException @@ -44,4 +47,8 @@ abstract class ColumnarBatchSerializerInstance extends SerializerInstance { override def serializeStream(s: OutputStream): SerializationStream = { throw new UnsupportedOperationException } + + override def deserializeStream(s: InputStream): DeserializationStream = { + throw new UnsupportedOperationException + } } diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleReader.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleReader.scala index 1e514cf9f1db..6126a57e07bd 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleReader.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleReader.scala @@ -22,6 +22,7 @@ import org.apache.spark._ import org.apache.spark.internal.{config, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.SerializerManager +import org.apache.spark.sql.execution.StageExecutionMode import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, ShuffleBlockFetcherIterator} import org.apache.spark.util.CompletionIterator @@ -33,11 +34,12 @@ class ColumnarShuffleReader[K, C]( blocksByAddress: Iterator[(BlockManagerId, collection.Seq[(BlockId, Long, Int)])], context: TaskContext, readMetrics: ShuffleReadMetricsReporter, + executionMode: StageExecutionMode, serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker, - shouldBatchFetch: Boolean = false) - extends ShuffleReader[K, C] + shouldBatchFetch: Boolean = false +) extends ShuffleReader[K, C] with Logging { private val dep = handle.dependency @@ -97,7 +99,7 @@ class ColumnarShuffleReader[K, C]( columnarDep.serializer .newInstance() .asInstanceOf[ColumnarBatchSerializerInstance] - .deserializeStreams(wrappedStreams) + .deserializeStreams(wrappedStreams, executionMode) .asKeyValueIterator case _ => val serializerInstance = dep.serializer.newInstance() diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index ef9877011b5d..33bef45c8b95 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.config.{SHUFFLE_COMPRESS, SHUFFLE_DISK_WRITE_BU import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.scheduler.MapStatus import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil, Utils} +import org.apache.spark.util.{SparkDirectoryUtil, Utils} import java.io.IOException @@ -122,12 +122,6 @@ class ColumnarShuffleWriter[K, V]( private val taskContext: TaskContext = TaskContext.get() - private def availableOffHeapPerTask(): Long = { - val perTask = - SparkMemoryUtil.getCurrentAvailableOffHeapMemory / SparkResourceUtil.getTaskSlots(conf) - perTask - } - @throws[IOException] def internalWrite(records: Iterator[Product2[K, V]]): Unit = { if (!records.hasNext) { @@ -218,7 +212,7 @@ class ColumnarShuffleWriter[K, V]( nativeShuffleWriter, rows, columnarBatchHandle, - availableOffHeapPerTask()) + SparkMemoryUtil.availableOffHeapPerTask(conf)) dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(rows) dep.metrics("inputBatches").add(1) diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala index 6293d4e76476..420f6c7e7ff9 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala @@ -39,7 +39,8 @@ object ShuffleUtil { parameters.blocksByAddress, parameters.context, parameters.readMetrics, - ColumnarShuffleManager.bypassDecompressionSerializerManger, + parameters.executionMode, + serializerManager = ColumnarShuffleManager.bypassDecompressionSerializerManger, shouldBatchFetch = parameters.shouldBatchFetch ) } else { diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/AdjustStageExecutionMode.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/AdjustStageExecutionMode.scala new file mode 100644 index 000000000000..7cf59b2c4519 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/AdjustStageExecutionMode.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.exception.GlutenException +import org.apache.gluten.execution.{CudfTag, TransformSupport, VeloxResizeBatchesExec, WholeStageTransformer} +import org.apache.gluten.logging.LogLevelUtil + +import org.apache.spark.SparkConf +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceRequest} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.execution.AdjustStageExecutionMode.{adjustExecutionMode, unsetTag} +import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ColumnarAQEShuffleReadExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.joins.BaseJoinExec +import org.apache.spark.sql.utils.GlutenResourceProfileUtil +import org.apache.spark.util.SparkTestUtil + +import scala.collection.mutable + +// For ShuffleStage, the resource profile is set to ColumnarShuffleExchangeExec.inputColumnarRDD. +@Experimental +case class AdjustStageExecutionMode( + glutenConf: GlutenConfig, + spark: SparkSession, + isAdaptiveContext: Boolean) + extends Rule[SparkPlan] + with LogLevelUtil { + + override def apply(plan: SparkPlan): SparkPlan = { + if (!isAdaptiveContext) { + return plan + } + + val sparkConf = spark.sparkContext.getConf + + GlutenResourceProfileUtil.restoreDefaultResourceSetting(sparkConf) + + if (glutenConf.enableColumnarCudf) { + return adjustExecutionModeForGPU(plan, sparkConf) + } + + plan + } + + private def adjustExecutionModeForGPU(plan: SparkPlan, sparkConf: SparkConf): SparkPlan = { + val planNodes = GlutenResourceProfileUtil.collectStagePlan(plan) + if (planNodes.isEmpty) { + return plan + } + log.info(s"detailPlanNodes ${planNodes.map(_.nodeName).mkString("Array(", ", ", ")")}") + + val transformers = plan.collect { case t: WholeStageTransformer => t } + if (transformers.isEmpty) { + return plan + } + if (transformers.size > 1) { + logWarning(s"Not offloading GPU because multiple WholeStageTransformer exist. Remove tags.") + unsetTag(plan, CudfTag.CudfTag) + return plan + } + + val transformer = transformers.head + if (transformer.isCudf) { + if (glutenConf.autoAdjustStageExecutionMode) { + // TODO: change to flexible config. + val offloadGpu = planNodes.exists(_.isInstanceOf[BaseJoinExec]) + if (!offloadGpu) { + logWarning(s"Not offloading GPU because missing offload condition. Remove tag.") + unsetTag(plan, CudfTag.CudfTag) + return plan + } + } + + val gpuStageMode = if (SparkTestUtil.isTesting) { + // Only unset for transformer. + transformer.unsetTagValue(CudfTag.CudfTag) + MockGPUStageMode + } else { + GPUStageMode + } + + logWarning(s"Adjust resource profile to use GPU.") + val rpManager = spark.sparkContext.resourceProfileManager + val defaultRP = spark.sparkContext.resourceProfileManager.defaultResourceProfile + + // initial resource profile config as default resource profile + val taskResource = mutable.Map.empty[String, TaskResourceRequest] ++= defaultRP.taskResources + val executorResource = + mutable.Map.empty[String, ExecutorResourceRequest] ++= defaultRP.executorResources + + // The gpu task resource limits how many tasks can be launched in one executor. + // TODO: Make gpu task resource configurable. + taskResource.put("gpu", new TaskResourceRequest("gpu", 0.1)) + executorResource.put("gpu", new ExecutorResourceRequest("gpu", 1)) + executorResource.remove("cpu") + val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap) + val newPlan = GlutenResourceProfileUtil.applyNewResourceProfileIfPossible( + adjustExecutionMode(plan, gpuStageMode), + newRP, + rpManager, + sparkConf) + if ( + !newPlan.isInstanceOf[ApplyResourceProfileExec] && !newPlan.children.exists( + _.isInstanceOf[ApplyResourceProfileExec]) + ) { + throw new GlutenException(s"Failed to apply new resource profile $newRP") + } + newPlan + } else { + plan + } + } +} + +object AdjustStageExecutionMode extends Logging { + def adjustExecutionMode(plan: SparkPlan, stageExecutionMode: StageExecutionMode): SparkPlan = { + plan match { + case aqeReader: AQEShuffleReadExec => + logWarning(s"Adjust AQE shuffle read to ${stageExecutionMode.name}.") + ColumnarAQEShuffleReadExec( + aqeReader.child, + aqeReader.partitionSpecs, + stageExecutionMode, + isWrapper = false) + case queryStageExec: ShuffleQueryStageExec => + // If no AQEShuffleReadExec is created for the shuffle query stage, + // create ColumnarAQEShuffleReadExec here with default partition specs and set the stage + // execution mode. + val partitionSpecs = + Array.tabulate(queryStageExec.shuffle.numPartitions)( + i => CoalescedPartitionSpec(i, i + 1)) + ColumnarAQEShuffleReadExec( + queryStageExec, + partitionSpecs, + stageExecutionMode, + isWrapper = true) + case shuffle: ColumnarShuffleExchangeExec => + logInfo(s"Adjust shuffle exchange to ${stageExecutionMode.name}.") + shuffle + .copy(mapperStageMode = Some(stageExecutionMode)) + .withNewChildren(Seq(adjustExecutionMode(shuffle.child, stageExecutionMode))) + case resizeBatches: VeloxResizeBatchesExec => + logInfo(s"Adjust VeloxResizeBatchesExec to ${stageExecutionMode.name}.") + VeloxResizeBatchesExec( + adjustExecutionMode(resizeBatches.child, stageExecutionMode), + Some(stageExecutionMode)) + case _ => + plan.withNewChildren(plan.children.map(adjustExecutionMode(_, stageExecutionMode))) + } + } + + def unsetTag[T](plan: SparkPlan, tag: TreeNodeTag[T]): Unit = { + plan.foreach { + case t: TransformSupport => + t.unsetTagValue(tag) + case _ => + } + } +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index e1a0fd98eeb5..a9e30e415a05 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -722,7 +722,6 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa val ops = collect(df.queryExecution.executedPlan) { case p: VeloxResizeBatchesExec => p } assert(ops.size == 1) val op = ops.head - assert(op.minOutputBatchSize == minBatchSize) val metrics = op.metrics assert(metrics("numInputRows").value == 27) assert(metrics("numInputBatches").value == 14) @@ -741,7 +740,6 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa val ops = collect(df.queryExecution.executedPlan) { case p: VeloxResizeBatchesExec => p } assert(ops.size == 1) val op = ops.head - assert(op.minOutputBatchSize == 1) val metrics = op.metrics assert(metrics("numInputRows").value == 27) assert(metrics("numInputBatches").value == 14) diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 187503f053dd..73f16780e1fe 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -997,7 +997,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jdouble splitBufferReallocThreshold, jlong partitionWriterHandle) { JNI_METHOD_START - + const auto ctx = getRuntime(env, wrapper); auto partitionWriter = ObjectStore::retrieve(partitionWriterHandle); @@ -1213,14 +1213,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe JNIEnv* env, jobject wrapper, jlong shuffleReaderHandle, - jobject jStreamReader) { + jobject jStreamReader, + jint executionMode) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto reader = ObjectStore::retrieve(shuffleReaderHandle); auto streamReader = std::make_shared(env, jStreamReader); - auto outItr = reader->read(streamReader); + ShuffleOutputType requiredOutputType = ShuffleReader::getOutputType(executionMode); + auto outItr = reader->read(streamReader, requiredOutputType); return ctx->saveObject(outItr); JNI_METHOD_END(kInvalidObjectHandle) } diff --git a/cpp/core/shuffle/ShuffleReader.h b/cpp/core/shuffle/ShuffleReader.h index 101865d2532d..f2452eded662 100644 --- a/cpp/core/shuffle/ShuffleReader.h +++ b/cpp/core/shuffle/ShuffleReader.h @@ -21,6 +21,8 @@ namespace gluten { +enum class ShuffleOutputType { kRowVector, kCudfTable }; + class StreamReader { public: virtual ~StreamReader() = default; @@ -33,11 +35,26 @@ class ShuffleReader { virtual ~ShuffleReader() = default; // FIXME iterator should be unique_ptr or un-copyable singleton - virtual std::shared_ptr read(const std::shared_ptr& streamReader) = 0; + virtual std::shared_ptr read( + const std::shared_ptr& streamReader, + ShuffleOutputType requiredOutputType) = 0; virtual int64_t getDecompressTime() const = 0; virtual int64_t getDeserializeTime() const = 0; + + static ShuffleOutputType getOutputType(int32_t executionModeId) { + switch (executionModeId) { + case 0: + // Cpu execution mode. + return ShuffleOutputType::kRowVector; + case 1: + // Gpu execution mode. + return ShuffleOutputType::kCudfTable; + default: + throw GlutenException("Unsupported execution mode id: " + std::to_string(executionModeId)); + } + } }; } // namespace gluten diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 616ba9bcfbde..66722ceabd5c 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -307,7 +307,7 @@ void runShuffle( GLUTEN_ASSIGN_OR_THROW(auto in, arrow::io::ReadableFile::Open(dataFile)); auto streamReader = std::make_shared(std::move(in)); // Read all partitions. - auto iter = reader->read(streamReader); + auto iter = reader->read(streamReader, ShuffleOutputType::kRowVector); while (iter->hasNext()) { // Read and discard. auto cb = iter->next(); @@ -433,8 +433,9 @@ auto BM_Generic = [](::benchmark::State& state, std::vector inputItersRaw; if (!dataFiles.empty()) { for (const auto& input : dataFiles) { - inputIters.push_back(FileReaderIterator::getInputIteratorFromFileReader( - readerType, input, FLAGS_batch_size, runtime->memoryManager()->getLeafMemoryPool())); + inputIters.push_back( + FileReaderIterator::getInputIteratorFromFileReader( + readerType, input, FLAGS_batch_size, runtime->memoryManager()->getLeafMemoryPool())); } std::transform( inputIters.begin(), diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index ad6f8947eb28..43cae91efc5c 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -450,6 +450,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_GpuBufferBatchResizerJniWra JNIEnv* env, jobject wrapper, jint minOutputBatchSize, + jlong memLimit, jobject jIter) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); @@ -457,7 +458,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_GpuBufferBatchResizerJniWra auto pool = dynamic_cast(ctx->memoryManager())->getLeafMemoryPool(); auto iter = makeJniColumnarBatchIterator(env, jIter, ctx); auto appender = std::make_shared( - std::make_unique(arrowPool, pool.get(), minOutputBatchSize, std::move(iter))); + std::make_unique(arrowPool, pool.get(), minOutputBatchSize, memLimit, std::move(iter))); return ctx->saveObject(appender); JNI_METHOD_END(kInvalidObjectHandle) } diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index bfe11ef3e9f2..dc85fb2d3d82 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -501,9 +501,9 @@ void VeloxHashShuffleReaderDeserializer::loadNextStream() { if (readerBufferSize_ > 0) { GLUTEN_ASSIGN_OR_THROW( - in_, - arrow::io::BufferedInputStream::Create( - readerBufferSize_, memoryManager_->defaultArrowMemoryPool(), std::move(in))); + in_, + arrow::io::BufferedInputStream::Create( + readerBufferSize_, memoryManager_->defaultArrowMemoryPool(), std::move(in))); } else { in_ = std::move(in); } @@ -811,22 +811,27 @@ VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory( } std::unique_ptr VeloxShuffleReaderDeserializerFactory::createDeserializer( - const std::shared_ptr& streamReader) { + const std::shared_ptr& streamReader, + ShuffleOutputType requiredOutputType) { switch (shuffleWriterType_) { + case ShuffleWriterType::kHashShuffle: case ShuffleWriterType::kGpuHashShuffle: + if (requiredOutputType == ShuffleOutputType::kCudfTable) { #ifdef GLUTEN_ENABLE_GPU - VELOX_CHECK(!hasComplexType_); - return std::make_unique( - streamReader, - schema_, - codec_, - rowType_, - readerBufferSize_, - memoryManager_, - deserializeTime_, - decompressTime_); + VELOX_CHECK(!hasComplexType_); + return std::make_unique( + streamReader, + schema_, + codec_, + rowType_, + readerBufferSize_, + memoryManager_, + deserializeTime_, + decompressTime_); +#else + throw GlutenException("GLUTEN_ENABLE_GPU is not set. GPU shuffle reader deserializer is not supported."); #endif - case ShuffleWriterType::kHashShuffle: + } return std::make_unique( streamReader, schema_, @@ -837,6 +842,8 @@ std::unique_ptr VeloxShuffleReaderDeserializerFactory::cr deserializeTime_, decompressTime_); case ShuffleWriterType::kSortShuffle: + GLUTEN_CHECK( + requiredOutputType == ShuffleOutputType::kRowVector, "Only RowVector output is supported for sort shuffle."); return std::make_unique( streamReader, schema_, @@ -849,6 +856,9 @@ std::unique_ptr VeloxShuffleReaderDeserializerFactory::cr deserializeTime_, decompressTime_); case ShuffleWriterType::kRssSortShuffle: + GLUTEN_CHECK( + requiredOutputType == ShuffleOutputType::kRowVector, + "Only RowVector output is supported for rss_sort shuffle."); return std::make_unique( streamReader, memoryManager_, rowType_, batchSize_, veloxCompressionType_, deserializeTime_); } @@ -896,8 +906,10 @@ void VeloxShuffleReaderDeserializerFactory::initFromSchema() { VeloxShuffleReader::VeloxShuffleReader(std::unique_ptr factory) : factory_(std::move(factory)) {} -std::shared_ptr VeloxShuffleReader::read(const std::shared_ptr& streamReader) { - return std::make_shared(factory_->createDeserializer(streamReader)); +std::shared_ptr VeloxShuffleReader::read( + const std::shared_ptr& streamReader, + ShuffleOutputType requiredOutputType) { + return std::make_shared(factory_->createDeserializer(streamReader, requiredOutputType)); } int64_t VeloxShuffleReader::getDecompressTime() const { diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index f30595dde456..473b3e2c907f 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -163,7 +163,9 @@ class VeloxShuffleReaderDeserializerFactory { VeloxMemoryManager* memoryManager, ShuffleWriterType shuffleWriterType); - std::unique_ptr createDeserializer(const std::shared_ptr& streamReader); + std::unique_ptr createDeserializer( + const std::shared_ptr& streamReader, + ShuffleOutputType requiredOutputType); int64_t getDecompressTime(); @@ -194,7 +196,9 @@ class VeloxShuffleReader final : public ShuffleReader { public: VeloxShuffleReader(std::unique_ptr factory); - std::shared_ptr read(const std::shared_ptr& streamReader) override; + std::shared_ptr read( + const std::shared_ptr& streamReader, + ShuffleOutputType requiredOutputType) override; int64_t getDecompressTime() const override; diff --git a/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc b/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc index 364d31e180cc..9ca4172c8478 100644 --- a/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc @@ -19,15 +19,15 @@ #include #include "config/GlutenConfig.h" +#include "memory/GpuBufferColumnarBatch.h" #include "shuffle/VeloxGpuShuffleWriter.h" #include "shuffle/VeloxHashShuffleWriter.h" #include "tests/VeloxShuffleWriterTestBase.h" #include "tests/utils/TestAllocationListener.h" #include "tests/utils/TestStreamReader.h" #include "tests/utils/TestUtils.h" -#include "utils/VeloxArrowUtils.h" -#include "memory/GpuBufferColumnarBatch.h" #include "utils/GpuBufferBatchResizer.h" +#include "utils/VeloxArrowUtils.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" #include "velox/experimental/cudf/vector/CudfVector.h" @@ -115,6 +115,7 @@ RowVectorPtr mergeBufferColumnarBatches(std::vectordefaultArrowMemoryPool(), getDefaultMemoryManager()->getLeafMemoryPool().get(), 1200, // output one batch + std::numeric_limits::max(), std::make_unique(bufferBatches)); auto cb = resizer.next(); auto batch = std::dynamic_pointer_cast(cb); @@ -313,14 +314,13 @@ class GpuVeloxShuffleWriterTest : public ::testing::TestWithParam(std::move(deserializerFactory)); - const auto iter = reader->read(std::make_shared(std::move(in))); + const auto iter = reader->read(std::make_shared(std::move(in)), ShuffleOutputType::kCudfTable); while (iter->hasNext()) { auto cb = std::dynamic_pointer_cast(iter->next()); VELOX_CHECK_NOT_NULL(cb); bufferBatches.emplace_back(cb); } - } void shuffleWriteReadMultiBlocks( @@ -362,7 +362,6 @@ class GpuVeloxShuffleWriterTest : public ::testing::TestWithParam getTestParams() { for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) { for (const bool useRadixSort : {true, false}) { for (const auto deserializerBufferSize : {static_cast(1L), kDefaultDeserializerBufferSize}) { - params.push_back(ShuffleTestParams{ - .shuffleWriterType = ShuffleWriterType::kSortShuffle, - .partitionWriterType = partitionWriterType, - .compressionType = compression, - .diskWriteBufferSize = diskWriteBufferSize, - .useRadixSort = useRadixSort, - .deserializerBufferSize = deserializerBufferSize}); + params.push_back( + ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kSortShuffle, + .partitionWriterType = partitionWriterType, + .compressionType = compression, + .diskWriteBufferSize = diskWriteBufferSize, + .useRadixSort = useRadixSort, + .deserializerBufferSize = deserializerBufferSize}); } } } } // Rss sort-based shuffle. - params.push_back(ShuffleTestParams{ - .shuffleWriterType = ShuffleWriterType::kRssSortShuffle, - .partitionWriterType = PartitionWriterType::kRss, - .compressionType = compression}); + params.push_back( + ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kRssSortShuffle, + .partitionWriterType = PartitionWriterType::kRss, + .compressionType = compression}); // Hash-based shuffle. for (const auto compressionThreshold : compressionThresholds) { // Local. for (const auto mergeBufferSize : mergeBufferSizes) { for (const bool enableDictionary : {true, false}) { - params.push_back(ShuffleTestParams{ - .shuffleWriterType = ShuffleWriterType::kHashShuffle, - .partitionWriterType = PartitionWriterType::kLocal, - .compressionType = compression, - .compressionThreshold = compressionThreshold, - .mergeBufferSize = mergeBufferSize, - .enableDictionary = enableDictionary}); + params.push_back( + ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kHashShuffle, + .partitionWriterType = PartitionWriterType::kLocal, + .compressionType = compression, + .compressionThreshold = compressionThreshold, + .mergeBufferSize = mergeBufferSize, + .enableDictionary = enableDictionary}); } } // Rss. - params.push_back(ShuffleTestParams{ - .shuffleWriterType = ShuffleWriterType::kHashShuffle, - .partitionWriterType = PartitionWriterType::kRss, - .compressionType = compression, - .compressionThreshold = compressionThreshold}); + params.push_back( + ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kHashShuffle, + .partitionWriterType = PartitionWriterType::kRss, + .compressionType = compression, + .compressionThreshold = compressionThreshold}); } } @@ -308,7 +312,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam(std::move(deserializerFactory)); - const auto iter = reader->read(std::make_shared(std::move(in))); + const auto iter = reader->read(std::make_shared(std::move(in)), ShuffleOutputType::kRowVector); while (iter->hasNext()) { auto vector = std::dynamic_pointer_cast(iter->next())->getRowVector(); vectors.emplace_back(vector); @@ -500,12 +504,7 @@ TEST_P(HashPartitioningShuffleWriterTest, hashPart1Vector) { makeFlatVector({232, 34567235, 1212, 4567}, DECIMAL(20, 4)), makeFlatVector( 4, [](vector_size_t row) { return row % 2; }, nullEvery(5), DATE()), - makeFlatVector( - 4, - [](vector_size_t row) { - return Timestamp{row % 2, 0}; - }, - nullEvery(5))}; + makeFlatVector(4, [](vector_size_t row) { return Timestamp{row % 2, 0}; }, nullEvery(5))}; const auto vector = makeRowVector(data); diff --git a/cpp/velox/utils/CachedBufferQueue.cc b/cpp/velox/utils/CachedBufferQueue.cc new file mode 100644 index 000000000000..c479d9cccfc8 --- /dev/null +++ b/cpp/velox/utils/CachedBufferQueue.cc @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/CachedBufferQueue.h" +#include "memory/GpuBufferColumnarBatch.h" + +namespace gluten { + +void CachedBufferQueue::put(std::shared_ptr batch) { + std::unique_lock lock(m_); + const auto batchSize = batch->numBytes(); + + VELOX_CHECK_LE(batchSize, capacity_, "Batch size exceeds queue capacity"); + + notFull_.wait(lock, [&]() { return totalSize_ + batchSize <= capacity_; }); + + queue_.push(std::move(batch)); + totalSize_ += batchSize; + + notEmpty_.notify_one(); +} + +std::shared_ptr CachedBufferQueue::get() { + std::unique_lock lock(m_); + notEmpty_.wait(lock, [&]() { return noMoreBatches_ || !queue_.empty(); }); + + if (queue_.empty()) { + return nullptr; + } + auto batch = std::move(queue_.front()); + LOG(WARNING) << "Trying to get from cached buffer queue. Queue length: " << queue_.size() + << ", total size in queue: " << totalSize_ << ", current batch size: " << batch->numBytes() << std::endl; + + queue_.pop(); + totalSize_ -= batch->numBytes(); + + notFull_.notify_one(); + return batch; +} + +void CachedBufferQueue::noMoreBatches() { + std::unique_lock lock(m_); + noMoreBatches_ = true; + notFull_.notify_all(); + notEmpty_.notify_all(); +} + +int64_t CachedBufferQueue::size() const { + return totalSize_; +} + +bool CachedBufferQueue::empty() const { + return queue_.empty(); +} + +} // namespace gluten diff --git a/cpp/velox/utils/CachedBufferQueue.h b/cpp/velox/utils/CachedBufferQueue.h new file mode 100644 index 000000000000..2ca7ecfb8ee6 --- /dev/null +++ b/cpp/velox/utils/CachedBufferQueue.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include + +namespace gluten { + +class GpuBufferColumnarBatch; + +class CachedBufferQueue { + public: + CachedBufferQueue(int64_t capacity) : capacity_(capacity) {} + + void put(std::shared_ptr batch); + + std::shared_ptr get(); + + void noMoreBatches(); + + int64_t size() const; + + bool empty() const; + + private: + int64_t capacity_; + int64_t totalSize_{0}; + bool noMoreBatches_{false}; + + std::queue> queue_; + + std::mutex m_; + std::condition_variable notEmpty_; + std::condition_variable notFull_; +}; + +} // namespace gluten diff --git a/cpp/velox/utils/GpuBufferBatchResizer.cc b/cpp/velox/utils/GpuBufferBatchResizer.cc index 518f280fc3eb..4e4c8fd9fac0 100644 --- a/cpp/velox/utils/GpuBufferBatchResizer.cc +++ b/cpp/velox/utils/GpuBufferBatchResizer.cc @@ -168,15 +168,39 @@ GpuBufferBatchResizer::GpuBufferBatchResizer( arrow::MemoryPool* arrowPool, facebook::velox::memory::MemoryPool* pool, int32_t minOutputBatchSize, + int64_t memLimit, std::unique_ptr in) - : arrowPool_(arrowPool), - pool_(pool), - minOutputBatchSize_(minOutputBatchSize), - in_(std::move(in)) { + : arrowPool_(arrowPool), pool_(pool), minOutputBatchSize_(minOutputBatchSize), in_(std::move(in)) { VELOX_CHECK_GT(minOutputBatchSize_, 0, "minOutputBatchSize should be larger than 0"); + queue_ = std::make_unique(memLimit); + batchProducer_ = std::thread([this]() { + while (auto batch = nextBatch()) { + queue_->put(batch); + } + queue_->noMoreBatches(); + }); +} + +GpuBufferBatchResizer::~GpuBufferBatchResizer() { + if (batchProducer_.joinable()) { + batchProducer_.join(); + } + VELOX_CHECK_EQ(queue_->size(), 0); } std::shared_ptr GpuBufferBatchResizer::next() { + if (auto batch = queue_->get()) { + lockGpu(); + return makeCudfTable(batch->getRowType(), batch->numRows(), batch->buffers(), pool_); + } + return nullptr; +} + +int64_t GpuBufferBatchResizer::spillFixedSize(int64_t size) { + return in_->spillFixedSize(size); +} + +std::shared_ptr GpuBufferBatchResizer::nextBatch() { std::vector> cachedBatches; int32_t cachedRows = 0; while (cachedRows < minOutputBatchSize_) { @@ -189,7 +213,7 @@ std::shared_ptr GpuBufferBatchResizer::next() { auto nextBatch = std::dynamic_pointer_cast(nextCb); VELOX_CHECK_NOT_NULL(nextBatch); if (nextBatch->numRows() == 0) { - continue; + continue; } cachedRows += nextBatch->numRows(); @@ -200,15 +224,7 @@ std::shared_ptr GpuBufferBatchResizer::next() { } // Compose all cached batches into one - auto batch = GpuBufferColumnarBatch::compose(arrowPool_, cachedBatches, cachedRows); - - lockGpu(); - - return makeCudfTable(batch->getRowType(), batch->numRows(), batch->buffers(), pool_); -} - -int64_t GpuBufferBatchResizer::spillFixedSize(int64_t size) { - return in_->spillFixedSize(size); + return GpuBufferColumnarBatch::compose(arrowPool_, cachedBatches, cachedRows); } } // namespace gluten diff --git a/cpp/velox/utils/GpuBufferBatchResizer.h b/cpp/velox/utils/GpuBufferBatchResizer.h index b3ed08662936..15939d0d22c7 100644 --- a/cpp/velox/utils/GpuBufferBatchResizer.h +++ b/cpp/velox/utils/GpuBufferBatchResizer.h @@ -15,7 +15,9 @@ * limitations under the License. */ +#include "CachedBufferQueue.h" #include "memory/ColumnarBatchIterator.h" +#include "memory/GpuBufferColumnarBatch.h" #include "memory/VeloxColumnarBatch.h" #include "utils/Exception.h" #include "velox/common/memory/MemoryPool.h" @@ -28,17 +30,24 @@ class GpuBufferBatchResizer : public ColumnarBatchIterator { arrow::MemoryPool* arrowPool, facebook::velox::memory::MemoryPool* pool, int32_t minOutputBatchSize, + int64_t memLimit, std::unique_ptr in); + ~GpuBufferBatchResizer(); + std::shared_ptr next() override; int64_t spillFixedSize(int64_t size) override; private: + std::shared_ptr nextBatch(); arrow::MemoryPool* arrowPool_; facebook::velox::memory::MemoryPool* pool_; const int32_t minOutputBatchSize_; std::unique_ptr in_; + + std::thread batchProducer_; + std::unique_ptr queue_; }; } // namespace gluten diff --git a/docs/Configuration.md b/docs/Configuration.md index 1372d982430c..0cb145f981ff 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -147,6 +147,7 @@ nav_order: 15 | Key | Default | Description | |-------------------------------------------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.gluten.auto.adjustStageExecutionMode | false | Experimental: Auto adjust execution mode according to the stage execution plan. | | spark.gluten.auto.adjustStageResource.enabled | false | Experimental: If enabled, gluten will try to set the stage resource according to stage execution plan. Only worked when aqe is enabled at the same time!! | | spark.gluten.auto.adjustStageResources.fallenNode.ratio.threshold | 0.5 | Experimental: Increase executor heap memory when stage contains fallen node count exceeds the total node count ratio. | | spark.gluten.auto.adjustStageResources.heap.ratio | 2.0 | Experimental: Increase executor heap memory when match adjust stage resource rule. | diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java index 6a0f2130d792..cc37633358b3 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java @@ -44,7 +44,8 @@ public native long make( long deserializerBufferSize, String shuffleWriterType); - public native long read(long shuffleReaderHandle, ShuffleStreamReader streamReader); + public native long read( + long shuffleReaderHandle, ShuffleStreamReader streamReader, int executionMode); public native void populateMetrics(long shuffleReaderHandle, ShuffleReaderMetrics metrics); diff --git a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala index a902c448fe47..b8e741f6359a 100644 --- a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala @@ -20,8 +20,8 @@ import org.apache.gluten.memory.memtarget._ import org.apache.gluten.memory.memtarget.spark.{RegularMemoryConsumer, TreeMemoryConsumer} import org.apache.gluten.proto.MemoryUsageStats -import org.apache.spark.SparkEnv -import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.util.{SparkResourceUtil, Utils} import com.google.common.base.Preconditions import org.apache.commons.lang3.StringUtils @@ -56,6 +56,12 @@ object SparkMemoryUtil { smp.memoryFree + emp.memoryFree } + def availableOffHeapPerTask(conf: SparkConf): Long = { + val perTask = + getCurrentAvailableOffHeapMemory / SparkResourceUtil.getTaskSlots(conf) + perTask + } + def dumpMemoryManagerStats(tmm: TaskMemoryManager): String = { val stats = tmm.synchronized { val consumers = consumersField.get(tmm).asInstanceOf[util.HashSet[MemoryConsumer]] diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index ed2d54936655..975730d471df 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -55,7 +55,7 @@ case object RssSortShuffleWriterType extends ShuffleWriterType { case object GpuHashShuffleWriterType extends ShuffleWriterType { override val name: String = ReservedKeys.GLUTEN_GPU_HASH_SHUFFLE_WRITER - override val requiresResizingShuffleInput: Boolean = true + override val requiresResizingShuffleInput: Boolean = false override val requiresResizingShuffleOutput: Boolean = true } @@ -366,6 +366,9 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def autoAdjustStageFallenNodeThreshold: Double = getConf(AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD) + def autoAdjustStageExecutionMode: Boolean = + getConf(AUTO_ADJUST_STAGE_EXECUTION_MODE) + def parquetMetadataValidationEnabled: Boolean = { getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED) } @@ -1537,6 +1540,13 @@ object GlutenConfig extends ConfigRegistry { .doubleConf .createWithDefault(0.5d) + val AUTO_ADJUST_STAGE_EXECUTION_MODE = + buildConf("spark.gluten.auto.adjustStageExecutionMode") + .experimental() + .doc("Experimental: Auto adjust execution mode according to the stage execution plan.") + .booleanConf + .createWithDefault(false) + val PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED = buildConf("spark.gluten.sql.fallbackUnexpectedMetadataParquet") .doc("If enabled, Gluten will not offload scan when unexpected metadata is detected.") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala index da1999a24cfb..01d403c2a58a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition} -import org.apache.spark.sql.execution.{LimitExec, ShuffledColumnarBatchRDD, SparkPlan} +import org.apache.spark.sql.execution.{CPUStageMode, LimitExec, ShuffledColumnarBatchRDD, SparkPlan} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter import org.apache.spark.sql.vectorized.ColumnarBatch @@ -102,7 +102,9 @@ abstract class ColumnarCollectLimitBaseExec( metrics, shuffleWriterType ), - readMetrics + readMetrics, + // FIXME: pass proper StageExecutionMode + CPUStageMode ) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala index 56f9ce69ee6d..47b647547664 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala @@ -25,7 +25,7 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition} -import org.apache.spark.sql.execution.{LimitExec, ShuffledColumnarBatchRDD, SparkPlan} +import org.apache.spark.sql.execution.{CPUStageMode, LimitExec, ShuffledColumnarBatchRDD, SparkPlan} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter import org.apache.spark.sql.vectorized.ColumnarBatch @@ -101,7 +101,9 @@ abstract class ColumnarCollectTailBaseExec( metrics, shuffleWriterType ), - readMetrics + readMetrics, + // FIXME: pass proper StageExecutionMode + CPUStageMode ) } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleReaderWrapper.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleReaderWrapper.scala index 95dd8845fdd4..0c18105a1c85 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleReaderWrapper.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleReaderWrapper.scala @@ -17,6 +17,7 @@ package org.apache.spark.shuffle import org.apache.spark.TaskContext +import org.apache.spark.sql.execution.StageExecutionMode import org.apache.spark.storage.{BlockId, BlockManagerId} case class GlutenShuffleReaderWrapper[K, C](shuffleReader: ShuffleReader[K, C]) @@ -26,4 +27,5 @@ case class GenShuffleReaderParameters[K, C]( blocksByAddress: Iterator[(BlockManagerId, collection.Seq[(BlockId, Long, Int)])], context: TaskContext, readMetrics: ShuffleReadMetricsReporter, - shouldBatchFetch: Boolean = false) + shouldBatchFetch: Boolean, + executionMode: StageExecutionMode) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala index 80b0e94830c9..4708dbcba3f3 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala @@ -26,6 +26,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.sort.ColumnarShuffleHandle import org.apache.spark.shuffle.sort.SortShuffleManager.canUseBatchFetch +import org.apache.spark.sql.execution.StageExecutionMode import org.apache.spark.storage.{BlockId, BlockManagerId} import org.apache.spark.util.random.XORShiftRandom @@ -159,7 +160,8 @@ object GlutenShuffleUtils { startPartition: Int, endPartition: Int, context: TaskContext, - metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { + metrics: ShuffleReadMetricsReporter, + executionMode: StageExecutionMode): ShuffleReader[K, C] = { val (blocksByAddress, canEnableBatchFetch) = { getReaderParam(handle, startMapIndex, endMapIndex, startPartition, endPartition) } @@ -173,7 +175,8 @@ object GlutenShuffleUtils { blocksByAddress, context, metrics, - shouldBatchFetch)) + shouldBatchFetch, + executionMode)) .shuffleReader } } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/sort/ColumnarShuffleManager.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/sort/ColumnarShuffleManager.scala index 904c2dff6ce7..015d60d3a845 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/sort/ColumnarShuffleManager.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/sort/ColumnarShuffleManager.scala @@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle._ import org.apache.spark.shuffle.api.ShuffleExecutorComponents +import org.apache.spark.sql.execution.{CPUStageMode, StageExecutionMode} import org.apache.spark.storage.BlockId import org.apache.spark.util.collection.OpenHashSet @@ -132,6 +133,26 @@ class ColumnarShuffleManager(conf: SparkConf) endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { + getReader[K, C]( + handle, + startMapIndex, + endMapIndex, + startPartition, + endPartition, + context, + metrics, + CPUStageMode) + } + + def getReader[K, C]( + handle: ShuffleHandle, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter, + executionMode: StageExecutionMode): ShuffleReader[K, C] = { GlutenShuffleUtils.genColumnarShuffleReader( handle, startMapIndex, @@ -139,7 +160,8 @@ class ColumnarShuffleManager(conf: SparkConf) startPartition, endPartition, context, - metrics) + metrics, + executionMode) } /** Remove a shuffle's metadata from the ShuffleManager. */ diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index bccd4eedf8f5..40866e7c0d29 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -16,29 +16,217 @@ */ package org.apache.spark.sql.execution +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.config.{GpuHashShuffleWriterType, ShuffleWriterType} +import org.apache.gluten.execution.{ValidatablePlan, ValidationResult} +import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.spark._ import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.Serializer +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.physical.{SinglePartition, _} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange._ +import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter +import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.concurrent.Future case class ColumnarShuffleExchangeExec( override val outputPartitioning: Partitioning, child: SparkPlan, shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS, projectOutputAttributes: Seq[Attribute], - advisoryPartitionSize: Option[Long] = None) - extends ColumnarShuffleExchangeExecBase(outputPartitioning, child, projectOutputAttributes) { + advisoryPartitionSize: Option[Long] = None, + mapperStageMode: Option[StageExecutionMode] = None, + reducerStageMode: Option[StageExecutionMode] = None) + extends ShuffleExchangeLike + with ValidatablePlan { + + override def nodeName: String = "ColumnarShuffleExchange" + { + if (mapperStageMode.isDefined) { + if (conf.adaptiveExecutionEnabled) { + // In AQE, the reducer stage mode is set in the downstream query stage. + // It is shown in the ColumnarAQEShuffleReaderExec node. + s"(${mapperStageMode.get.name})" + } else { + // Mapper and reducer stage modes should be set together when AQE is disabled. + if (reducerStageMode.isEmpty) { + throw new IllegalStateException( + "Reducer stage mode is not defined in ColumnarShuffleExchangeExec when AQE is disabled") + } + s"(${mapperStageMode.get.name}, ${reducerStageMode.get.name})" + } + } else { + "" + } + } + + private[sql] lazy val writeMetrics = + SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + + private[sql] lazy val readMetrics = + SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) + + lazy val shuffleWriterType: ShuffleWriterType = getShuffleWriterType + + // super.stringArgs ++ Iterator(output.map(o => s"${o}#${o.dataType.simpleString}")) + lazy val serializer: Serializer = BackendsApiManager.getSparkPlanExecApiInstance + .createColumnarBatchSerializer(schema, metrics, shuffleWriterType) + + // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. + @transient override lazy val metrics = + BackendsApiManager.getMetricsApiInstance + .genColumnarShuffleExchangeMetrics( + sparkContext, + shuffleWriterType) ++ readMetrics ++ writeMetrics + + @transient lazy val inputColumnarRDD: RDD[ColumnarBatch] = child.executeColumnar() + + // 'mapOutputStatisticsFuture' is only needed when enable AQE. + @transient override lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = { + if (inputColumnarRDD.getNumPartitions == 0) { + Future.successful(null) + } else { + sparkContext.submitMapStage(columnarShuffleDependency) + } + } - override def nodeName: String = "ColumnarExchange" + /** + * A [[ShuffleDependency]] that will partition rows of its child based on the partitioning scheme + * defined in `newPartitioning`. Those partitions of the returned ShuffleDependency will be the + * input of shuffle. + */ + @transient + lazy val columnarShuffleDependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { + BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency( + inputColumnarRDD, + child.output, + projectOutputAttributes, + outputPartitioning, + serializer, + writeMetrics, + metrics, + shuffleWriterType) + } + + var cachedShuffleRDD: ShuffledColumnarBatchRDD = _ + + override protected def doValidateInternal(): ValidationResult = { + val validation = BackendsApiManager.getValidatorApiInstance + .doColumnarShuffleExchangeExecValidate(output, outputPartitioning, child) + if (validation.nonEmpty) { + return ValidationResult.failed( + s"Found schema check failure for schema ${child.schema} due to: ${validation.get}") + } + outputPartitioning match { + case _: HashPartitioning => ValidationResult.succeeded + case _: RangePartitioning => ValidationResult.succeeded + case SinglePartition => ValidationResult.succeeded + case _: RoundRobinPartitioning => ValidationResult.succeeded + case _ => + ValidationResult.failed( + s"Unsupported partitioning ${outputPartitioning.getClass.getSimpleName}") + } + } + + override def numMappers: Int = inputColumnarRDD.getNumPartitions + + override def numPartitions: Int = columnarShuffleDependency.partitioner.numPartitions + + override def runtimeStatistics: Statistics = { + val dataSize = metrics("dataSize").value + val rowCount = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN).value + Statistics(dataSize, Some(rowCount)) + } + + def getShuffleWriterType: ShuffleWriterType = { + mapperStageMode match { + case Some(GPUStageMode) => + GpuHashShuffleWriterType + case _ => + BackendsApiManager.getSparkPlanExecApiInstance.getShuffleWriterType( + outputPartitioning, + output) + } + } - protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeExec = + // Required for Spark 4.0 to implement a trait method. + // The "override" keyword is omitted to maintain compatibility with earlier Spark versions. + def shuffleId: Int = columnarShuffleDependency.shuffleId + + // Called by AQEShuffleReaderExec to create a ShuffleRDD with custom partition specs. + override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[ColumnarBatch] = { + new ShuffledColumnarBatchRDD( + columnarShuffleDependency, + readMetrics, + partitionSpecs, + CPUStageMode) + } + + // Called by ColumnarAQEShuffleReaderExec to create a ShuffleRDD with custom partition specs, + // and reducer stage execution mode. + def getShuffleRDD( + partitionSpecs: Array[ShufflePartitionSpec], + reducerStageMode: StageExecutionMode): RDD[ColumnarBatch] = { + new ShuffledColumnarBatchRDD( + columnarShuffleDependency, + readMetrics, + partitionSpecs, + reducerStageMode) + } + + override def stringArgs: Iterator[Any] = { + super.stringArgs ++ Iterator(s"[shuffle_writer_type=${shuffleWriterType.name}]") + } + + override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType + + override def rowType0(): Convention.RowType = Convention.RowType.None + + override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException() + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + if (cachedShuffleRDD == null) { + cachedShuffleRDD = new ShuffledColumnarBatchRDD( + columnarShuffleDependency, + readMetrics, + reducerStageMode.getOrElse(CPUStageMode)) + } + cachedShuffleRDD + } + + override def verboseString(maxFields: Int): String = + toString(super.verboseString(maxFields), maxFields) + + private def toString(original: String, maxFields: Int): String = { + original + ", [output=" + truncatedString( + output.map(_.verboseString(maxFields)), + "[", + ", ", + "]", + maxFields) + "]" + } + + override def output: Seq[Attribute] = if (projectOutputAttributes != null) { + projectOutputAttributes + } else { + child.output + } + + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeExec = copy(child = newChild) } object ColumnarShuffleExchangeExec extends Logging { - def apply( plan: ShuffleExchangeExec, child: SparkPlan, @@ -51,5 +239,4 @@ object ColumnarShuffleExchangeExec extends Logging { advisoryPartitionSize = SparkShimLoader.getSparkShims.getShuffleAdvisoryPartitionSize(plan) ) } - } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExecBase.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExecBase.scala deleted file mode 100644 index 17d1ec403814..000000000000 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExecBase.scala +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution - -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.config.ShuffleWriterType -import org.apache.gluten.execution.{ValidatablePlan, ValidationResult} -import org.apache.gluten.extension.columnar.transition.Convention - -import org.apache.spark._ -import org.apache.spark.rdd.RDD -import org.apache.spark.serializer.Serializer -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.Statistics -import org.apache.spark.sql.catalyst.plans.physical.{SinglePartition, _} -import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.exchange._ -import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter -import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter -import org.apache.spark.sql.vectorized.ColumnarBatch - -import scala.concurrent.Future - -abstract class ColumnarShuffleExchangeExecBase( - override val outputPartitioning: Partitioning, - child: SparkPlan, - projectOutputAttributes: Seq[Attribute]) - extends ShuffleExchangeLike - with ValidatablePlan { - private[sql] lazy val writeMetrics = - SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) - - private[sql] lazy val readMetrics = - SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) - - lazy val shuffleWriterType: ShuffleWriterType = getShuffleWriterType - - // super.stringArgs ++ Iterator(output.map(o => s"${o}#${o.dataType.simpleString}")) - lazy val serializer: Serializer = BackendsApiManager.getSparkPlanExecApiInstance - .createColumnarBatchSerializer(schema, metrics, shuffleWriterType) - - // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. - @transient override lazy val metrics = - BackendsApiManager.getMetricsApiInstance - .genColumnarShuffleExchangeMetrics( - sparkContext, - shuffleWriterType) ++ readMetrics ++ writeMetrics - - @transient lazy val inputColumnarRDD: RDD[ColumnarBatch] = child.executeColumnar() - - // 'mapOutputStatisticsFuture' is only needed when enable AQE. - @transient override lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = { - if (inputColumnarRDD.getNumPartitions == 0) { - Future.successful(null) - } else { - sparkContext.submitMapStage(columnarShuffleDependency) - } - } - - /** - * A [[ShuffleDependency]] that will partition rows of its child based on the partitioning scheme - * defined in `newPartitioning`. Those partitions of the returned ShuffleDependency will be the - * input of shuffle. - */ - @transient - lazy val columnarShuffleDependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { - BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency( - inputColumnarRDD, - child.output, - projectOutputAttributes, - outputPartitioning, - serializer, - writeMetrics, - metrics, - shuffleWriterType) - } - - var cachedShuffleRDD: ShuffledColumnarBatchRDD = _ - - override protected def doValidateInternal(): ValidationResult = { - val validation = BackendsApiManager.getValidatorApiInstance - .doColumnarShuffleExchangeExecValidate(output, outputPartitioning, child) - if (validation.nonEmpty) { - return ValidationResult.failed( - s"Found schema check failure for schema ${child.schema} due to: ${validation.get}") - } - outputPartitioning match { - case _: HashPartitioning => ValidationResult.succeeded - case _: RangePartitioning => ValidationResult.succeeded - case SinglePartition => ValidationResult.succeeded - case _: RoundRobinPartitioning => ValidationResult.succeeded - case _ => - ValidationResult.failed( - s"Unsupported partitioning ${outputPartitioning.getClass.getSimpleName}") - } - } - - override def numMappers: Int = inputColumnarRDD.getNumPartitions - - override def numPartitions: Int = columnarShuffleDependency.partitioner.numPartitions - - override def runtimeStatistics: Statistics = { - val dataSize = metrics("dataSize").value - val rowCount = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN).value - Statistics(dataSize, Some(rowCount)) - } - - def getShuffleWriterType: ShuffleWriterType = - BackendsApiManager.getSparkPlanExecApiInstance.getShuffleWriterType(outputPartitioning, output) - - // Required for Spark 4.0 to implement a trait method. - // The "override" keyword is omitted to maintain compatibility with earlier Spark versions. - def shuffleId: Int = columnarShuffleDependency.shuffleId - - override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[ColumnarBatch] = { - new ShuffledColumnarBatchRDD(columnarShuffleDependency, readMetrics, partitionSpecs) - } - - override def stringArgs: Iterator[Any] = { - super.stringArgs ++ Iterator(s"[shuffle_writer_type=${shuffleWriterType.name}]") - } - - override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType - - override def rowType0(): Convention.RowType = Convention.RowType.None - - override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException() - } - - override def doExecuteColumnar(): RDD[ColumnarBatch] = { - if (cachedShuffleRDD == null) { - cachedShuffleRDD = new ShuffledColumnarBatchRDD(columnarShuffleDependency, readMetrics) - } - cachedShuffleRDD - } - - override def verboseString(maxFields: Int): String = - toString(super.verboseString(maxFields), maxFields) - - private def toString(original: String, maxFields: Int): String = { - original + ", [output=" + truncatedString( - output.map(_.verboseString(maxFields)), - "[", - ", ", - "]", - maxFields) + "]" - } - - override def output: Seq[Attribute] = if (projectOutputAttributes != null) { - projectOutputAttributes - } else { - child.output - } -} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala deleted file mode 100644 index 3564d6fa56bb..000000000000 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution - -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.config.{GpuHashShuffleWriterType, HashShuffleWriterType, ShuffleWriterType} -import org.apache.gluten.execution.ValidationResult -import org.apache.gluten.sql.shims.SparkShimLoader - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.exchange._ - -// The write is Velox RowVector, but the reader transforms it to cudf table -case class GPUColumnarShuffleExchangeExec( - override val outputPartitioning: Partitioning, - child: SparkPlan, - shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS, - projectOutputAttributes: Seq[Attribute], - advisoryPartitionSize: Option[Long] = None) - extends ColumnarShuffleExchangeExecBase(outputPartitioning, child, projectOutputAttributes) { - - override protected def doValidateInternal(): ValidationResult = { - val validation = super.doValidateInternal() - if (!validation.ok()) { - return validation - } - val shuffleWriterType = BackendsApiManager.getSparkPlanExecApiInstance.getShuffleWriterType( - outputPartitioning, - output) - if (shuffleWriterType != HashShuffleWriterType) { - return ValidationResult.failed("Only support hash partitioning") - } - ValidationResult.succeeded - } - - override def nodeName: String = "CudfColumnarExchange" - - override def getShuffleWriterType: ShuffleWriterType = GpuHashShuffleWriterType - - protected def withNewChildInternal(newChild: SparkPlan): GPUColumnarShuffleExchangeExec = - copy(child = newChild) -} - -object GPUColumnarShuffleExchangeExec extends Logging { - - def apply( - plan: ShuffleExchangeExec, - child: SparkPlan, - shuffleOutputAttributes: Seq[Attribute]): GPUColumnarShuffleExchangeExec = { - GPUColumnarShuffleExchangeExec( - plan.outputPartitioning, - child, - plan.shuffleOrigin, - shuffleOutputAttributes, - advisoryPartitionSize = SparkShimLoader.getSparkShims.getShuffleAdvisoryPartitionSize(plan) - ) - } -} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala index fe76910cc9e5..7cdf24b1886c 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenAutoAdjustStageResourceProfile.scala @@ -16,26 +16,19 @@ */ package org.apache.spark.sql.execution -import org.apache.gluten.config.{GlutenConfig, GlutenCoreConfig} +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{ColumnarToRowExecBase, GlutenPlan} import org.apache.gluten.logging.LogLevelUtil -import org.apache.spark.SparkConf import org.apache.spark.annotation.Experimental -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{CPUS_PER_TASK, EXECUTOR_CORES, MEMORY_OFFHEAP_SIZE} -import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, ResourceProfileManager, TaskResourceRequest} +import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceRequest} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{GlutenAutoAdjustStageResourceProfile => GlutenResourceProfile} -import org.apache.spark.sql.execution.adaptive.QueryStageExec -import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.SparkTestUtil +import org.apache.spark.sql.utils.GlutenResourceProfileUtil import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer /** * This rule is used to dynamic adjust stage resource profile for following purposes: @@ -52,23 +45,22 @@ case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark: lazy val sparkConf = spark.sparkContext.getConf override def apply(plan: SparkPlan): SparkPlan = { - if (!glutenConf.enableAutoAdjustStageResourceProfile) { + if ( + !glutenConf.enableAutoAdjustStageResourceProfile || + // Auto adjust stage resource for GPU is not supported. + glutenConf.enableColumnarCudf || + !SQLConf.get.adaptiveExecutionEnabled + ) { return plan } - if (!SQLConf.get.adaptiveExecutionEnabled) { - return plan - } - // Starting here, the resource profile may differ between stages. Configure resource settings - // using the default profile to prevent any impact from the previous stage. If a new resource - // profile is applied, the settings will be updated accordingly. - GlutenResourceProfile.updateResourceSetting( - ResourceProfile.getOrCreateDefaultProfile(sparkConf), - sparkConf) + + GlutenResourceProfileUtil.restoreDefaultResourceSetting(sparkConf) + if (!plan.isInstanceOf[Exchange]) { // todo: support set resource profile for final stage return plan } - val planNodes = GlutenResourceProfile.collectStagePlan(plan) + val planNodes = GlutenResourceProfileUtil.collectStagePlan(plan) if (planNodes.isEmpty) { return plan } @@ -104,11 +96,8 @@ case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark: executorResource.put(ResourceProfile.OFFHEAP_MEM, newExecutorOffheap) val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap) - return GlutenResourceProfile.applyNewResourceProfileIfPossible( - plan, - newRP, - rpManager, - sparkConf) + return GlutenResourceProfileUtil + .applyNewResourceProfileIfPossible(plan, newRP, rpManager, sparkConf) } // case 2: check whether fallback exists and decide whether increase heap memory @@ -129,7 +118,7 @@ case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark: executorResource.put(ResourceProfile.OFFHEAP_MEM, newExecutorOffheap) val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap) - return GlutenResourceProfile.applyNewResourceProfileIfPossible( + return GlutenResourceProfileUtil.applyNewResourceProfileIfPossible( plan, newRP, rpManager, @@ -138,78 +127,3 @@ case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark: plan } } - -object GlutenAutoAdjustStageResourceProfile extends Logging { - // collect all plan nodes belong to this stage including child query stage - // but exclude query stage child - def collectStagePlan(plan: SparkPlan): ArrayBuffer[SparkPlan] = { - - def collectStagePlan(plan: SparkPlan, planNodes: ArrayBuffer[SparkPlan]): Unit = { - if (plan.isInstanceOf[DataWritingCommandExec] || plan.isInstanceOf[ExecutedCommandExec]) { - // todo: support set final stage's resource profile - return - } - planNodes += plan - if (plan.isInstanceOf[QueryStageExec]) { - return - } - plan.children.foreach(collectStagePlan(_, planNodes)) - } - - val planNodes = new ArrayBuffer[SparkPlan]() - collectStagePlan(plan, planNodes) - planNodes - } - - private def getFinalResourceProfile( - rpManager: ResourceProfileManager, - newRP: ResourceProfile): ResourceProfile = { - // Just for test - // ResourceProfiles are only supported on YARN and Kubernetes with dynamic allocation enabled - if (SparkTestUtil.isTesting) { - return rpManager.defaultResourceProfile - } - val maybeEqProfile = rpManager.getEquivalentProfile(newRP) - if (maybeEqProfile.isDefined) { - maybeEqProfile.get - } else { - // register new resource profile here - rpManager.addResourceProfile(newRP) - newRP - } - } - - /** - * Reflects resource changes in some configurations that will be passed to the native side. It - * only affects the current thread. - */ - def updateResourceSetting(rp: ResourceProfile, sparkConf: SparkConf): Unit = { - val coresPerExecutor = rp.getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES)) - val coresPerTask = rp.getTaskCpus.getOrElse(sparkConf.get(CPUS_PER_TASK)) - val taskSlots = coresPerExecutor / coresPerTask - val conf = SQLConf.get - conf.setConfString(GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.key, taskSlots.toString) - val offHeapSize = rp.executorResources - .get(ResourceProfile.OFFHEAP_MEM) - .map(_.amount) - .getOrElse(sparkConf.get(MEMORY_OFFHEAP_SIZE)) - conf.setConfString(GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key, offHeapSize.toString) - conf.setConfString( - GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key, - (offHeapSize / taskSlots).toString) - } - - def applyNewResourceProfileIfPossible( - plan: SparkPlan, - rp: ResourceProfile, - rpManager: ResourceProfileManager, - sparkConf: SparkConf): SparkPlan = { - updateResourceSetting(rp, sparkConf) - - val finalRP = getFinalResourceProfile(rpManager, rp) - // Wrap the plan with ApplyResourceProfileExec so that we can apply new ResourceProfile - val wrapperPlan = ApplyResourceProfileExec(plan.children.head, finalRP) - logInfo(s"Apply resource profile $finalRP for plan ${wrapperPlan.nodeName}") - plan.withNewChildren(IndexedSeq(wrapperPlan)) - } -} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala index 0642c3a24760..e5d2e6ee1a09 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarBatchRDD.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.shuffle.{ShuffleHandle, ShuffleManager, ShuffleReader, ShuffleReadMetricsReporter} +import org.apache.spark.shuffle.sort.{ColumnarShuffleManager, SortShuffleManager} +import org.apache.spark.sql.execution.ShuffledColumnarBatchRDD.getReader import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter @@ -32,7 +34,8 @@ final private case class ShuffledColumnarBatchRDDPartition(index: Int, spec: Shu class ShuffledColumnarBatchRDD( var dependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch], metrics: Map[String, SQLMetric], - partitionSpecs: Array[ShufflePartitionSpec]) + partitionSpecs: Array[ShufflePartitionSpec], + executionMode: StageExecutionMode) extends RDD[ColumnarBatch](dependency.rdd.context, Nil) { override val partitioner: Option[Partitioner] = @@ -55,11 +58,13 @@ class ShuffledColumnarBatchRDD( def this( dependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch], - metrics: Map[String, SQLMetric]) = { + metrics: Map[String, SQLMetric], + executionMode: StageExecutionMode) = { this( dependency, metrics, - Array.tabulate(dependency.partitioner.numPartitions)(i => CoalescedPartitionSpec(i, i + 1))) + Array.tabulate(dependency.partitioner.numPartitions)(i => CoalescedPartitionSpec(i, i + 1)), + executionMode) } override def getDependencies: Seq[Dependency[_]] = List(dependency) @@ -93,44 +98,53 @@ class ShuffledColumnarBatchRDD( // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, // as well as the `tempMetrics` for basic shuffle metrics. val sqlMetricsReporter = new SQLColumnarShuffleReadMetricsReporter(tempMetrics, metrics) + val reader = split.asInstanceOf[ShuffledColumnarBatchRDDPartition].spec match { case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) => - SparkEnv.get.shuffleManager.getReader( + getReader( + SparkEnv.get.shuffleManager, dependency.shuffleHandle, startReducerIndex, endReducerIndex, context, - sqlMetricsReporter) + sqlMetricsReporter, + executionMode) case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) => - SparkEnv.get.shuffleManager.getReader( + getReader( + SparkEnv.get.shuffleManager, dependency.shuffleHandle, startMapIndex, endMapIndex, reducerIndex, reducerIndex + 1, context, - sqlMetricsReporter) + sqlMetricsReporter, + executionMode) case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) => - SparkEnv.get.shuffleManager.getReader( + getReader( + SparkEnv.get.shuffleManager, dependency.shuffleHandle, mapIndex, mapIndex + 1, startReducerIndex, endReducerIndex, context, - sqlMetricsReporter) + sqlMetricsReporter, + executionMode) case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) => - SparkEnv.get.shuffleManager.getReader( + getReader( + SparkEnv.get.shuffleManager, dependency.shuffleHandle, startMapIndex, endMapIndex, 0, numReducers, context, - sqlMetricsReporter) + sqlMetricsReporter, + executionMode) } reader.read().asInstanceOf[Iterator[Product2[Int, ColumnarBatch]]].map { case (_, batch: ColumnarBatch) => @@ -144,3 +158,58 @@ class ShuffledColumnarBatchRDD( dependency = null } } + +object ShuffledColumnarBatchRDD { + private def getReader[K, C]( + shuffleManager: ShuffleManager, + handle: ShuffleHandle, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter, + executionMode: StageExecutionMode): ShuffleReader[K, C] = { + shuffleManager match { + case columnarShuffleManager: ColumnarShuffleManager => + columnarShuffleManager.getReader( + handle, + startMapIndex, + endMapIndex, + startPartition, + endPartition, + context, + metrics, + executionMode) + case _ => + shuffleManager.getReader( + handle, + startMapIndex, + endMapIndex, + startPartition, + endPartition, + context, + metrics) + } + } + + private def getReader[K, C]( + shuffleManager: ShuffleManager, + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter, + executionMode: StageExecutionMode): ShuffleReader[K, C] = { + getReader[K, C]( + shuffleManager, + handle, + 0, + Int.MaxValue, + startPartition, + endPartition, + context, + metrics, + executionMode) + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/StageExecutionModes.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/StageExecutionModes.scala new file mode 100644 index 000000000000..ce8781bc294b --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/StageExecutionModes.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +trait StageExecutionMode { + def name: String = this.getClass.getSimpleName.replaceAll("\\$", "") + def id: Int +} + +case object CPUStageMode extends StageExecutionMode { + override def id: Int = 0 +} +case object GPUStageMode extends StageExecutionMode { + override def id: Int = 1 +} +case object MockGPUStageMode extends StageExecutionMode { + override def id: Int = 0 +} + +object StageExecutionMode { + def fromId(id: Int): StageExecutionMode = id match { + case 0 => CPUStageMode + case 1 => GPUStageMode + case _ => throw new IllegalArgumentException(s"Unknown StageExecutionMode id: $id") + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarAQEShuffleReadExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarAQEShuffleReadExec.scala new file mode 100644 index 000000000000..90387abf8433 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarAQEShuffleReadExec.scala @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary, CoalescedHashPartitioning, HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, UnknownPartitioning} +import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.mutable.ArrayBuffer + +/** + * A wrapper of shuffle query stage, which follows the given partition arrangement. + * + * @param child + * It is usually `ShuffleQueryStageExec`, but can be the shuffle exchange node during + * canonicalization. + * @param partitionSpecs + * The partition specs that defines the arrangement, requires at least one partition. + */ +case class ColumnarAQEShuffleReadExec private ( + child: SparkPlan, + partitionSpecs: Seq[ShufflePartitionSpec], + executionMode: StageExecutionMode, + isWrapper: Boolean) + extends UnaryExecNode { + assert(partitionSpecs.nonEmpty, s"${getClass.getSimpleName} requires at least one partition") + + // If this is to read shuffle files locally, then all partition specs should be + // `PartialMapperPartitionSpec`. + if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) { + assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec])) + } + + override def nodeName: String = super.nodeName + s"(${executionMode.name})" + + override def supportsColumnar: Boolean = child.supportsColumnar + + override def output: Seq[Attribute] = child.output + + override lazy val outputPartitioning: Partitioning = { + // If it is a local shuffle read with one mapper per task, then the output partitioning is + // the same as the plan before shuffle. + // TODO this check is based on assumptions of callers' behavior but is sufficient for now. + if ( + partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) && + partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size == + partitionSpecs.length + ) { + child match { + case ShuffleQueryStageExec(_, s: ShuffleExchangeLike, _) => + s.child.outputPartitioning + case ShuffleQueryStageExec(_, r @ ReusedExchangeExec(_, s: ShuffleExchangeLike), _) => + s.child.outputPartitioning match { + case e: Expression => r.updateAttr(e).asInstanceOf[Partitioning] + case other => other + } + case _ => + throw new IllegalStateException("operating on canonicalization plan") + } + } else if (isCoalescedRead) { + // For coalesced shuffle read, the data distribution is not changed, only the number of + // partitions is changed. + child.outputPartitioning match { + case h: HashPartitioning => + val partitions = partitionSpecs.map { + case CoalescedPartitionSpec(start, end, _) => CoalescedBoundary(start, end) + // Can not happend due to isCoalescedRead + case unexpected => + throw SparkException.internalError(s"Unexpected ShufflePartitionSpec: $unexpected") + } + CurrentOrigin.withOrigin(h.origin)(CoalescedHashPartitioning(h, partitions)) + case r: RangePartitioning => + CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions = partitionSpecs.length)) + // This can only happen for `REBALANCE_PARTITIONS_BY_NONE`, which uses + // `RoundRobinPartitioning` but we don't need to retain the number of partitions. + case r: RoundRobinPartitioning => + r.copy(numPartitions = partitionSpecs.length) + case other @ SinglePartition => + throw new IllegalStateException( + "Unexpected partitioning for coalesced shuffle read: " + other) + case _ => + // Spark plugins may have custom partitioning and may replace this operator + // during the postStageOptimization phase, so return UnknownPartitioning here + // rather than throw an exception + UnknownPartitioning(partitionSpecs.length) + } + } else { + UnknownPartitioning(partitionSpecs.length) + } + } + + override def stringArgs: Iterator[Any] = { + val desc = if (isLocalRead) { + "local" + } else if (hasCoalescedPartition && hasSkewedPartition) { + "coalesced and skewed" + } else if (hasCoalescedPartition) { + "coalesced" + } else if (hasSkewedPartition) { + "skewed" + } else { + "" + } + Iterator(desc) + } + + /** Returns true iff some partitions were actually combined */ + private def isCoalescedSpec(spec: ShufflePartitionSpec) = spec match { + case CoalescedPartitionSpec(0, 0, _) => true + case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1 + case _ => false + } + + /** Returns true iff some non-empty partitions were combined */ + def hasCoalescedPartition: Boolean = { + partitionSpecs.exists(isCoalescedSpec) + } + + def hasSkewedPartition: Boolean = + partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) + + def isLocalRead: Boolean = + partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) || + partitionSpecs.exists(_.isInstanceOf[CoalescedMapperPartitionSpec]) + + def isCoalescedRead: Boolean = { + partitionSpecs.sliding(2).forall { + // A single partition spec which is `CoalescedPartitionSpec` also means coalesced read. + case Seq(_: CoalescedPartitionSpec) => true + case Seq(l: CoalescedPartitionSpec, r: CoalescedPartitionSpec) => + l.endReducerIndex <= r.startReducerIndex + case _ => false + } + } + + private def shuffleStage = child match { + case stage: ShuffleQueryStageExec => Some(stage) + case _ => None + } + + @transient private lazy val partitionDataSizes: Option[Seq[Long]] = { + val mapStats = shuffleStage.get.mapStats + if (!isLocalRead && mapStats.isDefined) { + Some(partitionSpecs.zipWithIndex.map { + case (p: CoalescedPartitionSpec, partition) => + if (isWrapper) { + mapStats.get.bytesByPartitionId(partition) + } else { + assert(p.dataSize.isDefined) + p.dataSize.get + } + case (p: PartialReducerPartitionSpec, _) => p.dataSize + case (p, _) => throw new IllegalStateException(s"unexpected $p") + }) + } else { + None + } + } + + private def sendDriverMetrics(): Unit = { + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + val driverAccumUpdates = ArrayBuffer.empty[(Long, Long)] + + val numPartitionsMetric = metrics("numPartitions") + numPartitionsMetric.set(partitionSpecs.length) + driverAccumUpdates += (numPartitionsMetric.id -> partitionSpecs.length.toLong) + + if (hasSkewedPartition) { + val skewedSpecs = partitionSpecs.collect { case p: PartialReducerPartitionSpec => p } + + val skewedPartitions = metrics("numSkewedPartitions") + val skewedSplits = metrics("numSkewedSplits") + + val numSkewedPartitions = skewedSpecs.map(_.reducerIndex).distinct.length + val numSplits = skewedSpecs.length + + skewedPartitions.set(numSkewedPartitions) + driverAccumUpdates += (skewedPartitions.id -> numSkewedPartitions) + + skewedSplits.set(numSplits) + driverAccumUpdates += (skewedSplits.id -> numSplits) + } + + if (hasCoalescedPartition) { + val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions") + val x = partitionSpecs.count(isCoalescedSpec) + numCoalescedPartitionsMetric.set(x) + driverAccumUpdates += numCoalescedPartitionsMetric.id -> x + } + + partitionDataSizes.foreach { + dataSizes => + val partitionDataSizeMetrics = metrics("partitionDataSize") + driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _) + // Set sum value to "partitionDataSize" metric. + partitionDataSizeMetrics.set(dataSizes.sum) + } + + SQLMetrics.postDriverMetricsUpdatedByValue(sparkContext, executionId, driverAccumUpdates.toSeq) + } + + @transient override lazy val metrics: Map[String, SQLMetric] = { + if (shuffleStage.isDefined) { + Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")) ++ { + if (isLocalRead) { + // We split the mapper partition evenly when creating local shuffle read, so no + // data size info is available. + Map.empty + } else { + Map( + "partitionDataSize" -> + SQLMetrics.createSizeMetric(sparkContext, "partition data size")) + } + } ++ { + if (hasSkewedPartition) { + Map( + "numSkewedPartitions" -> + SQLMetrics.createMetric(sparkContext, "number of skewed partitions"), + "numSkewedSplits" -> + SQLMetrics.createMetric(sparkContext, "number of skewed partition splits") + ) + } else { + Map.empty + } + } ++ { + if (hasCoalescedPartition) { + Map( + "numCoalescedPartitions" -> + SQLMetrics.createMetric(sparkContext, "number of coalesced partitions")) + } else { + Map.empty + } + } + } else { + // It's a canonicalized plan, no need to report metrics. + Map.empty + } + } + + private lazy val shuffleRDD: RDD[_] = { + shuffleStage match { + case Some(stage) => + sendDriverMetrics() + stage.shuffle match { + case columnarShuffle: ColumnarShuffleExchangeExec => + columnarShuffle.getShuffleRDD(partitionSpecs.toArray, executionMode) + case _ => + stage.shuffle.getShuffleRDD(partitionSpecs.toArray) + } + case _ => + throw new IllegalStateException("operating on canonicalized plan") + } + } + + override protected def doExecute(): RDD[InternalRow] = { + shuffleRDD.asInstanceOf[RDD[InternalRow]] + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + shuffleRDD.asInstanceOf[RDD[ColumnarBatch]] + } + + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarAQEShuffleReadExec = + copy(child = newChild) +} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/utils/GlutenResourceProfileUtil.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/utils/GlutenResourceProfileUtil.scala new file mode 100644 index 000000000000..128c68aebb5f --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/utils/GlutenResourceProfileUtil.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.utils + +import org.apache.gluten.config.GlutenCoreConfig + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{CPUS_PER_TASK, EXECUTOR_CORES, MEMORY_OFFHEAP_SIZE} +import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager} +import org.apache.spark.sql.execution.{ApplyResourceProfileExec, SparkPlan} +import org.apache.spark.sql.execution.adaptive.QueryStageExec +import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.exchange.Exchange +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.SparkTestUtil + +import scala.collection.mutable.ArrayBuffer + +object GlutenResourceProfileUtil extends Logging { + // collect all plan nodes belong to this stage including child query stage + // but exclude query stage child + def collectStagePlan(plan: SparkPlan): ArrayBuffer[SparkPlan] = { + + def collectStagePlan(plan: SparkPlan, planNodes: ArrayBuffer[SparkPlan]): Unit = { + if (plan.isInstanceOf[DataWritingCommandExec] || plan.isInstanceOf[ExecutedCommandExec]) { + // todo: support set final stage's resource profile + return + } + planNodes += plan + if (plan.isInstanceOf[QueryStageExec]) { + return + } + plan.children.foreach(collectStagePlan(_, planNodes)) + } + + val planNodes = new ArrayBuffer[SparkPlan]() + collectStagePlan(plan, planNodes) + planNodes + } + + private def getFinalResourceProfile( + rpManager: ResourceProfileManager, + newRP: ResourceProfile): (ResourceProfile, Boolean) = { + val maybeEqProfile = rpManager.getEquivalentProfile(newRP) + if (maybeEqProfile.isDefined) { + (maybeEqProfile.get, true) + } else { + try { + rpManager.isSupported(newRP) + } catch { + case e: SparkException => + // ResourceProfiles are only supported on YARN and Kubernetes with + // dynamic allocation enabled + logWarning( + s"Resource profile $newRP is not supported, fallback to default profile. Reason: " + + s"${e.getMessage}") + return (rpManager.defaultResourceProfile, false) + } + // register new resource profile here + rpManager.addResourceProfile(newRP) + (newRP, true) + } + } + + /** + * Starting here, the resource profile may differ between stages. Configure resource settings + * using the default profile to prevent any impact from the previous stage. If a new resource + * profile is applied, the settings will be updated accordingly. + */ + def restoreDefaultResourceSetting(sparkConf: SparkConf): Unit = { + GlutenResourceProfileUtil.updateResourceSetting( + ResourceProfile.getOrCreateDefaultProfile(sparkConf), + sparkConf) + } + + /** + * Reflects resource changes in some configurations that will be passed to the native side. It + * only affects the current thread. + */ + def updateResourceSetting(rp: ResourceProfile, sparkConf: SparkConf): Unit = { + val coresPerExecutor = rp.getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES)) + val coresPerTask = rp.getTaskCpus.getOrElse(sparkConf.get(CPUS_PER_TASK)) + val taskSlots = coresPerExecutor / coresPerTask + val conf = SQLConf.get + conf.setConfString(GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.key, taskSlots.toString) + val offHeapSize = rp.executorResources + .get(ResourceProfile.OFFHEAP_MEM) + .map(_.amount) + .getOrElse(sparkConf.get(MEMORY_OFFHEAP_SIZE)) + conf.setConfString(GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key, offHeapSize.toString) + conf.setConfString( + GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key, + (offHeapSize / taskSlots).toString) + } + + // Returns the plan and whether the new resource profile is applied. + def applyNewResourceProfileIfPossible( + plan: SparkPlan, + rp: ResourceProfile, + rpManager: ResourceProfileManager, + sparkConf: SparkConf): SparkPlan = { + updateResourceSetting(rp, sparkConf) + + val (finalRP, profileApplied) = getFinalResourceProfile(rpManager, rp) + if (profileApplied || SparkTestUtil.isTesting) { + // Wrap the plan with ApplyResourceProfileExec so that we can apply new ResourceProfile. + logInfo(s"Apply resource profile $finalRP for plan ${plan.nodeName}") + plan match { + case exchange: Exchange => + exchange.withNewChildren(Seq(ApplyResourceProfileExec(exchange.child, finalRP))) + case other => + ApplyResourceProfileExec(other, finalRP) + } + } else { + plan + } + } +} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala index 978685db87a0..70ab757d9102 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala @@ -174,7 +174,7 @@ class GlutenDataFrameWindowFunctionsSuite def isShuffleExecByRequirement( plan: ColumnarShuffleExchangeExec, desiredClusterColumns: Seq[String]): Boolean = plan match { - case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _) => + case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _, _, _) => partitionExpressionsColumns(op.expressions) === desiredClusterColumns case _ => false } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala index 978685db87a0..70ab757d9102 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala @@ -174,7 +174,7 @@ class GlutenDataFrameWindowFunctionsSuite def isShuffleExecByRequirement( plan: ColumnarShuffleExchangeExec, desiredClusterColumns: Seq[String]): Boolean = plan match { - case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _) => + case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _, _, _) => partitionExpressionsColumns(op.expressions) === desiredClusterColumns case _ => false } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala index 978685db87a0..70ab757d9102 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala @@ -174,7 +174,7 @@ class GlutenDataFrameWindowFunctionsSuite def isShuffleExecByRequirement( plan: ColumnarShuffleExchangeExec, desiredClusterColumns: Seq[String]): Boolean = plan match { - case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _) => + case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _, _, _) => partitionExpressionsColumns(op.expressions) === desiredClusterColumns case _ => false } diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala index 978685db87a0..70ab757d9102 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala @@ -174,7 +174,7 @@ class GlutenDataFrameWindowFunctionsSuite def isShuffleExecByRequirement( plan: ColumnarShuffleExchangeExec, desiredClusterColumns: Seq[String]): Boolean = plan match { - case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _) => + case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _, _, _) => partitionExpressionsColumns(op.expressions) === desiredClusterColumns case _ => false } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala index 978685db87a0..70ab757d9102 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameWindowFunctionsSuite.scala @@ -174,7 +174,7 @@ class GlutenDataFrameWindowFunctionsSuite def isShuffleExecByRequirement( plan: ColumnarShuffleExchangeExec, desiredClusterColumns: Seq[String]): Boolean = plan match { - case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _) => + case ColumnarShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _, _, _, _) => partitionExpressionsColumns(op.expressions) === desiredClusterColumns case _ => false }