From 78e985f2050b9521984ccb34b60dda8ca372fa3d Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 4 Feb 2026 15:01:12 +0100 Subject: [PATCH 1/9] Add API ColumnarBatchOutIterator#requestBarrier --- cpp/core/compute/Runtime.h | 4 ++++ cpp/core/jni/JniWrapper.cc | 14 +++++++++++ .../memory/SplitAwareColumnarBatchIterator.h | 6 +++++ cpp/velox/compute/VeloxRuntime.cc | 8 +++++++ cpp/velox/compute/VeloxRuntime.h | 2 ++ cpp/velox/compute/WholeStageResultIterator.cc | 7 ++++++ cpp/velox/compute/WholeStageResultIterator.h | 6 +++++ .../vectorized/ColumnarBatchOutIterator.java | 23 ++++++++++++++++++- 8 files changed, 69 insertions(+), 1 deletion(-) diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index a15cdc83d310..9d8315731fc5 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -110,6 +110,10 @@ class Runtime : public std::enable_shared_from_this { throw GlutenException("Not implemented"); } + virtual void requestBarrier(ResultIterator* iter) { + throw GlutenException("Not implemented"); + } + virtual std::shared_ptr createOrGetEmptySchemaBatch(int32_t numRows) { throw GlutenException("Not implemented"); } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index e9d797e1dbfb..a91f10641748 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -695,6 +695,20 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterato JNI_METHOD_END() } +JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeRequestBarrier( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong iterHandle) { + JNI_METHOD_START + auto ctx = getRuntime(env, wrapper); + auto iter = ObjectStore::retrieve(iterHandle); + if (iter == nullptr) { + throw GlutenException("Invalid iterator handle for requestBarrier"); + } + ctx->requestBarrier(iter.get()); + JNI_METHOD_END() +} + JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit( // NOLINT JNIEnv* env, diff --git a/cpp/core/memory/SplitAwareColumnarBatchIterator.h b/cpp/core/memory/SplitAwareColumnarBatchIterator.h index e12f38afadf5..987eb04351c9 100644 --- a/cpp/core/memory/SplitAwareColumnarBatchIterator.h +++ b/cpp/core/memory/SplitAwareColumnarBatchIterator.h @@ -44,6 +44,12 @@ class SplitAwareColumnarBatchIterator : public ColumnarBatchIterator { /// Signal that no more splits will be added to this iterator. /// This must be called after all splits have been added to ensure proper task completion. virtual void noMoreSplits() = 0; + + /// Request a barrier in task execution. This signals the task to finish processing + /// all currently queued splits and drain all stateful operators before continuing. + /// Enables task reuse and deterministic execution for streaming workloads. + /// @see https://facebookincubator.github.io/velox/develop/task-barrier.html + virtual void requestBarrier() = 0; }; } // namespace gluten diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 69d21b4a57c9..3a9bd8ae4208 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -210,6 +210,14 @@ void VeloxRuntime::noMoreSplits(ResultIterator* iter){ splitAwareIter->noMoreSplits(); } +void VeloxRuntime::requestBarrier(ResultIterator* iter){ + auto* splitAwareIter = dynamic_cast(iter->getInputIter()); + if (splitAwareIter == nullptr) { + throw GlutenException("Iterator does not support split management"); + } + splitAwareIter->requestBarrier(); +} + std::shared_ptr VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) { auto veloxPool = memoryManager()->getLeafMemoryPool(); return std::make_shared(veloxPool, column2RowMemThreshold); diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index a3c3da0c5ac9..728cc46c92a4 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -61,6 +61,8 @@ class VeloxRuntime final : public Runtime { void noMoreSplits(ResultIterator* iter) override; + void requestBarrier(ResultIterator* iter) override; + std::shared_ptr createColumnar2RowConverter(int64_t column2RowMemThreshold) override; std::shared_ptr createOrGetEmptySchemaBatch(int32_t numRows) override; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index ac5773e439ee..97d2dfc6b66f 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -393,6 +393,13 @@ void WholeStageResultIterator::noMoreSplits() { allSplitsAdded = true; } +void WholeStageResultIterator::requestBarrier() { + if (task_ == nullptr) { + throw GlutenException("Cannot request barrier: task is null"); + } + task_->requestBarrier(); +} + void WholeStageResultIterator::collectMetrics() { if (metrics_) { // The metrics has already been created. diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 401cff06dafd..a6b30440a9a5 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -85,6 +85,12 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { /// This is required for proper task completion and enables future barrier support. void noMoreSplits() override; + /// Request a barrier in the Velox task execution. + /// This signals the task to finish processing all currently queued splits + /// and drain all stateful operators before continuing. + /// @see https://facebookincubator.github.io/velox/develop/task-barrier.html + void requestBarrier() override; + private: /// Get the Spark confs to Velox query context. std::unordered_map getQueryContextConf(); diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java index ad200dd46c81..684d78d6b905 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java @@ -58,6 +58,8 @@ private native boolean nativeAddIteratorSplits( private native void nativeNoMoreSplits(long iterHandle); + private native void nativeRequestBarrier(long iterHandle); + @Override public boolean hasNext0() throws IOException { return nativeHasNext(iterHandle); @@ -97,7 +99,7 @@ public boolean addIteratorSplits(ColumnarBatchInIterator[] batchItr) { /** * Signal that no more splits will be added to the iterator. This is required for proper task - * completion and is a prerequisite for barrier support. + * completion. * * @throws IllegalStateException if the iterator is closed */ @@ -108,6 +110,25 @@ public void noMoreSplits() { nativeNoMoreSplits(iterHandle); } + /** + * Request a barrier in the task execution. This signals the task to finish processing + * all currently queued splits and drain all stateful operators before continuing. + * After calling this method, continue calling next() to fetch results. When next() returns + * null and hasNext() returns false, the barrier has been reached. + * + * This enables task reuse and deterministic execution for workloads like AI training data + * loading and real-time streaming processing. + * + * @throws IllegalStateException if the iterator is closed + * @see Velox Task Barrier Documentation + */ + public void requestBarrier() { + if (closed.get()) { + throw new IllegalStateException("Cannot call requestBarrier on a closed iterator"); + } + nativeRequestBarrier(iterHandle); + } + @Override public void close0() { // To make sure the outputted batches are still accessible after the iterator is closed. From 5a75c83f38f9ca2e7aeb658ee164579f93b06fd9 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 12 Jan 2026 12:52:01 +0100 Subject: [PATCH 2/9] [GLUTEN-10215][VL] Delta Write: Native statistics tracker to avoid C2R overhead The patch adds a native job statistics tracker for Delta write to eliminate C2R overhead. More PR description WIP. --- .../delta/GlutenOptimisticTransaction.scala | 13 +- .../files/GlutenDeltaFileFormatWriter.scala | 19 +- ... GlutenDeltaJobStatsFallbackTracker.scala} | 51 +-- ...lutenDeltaJobStatsRowCountingTracker.scala | 66 ++++ .../stats/GlutenDeltaJobStatsTracker.scala | 316 ++++++++++++++++++ .../datasources/velox/VeloxBlockStripes.java | 46 +-- .../columnar/validator/Validator.scala | 9 +- 7 files changed, 449 insertions(+), 71 deletions(-) rename backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/{GlutenDeltaWriteJobStatsTracker.scala => GlutenDeltaJobStatsFallbackTracker.scala} (57%) create mode 100644 backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala create mode 100644 backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala index af19d1df9f91..351c2effb784 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.delta.hooks.AutoCompact import org.apache.spark.sql.delta.perf.{DeltaOptimizedWriterExec, GlutenDeltaOptimizedWriterExec} import org.apache.spark.sql.delta.schema.InnerInvariantViolationException import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.stats.{GlutenDeltaIdentityColumnStatsTracker, GlutenDeltaJobStatisticsTracker} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker} @@ -72,8 +71,7 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) // the FileFormatWriter.write call below and will collect per-file stats using // StatisticsCollection val optionalStatsTracker = - getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)._1.map( - new GlutenDeltaJobStatisticsTracker(_)) + getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)._1 val constraints = Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints @@ -87,7 +85,6 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) statsDataSchema, trackIdentityHighWaterMarks ) - .map(new GlutenDeltaIdentityColumnStatsTracker(_)) SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) { val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output) @@ -96,8 +93,8 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) convertEmptyToNullIfNeeded(queryExecution.executedPlan, partitioningColumns, constraints) val maybeCheckInvariants = if (constraints.isEmpty) { // Compared to vanilla Delta, we simply avoid adding the invariant checker - // when the constraint list is empty, to omit the unnecessary transitions - // added around the invariant checker. + // when the constraint list is empty, to prevent the unnecessary transitions + // from being added around the invariant checker. empty2NullPlan } else { DeltaInvariantCheckerExec(empty2NullPlan, constraints) @@ -206,7 +203,7 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) committer.addedStatuses.map { a => a.copy(stats = optionalStatsTracker - .map(_.delegate.recordedStats(a.toPath.getName)) + .map(_.recordedStats(a.toPath.getName)) .getOrElse(a.stats)) } } else { @@ -235,7 +232,7 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) if (resultFiles.nonEmpty && !isOptimize) registerPostCommitHook(AutoCompact) // Record the updated high water marks to be used during transaction commit. identityTrackerOpt.ifDefined { - tracker => updatedIdentityHighWaterMarks.appendAll(tracker.delegate.highWaterMarks.toSeq) + tracker => updatedIdentityHighWaterMarks.appendAll(tracker.highWaterMarks.toSeq) } resultFiles.toSeq ++ committer.changeFiles diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala index 74a1e3f03673..653b705becf0 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat} import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec @@ -125,13 +126,14 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { val dataSchema = dataColumns.toStructType DataSourceUtils.verifySchema(fileFormat, dataSchema) DataSourceUtils.checkFieldNames(fileFormat, dataSchema) - // Note: prepareWrite has side effect. It sets "job". + val isNativeWritable = GlutenParquetFileFormat.isNativeWritable(dataSchema) val outputDataColumns = if (caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true")) { dataColumns ++ partitionColumns } else dataColumns + // Note: prepareWrite has side effect. It sets "job". val outputWriterFactory = fileFormat.prepareWrite( sparkSession, @@ -140,6 +142,12 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { outputDataColumns.toStructType ) + val maybeWrappedStatsTrackers: Seq[WriteJobStatsTracker] = if (isNativeWritable) { + statsTrackers.map(GlutenDeltaJobStatsTracker(_)) + } else { + statsTrackers + } + val description = new WriteJobDescription( uuid = UUID.randomUUID.toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), @@ -157,7 +165,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { timeZoneId = caseInsensitiveOptions .get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone), - statsTrackers = statsTrackers + statsTrackers = maybeWrappedStatsTrackers ) // We should first sort by dynamic partition columns, then bucket id, and finally sorting @@ -222,7 +230,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { partitionColumns, sortColumns, orderingMatched, - GlutenParquetFileFormat.isNativeWritable(dataSchema) + isNativeWritable ) } } @@ -459,6 +467,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { } else { concurrentOutputWriterSpec match { case Some(spec) => + // TODO: Concurrent writer is not yet supported. new DynamicPartitionDataConcurrentWriter( description, taskAttemptContext, @@ -468,7 +477,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { case _ => // Columnar-based partition writer to divide the input batch by partition values // and bucket IDs in advance. - new ColumnarDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + new GlutenDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) } } @@ -525,7 +534,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { } } - private class ColumnarDynamicPartitionDataSingleWriter( + private class GlutenDynamicPartitionDataSingleWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol, diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala similarity index 57% rename from backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala rename to backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala index 30e61730c17b..4eab5408fa9c 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala @@ -19,17 +19,19 @@ package org.apache.spark.sql.delta.stats import org.apache.gluten.execution.{PlaceholderRow, TerminalRow, VeloxColumnarToRowExec} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.delta.DeltaIdentityColumnStatsTracker import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} import org.apache.spark.sql.execution.metric.SQLMetric -class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker) +/** + * A fallback stats tracker where a C2R converter converts all the incoming batches to rows then + * send to the delegate tracker. + */ +private[stats] class GlutenDeltaJobStatsFallbackTracker(val delegate: WriteJobStatsTracker) extends WriteJobStatsTracker { - import GlutenDeltaJobStatisticsTracker._ + import GlutenDeltaJobStatsFallbackTracker._ override def newTaskInstance(): WriteTaskStatsTracker = { - new GlutenDeltaTaskStatisticsTracker( - delegate.newTaskInstance().asInstanceOf[DeltaTaskStatisticsTracker]) + new GlutenDeltaTaskStatsFallbackTracker(delegate.newTaskInstance()) } override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { @@ -37,43 +39,22 @@ class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker) } } -class GlutenDeltaIdentityColumnStatsTracker(override val delegate: DeltaIdentityColumnStatsTracker) - extends GlutenDeltaJobStatisticsTracker(delegate) - -private object GlutenDeltaJobStatisticsTracker { - - /** - * This is a temporary implementation of statistics tracker for Delta Lake. It's sub-optimal in - * performance because it internally performs C2R then send rows to the delegate row-based - * tracker. - * - * TODO: Columnar-based statistics collection. - */ - private class GlutenDeltaTaskStatisticsTracker(delegate: DeltaTaskStatisticsTracker) +private object GlutenDeltaJobStatsFallbackTracker { + private class GlutenDeltaTaskStatsFallbackTracker(delegate: WriteTaskStatsTracker) extends WriteTaskStatsTracker { - private val c2r = new VeloxColumnarToRowExec.Converter(new SQLMetric("convertTime")) - override def newPartition(partitionValues: InternalRow): Unit = { + override def newPartition(partitionValues: InternalRow): Unit = delegate.newPartition(partitionValues) - } - override def newFile(filePath: String): Unit = { - delegate.newFile(filePath) - } + override def newFile(filePath: String): Unit = delegate.newFile(filePath) - override def closeFile(filePath: String): Unit = { - delegate.closeFile(filePath) - } + override def closeFile(filePath: String): Unit = delegate.closeFile(filePath) - override def newRow(filePath: String, row: InternalRow): Unit = { - row match { - case _: PlaceholderRow => - case t: TerminalRow => - c2r.toRowIterator(t.batch()).foreach(eachRow => delegate.newRow(filePath, eachRow)) - case otherRow => - delegate.newRow(filePath, otherRow) - } + override def newRow(filePath: String, row: InternalRow): Unit = row match { + case _: PlaceholderRow => + case t: TerminalRow => + c2r.toRowIterator(t.batch()).foreach(eachRow => delegate.newRow(filePath, eachRow)) } override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala new file mode 100644 index 000000000000..d4086cf8e9b7 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.stats + +import org.apache.gluten.execution.{PlaceholderRow, TerminalRow} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} + +/** + * A fallback stats tracker to simply call `newRow` many times when a columnar batch comes. There is + * no C2R process involved to save performance. Therefore, the delegate stats tracker must not read + * the row in its `newRow` implementation. + */ +private[stats] class GlutenDeltaJobStatsRowCountingTracker(val delegate: WriteJobStatsTracker) + extends WriteJobStatsTracker { + import GlutenDeltaJobStatsRowCountingTracker._ + + override def newTaskInstance(): WriteTaskStatsTracker = { + new GlutenDeltaTaskStatsRowCountingTracker(delegate.newTaskInstance()) + } + + override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { + delegate.processStats(stats, jobCommitTime) + } +} + +private object GlutenDeltaJobStatsRowCountingTracker { + private class GlutenDeltaTaskStatsRowCountingTracker(delegate: WriteTaskStatsTracker) + extends WriteTaskStatsTracker { + override def newPartition(partitionValues: InternalRow): Unit = + delegate.newPartition(partitionValues) + + override def newFile(filePath: String): Unit = delegate.newFile(filePath) + + override def closeFile(filePath: String): Unit = delegate.closeFile(filePath) + + override def newRow(filePath: String, row: InternalRow): Unit = row match { + case _: PlaceholderRow => + case t: TerminalRow => + for (_ <- 0 until t.batch().numRows()) { + // Here we pass null row to the delegate stats tracker as we assume + // the delegate stats tracker only counts on row numbers. + delegate.newRow(filePath, null) + } + } + + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { + delegate.getFinalStats(taskCommitTime) + } + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala new file mode 100644 index 000000000000..d8e266db81ea --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.stats + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.backendsapi.velox.VeloxBatchType +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.execution._ +import org.apache.gluten.expression.{ConverterUtils, TransformerState} +import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform +import org.apache.gluten.extension.columnar.offload.OffloadOthers +import org.apache.gluten.extension.columnar.rewrite.PullOutPreProject +import org.apache.gluten.extension.columnar.transition.Convention +import org.apache.gluten.extension.columnar.validator.{Validator, Validators} +import org.apache.gluten.iterator.Iterators +import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.plan.PlanBuilder +import org.apache.gluten.vectorized.{ColumnarBatchInIterator, NativePlanEvaluator} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Projection, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, DeclarativeAggregate} +import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, LeafExecNode} +import org.apache.spark.sql.execution.aggregate.SortAggregateExec +import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil} + +import com.google.common.collect.Lists +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import java.util.UUID +import java.util.concurrent.{Callable, Executors, ExecutorService, SynchronousQueue, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** Gluten's stats tracker with vectorized aggregation inside to produce statistics efficiently. */ +private[stats] class GlutenDeltaJobStatsTracker(val delegate: DeltaJobStatisticsTracker) + extends WriteJobStatsTracker { + import GlutenDeltaJobStatsTracker._ + + @transient private val hadoopConf: Configuration = { + val clazz = classOf[DeltaJobStatisticsTracker] + val method = clazz.getDeclaredField("hadoopConf") + method.setAccessible(true) + method.get(delegate).asInstanceOf[Configuration] + } + @transient private val path = delegate.path + private val dataCols = delegate.dataCols + private val statsColExpr = delegate.statsColExpr + + private val srlHadoopConf = new SerializableConfiguration(hadoopConf) + private val rootUri = path.getFileSystem(hadoopConf).makeQualified(path).toUri + + override def newTaskInstance(): WriteTaskStatsTracker = { + val rootPath = new Path(rootUri) + val hadoopConf = srlHadoopConf.value + new GlutenDeltaTaskStatsTracker(dataCols, statsColExpr, rootPath, hadoopConf) + } + + override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { + delegate.processStats(stats, jobCommitTime) + } +} + +object GlutenDeltaJobStatsTracker extends Logging { + def apply(tracker: WriteJobStatsTracker): WriteJobStatsTracker = tracker match { + case tracker: BasicWriteJobStatsTracker => + new GlutenDeltaJobStatsRowCountingTracker(tracker) + case tracker: DeltaJobStatisticsTracker => + new GlutenDeltaJobStatsTracker(tracker) + case tracker => + logWarning( + "Gluten Delta: Creating fallback job stats tracker," + + " this involves frequent columnar-to-row conversions which may cause performance" + + " issues.") + new GlutenDeltaJobStatsFallbackTracker(tracker) + } + + /** A columnar-based statistics collection for Gluten + Delta Lake. */ + private class GlutenDeltaTaskStatsTracker( + dataCols: Seq[Attribute], + statsColExpr: Expression, + rootPath: Path, + hadoopConf: Configuration) + extends WriteTaskStatsTracker { + private val resultThreadRunner = Executors.newSingleThreadExecutor() + private val accumulators = mutable.Map[String, VeloxTaskStatsAccumulator]() + private val evaluator = NativePlanEvaluator.create( + BackendsApiManager.getBackendName, + Map.empty[String, String].asJava) + + override def newPartition(partitionValues: InternalRow): Unit = {} + + override def newFile(filePath: String): Unit = { + accumulators.getOrElseUpdate( + filePath, + new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, dataCols, statsColExpr) + ) + } + + override def closeFile(filePath: String): Unit = { + accumulators(filePath).setFinished() + } + + override def newRow(filePath: String, row: InternalRow): Unit = { + row match { + case _: PlaceholderRow => + case t: TerminalRow => + accumulators(filePath).appendColumnarBatch(t.batch()) + } + } + + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { + val stats = accumulators.map { + case (path, acc) => + new Path(path).getName -> acc.toJson + }.toMap + DeltaFileStatistics(stats) + } + } + + private class VeloxTaskStatsAccumulator( + evaluator: NativePlanEvaluator, + resultThreadRunner: ExecutorService, + dataCols: Seq[Attribute], + statsColExpr: Expression) { + private val c2r = new VeloxColumnarToRowExec.Converter(new SQLMetric("convertTime")) + private var resultRequested: Boolean = false + private val inputBatchQueue = new SynchronousQueue[ColumnarBatch]() + private val aggregates: Seq[AggregateExpression] = statsColExpr.collect { + case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => + assert(ae.mode == Complete) + ae + } + private val resultExpr: Expression = statsColExpr.transform { + case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => + ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression + } + protected val aggBufferAttrs: Seq[Attribute] = statsColExpr.flatMap { + case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => + ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].aggBufferAttributes + case _ => None + } + private val getStats: Projection = UnsafeProjection.create( + exprs = Seq(resultExpr), + inputSchema = aggBufferAttrs + ) + private val taskContext = TaskContext.get() + private val outIterator: Iterator[ColumnarBatch] = { + // Constructs an input iterator receiving the input rows first. + val inputIterator = new ColumnarBatchInIterator( + BackendsApiManager.getBackendName, + Iterators + .wrap(new Iterator[ColumnarBatch] { + private var batch: ColumnarBatch = _ + + override def hasNext: Boolean = { + assert(batch == null) + while (!Thread.currentThread().isInterrupted) { + val tmp = + try { + inputBatchQueue.poll(100, TimeUnit.MILLISECONDS) + } catch { + case _: InterruptedException => + Thread.currentThread().interrupt() + return false; + } + if (tmp != null) { + batch = tmp + return true + } + if (resultRequested) { + return false + } + } + throw new IllegalStateException() + } + + override def next(): ColumnarBatch = { + assert(batch != null) + try { + batch + } finally { + batch = null + } + } + }) + .recyclePayload(b => b.close()) + .create() + .asJava + ) + + val inputNode = StatisticsInputNode(dataCols) + + val aggOp = SortAggregateExec( + None, + false, + None, + Seq.empty, + aggregates, + Seq(AttributeReference("stats", StringType)()), + 0, + Seq.empty, + inputNode + ) + // Invoke the legacy transform rule to get a local Velox aggregation query plan. + val offloads = Seq(OffloadOthers()).map(_.toStrcitRule()) + val validatorBuilder: GlutenConfig => Validator = conf => + Validators.newValidator(conf, offloads) + val rewrites = Seq(PullOutPreProject) + val config = GlutenConfig.get + val transformRule = + HeuristicTransform.WithRewrites(validatorBuilder(config), rewrites, offloads) + val aggTransformer = ColumnarCollapseTransformStages(config)(transformRule(aggOp)) + .asInstanceOf[WholeStageTransformer] + .child + .asInstanceOf[RegularHashAggregateExecTransformer] + val substraitContext = new SubstraitContext + TransformerState.enterValidation + val transformedNode = + try { + aggTransformer.transform(substraitContext) + } finally { + TransformerState.finishValidation + } + val outNames = aggTransformer.output.map(ConverterUtils.genColumnNameWithExprId).asJava + val planNode = + PlanBuilder.makePlan(substraitContext, Lists.newArrayList(transformedNode.root), outNames) + + val spillDirPath = SparkDirectoryUtil + .get() + .namespace("gluten-spill") + .mkChildDirRoundRobin(UUID.randomUUID.toString) + .getAbsolutePath + val nativeOutItr = evaluator + .createKernelWithBatchIterator( + planNode.toProtobuf.toByteArray, + new Array[Array[Byte]](0), + Seq(inputIterator).asJava, + 0, + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + ) + Iterators + .wrap(nativeOutItr.asScala) + .recyclePayload(b => b.close()) + .recycleIterator(nativeOutItr.close()) + .create() + } + + private val resultThreadName = + s"Gluten Delta Statistics Writer - ${System.identityHashCode(this)}" + private val resJsonFuture = resultThreadRunner.submit(new Callable[String] { + override def call(): String = { + Thread.currentThread().setName(resultThreadName) + TaskContext.setTaskContext(taskContext) + val rows = outIterator.flatMap(batch => c2r.toRowIterator(batch)).toSeq + assert( + rows.size == 1, + "Only one single output row is expected from the global aggregation.") + val row = rows.head + val jsonStats = getStats(row).getString(0) + jsonStats + } + }) + + def appendColumnarBatch(inputBatch: ColumnarBatch): Unit = { + // Retains the input batch so it will be shared by the data writer thread + // and the statistic writer thread. + ColumnarBatches.retain(inputBatch) + inputBatchQueue.put(inputBatch) + } + + def setFinished(): Unit = { + resultRequested = true + resJsonFuture.get() // Blocking wait until the task is released. + } + + def toJson: String = { + val jsonStats = resJsonFuture.get() + jsonStats + } + } + + private case class StatisticsInputNode(override val output: Seq[Attribute]) + extends GlutenPlan + with LeafExecNode { + override def batchType(): Convention.BatchType = VeloxBatchType + override def rowType0(): Convention.RowType = Convention.RowType.None + override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = + throw new UnsupportedOperationException() + } +} diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java index 646164de3a21..8e47268574e6 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java @@ -26,10 +26,34 @@ import java.util.Iterator; public class VeloxBlockStripes extends BlockStripes { + private final BlockStripe[] blockStripes; + public VeloxBlockStripes(BlockStripes bs) { super(bs.originBlockAddress, bs.blockAddresses, bs.headingRowIndice, bs.originBlockNumColumns, bs.headingRowBytes); + blockStripes = new BlockStripe[blockAddresses.length]; + for (int i = 0; i < blockStripes.length; i++) { + final long blockAddress = blockAddresses[i]; + final byte[] headingRowByteArray = headingRowBytes[i]; + blockStripes[i] = new BlockStripe() { + private final ColumnarBatch batch = ColumnarBatches.create(blockAddress); + private final UnsafeRow headingRow = new UnsafeRow(headingRowByteArray.length); + { + headingRow.pointTo(headingRowByteArray, headingRowByteArray.length); + } + + @Override + public ColumnarBatch getColumnarBatch() { + return batch; + } + + @Override + public InternalRow getHeadingRow() { + return headingRow; + } + }; + } } @Override @@ -44,23 +68,8 @@ public boolean hasNext() { @Override public BlockStripe next() { - final BlockStripe nextStripe = new BlockStripe() { - private final long blockAddress = blockAddresses[index]; - private final byte[] headingRowByteArray = headingRowBytes[index]; - - @Override - public ColumnarBatch getColumnarBatch() { - return ColumnarBatches.create(blockAddress); - } - - @Override - public InternalRow getHeadingRow() { - UnsafeRow row = new UnsafeRow(originBlockNumColumns); - row.pointTo(headingRowByteArray, headingRowByteArray.length); - return row; - } - }; - index += 1; + final BlockStripe nextStripe = blockStripes[index]; + index++; return nextStripe; } }; @@ -69,7 +78,6 @@ public InternalRow getHeadingRow() { @Override public void release() { - + // Do nothing. We rely on the caller to call #close API on columnar batches returned to them. } } - diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala index b6f1313ff502..31ce4b05afae 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala @@ -43,6 +43,11 @@ object Validator { case object Passed extends OutCome case class Failed private (reason: String) extends OutCome + private object NoopValidator extends Validator { + override def validate(plan: SparkPlan): Validator.OutCome = pass() + } + + def noop(): Validator = NoopValidator def builder(): Builder = Builder() class Builder private { @@ -75,10 +80,6 @@ object Validator { private object Builder { def apply(): Builder = new Builder() - private object NoopValidator extends Validator { - override def validate(plan: SparkPlan): Validator.OutCome = pass() - } - private class ValidatorPipeline(val validators: Seq[Validator]) extends Validator { assert(!validators.exists(_.isInstanceOf[ValidatorPipeline])) From 993274b44659c98ded69d53fac7a55ec29a662c3 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 4 Feb 2026 16:17:08 +0100 Subject: [PATCH 3/9] fixup --- .../spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index d8e266db81ea..20484d8296bd 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -258,11 +258,13 @@ object GlutenDeltaJobStatsTracker extends Logging { val nativeOutItr = evaluator .createKernelWithBatchIterator( planNode.toProtobuf.toByteArray, - new Array[Array[Byte]](0), - Seq(inputIterator).asJava, + null, + null, 0, BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) ) + nativeOutItr.addIteratorSplits(Array(inputIterator)) + nativeOutItr.noMoreSplits() Iterators .wrap(nativeOutItr.asScala) .recyclePayload(b => b.close()) From 9aae596eb3bbc275b562af33222fc35854a42fec Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 4 Feb 2026 17:15:49 +0100 Subject: [PATCH 4/9] fixup --- .../stats/GlutenDeltaJobStatsTracker.scala | 218 ++++++++---------- 1 file changed, 99 insertions(+), 119 deletions(-) diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index 20484d8296bd..915980f59f75 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.delta.stats import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.backendsapi.velox.VeloxBatchType -import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution._ import org.apache.gluten.expression.{ConverterUtils, TransformerState} @@ -30,7 +29,7 @@ import org.apache.gluten.extension.columnar.validator.{Validator, Validators} import org.apache.gluten.iterator.Iterators import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.plan.PlanBuilder -import org.apache.gluten.vectorized.{ColumnarBatchInIterator, NativePlanEvaluator} +import org.apache.gluten.vectorized.{ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -51,7 +50,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import java.util.UUID -import java.util.concurrent.{Callable, Executors, ExecutorService, SynchronousQueue, TimeUnit} +import java.util.concurrent.{Callable, Executors, Future, SynchronousQueue, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -106,49 +105,13 @@ object GlutenDeltaJobStatsTracker extends Logging { rootPath: Path, hadoopConf: Configuration) extends WriteTaskStatsTracker { + // We use one single thread to ensure the statistic files are written in serial. + // Do not increase the thread number, otherwise sanity will not be guaranteed. private val resultThreadRunner = Executors.newSingleThreadExecutor() - private val accumulators = mutable.Map[String, VeloxTaskStatsAccumulator]() private val evaluator = NativePlanEvaluator.create( BackendsApiManager.getBackendName, Map.empty[String, String].asJava) - - override def newPartition(partitionValues: InternalRow): Unit = {} - - override def newFile(filePath: String): Unit = { - accumulators.getOrElseUpdate( - filePath, - new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, dataCols, statsColExpr) - ) - } - - override def closeFile(filePath: String): Unit = { - accumulators(filePath).setFinished() - } - - override def newRow(filePath: String, row: InternalRow): Unit = { - row match { - case _: PlaceholderRow => - case t: TerminalRow => - accumulators(filePath).appendColumnarBatch(t.batch()) - } - } - - override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { - val stats = accumulators.map { - case (path, acc) => - new Path(path).getName -> acc.toJson - }.toMap - DeltaFileStatistics(stats) - } - } - - private class VeloxTaskStatsAccumulator( - evaluator: NativePlanEvaluator, - resultThreadRunner: ExecutorService, - dataCols: Seq[Attribute], - statsColExpr: Expression) { private val c2r = new VeloxColumnarToRowExec.Converter(new SQLMetric("convertTime")) - private var resultRequested: Boolean = false private val inputBatchQueue = new SynchronousQueue[ColumnarBatch]() private val aggregates: Seq[AggregateExpression] = statsColExpr.collect { case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => @@ -169,52 +132,8 @@ object GlutenDeltaJobStatsTracker extends Logging { inputSchema = aggBufferAttrs ) private val taskContext = TaskContext.get() - private val outIterator: Iterator[ColumnarBatch] = { - // Constructs an input iterator receiving the input rows first. - val inputIterator = new ColumnarBatchInIterator( - BackendsApiManager.getBackendName, - Iterators - .wrap(new Iterator[ColumnarBatch] { - private var batch: ColumnarBatch = _ - - override def hasNext: Boolean = { - assert(batch == null) - while (!Thread.currentThread().isInterrupted) { - val tmp = - try { - inputBatchQueue.poll(100, TimeUnit.MILLISECONDS) - } catch { - case _: InterruptedException => - Thread.currentThread().interrupt() - return false; - } - if (tmp != null) { - batch = tmp - return true - } - if (resultRequested) { - return false - } - } - throw new IllegalStateException() - } - - override def next(): ColumnarBatch = { - assert(batch != null) - try { - batch - } finally { - batch = null - } - } - }) - .recyclePayload(b => b.close()) - .create() - .asJava - ) - + private val veloxAggTask: ColumnarBatchOutIterator = { val inputNode = StatisticsInputNode(dataCols) - val aggOp = SortAggregateExec( None, false, @@ -263,46 +182,107 @@ object GlutenDeltaJobStatsTracker extends Logging { 0, BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) ) - nativeOutItr.addIteratorSplits(Array(inputIterator)) - nativeOutItr.noMoreSplits() - Iterators - .wrap(nativeOutItr.asScala) - .recyclePayload(b => b.close()) - .recycleIterator(nativeOutItr.close()) - .create() + nativeOutItr } - private val resultThreadName = - s"Gluten Delta Statistics Writer - ${System.identityHashCode(this)}" - private val resJsonFuture = resultThreadRunner.submit(new Callable[String] { - override def call(): String = { - Thread.currentThread().setName(resultThreadName) - TaskContext.setTaskContext(taskContext) - val rows = outIterator.flatMap(batch => c2r.toRowIterator(batch)).toSeq - assert( - rows.size == 1, - "Only one single output row is expected from the global aggregation.") - val row = rows.head - val jsonStats = getStats(row).getString(0) - jsonStats - } - }) + private val resultJsonMap: mutable.Map[String, String] = mutable.Map() + + private var currentPath: String = _ + private var currentJsonFuture: Future[String] = _ + + override def newPartition(partitionValues: InternalRow): Unit = {} - def appendColumnarBatch(inputBatch: ColumnarBatch): Unit = { - // Retains the input batch so it will be shared by the data writer thread - // and the statistic writer thread. - ColumnarBatches.retain(inputBatch) - inputBatchQueue.put(inputBatch) + override def newFile(filePath: String): Unit = { + assert(currentPath == null) + veloxAggTask.addIteratorSplits(Array(newIteratorFromInputQueue())) + veloxAggTask.requestBarrier() + currentJsonFuture = resultThreadRunner.submit(new Callable[String] { + private val resultThreadName = + s"Gluten Delta Statistics Writer - ${System.identityHashCode(this)}" + override def call(): String = { + Thread.currentThread().setName(resultThreadName) + TaskContext.setTaskContext(taskContext) + val rows = veloxAggTask.asScala.flatMap { + batch => + val row = c2r.toRowIterator(batch) + batch.close() + row + }.toSeq + assert( + rows.size == 1, + "Only one single output row is expected from the global aggregation.") + val row = rows.head + val jsonStats = getStats(row).getString(0) + jsonStats + } + currentPath = filePath + }) } - def setFinished(): Unit = { - resultRequested = true - resJsonFuture.get() // Blocking wait until the task is released. + override def closeFile(filePath: String): Unit = { + assert(filePath == currentPath) + inputBatchQueue.put(null) + val json = currentJsonFuture.get() + resultJsonMap(filePath) = json + currentPath = null } - def toJson: String = { - val jsonStats = resJsonFuture.get() - jsonStats + override def newRow(filePath: String, row: InternalRow): Unit = { + assert(filePath == currentPath) + row match { + case _: PlaceholderRow => + case t: TerminalRow => + inputBatchQueue.put(t.batch()) + } + } + + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { + veloxAggTask.noMoreSplits() + veloxAggTask.close() + DeltaFileStatistics(resultJsonMap.toMap) + } + + private def newIteratorFromInputQueue(): ColumnarBatchInIterator = { + val itr = new ColumnarBatchInIterator( + BackendsApiManager.getBackendName, + Iterators + .wrap(new Iterator[ColumnarBatch] { + private var batch: ColumnarBatch = _ + + override def hasNext: Boolean = { + assert(batch == null) + while (!Thread.currentThread().isInterrupted) { + val tmp = + try { + inputBatchQueue.poll(100, TimeUnit.MILLISECONDS) + } catch { + case _: InterruptedException => + Thread.currentThread().interrupt() + return false; + } + if (tmp != null) { + batch = tmp + return true + } + return false + } + throw new IllegalStateException() + } + + override def next(): ColumnarBatch = { + assert(batch != null) + try { + batch + } finally { + batch = null + } + } + }) + .recyclePayload(b => b.close()) + .create() + .asJava + ) + itr } } From f7871b868ce1856172eb48261c20dcfc7a7d8f87 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 5 Feb 2026 13:29:19 +0100 Subject: [PATCH 5/9] fixup --- .../stats/GlutenDeltaJobStatsTracker.scala | 91 ++++++++++++------- 1 file changed, 59 insertions(+), 32 deletions(-) diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index 915980f59f75..e2a4542d586a 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -29,29 +29,28 @@ import org.apache.gluten.extension.columnar.validator.{Validator, Validators} import org.apache.gluten.iterator.Iterators import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.plan.PlanBuilder -import org.apache.gluten.vectorized.{ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator} - +import org.apache.gluten.vectorized.{ArrowWritableColumnVector, ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Projection, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, Projection, SortOrder, UnsafeProjection} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, DeclarativeAggregate} -import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, LeafExecNode} +import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, LeafExecNode, ProjectExec} import org.apache.spark.sql.execution.aggregate.SortAggregateExec import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.types.StringType -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil} - import com.google.common.collect.Lists +import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches} +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import java.util.UUID -import java.util.concurrent.{Callable, Executors, Future, SynchronousQueue, TimeUnit} - +import java.util.concurrent.{Callable, Executors, Future, SynchronousQueue} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -112,7 +111,7 @@ object GlutenDeltaJobStatsTracker extends Logging { BackendsApiManager.getBackendName, Map.empty[String, String].asJava) private val c2r = new VeloxColumnarToRowExec.Converter(new SQLMetric("convertTime")) - private val inputBatchQueue = new SynchronousQueue[ColumnarBatch]() + private val inputBatchQueue = new SynchronousQueue[Option[ColumnarBatch]]() private val aggregates: Seq[AggregateExpression] = statsColExpr.collect { case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => assert(ae.mode == Complete) @@ -132,19 +131,28 @@ object GlutenDeltaJobStatsTracker extends Logging { inputSchema = aggBufferAttrs ) private val taskContext = TaskContext.get() + private val dummyKeyAttr = { + // FIXME: We have to force the use of Velox's streaming aggregation since hash aggregation + // doesn't support task barriers. But as streaming aggregation should always be keyed, we + // have to do a small hack here by adding a dummy key for the global aggregation. + AttributeReference("__GLUTEN_DELTA_DUMMY_KEY__", IntegerType)() + } + private val statsAttrs = aggregates.flatMap(_.aggregateFunction.aggBufferAttributes) + private val statsResultAttrs = aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes) private val veloxAggTask: ColumnarBatchOutIterator = { - val inputNode = StatisticsInputNode(dataCols) + val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols) val aggOp = SortAggregateExec( None, - false, + isStreaming = false, None, - Seq.empty, + Seq(dummyKeyAttr), aggregates, - Seq(AttributeReference("stats", StringType)()), + statsAttrs, 0, - Seq.empty, + dummyKeyAttr +: statsResultAttrs, inputNode ) + val projOp = ProjectExec(statsResultAttrs, aggOp) // Invoke the legacy transform rule to get a local Velox aggregation query plan. val offloads = Seq(OffloadOthers()).map(_.toStrcitRule()) val validatorBuilder: GlutenConfig => Validator = conf => @@ -153,19 +161,20 @@ object GlutenDeltaJobStatsTracker extends Logging { val config = GlutenConfig.get val transformRule = HeuristicTransform.WithRewrites(validatorBuilder(config), rewrites, offloads) - val aggTransformer = ColumnarCollapseTransformStages(config)(transformRule(aggOp)) + val veloxTransformer = transformRule(projOp) + val wholeStageTransformer = ColumnarCollapseTransformStages(config)(veloxTransformer) .asInstanceOf[WholeStageTransformer] .child - .asInstanceOf[RegularHashAggregateExecTransformer] + .asInstanceOf[TransformSupport] val substraitContext = new SubstraitContext TransformerState.enterValidation val transformedNode = try { - aggTransformer.transform(substraitContext) + wholeStageTransformer.transform(substraitContext) } finally { TransformerState.finishValidation } - val outNames = aggTransformer.output.map(ConverterUtils.genColumnNameWithExprId).asJava + val outNames = wholeStageTransformer.output.map(ConverterUtils.genColumnNameWithExprId).asJava val planNode = PlanBuilder.makePlan(substraitContext, Lists.newArrayList(transformedNode.root), outNames) @@ -202,15 +211,14 @@ object GlutenDeltaJobStatsTracker extends Logging { override def call(): String = { Thread.currentThread().setName(resultThreadName) TaskContext.setTaskContext(taskContext) - val rows = veloxAggTask.asScala.flatMap { - batch => - val row = c2r.toRowIterator(batch) - batch.close() - row - }.toSeq + val outBatches = veloxAggTask.asScala.toSeq + assert(outBatches.size == 1) + val batch = outBatches.head + val rows = c2r.toRowIterator(batch).toSeq assert( rows.size == 1, "Only one single output row is expected from the global aggregation.") + batch.close() val row = rows.head val jsonStats = getStats(row).getString(0) jsonStats @@ -221,9 +229,10 @@ object GlutenDeltaJobStatsTracker extends Logging { override def closeFile(filePath: String): Unit = { assert(filePath == currentPath) - inputBatchQueue.put(null) + val fileName = new Path(filePath).getName + inputBatchQueue.put(None) val json = currentJsonFuture.get() - resultJsonMap(filePath) = json + resultJsonMap(fileName) = json currentPath = null } @@ -232,7 +241,21 @@ object GlutenDeltaJobStatsTracker extends Logging { row match { case _: PlaceholderRow => case t: TerminalRow => - inputBatchQueue.put(t.batch()) + val valueBatch = t.batch() + val numRows = valueBatch.numRows() + val dummyKeyVec = ArrowWritableColumnVector + .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, IntegerType)) + .head + (0 until numRows).foreach { + i => + dummyKeyVec.putInt(i, 1) + } + val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch( + ColumnarBatches.offload( + ArrowBufferAllocators.contextInstance(), + new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows))) + val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, valueBatch) + inputBatchQueue.put(Some(compositeBatch)) } } @@ -254,14 +277,14 @@ object GlutenDeltaJobStatsTracker extends Logging { while (!Thread.currentThread().isInterrupted) { val tmp = try { - inputBatchQueue.poll(100, TimeUnit.MILLISECONDS) + inputBatchQueue.take() } catch { case _: InterruptedException => Thread.currentThread().interrupt() return false; } - if (tmp != null) { - batch = tmp + if (tmp.isDefined) { + batch = tmp.get return true } return false @@ -286,13 +309,17 @@ object GlutenDeltaJobStatsTracker extends Logging { } } - private case class StatisticsInputNode(override val output: Seq[Attribute]) + private case class StatisticsInputNode(keySchema: Seq[Attribute], dataSchema: Seq[Attribute]) extends GlutenPlan with LeafExecNode { + override def output: Seq[Attribute] = keySchema ++ dataSchema override def batchType(): Convention.BatchType = VeloxBatchType override def rowType0(): Convention.RowType = Convention.RowType.None override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() override protected def doExecuteColumnar(): RDD[ColumnarBatch] = throw new UnsupportedOperationException() + override def outputOrdering: Seq[SortOrder] = { + keySchema.map(key => SortOrder(key, Ascending)) + } } } From 8f714a28628de45cd578b7d13f491d234f159b37 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 5 Feb 2026 13:33:43 +0100 Subject: [PATCH 6/9] style --- .../delta/stats/GlutenDeltaJobStatsTracker.scala | 14 +++++++------- .../vectorized/ColumnarBatchOutIterator.java | 15 ++++++++------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index e2a4542d586a..e6a9ae56683e 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.stats import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.backendsapi.velox.VeloxBatchType +import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches} import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution._ import org.apache.gluten.expression.{ConverterUtils, TransformerState} @@ -27,9 +28,11 @@ import org.apache.gluten.extension.columnar.rewrite.PullOutPreProject import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.extension.columnar.validator.{Validator, Validators} import org.apache.gluten.iterator.Iterators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.plan.PlanBuilder import org.apache.gluten.vectorized.{ArrowWritableColumnVector, ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator} + import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -41,16 +44,16 @@ import org.apache.spark.sql.execution.aggregate.SortAggregateExec import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{IntegerType, StructType} -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil} + import com.google.common.collect.Lists -import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches} -import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import java.util.UUID import java.util.concurrent.{Callable, Executors, Future, SynchronousQueue} + import scala.collection.JavaConverters._ import scala.collection.mutable @@ -246,10 +249,7 @@ object GlutenDeltaJobStatsTracker extends Logging { val dummyKeyVec = ArrowWritableColumnVector .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, IntegerType)) .head - (0 until numRows).foreach { - i => - dummyKeyVec.putInt(i, 1) - } + (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1)) val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch( ColumnarBatches.offload( ArrowBufferAllocators.contextInstance(), diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java index 684d78d6b905..27162a800f07 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java @@ -111,16 +111,17 @@ public void noMoreSplits() { } /** - * Request a barrier in the task execution. This signals the task to finish processing - * all currently queued splits and drain all stateful operators before continuing. - * After calling this method, continue calling next() to fetch results. When next() returns - * null and hasNext() returns false, the barrier has been reached. - * - * This enables task reuse and deterministic execution for workloads like AI training data + * Request a barrier in the task execution. This signals the task to finish processing all + * currently queued splits and drain all stateful operators before continuing. After calling this + * method, continue calling next() to fetch results. When next() returns null and hasNext() + * returns false, the barrier has been reached. + * + *

