diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index 59a9cb2b00fc..d542fd92b92c 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.utils.SparkArrowUtil import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.task.TaskResources +import org.apache.spark.util.KnownSizeEstimation import org.apache.arrow.c.ArrowSchema @@ -61,7 +62,8 @@ case class ColumnarBuildSideRelation( output: Seq[Attribute], batches: Array[Array[Byte]], safeBroadcastMode: SafeBroadcastMode) - extends BuildSideRelation { + extends BuildSideRelation + with KnownSizeEstimation { // Rebuild the real BroadcastMode on demand; never serialize it. @transient override lazy val mode: BroadcastMode = @@ -236,4 +238,11 @@ case class ColumnarBuildSideRelation( } iterator.toArray } + override def estimatedSize: Long = { + if (batches != null) { + batches.map(_.length.toLong).sum + } else { + 0L + } + } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index 80e92a1537e8..308834657a00 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.utils.SparkArrowUtil import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.task.TaskResources -import org.apache.spark.util.Utils +import org.apache.spark.util.{KnownSizeEstimation, Utils} import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} @@ -102,7 +102,8 @@ case class UnsafeColumnarBuildSideRelation( extends BuildSideRelation with Externalizable with Logging - with KryoSerializable { + with KryoSerializable + with KnownSizeEstimation { // Rebuild the real BroadcastMode on demand; never serialize it. @transient override lazy val mode: BroadcastMode = @@ -366,4 +367,12 @@ case class UnsafeColumnarBuildSideRelation( } iterator.toArray } + + override def estimatedSize: Long = { + if (batches != null) { + batches.totalBytes + } else { + 0L + } + } }