From 13463450b25582e9fdb955db77e3ef7fd71142c4 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Mon, 29 Sep 2025 11:44:37 +0800 Subject: [PATCH 1/3] [VL]Implement KnownSizeEstimation for buildSideRelation --- .../sql/execution/ColumnarBuildSideRelation.scala | 10 +++++++++- .../unsafe/UnsafeColumnarBuildSideRelation.scala | 14 +++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index 59a9cb2b00fc..0fe01c6f030a 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.utils.SparkArrowUtil import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.task.TaskResources +import org.apache.spark.util.KnownSizeEstimation import org.apache.arrow.c.ArrowSchema @@ -61,7 +62,7 @@ case class ColumnarBuildSideRelation( output: Seq[Attribute], batches: Array[Array[Byte]], safeBroadcastMode: SafeBroadcastMode) - extends BuildSideRelation { + extends BuildSideRelation with KnownSizeEstimation { // Rebuild the real BroadcastMode on demand; never serialize it. @transient override lazy val mode: BroadcastMode = @@ -236,4 +237,11 @@ case class ColumnarBuildSideRelation( } iterator.toArray } + override def estimatedSize: Long = { + if (null != batches) { + batches.map(_.length.toLong).sum + } else { + 0L + } + } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index 80e92a1537e8..6a2a7e3703d6 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -36,8 +36,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.utils.SparkArrowUtil import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.task.TaskResources -import org.apache.spark.util.Utils - +import org.apache.spark.util.{KnownSizeEstimation, Utils} import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.arrow.c.ArrowSchema @@ -102,7 +101,8 @@ case class UnsafeColumnarBuildSideRelation( extends BuildSideRelation with Externalizable with Logging - with KryoSerializable { + with KryoSerializable + with KnownSizeEstimation { // Rebuild the real BroadcastMode on demand; never serialize it. @transient override lazy val mode: BroadcastMode = @@ -366,4 +366,12 @@ case class UnsafeColumnarBuildSideRelation( } iterator.toArray } + + override def estimatedSize: Long = { + if (null != batches) { + batches.totalBytes + } else { + 0L + } + } } From 0f1734f084c02f444ddf570de38ea489395308db Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Mon, 29 Sep 2025 15:00:28 +0800 Subject: [PATCH 2/3] fix style --- .../apache/spark/sql/execution/ColumnarBuildSideRelation.scala | 3 ++- .../sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index 0fe01c6f030a..1cde77ea75cf 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -62,7 +62,8 @@ case class ColumnarBuildSideRelation( output: Seq[Attribute], batches: Array[Array[Byte]], safeBroadcastMode: SafeBroadcastMode) - extends BuildSideRelation with KnownSizeEstimation { + extends BuildSideRelation + with KnownSizeEstimation { // Rebuild the real BroadcastMode on demand; never serialize it. @transient override lazy val mode: BroadcastMode = diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index 6a2a7e3703d6..0f927bc89c3c 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.utils.SparkArrowUtil import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.task.TaskResources import org.apache.spark.util.{KnownSizeEstimation, Utils} + import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.arrow.c.ArrowSchema From 9857f492ccaf8bae60c7d6cdd33473f80a2c38f2 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Sat, 11 Oct 2025 14:17:53 +0800 Subject: [PATCH 3/3] address comments --- .../apache/spark/sql/execution/ColumnarBuildSideRelation.scala | 2 +- .../sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index 1cde77ea75cf..d542fd92b92c 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -239,7 +239,7 @@ case class ColumnarBuildSideRelation( iterator.toArray } override def estimatedSize: Long = { - if (null != batches) { + if (batches != null) { batches.map(_.length.toLong).sum } else { 0L diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index 0f927bc89c3c..308834657a00 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -369,7 +369,7 @@ case class UnsafeColumnarBuildSideRelation( } override def estimatedSize: Long = { - if (null != batches) { + if (batches != null) { batches.totalBytes } else { 0L