This enables task reuse and deterministic execution for workloads like AI training data * loading and real-time streaming processing. * * @throws IllegalStateException if the iterator is closed - * @see Velox Task Barrier Documentation + * @see Velox Task + * Barrier Documentation */ public void requestBarrier() { if (closed.get()) { From d463e92f04b88961e1cec4e903a647ed34c3c77a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 5 Feb 2026 13:46:02 +0100 Subject: [PATCH 7/9] fixup --- .../spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index e6a9ae56683e..5c990b1acfce 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -255,6 +255,8 @@ object GlutenDeltaJobStatsTracker extends Logging { ArrowBufferAllocators.contextInstance(), new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows))) val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, valueBatch) + dummyKeyBatch.close() + valueBatch.close() inputBatchQueue.put(Some(compositeBatch)) } } From 57c420381d73108e191bb92f8e4a815a057a6fa0 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 6 Feb 2026 11:37:18 +0100 Subject: [PATCH 8/9] fixup --- .../stats/GlutenDeltaJobStatsTracker.scala | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index 5c990b1acfce..ae1126394379 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -37,8 +37,9 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, Projection, SortOrder, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, EmptyRow, Expression, Projection, SortOrder, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, DeclarativeAggregate} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, LeafExecNode, ProjectExec} import org.apache.spark.sql.execution.aggregate.SortAggregateExec import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} @@ -120,14 +121,23 @@ object GlutenDeltaJobStatsTracker extends Logging { assert(ae.mode == Complete) ae } + private val declarativeAggregates: Seq[DeclarativeAggregate] = aggregates.map { + ae => ae.aggregateFunction.asInstanceOf[DeclarativeAggregate] + } private val resultExpr: Expression = statsColExpr.transform { case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression } - protected val aggBufferAttrs: Seq[Attribute] = statsColExpr.flatMap { - case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => - ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].aggBufferAttributes - case _ => None + private val aggBufferAttrs: Seq[Attribute] = + declarativeAggregates.flatMap(_.aggBufferAttributes) + private val emptyRow: InternalRow = { + val initializeStats = GenerateMutableProjection.generate( + expressions = declarativeAggregates.flatMap(_.initialValues), + inputSchema = Seq.empty, + useSubexprElimination = false + ) + val buffer = new SpecificInternalRow(aggBufferAttrs.map(_.dataType)) + initializeStats.target(buffer).apply(EmptyRow) } private val getStats: Projection = UnsafeProjection.create( exprs = Seq(resultExpr), @@ -215,14 +225,19 @@ object GlutenDeltaJobStatsTracker extends Logging { Thread.currentThread().setName(resultThreadName) TaskContext.setTaskContext(taskContext) val outBatches = veloxAggTask.asScala.toSeq - assert(outBatches.size == 1) - val batch = outBatches.head - val rows = c2r.toRowIterator(batch).toSeq - assert( - rows.size == 1, - "Only one single output row is expected from the global aggregation.") - batch.close() - val row = rows.head + val row: InternalRow = if (outBatches.isEmpty) { + // No input was received. Returns the default aggregation values. + emptyRow + } else { + assert(outBatches.size == 1) + val batch = outBatches.head + val rows = c2r.toRowIterator(batch).toSeq + assert( + rows.size == 1, + "Only one single output row is expected from the global aggregation.") + batch.close() + rows.head + } val jsonStats = getStats(row).getString(0) jsonStats } From 3a2fd007da434429af37bb65224f90536ace9af8 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 12 Feb 2026 14:30:59 +0100 Subject: [PATCH 9/9] fixup --- .../sql/execution/datasources/velox/VeloxBlockStripes.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java index 8e47268574e6..3b06570398ee 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java @@ -38,7 +38,7 @@ public VeloxBlockStripes(BlockStripes bs) { final byte[] headingRowByteArray = headingRowBytes[i]; blockStripes[i] = new BlockStripe() { private final ColumnarBatch batch = ColumnarBatches.create(blockAddress); - private final UnsafeRow headingRow = new UnsafeRow(headingRowByteArray.length); + private final UnsafeRow headingRow = new UnsafeRow(originBlockNumColumns); { headingRow.pointTo(headingRowByteArray, headingRowByteArray.length); }