diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala index af19d1df9f91..351c2effb784 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.delta.hooks.AutoCompact import org.apache.spark.sql.delta.perf.{DeltaOptimizedWriterExec, GlutenDeltaOptimizedWriterExec} import org.apache.spark.sql.delta.schema.InnerInvariantViolationException import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.stats.{GlutenDeltaIdentityColumnStatsTracker, GlutenDeltaJobStatisticsTracker} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker} @@ -72,8 +71,7 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) // the FileFormatWriter.write call below and will collect per-file stats using // StatisticsCollection val optionalStatsTracker = - getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)._1.map( - new GlutenDeltaJobStatisticsTracker(_)) + getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)._1 val constraints = Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints @@ -87,7 +85,6 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) statsDataSchema, trackIdentityHighWaterMarks ) - .map(new GlutenDeltaIdentityColumnStatsTracker(_)) SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) { val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output) @@ -96,8 +93,8 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) convertEmptyToNullIfNeeded(queryExecution.executedPlan, partitioningColumns, constraints) val maybeCheckInvariants = if (constraints.isEmpty) { // Compared to vanilla Delta, we simply avoid adding the invariant checker - // when the constraint list is empty, to omit the unnecessary transitions - // added around the invariant checker. + // when the constraint list is empty, to prevent the unnecessary transitions + // from being added around the invariant checker. empty2NullPlan } else { DeltaInvariantCheckerExec(empty2NullPlan, constraints) @@ -206,7 +203,7 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) committer.addedStatuses.map { a => a.copy(stats = optionalStatsTracker - .map(_.delegate.recordedStats(a.toPath.getName)) + .map(_.recordedStats(a.toPath.getName)) .getOrElse(a.stats)) } } else { @@ -235,7 +232,7 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction) if (resultFiles.nonEmpty && !isOptimize) registerPostCommitHook(AutoCompact) // Record the updated high water marks to be used during transaction commit. identityTrackerOpt.ifDefined { - tracker => updatedIdentityHighWaterMarks.appendAll(tracker.delegate.highWaterMarks.toSeq) + tracker => updatedIdentityHighWaterMarks.appendAll(tracker.highWaterMarks.toSeq) } resultFiles.toSeq ++ committer.changeFiles diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala index 74a1e3f03673..653b705becf0 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat} import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec @@ -125,13 +126,14 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { val dataSchema = dataColumns.toStructType DataSourceUtils.verifySchema(fileFormat, dataSchema) DataSourceUtils.checkFieldNames(fileFormat, dataSchema) - // Note: prepareWrite has side effect. It sets "job". + val isNativeWritable = GlutenParquetFileFormat.isNativeWritable(dataSchema) val outputDataColumns = if (caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true")) { dataColumns ++ partitionColumns } else dataColumns + // Note: prepareWrite has side effect. It sets "job". val outputWriterFactory = fileFormat.prepareWrite( sparkSession, @@ -140,6 +142,12 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { outputDataColumns.toStructType ) + val maybeWrappedStatsTrackers: Seq[WriteJobStatsTracker] = if (isNativeWritable) { + statsTrackers.map(GlutenDeltaJobStatsTracker(_)) + } else { + statsTrackers + } + val description = new WriteJobDescription( uuid = UUID.randomUUID.toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), @@ -157,7 +165,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { timeZoneId = caseInsensitiveOptions .get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone), - statsTrackers = statsTrackers + statsTrackers = maybeWrappedStatsTrackers ) // We should first sort by dynamic partition columns, then bucket id, and finally sorting @@ -222,7 +230,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { partitionColumns, sortColumns, orderingMatched, - GlutenParquetFileFormat.isNativeWritable(dataSchema) + isNativeWritable ) } } @@ -459,6 +467,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { } else { concurrentOutputWriterSpec match { case Some(spec) => + // TODO: Concurrent writer is not yet supported. new DynamicPartitionDataConcurrentWriter( description, taskAttemptContext, @@ -468,7 +477,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { case _ => // Columnar-based partition writer to divide the input batch by partition values // and bucket IDs in advance. - new ColumnarDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + new GlutenDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) } } @@ -525,7 +534,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { } } - private class ColumnarDynamicPartitionDataSingleWriter( + private class GlutenDynamicPartitionDataSingleWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol, diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala similarity index 57% rename from backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala rename to backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala index 30e61730c17b..4eab5408fa9c 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala @@ -19,17 +19,19 @@ package org.apache.spark.sql.delta.stats import org.apache.gluten.execution.{PlaceholderRow, TerminalRow, VeloxColumnarToRowExec} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.delta.DeltaIdentityColumnStatsTracker import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} import org.apache.spark.sql.execution.metric.SQLMetric -class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker) +/** + * A fallback stats tracker where a C2R converter converts all the incoming batches to rows then + * send to the delegate tracker. + */ +private[stats] class GlutenDeltaJobStatsFallbackTracker(val delegate: WriteJobStatsTracker) extends WriteJobStatsTracker { - import GlutenDeltaJobStatisticsTracker._ + import GlutenDeltaJobStatsFallbackTracker._ override def newTaskInstance(): WriteTaskStatsTracker = { - new GlutenDeltaTaskStatisticsTracker( - delegate.newTaskInstance().asInstanceOf[DeltaTaskStatisticsTracker]) + new GlutenDeltaTaskStatsFallbackTracker(delegate.newTaskInstance()) } override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { @@ -37,43 +39,22 @@ class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker) } } -class GlutenDeltaIdentityColumnStatsTracker(override val delegate: DeltaIdentityColumnStatsTracker) - extends GlutenDeltaJobStatisticsTracker(delegate) - -private object GlutenDeltaJobStatisticsTracker { - - /** - * This is a temporary implementation of statistics tracker for Delta Lake. It's sub-optimal in - * performance because it internally performs C2R then send rows to the delegate row-based - * tracker. - * - * TODO: Columnar-based statistics collection. - */ - private class GlutenDeltaTaskStatisticsTracker(delegate: DeltaTaskStatisticsTracker) +private object GlutenDeltaJobStatsFallbackTracker { + private class GlutenDeltaTaskStatsFallbackTracker(delegate: WriteTaskStatsTracker) extends WriteTaskStatsTracker { - private val c2r = new VeloxColumnarToRowExec.Converter(new SQLMetric("convertTime")) - override def newPartition(partitionValues: InternalRow): Unit = { + override def newPartition(partitionValues: InternalRow): Unit = delegate.newPartition(partitionValues) - } - override def newFile(filePath: String): Unit = { - delegate.newFile(filePath) - } + override def newFile(filePath: String): Unit = delegate.newFile(filePath) - override def closeFile(filePath: String): Unit = { - delegate.closeFile(filePath) - } + override def closeFile(filePath: String): Unit = delegate.closeFile(filePath) - override def newRow(filePath: String, row: InternalRow): Unit = { - row match { - case _: PlaceholderRow => - case t: TerminalRow => - c2r.toRowIterator(t.batch()).foreach(eachRow => delegate.newRow(filePath, eachRow)) - case otherRow => - delegate.newRow(filePath, otherRow) - } + override def newRow(filePath: String, row: InternalRow): Unit = row match { + case _: PlaceholderRow => + case t: TerminalRow => + c2r.toRowIterator(t.batch()).foreach(eachRow => delegate.newRow(filePath, eachRow)) } override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala new file mode 100644 index 000000000000..d4086cf8e9b7 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.stats + +import org.apache.gluten.execution.{PlaceholderRow, TerminalRow} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} + +/** + * A fallback stats tracker to simply call `newRow` many times when a columnar batch comes. There is + * no C2R process involved to save performance. Therefore, the delegate stats tracker must not read + * the row in its `newRow` implementation. + */ +private[stats] class GlutenDeltaJobStatsRowCountingTracker(val delegate: WriteJobStatsTracker) + extends WriteJobStatsTracker { + import GlutenDeltaJobStatsRowCountingTracker._ + + override def newTaskInstance(): WriteTaskStatsTracker = { + new GlutenDeltaTaskStatsRowCountingTracker(delegate.newTaskInstance()) + } + + override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { + delegate.processStats(stats, jobCommitTime) + } +} + +private object GlutenDeltaJobStatsRowCountingTracker { + private class GlutenDeltaTaskStatsRowCountingTracker(delegate: WriteTaskStatsTracker) + extends WriteTaskStatsTracker { + override def newPartition(partitionValues: InternalRow): Unit = + delegate.newPartition(partitionValues) + + override def newFile(filePath: String): Unit = delegate.newFile(filePath) + + override def closeFile(filePath: String): Unit = delegate.closeFile(filePath) + + override def newRow(filePath: String, row: InternalRow): Unit = row match { + case _: PlaceholderRow => + case t: TerminalRow => + for (_ <- 0 until t.batch().numRows()) { + // Here we pass null row to the delegate stats tracker as we assume + // the delegate stats tracker only counts on row numbers. + delegate.newRow(filePath, null) + } + } + + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { + delegate.getFinalStats(taskCommitTime) + } + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala new file mode 100644 index 000000000000..ae1126394379 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.stats + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.backendsapi.velox.VeloxBatchType +import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches} +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.execution._ +import org.apache.gluten.expression.{ConverterUtils, TransformerState} +import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform +import org.apache.gluten.extension.columnar.offload.OffloadOthers +import org.apache.gluten.extension.columnar.rewrite.PullOutPreProject +import org.apache.gluten.extension.columnar.transition.Convention +import org.apache.gluten.extension.columnar.validator.{Validator, Validators} +import org.apache.gluten.iterator.Iterators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators +import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.plan.PlanBuilder +import org.apache.gluten.vectorized.{ArrowWritableColumnVector, ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, EmptyRow, Expression, Projection, SortOrder, SpecificInternalRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, DeclarativeAggregate} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection +import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, LeafExecNode, ProjectExec} +import org.apache.spark.sql.execution.aggregate.SortAggregateExec +import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil} + +import com.google.common.collect.Lists +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import java.util.UUID +import java.util.concurrent.{Callable, Executors, Future, SynchronousQueue} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** Gluten's stats tracker with vectorized aggregation inside to produce statistics efficiently. */ +private[stats] class GlutenDeltaJobStatsTracker(val delegate: DeltaJobStatisticsTracker) + extends WriteJobStatsTracker { + import GlutenDeltaJobStatsTracker._ + + @transient private val hadoopConf: Configuration = { + val clazz = classOf[DeltaJobStatisticsTracker] + val method = clazz.getDeclaredField("hadoopConf") + method.setAccessible(true) + method.get(delegate).asInstanceOf[Configuration] + } + @transient private val path = delegate.path + private val dataCols = delegate.dataCols + private val statsColExpr = delegate.statsColExpr + + private val srlHadoopConf = new SerializableConfiguration(hadoopConf) + private val rootUri = path.getFileSystem(hadoopConf).makeQualified(path).toUri + + override def newTaskInstance(): WriteTaskStatsTracker = { + val rootPath = new Path(rootUri) + val hadoopConf = srlHadoopConf.value + new GlutenDeltaTaskStatsTracker(dataCols, statsColExpr, rootPath, hadoopConf) + } + + override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { + delegate.processStats(stats, jobCommitTime) + } +} + +object GlutenDeltaJobStatsTracker extends Logging { + def apply(tracker: WriteJobStatsTracker): WriteJobStatsTracker = tracker match { + case tracker: BasicWriteJobStatsTracker => + new GlutenDeltaJobStatsRowCountingTracker(tracker) + case tracker: DeltaJobStatisticsTracker => + new GlutenDeltaJobStatsTracker(tracker) + case tracker => + logWarning( + "Gluten Delta: Creating fallback job stats tracker," + + " this involves frequent columnar-to-row conversions which may cause performance" + + " issues.") + new GlutenDeltaJobStatsFallbackTracker(tracker) + } + + /** A columnar-based statistics collection for Gluten + Delta Lake. */ + private class GlutenDeltaTaskStatsTracker( + dataCols: Seq[Attribute], + statsColExpr: Expression, + rootPath: Path, + hadoopConf: Configuration) + extends WriteTaskStatsTracker { + // We use one single thread to ensure the statistic files are written in serial. + // Do not increase the thread number, otherwise sanity will not be guaranteed. + private val resultThreadRunner = Executors.newSingleThreadExecutor() + private val evaluator = NativePlanEvaluator.create( + BackendsApiManager.getBackendName, + Map.empty[String, String].asJava) + private val c2r = new VeloxColumnarToRowExec.Converter(new SQLMetric("convertTime")) + private val inputBatchQueue = new SynchronousQueue[Option[ColumnarBatch]]() + private val aggregates: Seq[AggregateExpression] = statsColExpr.collect { + case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => + assert(ae.mode == Complete) + ae + } + private val declarativeAggregates: Seq[DeclarativeAggregate] = aggregates.map { + ae => ae.aggregateFunction.asInstanceOf[DeclarativeAggregate] + } + private val resultExpr: Expression = statsColExpr.transform { + case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => + ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression + } + private val aggBufferAttrs: Seq[Attribute] = + declarativeAggregates.flatMap(_.aggBufferAttributes) + private val emptyRow: InternalRow = { + val initializeStats = GenerateMutableProjection.generate( + expressions = declarativeAggregates.flatMap(_.initialValues), + inputSchema = Seq.empty, + useSubexprElimination = false + ) + val buffer = new SpecificInternalRow(aggBufferAttrs.map(_.dataType)) + initializeStats.target(buffer).apply(EmptyRow) + } + private val getStats: Projection = UnsafeProjection.create( + exprs = Seq(resultExpr), + inputSchema = aggBufferAttrs + ) + private val taskContext = TaskContext.get() + private val dummyKeyAttr = { + // FIXME: We have to force the use of Velox's streaming aggregation since hash aggregation + // doesn't support task barriers. But as streaming aggregation should always be keyed, we + // have to do a small hack here by adding a dummy key for the global aggregation. + AttributeReference("__GLUTEN_DELTA_DUMMY_KEY__", IntegerType)() + } + private val statsAttrs = aggregates.flatMap(_.aggregateFunction.aggBufferAttributes) + private val statsResultAttrs = aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes) + private val veloxAggTask: ColumnarBatchOutIterator = { + val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols) + val aggOp = SortAggregateExec( + None, + isStreaming = false, + None, + Seq(dummyKeyAttr), + aggregates, + statsAttrs, + 0, + dummyKeyAttr +: statsResultAttrs, + inputNode + ) + val projOp = ProjectExec(statsResultAttrs, aggOp) + // Invoke the legacy transform rule to get a local Velox aggregation query plan. + val offloads = Seq(OffloadOthers()).map(_.toStrcitRule()) + val validatorBuilder: GlutenConfig => Validator = conf => + Validators.newValidator(conf, offloads) + val rewrites = Seq(PullOutPreProject) + val config = GlutenConfig.get + val transformRule = + HeuristicTransform.WithRewrites(validatorBuilder(config), rewrites, offloads) + val veloxTransformer = transformRule(projOp) + val wholeStageTransformer = ColumnarCollapseTransformStages(config)(veloxTransformer) + .asInstanceOf[WholeStageTransformer] + .child + .asInstanceOf[TransformSupport] + val substraitContext = new SubstraitContext + TransformerState.enterValidation + val transformedNode = + try { + wholeStageTransformer.transform(substraitContext) + } finally { + TransformerState.finishValidation + } + val outNames = wholeStageTransformer.output.map(ConverterUtils.genColumnNameWithExprId).asJava + val planNode = + PlanBuilder.makePlan(substraitContext, Lists.newArrayList(transformedNode.root), outNames) + + val spillDirPath = SparkDirectoryUtil + .get() + .namespace("gluten-spill") + .mkChildDirRoundRobin(UUID.randomUUID.toString) + .getAbsolutePath + val nativeOutItr = evaluator + .createKernelWithBatchIterator( + planNode.toProtobuf.toByteArray, + null, + null, + 0, + BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) + ) + nativeOutItr + } + + private val resultJsonMap: mutable.Map[String, String] = mutable.Map() + + private var currentPath: String = _ + private var currentJsonFuture: Future[String] = _ + + override def newPartition(partitionValues: InternalRow): Unit = {} + + override def newFile(filePath: String): Unit = { + assert(currentPath == null) + veloxAggTask.addIteratorSplits(Array(newIteratorFromInputQueue())) + veloxAggTask.requestBarrier() + currentJsonFuture = resultThreadRunner.submit(new Callable[String] { + private val resultThreadName = + s"Gluten Delta Statistics Writer - ${System.identityHashCode(this)}" + override def call(): String = { + Thread.currentThread().setName(resultThreadName) + TaskContext.setTaskContext(taskContext) + val outBatches = veloxAggTask.asScala.toSeq + val row: InternalRow = if (outBatches.isEmpty) { + // No input was received. Returns the default aggregation values. + emptyRow + } else { + assert(outBatches.size == 1) + val batch = outBatches.head + val rows = c2r.toRowIterator(batch).toSeq + assert( + rows.size == 1, + "Only one single output row is expected from the global aggregation.") + batch.close() + rows.head + } + val jsonStats = getStats(row).getString(0) + jsonStats + } + currentPath = filePath + }) + } + + override def closeFile(filePath: String): Unit = { + assert(filePath == currentPath) + val fileName = new Path(filePath).getName + inputBatchQueue.put(None) + val json = currentJsonFuture.get() + resultJsonMap(fileName) = json + currentPath = null + } + + override def newRow(filePath: String, row: InternalRow): Unit = { + assert(filePath == currentPath) + row match { + case _: PlaceholderRow => + case t: TerminalRow => + val valueBatch = t.batch() + val numRows = valueBatch.numRows() + val dummyKeyVec = ArrowWritableColumnVector + .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, IntegerType)) + .head + (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1)) + val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch( + ColumnarBatches.offload( + ArrowBufferAllocators.contextInstance(), + new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows))) + val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, valueBatch) + dummyKeyBatch.close() + valueBatch.close() + inputBatchQueue.put(Some(compositeBatch)) + } + } + + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { + veloxAggTask.noMoreSplits() + veloxAggTask.close() + DeltaFileStatistics(resultJsonMap.toMap) + } + + private def newIteratorFromInputQueue(): ColumnarBatchInIterator = { + val itr = new ColumnarBatchInIterator( + BackendsApiManager.getBackendName, + Iterators + .wrap(new Iterator[ColumnarBatch] { + private var batch: ColumnarBatch = _ + + override def hasNext: Boolean = { + assert(batch == null) + while (!Thread.currentThread().isInterrupted) { + val tmp = + try { + inputBatchQueue.take() + } catch { + case _: InterruptedException => + Thread.currentThread().interrupt() + return false; + } + if (tmp.isDefined) { + batch = tmp.get + return true + } + return false + } + throw new IllegalStateException() + } + + override def next(): ColumnarBatch = { + assert(batch != null) + try { + batch + } finally { + batch = null + } + } + }) + .recyclePayload(b => b.close()) + .create() + .asJava + ) + itr + } + } + + private case class StatisticsInputNode(keySchema: Seq[Attribute], dataSchema: Seq[Attribute]) + extends GlutenPlan + with LeafExecNode { + override def output: Seq[Attribute] = keySchema ++ dataSchema + override def batchType(): Convention.BatchType = VeloxBatchType + override def rowType0(): Convention.RowType = Convention.RowType.None + override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = + throw new UnsupportedOperationException() + override def outputOrdering: Seq[SortOrder] = { + keySchema.map(key => SortOrder(key, Ascending)) + } + } +} diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java index 646164de3a21..3b06570398ee 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java @@ -26,10 +26,34 @@ import java.util.Iterator; public class VeloxBlockStripes extends BlockStripes { + private final BlockStripe[] blockStripes; + public VeloxBlockStripes(BlockStripes bs) { super(bs.originBlockAddress, bs.blockAddresses, bs.headingRowIndice, bs.originBlockNumColumns, bs.headingRowBytes); + blockStripes = new BlockStripe[blockAddresses.length]; + for (int i = 0; i < blockStripes.length; i++) { + final long blockAddress = blockAddresses[i]; + final byte[] headingRowByteArray = headingRowBytes[i]; + blockStripes[i] = new BlockStripe() { + private final ColumnarBatch batch = ColumnarBatches.create(blockAddress); + private final UnsafeRow headingRow = new UnsafeRow(originBlockNumColumns); + { + headingRow.pointTo(headingRowByteArray, headingRowByteArray.length); + } + + @Override + public ColumnarBatch getColumnarBatch() { + return batch; + } + + @Override + public InternalRow getHeadingRow() { + return headingRow; + } + }; + } } @Override @@ -44,23 +68,8 @@ public boolean hasNext() { @Override public BlockStripe next() { - final BlockStripe nextStripe = new BlockStripe() { - private final long blockAddress = blockAddresses[index]; - private final byte[] headingRowByteArray = headingRowBytes[index]; - - @Override - public ColumnarBatch getColumnarBatch() { - return ColumnarBatches.create(blockAddress); - } - - @Override - public InternalRow getHeadingRow() { - UnsafeRow row = new UnsafeRow(originBlockNumColumns); - row.pointTo(headingRowByteArray, headingRowByteArray.length); - return row; - } - }; - index += 1; + final BlockStripe nextStripe = blockStripes[index]; + index++; return nextStripe; } }; @@ -69,7 +78,6 @@ public InternalRow getHeadingRow() { @Override public void release() { - + // Do nothing. We rely on the caller to call #close API on columnar batches returned to them. } } - diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index a15cdc83d310..9d8315731fc5 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -110,6 +110,10 @@ class Runtime : public std::enable_shared_from_this { throw GlutenException("Not implemented"); } + virtual void requestBarrier(ResultIterator* iter) { + throw GlutenException("Not implemented"); + } + virtual std::shared_ptr createOrGetEmptySchemaBatch(int32_t numRows) { throw GlutenException("Not implemented"); } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index e9d797e1dbfb..a91f10641748 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -695,6 +695,20 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterato JNI_METHOD_END() } +JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeRequestBarrier( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong iterHandle) { + JNI_METHOD_START + auto ctx = getRuntime(env, wrapper); + auto iter = ObjectStore::retrieve(iterHandle); + if (iter == nullptr) { + throw GlutenException("Invalid iterator handle for requestBarrier"); + } + ctx->requestBarrier(iter.get()); + JNI_METHOD_END() +} + JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit( // NOLINT JNIEnv* env, diff --git a/cpp/core/memory/SplitAwareColumnarBatchIterator.h b/cpp/core/memory/SplitAwareColumnarBatchIterator.h index e12f38afadf5..987eb04351c9 100644 --- a/cpp/core/memory/SplitAwareColumnarBatchIterator.h +++ b/cpp/core/memory/SplitAwareColumnarBatchIterator.h @@ -44,6 +44,12 @@ class SplitAwareColumnarBatchIterator : public ColumnarBatchIterator { /// Signal that no more splits will be added to this iterator. /// This must be called after all splits have been added to ensure proper task completion. virtual void noMoreSplits() = 0; + + /// Request a barrier in task execution. This signals the task to finish processing + /// all currently queued splits and drain all stateful operators before continuing. + /// Enables task reuse and deterministic execution for streaming workloads. + /// @see https://facebookincubator.github.io/velox/develop/task-barrier.html + virtual void requestBarrier() = 0; }; } // namespace gluten diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 69d21b4a57c9..3a9bd8ae4208 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -210,6 +210,14 @@ void VeloxRuntime::noMoreSplits(ResultIterator* iter){ splitAwareIter->noMoreSplits(); } +void VeloxRuntime::requestBarrier(ResultIterator* iter){ + auto* splitAwareIter = dynamic_cast(iter->getInputIter()); + if (splitAwareIter == nullptr) { + throw GlutenException("Iterator does not support split management"); + } + splitAwareIter->requestBarrier(); +} + std::shared_ptr VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) { auto veloxPool = memoryManager()->getLeafMemoryPool(); return std::make_shared(veloxPool, column2RowMemThreshold); diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index a3c3da0c5ac9..728cc46c92a4 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -61,6 +61,8 @@ class VeloxRuntime final : public Runtime { void noMoreSplits(ResultIterator* iter) override; + void requestBarrier(ResultIterator* iter) override; + std::shared_ptr createColumnar2RowConverter(int64_t column2RowMemThreshold) override; std::shared_ptr createOrGetEmptySchemaBatch(int32_t numRows) override; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index ac5773e439ee..97d2dfc6b66f 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -393,6 +393,13 @@ void WholeStageResultIterator::noMoreSplits() { allSplitsAdded = true; } +void WholeStageResultIterator::requestBarrier() { + if (task_ == nullptr) { + throw GlutenException("Cannot request barrier: task is null"); + } + task_->requestBarrier(); +} + void WholeStageResultIterator::collectMetrics() { if (metrics_) { // The metrics has already been created. diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 401cff06dafd..a6b30440a9a5 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -85,6 +85,12 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { /// This is required for proper task completion and enables future barrier support. void noMoreSplits() override; + /// Request a barrier in the Velox task execution. + /// This signals the task to finish processing all currently queued splits + /// and drain all stateful operators before continuing. + /// @see https://facebookincubator.github.io/velox/develop/task-barrier.html + void requestBarrier() override; + private: /// Get the Spark confs to Velox query context. std::unordered_map getQueryContextConf(); diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java index ad200dd46c81..27162a800f07 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java @@ -58,6 +58,8 @@ private native boolean nativeAddIteratorSplits( private native void nativeNoMoreSplits(long iterHandle); + private native void nativeRequestBarrier(long iterHandle); + @Override public boolean hasNext0() throws IOException { return nativeHasNext(iterHandle); @@ -97,7 +99,7 @@ public boolean addIteratorSplits(ColumnarBatchInIterator[] batchItr) { /** * Signal that no more splits will be added to the iterator. This is required for proper task - * completion and is a prerequisite for barrier support. + * completion. * * @throws IllegalStateException if the iterator is closed */ @@ -108,6 +110,26 @@ public void noMoreSplits() { nativeNoMoreSplits(iterHandle); } + /** + * Request a barrier in the task execution. This signals the task to finish processing all + * currently queued splits and drain all stateful operators before continuing. After calling this + * method, continue calling next() to fetch results. When next() returns null and hasNext() + * returns false, the barrier has been reached. + * + *

This enables task reuse and deterministic execution for workloads like AI training data + * loading and real-time streaming processing. + * + * @throws IllegalStateException if the iterator is closed + * @see Velox Task + * Barrier Documentation + */ + public void requestBarrier() { + if (closed.get()) { + throw new IllegalStateException("Cannot call requestBarrier on a closed iterator"); + } + nativeRequestBarrier(iterHandle); + } + @Override public void close0() { // To make sure the outputted batches are still accessible after the iterator is closed. diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala index b6f1313ff502..31ce4b05afae 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala @@ -43,6 +43,11 @@ object Validator { case object Passed extends OutCome case class Failed private (reason: String) extends OutCome + private object NoopValidator extends Validator { + override def validate(plan: SparkPlan): Validator.OutCome = pass() + } + + def noop(): Validator = NoopValidator def builder(): Builder = Builder() class Builder private { @@ -75,10 +80,6 @@ object Validator { private object Builder { def apply(): Builder = new Builder() - private object NoopValidator extends Validator { - override def validate(plan: SparkPlan): Validator.OutCome = pass() - } - private class ValidatorPipeline(val validators: Seq[Validator]) extends Validator { assert(!validators.exists(_.isInstanceOf[ValidatorPipeline]))