Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import java.util.concurrent.atomic.AtomicBoolean;

public class BatchIterator extends ClosableIterator {
public class BatchIterator extends ClosableIterator<ColumnarBatch> {
private final long handle;
private final AtomicBoolean cancelled = new AtomicBoolean(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) {
override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
in.map {
b =>
val out = VeloxColumnarBatches.toVeloxBatch(b)
out
}
in.map(b => VeloxColumnarBatches.toVeloxBatch(b))
}

override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
ArrowColumnarToVeloxColumnarExec(child = newChild)
copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,99 +16,40 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.backendsapi.velox.VeloxBatchType
import org.apache.gluten.iterator.ClosableIterator
import org.apache.gluten.utils.VeloxBatchResizer

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.ColumnarBatch

import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

/**
* An operator to resize input batches by appending the later batches to the one that comes earlier,
* or splitting one batch to smaller ones.
*
* FIXME: Code duplication with ColumnarToColumnarExec.
*/
case class VeloxResizeBatchesExec(
override val child: SparkPlan,
minOutputBatchSize: Int,
maxOutputBatchSize: Int)
extends GlutenPlan
with UnaryExecNode {

override lazy val metrics: Map[String, SQLMetric] = Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append / split batches")
)

override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType

override def rowType0(): Convention.RowType = Convention.RowType.None
extends ColumnarToColumnarExec(VeloxBatchType, VeloxBatchType) {

override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException()

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numInputBatches = longMetric("numInputBatches")
val numOutputRows = longMetric("numOutputRows")
val numOutputBatches = longMetric("numOutputBatches")
val selfTime = longMetric("selfTime")

child.executeColumnar().mapPartitions {
in =>
// Append millis = Out millis - In millis.
val appendMillis = new AtomicLong(0L)
val appender = VeloxBatchResizer.create(
minOutputBatchSize,
maxOutputBatchSize,
Iterators
.wrap(in)
.collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis))
.create()
.map {
inBatch =>
numInputRows += inBatch.numRows()
numInputBatches += 1
inBatch
}
.asJava
)

val out = Iterators
.wrap(appender.asScala)
.protectInvocationFlow()
.collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis))
.recyclePayload(_.close())
.recycleIterator {
appender.close()
selfTime += appendMillis.get()
}
.create()
.map {
outBatch =>
numOutputRows += outBatch.numRows()
numOutputBatches += 1
outBatch
}
override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
VeloxBatchResizer.create(minOutputBatchSize, maxOutputBatchSize, in.asJava).asScala
}

out
override protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = {
out.asJava match {
case c: ClosableIterator[ColumnarBatch] => c.close()
case _ =>
}
}

override def output: Seq[Attribute] = child.output
override protected def needRecyclePayload: Boolean = true

override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec}

