diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java index 2f7471d837bd..d73330b9a640 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; -public class BatchIterator extends ClosableIterator { +public class BatchIterator extends ClosableIterator { private final long handle; private final AtomicBoolean cancelled = new AtomicBoolean(false); diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala index 49d56599962f..920988675087 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala @@ -26,12 +26,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan) extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) { override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = { - in.map { - b => - val out = VeloxColumnarBatches.toVeloxBatch(b) - out - } + in.map(b => VeloxColumnarBatches.toVeloxBatch(b)) } + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = - ArrowColumnarToVeloxColumnarExec(child = newChild) + copy(child = newChild) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala index a8e0863639c5..a1ec54ffbc13 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala @@ -16,99 +16,40 @@ */ package org.apache.gluten.execution -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.extension.columnar.transition.Convention -import org.apache.gluten.iterator.Iterators +import org.apache.gluten.backendsapi.velox.VeloxBatchType +import org.apache.gluten.iterator.ClosableIterator import org.apache.gluten.utils.VeloxBatchResizer -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.vectorized.ColumnarBatch -import java.util.concurrent.atomic.AtomicLong - import scala.collection.JavaConverters._ /** * An operator to resize input batches by appending the later batches to the one that comes earlier, * or splitting one batch to smaller ones. - * - * FIXME: Code duplication with ColumnarToColumnarExec. */ case class VeloxResizeBatchesExec( override val child: SparkPlan, minOutputBatchSize: Int, maxOutputBatchSize: Int) - extends GlutenPlan - with UnaryExecNode { - - override lazy val metrics: Map[String, SQLMetric] = Map( - "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), - "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), - "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append / split batches") - ) - - override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType - - override def rowType0(): Convention.RowType = Convention.RowType.None + extends ColumnarToColumnarExec(VeloxBatchType, VeloxBatchType) { - override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() - - override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { - val numInputRows = longMetric("numInputRows") - val numInputBatches = longMetric("numInputBatches") - val numOutputRows = longMetric("numOutputRows") - val numOutputBatches = longMetric("numOutputBatches") - val selfTime = longMetric("selfTime") - - child.executeColumnar().mapPartitions { - in => - // Append millis = Out millis - In millis. - val appendMillis = new AtomicLong(0L) - val appender = VeloxBatchResizer.create( - minOutputBatchSize, - maxOutputBatchSize, - Iterators - .wrap(in) - .collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis)) - .create() - .map { - inBatch => - numInputRows += inBatch.numRows() - numInputBatches += 1 - inBatch - } - .asJava - ) - - val out = Iterators - .wrap(appender.asScala) - .protectInvocationFlow() - .collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis)) - .recyclePayload(_.close()) - .recycleIterator { - appender.close() - selfTime += appendMillis.get() - } - .create() - .map { - outBatch => - numOutputRows += outBatch.numRows() - numOutputBatches += 1 - outBatch - } + override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = { + VeloxBatchResizer.create(minOutputBatchSize, maxOutputBatchSize, in.asJava).asScala + } - out + override protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = { + out.asJava match { + case c: ClosableIterator[ColumnarBatch] => c.close() + case _ => } } - override def output: Seq[Attribute] = child.output + override protected def needRecyclePayload: Boolean = true + override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala index aea311c01dbe..ebe55f573c62 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec} /** - * Try to append [[VeloxResizeBatchesExec]] for shuffle input and ouput to make the batch sizes in + * Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to make the batch sizes in * good shape. */ case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] { diff --git a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala index 17eea29238dd..3b5fce63f8c2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala @@ -145,7 +145,7 @@ private class ColumnarBatchSerializerInstanceImpl( with TaskResource { private val streamReader = ShuffleStreamReader(streams) - private val wrappedOut: ClosableIterator = new ColumnarBatchOutIterator( + private val wrappedOut: ClosableIterator[ColumnarBatch] = new ColumnarBatchOutIterator( runtime, jniWrapper .read(shuffleReaderHandle, streamReader)) diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java index 4293b2abf867..f4d2c8e7d1b1 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java @@ -25,7 +25,8 @@ import java.io.IOException; -public class ColumnarBatchOutIterator extends ClosableIterator implements RuntimeAware { +public class ColumnarBatchOutIterator extends ClosableIterator + implements RuntimeAware { private final Runtime runtime; private final long iterHandle; diff --git a/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java b/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java index 6ed9d6c18083..7947b09af9b7 100644 --- a/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java +++ b/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java @@ -18,14 +18,11 @@ import org.apache.gluten.exception.GlutenException; -import org.apache.spark.sql.vectorized.ColumnarBatch; - import java.io.Serializable; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; -public abstract class ClosableIterator - implements AutoCloseable, Serializable, Iterator { +public abstract class ClosableIterator implements AutoCloseable, Serializable, Iterator { protected final AtomicBoolean closed = new AtomicBoolean(false); public ClosableIterator() {} @@ -43,7 +40,7 @@ public final boolean hasNext() { } @Override - public final ColumnarBatch next() { + public final T next() { if (closed.get()) { throw new GlutenException("Iterator has been closed."); } @@ -65,5 +62,5 @@ public final void close() { protected abstract boolean hasNext0() throws Exception; - protected abstract ColumnarBatch next0() throws Exception; + protected abstract T next0() throws Exception; } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala index ac1f8d683570..f19b89898388 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala @@ -32,9 +32,16 @@ abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: Convention extends ColumnarToColumnarTransition with GlutenPlan { + override def isSameConvention: Boolean = from == to + def child: SparkPlan + protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] + protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = {} + + protected def needRecyclePayload: Boolean = false + override lazy val metrics: Map[String, SQLMetric] = Map( "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), @@ -77,12 +84,18 @@ abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: Convention inBatch } val out = mapIterator(wrappedIn) - val wrappedOut = Iterators + val builder = Iterators .wrap(out) + .protectInvocationFlow() .collectReadMillis(outMillis => selfMillis.getAndAdd(outMillis)) .recycleIterator { + closeIterator(out) selfTime += selfMillis.get() } + if (needRecyclePayload) { + builder.recyclePayload(_.close()) + } + builder .create() .map { outBatch => @@ -90,7 +103,6 @@ abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: Convention numOutputBatches += 1 outBatch } - wrappedOut } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala index 36c79c3f197e..8c4757b45c83 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala @@ -18,4 +18,6 @@ package org.apache.gluten.execution import org.apache.spark.sql.execution.UnaryExecNode -trait ColumnarToColumnarTransition extends UnaryExecNode +trait ColumnarToColumnarTransition extends UnaryExecNode { + def isSameConvention: Boolean +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala index a10c41cbca1b..9f309b843f55 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala @@ -65,11 +65,11 @@ package object transition { } } - // Extractor for Gluten's C2C + // Extractor for Gluten's C2C with different convention object ColumnarToColumnarLike { def unapply(plan: SparkPlan): Option[SparkPlan] = { plan match { - case c2c: ColumnarToColumnarTransition => + case c2c: ColumnarToColumnarTransition if !c2c.isSameConvention => Some(c2c.child) case _ => None }