/**
* Try to append [[VeloxResizeBatchesExec]] for shuffle input and ouput to make the batch sizes in
* Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to make the batch sizes in
* good shape.
*/
case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private class ColumnarBatchSerializerInstanceImpl(
with TaskResource {
private val streamReader = ShuffleStreamReader(streams)

private val wrappedOut: ClosableIterator = new ColumnarBatchOutIterator(
private val wrappedOut: ClosableIterator[ColumnarBatch] = new ColumnarBatchOutIterator(
runtime,
jniWrapper
.read(shuffleReaderHandle, streamReader))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

import java.io.IOException;

public class ColumnarBatchOutIterator extends ClosableIterator implements RuntimeAware {
public class ColumnarBatchOutIterator extends ClosableIterator<ColumnarBatch>
implements RuntimeAware {
private final Runtime runtime;
private final long iterHandle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@

import org.apache.gluten.exception.GlutenException;

import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class ClosableIterator
implements AutoCloseable, Serializable, Iterator<ColumnarBatch> {
public abstract class ClosableIterator<T> implements AutoCloseable, Serializable, Iterator<T> {
protected final AtomicBoolean closed = new AtomicBoolean(false);

public ClosableIterator() {}
Expand All @@ -43,7 +40,7 @@ public final boolean hasNext() {
}

@Override
public final ColumnarBatch next() {
public final T next() {
if (closed.get()) {
throw new GlutenException("Iterator has been closed.");
}
Expand All @@ -65,5 +62,5 @@ public final void close() {

protected abstract boolean hasNext0() throws Exception;

protected abstract ColumnarBatch next0() throws Exception;
protected abstract T next0() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: Convention
extends ColumnarToColumnarTransition
with GlutenPlan {

override def isSameConvention: Boolean = from == to

def child: SparkPlan

protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch]

protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = {}

protected def needRecyclePayload: Boolean = false

override lazy val metrics: Map[String, SQLMetric] =
Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
Expand Down Expand Up @@ -77,20 +84,25 @@ abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: Convention
inBatch
}
val out = mapIterator(wrappedIn)
val wrappedOut = Iterators
val builder = Iterators
.wrap(out)
.protectInvocationFlow()
.collectReadMillis(outMillis => selfMillis.getAndAdd(outMillis))
.recycleIterator {
closeIterator(out)
selfTime += selfMillis.get()
}
if (needRecyclePayload) {
builder.recyclePayload(_.close())
}
builder
.create()
.map {
outBatch =>
numOutputRows += outBatch.numRows()
numOutputBatches += 1
outBatch
}
wrappedOut
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ package org.apache.gluten.execution

import org.apache.spark.sql.execution.UnaryExecNode

trait ColumnarToColumnarTransition extends UnaryExecNode
trait ColumnarToColumnarTransition extends UnaryExecNode {
def isSameConvention: Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ package object transition {
}
}

// Extractor for Gluten's C2C
// Extractor for Gluten's C2C with different convention
object ColumnarToColumnarLike {
def unapply(plan: SparkPlan): Option[SparkPlan] = {
plan match {
case c2c: ColumnarToColumnarTransition =>
case c2c: ColumnarToColumnarTransition if !c2c.isSameConvention =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zouxxyy

Sorry I missed out this change. Would you explain the purpose of it? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnarToColumnarLike is used by the Coster to calculate C2C costs. Previously, VeloxResizeBatchesExec didn't inherit from ColumnarToColumnarTransition, so it wasn't included in the cost calculation. This change is to maintain the previous behavior.

Besides, in my testing, without this, ELECT max(l_orderkey) FROM lineitem will not generate a VeloxResizeBatchesExec node because it is eliminated after the cost calculation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently VeloxResizeBatchesExec is added by rule https://github.com/apache/incubator-gluten/blob/cd2c0cca9b9478a050bfbb90f15e75f99e7adcd2/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala#L31-L59 without costers involved. Am I missing something? How is it eliminated after being added by the rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer , Not sure if my understanding is correct, in InsertTransitions we removeForNode ColumnarToColumnarLike, and then fillWithTransitions

case class InsertTransitions(convReq: ConventionReq) extends Rule[SparkPlan] {
  private val convFunc = ConventionFunc.create()

  override def apply(plan: SparkPlan): SparkPlan = {
    // Remove all transitions at first.
    val removed = RemoveTransitions.apply(plan)
    val filled = fillWithTransitions(removed)
    val out = Transitions.enforceReq(filled, convReq)
    out
  }

but in VeloxBatchType we have no the Transitions, so after we remove node, we loss VeloxResizeBatchesExec

object VeloxBatchType extends Convention.BatchType {
  override protected def registerTransitions(): Unit = {
    fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
    toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
    fromBatch(ArrowBatchTypes.ArrowNativeBatchType, ArrowColumnarToVeloxColumnarExec.apply)
    toBatch(ArrowBatchTypes.ArrowNativeBatchType, Transition.empty)
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the current c2c is automatically added, and resize is manually added, so we should not remove it. and whether velox resize should also be included in the cost

Some(c2c.child)
case _ => None
}
Expand Down