diff --git a/backends-velox/src-delta/main/scala/org/apache/gluten/config/VeloxDeltaConfig.scala b/backends-velox/src-delta/main/scala/org/apache/gluten/config/VeloxDeltaConfig.scala new file mode 100644 index 000000000000..99c2d2c26a79 --- /dev/null +++ b/backends-velox/src-delta/main/scala/org/apache/gluten/config/VeloxDeltaConfig.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.config + +import org.apache.spark.sql.internal.SQLConf + +class VeloxDeltaConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { + import VeloxDeltaConfig._ + + def enableNativeWrite: Boolean = getConf(ENABLE_NATIVE_WRITE) +} + +object VeloxDeltaConfig extends ConfigRegistry { + + def get: VeloxDeltaConfig = { + new VeloxDeltaConfig(SQLConf.get) + } + + /** + * Experimental as the feature now has performance issue because of the fallback processing of + * statistics. + */ + val ENABLE_NATIVE_WRITE: ConfigEntry[Boolean] = + buildConf("spark.gluten.sql.columnar.backend.velox.delta.enableNativeWrite") + .experimental() + .doc("Enable native Delta Lake write for Velox backend.") + .booleanConf + .createWithDefault(false) +} diff --git a/backends-velox/src-delta33/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDelta33WriteComponent b/backends-velox/src-delta33/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDelta33WriteComponent new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/backends-velox/src-delta33/main/scala/org/apache/gluten/component/VeloxDelta33WriteComponent.scala b/backends-velox/src-delta33/main/scala/org/apache/gluten/component/VeloxDelta33WriteComponent.scala new file mode 100644 index 000000000000..f1e06d0702bf --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/gluten/component/VeloxDelta33WriteComponent.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.component + +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.extension.columnar.enumerated.RasOffload +import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform +import org.apache.gluten.extension.columnar.validator.Validators +import org.apache.gluten.extension.injector.Injector + +import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.datasources.v2.{LeafV2CommandExec, OffloadDeltaCommand} + +class VeloxDelta33WriteComponent extends Component { + override def name(): String = "velox-delta33-write" + + override def buildInfo(): Component.BuildInfo = + Component.BuildInfo("VeloxDelta33Write", "N/A", "N/A", "N/A") + + override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxDeltaComponent] :: Nil + + override def injectRules(injector: Injector): Unit = { + val legacy = injector.gluten.legacy + val ras = injector.gluten.ras + legacy.injectTransform { + c => + val offload = Seq( + OffloadDeltaCommand() + ).map(_.toStrcitRule()) + HeuristicTransform.Simple( + Validators.newValidator(new GlutenConfig(c.sqlConf), offload), + offload) + } + val offloads: Seq[RasOffload] = Seq( + RasOffload.from[ExecutedCommandExec](OffloadDeltaCommand()), + RasOffload.from[LeafV2CommandExec](OffloadDeltaCommand()) + ) + offloads.foreach( + offload => + ras.injectRasRule( + c => RasOffload.Rule(offload, Validators.newValidator(new GlutenConfig(c.sqlConf)), Nil))) + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala new file mode 100644 index 000000000000..c661e820ba21 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala @@ -0,0 +1,619 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.spark.internal.{LoggingShims, MDC} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._ +import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata, Protocol} +import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable +import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, KeepAllRowsFilter, KeepMarkedRowsFilter} +import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.schema.SchemaMergingUtils +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.execution.datasources.OutputWriterFactory +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder, StructField, StructType} +import org.apache.spark.sql.util.ScalaExtensions._ +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchRow, ColumnVector} +import org.apache.spark.util.SerializableConfiguration + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.Job +import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.parquet.hadoop.util.ContextUtil + +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal + +// spotless:off +/** + * A thin wrapper over the Parquet file format to support + * - columns names without restrictions. + * - populated a column from the deletion vector of this file (if exists) to indicate + * whether the row is deleted or not according to the deletion vector. Consumers + * of this scan can use the column values to filter out the deleted rows. + */ +case class GlutenDeltaParquetFileFormat( + protocol: Protocol, + metadata: Metadata, + nullableRowTrackingFields: Boolean = false, + optimizationsEnabled: Boolean = true, + tablePath: Option[String] = None, + isCDCRead: Boolean = false) + extends GlutenParquetFileFormat + with LoggingShims { + // Validate either we have all arguments for DV enabled read or none of them. + if (hasTablePath) { + SparkSession.getActiveSession.map { session => + val useMetadataRowIndex = + session.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX) + require(useMetadataRowIndex == optimizationsEnabled, + "Wrong arguments for Delta table scan with deletion vectors") + } + } + + SparkSession.getActiveSession.ifDefined { session => + TypeWidening.assertTableReadable(session.sessionState.conf, protocol, metadata) + } + + + val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode + val referenceSchema: StructType = metadata.schema + + if (columnMappingMode == IdMapping) { + val requiredReadConf = SQLConf.PARQUET_FIELD_ID_READ_ENABLED + require(SparkSession.getActiveSession.exists(_.sessionState.conf.getConf(requiredReadConf)), + s"${requiredReadConf.key} must be enabled to support Delta id column mapping mode") + val requiredWriteConf = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED + require(SparkSession.getActiveSession.exists(_.sessionState.conf.getConf(requiredWriteConf)), + s"${requiredWriteConf.key} must be enabled to support Delta id column mapping mode") + } + + override def shortName(): String = "parquet" + + override def toString: String = "Parquet" + + /** + * prepareSchemaForRead must only be used for parquet read. + * It removes "PARQUET_FIELD_ID_METADATA_KEY" for name mapping mode which address columns by + * physical name instead of id. + */ + def prepareSchemaForRead(inputSchema: StructType): StructType = { + val schema = DeltaColumnMapping.createPhysicalSchema( + inputSchema, referenceSchema, columnMappingMode) + if (columnMappingMode == NameMapping) { + SchemaMergingUtils.transformColumns(schema) { (_, field, _) => + field.copy(metadata = new MetadataBuilder() + .withMetadata(field.metadata) + .remove(DeltaColumnMapping.PARQUET_FIELD_ID_METADATA_KEY) + .remove(DeltaColumnMapping.PARQUET_FIELD_NESTED_IDS_METADATA_KEY) + .build()) + } + } else schema + } + + /** + * Prepares filters so that they can be pushed down into the Parquet reader. + * + * If column mapping is enabled, then logical column names in the filters will be replaced with + * their corresponding physical column names. This is necessary as the Parquet files will use + * physical column names, and the requested schema pushed down in the Parquet reader will also use + * physical column names. + */ + private def prepareFiltersForRead(filters: Seq[Filter]): Seq[Filter] = { + if (!optimizationsEnabled) { + Seq.empty + } else if (columnMappingMode != NoMapping) { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper + val physicalNameMap = DeltaColumnMapping.getLogicalNameToPhysicalNameMap(referenceSchema) + .map { case (logicalName, physicalName) => (logicalName.quoted, physicalName.quoted) } + filters.flatMap(translateFilterForColumnMapping(_, physicalNameMap)) + } else { + filters + } + } + + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = optimizationsEnabled + + def hasTablePath: Boolean = tablePath.isDefined + + /** + * We sometimes need to replace FileFormat within LogicalPlans, so we have to override + * `equals` to ensure file format changes are captured + */ + override def equals(other: Any): Boolean = { + other match { + case ff: GlutenDeltaParquetFileFormat => + ff.columnMappingMode == columnMappingMode && + ff.referenceSchema == referenceSchema && + ff.optimizationsEnabled == optimizationsEnabled + case _ => false + } + } + + override def hashCode(): Int = getClass.getCanonicalName.hashCode() + + override def buildReaderWithPartitionValues( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + + val useMetadataRowIndexConf = DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX + val useMetadataRowIndex = sparkSession.sessionState.conf.getConf(useMetadataRowIndexConf) + + val parquetDataReader: PartitionedFile => Iterator[InternalRow] = + super.buildReaderWithPartitionValues( + sparkSession, + prepareSchemaForRead(dataSchema), + prepareSchemaForRead(partitionSchema), + prepareSchemaForRead(requiredSchema), + prepareFiltersForRead(filters), + options, + hadoopConf) + + val schemaWithIndices = requiredSchema.fields.zipWithIndex + def findColumn(name: String): Option[ColumnMetadata] = { + val results = schemaWithIndices.filter(_._1.name == name) + if (results.length > 1) { + throw new IllegalArgumentException( + s"There are more than one column with name=`$name` requested in the reader output") + } + results.headOption.map(e => ColumnMetadata(e._2, e._1)) + } + + val isRowDeletedColumn = findColumn(IS_ROW_DELETED_COLUMN_NAME) + val rowIndexColumnName = if (useMetadataRowIndex) { + ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME + } else { + ROW_INDEX_COLUMN_NAME + } + val rowIndexColumn = findColumn(rowIndexColumnName) + + // We don't have any additional columns to generate, just return the original reader as is. + if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) return parquetDataReader + + // We are using the row_index col generated by the parquet reader and there are no more + // columns to generate. + if (useMetadataRowIndex && isRowDeletedColumn.isEmpty) return parquetDataReader + + // Verify that either predicate pushdown with metadata column is enabled or optimizations + // are disabled. + require(useMetadataRowIndex || !optimizationsEnabled, + "Cannot generate row index related metadata with file splitting or predicate pushdown") + + if (hasTablePath && isRowDeletedColumn.isEmpty) { + throw new IllegalArgumentException( + s"Expected a column $IS_ROW_DELETED_COLUMN_NAME in the schema") + } + + val serializableHadoopConf = new SerializableConfiguration(hadoopConf) + + val useOffHeapBuffers = sparkSession.sessionState.conf.offHeapColumnVectorEnabled + (partitionedFile: PartitionedFile) => { + val rowIteratorFromParquet = parquetDataReader(partitionedFile) + try { + val iterToReturn = + iteratorWithAdditionalMetadataColumns( + partitionedFile, + rowIteratorFromParquet, + isRowDeletedColumn, + rowIndexColumn, + useOffHeapBuffers, + serializableHadoopConf, + useMetadataRowIndex) + iterToReturn.asInstanceOf[Iterator[InternalRow]] + } catch { + case NonFatal(e) => + // Close the iterator if it is a closeable resource. The `ParquetFileFormat` opens + // the file and returns `RecordReaderIterator` (which implements `AutoCloseable` and + // `Iterator`) instance as a `Iterator`. + rowIteratorFromParquet match { + case resource: AutoCloseable => closeQuietly(resource) + case _ => // do nothing + } + throw e + } + } + } + + override def supportFieldName(name: String): Boolean = { + if (columnMappingMode != NoMapping) true else super.supportFieldName(name) + } + + override def metadataSchemaFields: Seq[StructField] = { + // TODO(SPARK-47731): Parquet reader in Spark has a bug where a file containing 2b+ rows + // in a single rowgroup causes it to run out of the `Integer` range. + // For Delta Parquet readers don't expose the row_index field as a metadata field when it is + // not strictly required. We do expose it when Row Tracking or DVs are enabled. + // In general, having 2b+ rows in a single rowgroup is not a common use case. When the issue is + // hit an exception is thrown. + (protocol, metadata) match { + // We should not expose row tracking fields for CDC reads. + case (p, m) if RowId.isEnabled(p, m) && !isCDCRead => + val extraFields = RowTracking.createMetadataStructFields(p, m, nullableRowTrackingFields) + super.metadataSchemaFields ++ extraFields + case (p, m) if deletionVectorsReadable(p, m) => super.metadataSchemaFields + case _ => super.metadataSchemaFields.filter(_ != ParquetFileFormat.ROW_INDEX_FIELD) + } + } + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val factory = super.prepareWrite(sparkSession, job, options, dataSchema) + val conf = ContextUtil.getConfiguration(job) + // Always write timestamp as TIMESTAMP_MICROS for Iceberg compat based on Iceberg spec + if (IcebergCompatV1.isEnabled(metadata) || IcebergCompatV2.isEnabled(metadata)) { + conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) + } + if (IcebergCompatV2.isEnabled(metadata)) { + // For Uniform with IcebergCompatV2, we need to write nested field IDs for list and map + // types to the parquet schema. Spark currently does not support it so we hook in our + // own write support class. + ParquetOutputFormat.setWriteSupportClass(job, classOf[DeltaParquetWriteSupport]) + } + factory + } + + override def fileConstantMetadataExtractors: Map[String, PartitionedFile => Any] = { + val extractBaseRowId: PartitionedFile => Any = { file => + file.otherConstantMetadataColumnValues.getOrElse(RowId.BASE_ROW_ID, { + throw new IllegalStateException( + s"Missing ${RowId.BASE_ROW_ID} value for file '${file.filePath}'") + }) + } + val extractDefaultRowCommitVersion: PartitionedFile => Any = { file => + file.otherConstantMetadataColumnValues + .getOrElse(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, { + throw new IllegalStateException( + s"Missing ${DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME} value " + + s"for file '${file.filePath}'") + }) + } + super.fileConstantMetadataExtractors + .updated(RowId.BASE_ROW_ID, extractBaseRowId) + .updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, extractDefaultRowCommitVersion) + } + + def copyWithDVInfo( + tablePath: String, + optimizationsEnabled: Boolean): GlutenDeltaParquetFileFormat = { + // When predicate pushdown is enabled we allow both splits and predicate pushdown. + this.copy( + optimizationsEnabled = optimizationsEnabled, + tablePath = Some(tablePath)) + } + + /** + * Modifies the data read from underlying Parquet reader by populating one or both of the + * following metadata columns. + * - [[IS_ROW_DELETED_COLUMN_NAME]] - row deleted status from deletion vector corresponding + * to this file + * - [[ROW_INDEX_COLUMN_NAME]] - index of the row within the file. Note, this column is only + * populated when we are not using _metadata.row_index column. + */ + private def iteratorWithAdditionalMetadataColumns( + partitionedFile: PartitionedFile, + iterator: Iterator[Object], + isRowDeletedColumnOpt: Option[ColumnMetadata], + rowIndexColumnOpt: Option[ColumnMetadata], + useOffHeapBuffers: Boolean, + serializableHadoopConf: SerializableConfiguration, + useMetadataRowIndex: Boolean): Iterator[Object] = { + require(!useMetadataRowIndex || rowIndexColumnOpt.isDefined, + "useMetadataRowIndex is enabled but rowIndexColumn is not defined.") + + val rowIndexFilterOpt = isRowDeletedColumnOpt.map { col => + // Fetch the DV descriptor from the broadcast map and create a row index filter + val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues + .get(FILE_ROW_INDEX_FILTER_ID_ENCODED) + val filterTypeOpt = partitionedFile.otherConstantMetadataColumnValues + .get(FILE_ROW_INDEX_FILTER_TYPE) + if (dvDescriptorOpt.isDefined && filterTypeOpt.isDefined) { + val rowIndexFilter = filterTypeOpt.get match { + case RowIndexFilterType.IF_CONTAINED => DropMarkedRowsFilter + case RowIndexFilterType.IF_NOT_CONTAINED => KeepMarkedRowsFilter + case unexpectedFilterType => throw new IllegalStateException( + s"Unexpected row index filter type: ${unexpectedFilterType}") + } + rowIndexFilter.createInstance( + DeletionVectorDescriptor.deserializeFromBase64(dvDescriptorOpt.get.asInstanceOf[String]), + serializableHadoopConf.value, + tablePath.map(new Path(_))) + } else if (dvDescriptorOpt.isDefined || filterTypeOpt.isDefined) { + throw new IllegalStateException( + s"Both ${FILE_ROW_INDEX_FILTER_ID_ENCODED} and ${FILE_ROW_INDEX_FILTER_TYPE} " + + "should either both have values or no values at all.") + } else { + KeepAllRowsFilter + } + } + + // We only generate the row index column when predicate pushdown is not enabled. + val rowIndexColumnToWriteOpt = if (useMetadataRowIndex) None else rowIndexColumnOpt + val metadataColumnsToWrite = + Seq(isRowDeletedColumnOpt, rowIndexColumnToWriteOpt).filter(_.nonEmpty).map(_.get) + + // When metadata.row_index is not used there is no way to verify the Parquet index is + // starting from 0. We disable the splits, so the assumption is ParquetFileFormat respects + // that. + var rowIndex: Long = 0 + + // Used only when non-column row batches are received from the Parquet reader + val tempVector = new OnHeapColumnVector(1, ByteType) + + iterator.map { row => + row match { + case batch: ColumnarBatch => // When vectorized Parquet reader is enabled. + val size = batch.numRows() + // Create vectors for all needed metadata columns. + // We can't use the one from Parquet reader as it set the + // [[WritableColumnVector.isAllNulls]] to true and it can't be reset with using any + // public APIs. + trySafely(useOffHeapBuffers, size, metadataColumnsToWrite) { writableVectors => + val indexVectorTuples = new ArrayBuffer[(Int, ColumnVector)] + + // When predicate pushdown is enabled we use _metadata.row_index. Therefore, + // we only need to construct the isRowDeleted column. + var index = 0 + isRowDeletedColumnOpt.foreach { columnMetadata => + val isRowDeletedVector = writableVectors(index) + if (useMetadataRowIndex) { + rowIndexFilterOpt.get.materializeIntoVectorWithRowIndex( + size, batch.column(rowIndexColumnOpt.get.index), isRowDeletedVector) + } else { + rowIndexFilterOpt.get + .materializeIntoVector(rowIndex, rowIndex + size, isRowDeletedVector) + } + indexVectorTuples += (columnMetadata.index -> isRowDeletedVector) + index += 1 + } + + rowIndexColumnToWriteOpt.foreach { columnMetadata => + val rowIndexVector = writableVectors(index) + // populate the row index column value. + for (i <- 0 until size) { + rowIndexVector.putLong(i, rowIndex + i) + } + + indexVectorTuples += (columnMetadata.index -> rowIndexVector) + index += 1 + } + + val newBatch = replaceVectors(batch, indexVectorTuples.toSeq: _*) + rowIndex += size + newBatch + } + + case columnarRow: ColumnarBatchRow => + // When vectorized reader is enabled but returns immutable rows instead of + // columnar batches [[ColumnarBatchRow]]. So we have to copy the row as a + // mutable [[InternalRow]] and set the `row_index` and `is_row_deleted` + // column values. This is not efficient. It should affect only the wide + // tables. https://github.com/delta-io/delta/issues/2246 + val newRow = columnarRow.copy(); + isRowDeletedColumnOpt.foreach { columnMetadata => + val rowIndexForFiltering = if (useMetadataRowIndex) { + columnarRow.getLong(rowIndexColumnOpt.get.index) + } else { + rowIndex + } + rowIndexFilterOpt.get.materializeSingleRowWithRowIndex(rowIndexForFiltering, tempVector) + newRow.setByte(columnMetadata.index, tempVector.getByte(0)) + } + + rowIndexColumnToWriteOpt + .foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex)) + rowIndex += 1 + + newRow + case rest: InternalRow => // When vectorized Parquet reader is disabled + // Temporary vector variable used to get DV values from RowIndexFilter + // Currently the RowIndexFilter only supports writing into a columnar vector + // and doesn't have methods to get DV value for a specific row index. + // TODO: This is not efficient, but it is ok given the default reader is vectorized + isRowDeletedColumnOpt.foreach { columnMetadata => + val rowIndexForFiltering = if (useMetadataRowIndex) { + rest.getLong(rowIndexColumnOpt.get.index) + } else { + rowIndex + } + rowIndexFilterOpt.get.materializeSingleRowWithRowIndex(rowIndexForFiltering, tempVector) + rest.setByte(columnMetadata.index, tempVector.getByte(0)) + } + + rowIndexColumnToWriteOpt + .foreach(columnMetadata => rest.setLong(columnMetadata.index, rowIndex)) + rowIndex += 1 + rest + case others => + throw new RuntimeException( + s"Parquet reader returned an unknown row type: ${others.getClass.getName}") + } + } + } + + /** + * Translates the filter to use physical column names instead of logical column names. + * This is needed when the column mapping mode is set to `NameMapping` or `IdMapping` + * to match the requested schema that's passed to the [[ParquetFileFormat]]. + */ + private def translateFilterForColumnMapping( + filter: Filter, + physicalNameMap: Map[String, String]): Option[Filter] = { + object PhysicalAttribute { + def unapply(attribute: String): Option[String] = { + physicalNameMap.get(attribute) + } + } + + filter match { + case EqualTo(PhysicalAttribute(physicalAttribute), value) => + Some(EqualTo(physicalAttribute, value)) + case EqualNullSafe(PhysicalAttribute(physicalAttribute), value) => + Some(EqualNullSafe(physicalAttribute, value)) + case GreaterThan(PhysicalAttribute(physicalAttribute), value) => + Some(GreaterThan(physicalAttribute, value)) + case GreaterThanOrEqual(PhysicalAttribute(physicalAttribute), value) => + Some(GreaterThanOrEqual(physicalAttribute, value)) + case LessThan(PhysicalAttribute(physicalAttribute), value) => + Some(LessThan(physicalAttribute, value)) + case LessThanOrEqual(PhysicalAttribute(physicalAttribute), value) => + Some(LessThanOrEqual(physicalAttribute, value)) + case In(PhysicalAttribute(physicalAttribute), values) => + Some(In(physicalAttribute, values)) + case IsNull(PhysicalAttribute(physicalAttribute)) => + Some(IsNull(physicalAttribute)) + case IsNotNull(PhysicalAttribute(physicalAttribute)) => + Some(IsNotNull(physicalAttribute)) + case And(left, right) => + val newLeft = translateFilterForColumnMapping(left, physicalNameMap) + val newRight = translateFilterForColumnMapping(right, physicalNameMap) + (newLeft, newRight) match { + case (Some(l), Some(r)) => Some(And(l, r)) + case (Some(l), None) => Some(l) + case (_, _) => newRight + } + case Or(left, right) => + val newLeft = translateFilterForColumnMapping(left, physicalNameMap) + val newRight = translateFilterForColumnMapping(right, physicalNameMap) + (newLeft, newRight) match { + case (Some(l), Some(r)) => Some(Or(l, r)) + case (_, _) => None + } + case Not(child) => + translateFilterForColumnMapping(child, physicalNameMap).map(Not) + case StringStartsWith(PhysicalAttribute(physicalAttribute), value) => + Some(StringStartsWith(physicalAttribute, value)) + case StringEndsWith(PhysicalAttribute(physicalAttribute), value) => + Some(StringEndsWith(physicalAttribute, value)) + case StringContains(PhysicalAttribute(physicalAttribute), value) => + Some(StringContains(physicalAttribute, value)) + case AlwaysTrue() => Some(AlwaysTrue()) + case AlwaysFalse() => Some(AlwaysFalse()) + case _ => + logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}") + None + } + } +} + +object GlutenDeltaParquetFileFormat { + /** + * Column name used to identify whether the row read from the parquet file is marked + * as deleted according to the Delta table deletion vectors + */ + val IS_ROW_DELETED_COLUMN_NAME = "__delta_internal_is_row_deleted" + val IS_ROW_DELETED_STRUCT_FIELD = StructField(IS_ROW_DELETED_COLUMN_NAME, ByteType) + + /** Row index for each column */ + val ROW_INDEX_COLUMN_NAME = "__delta_internal_row_index" + val ROW_INDEX_STRUCT_FIELD = StructField(ROW_INDEX_COLUMN_NAME, LongType) + + /** The key to the encoded row index filter identifier value of the + * [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */ + val FILE_ROW_INDEX_FILTER_ID_ENCODED = "row_index_filter_id_encoded" + + /** The key to the row index filter type value of the + * [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */ + val FILE_ROW_INDEX_FILTER_TYPE = "row_index_filter_type" + + /** Utility method to create a new writable vector */ + private def newVector( + useOffHeapBuffers: Boolean, size: Int, dataType: StructField): WritableColumnVector = { + if (useOffHeapBuffers) { + OffHeapColumnVector.allocateColumns(size, Seq(dataType).toArray)(0) + } else { + OnHeapColumnVector.allocateColumns(size, Seq(dataType).toArray)(0) + } + } + + /** Try the operation, if the operation fails release the created resource */ + private def trySafely[R <: WritableColumnVector, T]( + useOffHeapBuffers: Boolean, + size: Int, + columns: Seq[ColumnMetadata])(f: Seq[WritableColumnVector] => T): T = { + val resources = new ArrayBuffer[WritableColumnVector](columns.size) + try { + columns.foreach(col => resources.append(newVector(useOffHeapBuffers, size, col.structField))) + f(resources.toSeq) + } catch { + case NonFatal(e) => + resources.foreach(closeQuietly(_)) + throw e + } + } + + /** Utility method to quietly close an [[AutoCloseable]] */ + private def closeQuietly(closeable: AutoCloseable): Unit = { + if (closeable != null) { + try { + closeable.close() + } catch { + case NonFatal(_) => // ignore + } + } + } + + /** + * Helper method to replace the vectors in given [[ColumnarBatch]]. + * New vectors and its index in the batch are given as tuples. + */ + private def replaceVectors( + batch: ColumnarBatch, + indexVectorTuples: (Int, ColumnVector) *): ColumnarBatch = { + val vectors = ArrayBuffer[ColumnVector]() + for (i <- 0 until batch.numCols()) { + var replaced: Boolean = false + for (indexVectorTuple <- indexVectorTuples) { + val index = indexVectorTuple._1 + val vector = indexVectorTuple._2 + if (indexVectorTuple._1 == i) { + vectors += indexVectorTuple._2 + // Make sure to close the existing vector allocated in the Parquet + batch.column(i).close() + replaced = true + } + } + if (!replaced) { + vectors += batch.column(i) + } + } + new ColumnarBatch(vectors.toArray, batch.numRows()) + } + + /** Helper class to encapsulate column info */ + case class ColumnMetadata(index: Int, structField: StructField) +} +// spotless:on diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala new file mode 100644 index 000000000000..275d2c0cd910 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.gluten.config.VeloxDeltaConfig + +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.delta.actions.{AddFile, FileAction} +import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInvariantCheckerExec} +import org.apache.spark.sql.delta.files.{GlutenDeltaFileFormatWriter, TransactionalWrite} +import org.apache.spark.sql.delta.hooks.AutoCompact +import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec +import org.apache.spark.sql.delta.schema.InnerInvariantViolationException +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.{GlutenDeltaIdentityColumnStatsTracker, GlutenDeltaJobStatisticsTracker} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.ScalaExtensions.OptionExt +import org.apache.spark.util.SerializableConfiguration + +import scala.collection.mutable.ListBuffer + +class GlutenOptimisticTransaction(delegate: OptimisticTransaction) + extends OptimisticTransaction( + delegate.deltaLog, + delegate.catalogTable, + delegate.snapshot + ) { + + override def writeFiles( + inputData: Dataset[_], + writeOptions: Option[DeltaOptions], + isOptimize: Boolean, + additionalConstraints: Seq[Constraint]): Seq[FileAction] = { + hasWritten = true + + val spark = inputData.sparkSession + val veloxDeltaConfig = new VeloxDeltaConfig(spark.sessionState.conf) + + val (data, partitionSchema) = performCDCPartition(inputData) + val outputPath = deltaLog.dataPath + + val (queryExecution, output, generatedColumnConstraints, trackFromData) = + normalizeData(deltaLog, writeOptions, data) + // Use the track set from the transaction if set, + // otherwise use the track set from `normalizeData()`. + val trackIdentityHighWaterMarks = trackHighWaterMarks.getOrElse(trackFromData) + + val partitioningColumns = getPartitioningColumns(partitionSchema, output) + + val committer = getCommitter(outputPath) + + val (statsDataSchema, _) = getStatsSchema(output, partitionSchema) + + // If Statistics Collection is enabled, then create a stats tracker that will be injected during + // the FileFormatWriter.write call below and will collect per-file stats using + // StatisticsCollection + val optionalStatsTracker = + getOptionalStatsTrackerAndStatsCollection(output, outputPath, partitionSchema, data)._1.map( + new GlutenDeltaJobStatisticsTracker(_)) + + val constraints = + Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ additionalConstraints + + val identityTrackerOpt = IdentityColumn + .createIdentityColumnStatsTracker( + spark, + deltaLog.newDeltaHadoopConf(), + outputPath, + metadata.schema, + statsDataSchema, + trackIdentityHighWaterMarks + ) + .map(new GlutenDeltaIdentityColumnStatsTracker(_)) + + SQLExecution.withNewExecutionId(queryExecution, Option("deltaTransactionalWrite")) { + val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, Map.empty, output) + + val empty2NullPlan = + convertEmptyToNullIfNeeded(queryExecution.executedPlan, partitioningColumns, constraints) + val maybeCheckInvariants = if (constraints.isEmpty) { + // Compared to vanilla Delta, we simply avoid adding the invariant checker + // when the constraint list is empty, to avoid the unnecessary transitions + // added around the invariant checker. + empty2NullPlan + } else { + DeltaInvariantCheckerExec(empty2NullPlan, constraints) + } + // No need to plan optimized write if the write command is OPTIMIZE, which aims to produce + // evenly-balanced data files already. + val physicalPlan = + if ( + !isOptimize && + shouldOptimizeWrite(writeOptions, spark.sessionState.conf) + ) { + DeltaOptimizedWriterExec(maybeCheckInvariants, metadata.partitionColumns, deltaLog) + } else { + maybeCheckInvariants + } + + val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer() + + if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) { + val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker( + new SerializableConfiguration(deltaLog.newDeltaHadoopConf()), + BasicWriteJobStatsTracker.metrics) + registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics) + statsTrackers.append(basicWriteJobStatsTracker) + } + + // Iceberg spec requires partition columns in data files + val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata) + // Retain only a minimal selection of Spark writer options to avoid any potential + // compatibility issues + val options = (writeOptions match { + case None => Map.empty[String, String] + case Some(writeOptions) => + writeOptions.options.filterKeys { + key => + key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) || + key.equalsIgnoreCase(DeltaOptions.COMPRESSION) + }.toMap + }) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString) + + try { + GlutenDeltaFileFormatWriter.write( + sparkSession = spark, + plan = physicalPlan, + fileFormat = new GlutenDeltaParquetFileFormat( + protocol, + metadata + ), // This is changed to Gluten's Delta format. + committer = committer, + outputSpec = outputSpec, + // scalastyle:off deltahadoopconfiguration + hadoopConf = + spark.sessionState.newHadoopConfWithOptions(metadata.configuration ++ deltaLog.options), + // scalastyle:on deltahadoopconfiguration + partitionColumns = partitioningColumns, + bucketSpec = None, + statsTrackers = optionalStatsTracker.toSeq + ++ statsTrackers + ++ identityTrackerOpt.toSeq, + options = options + ) + } catch { + case InnerInvariantViolationException(violationException) => + // Pull an InvariantViolationException up to the top level if it was the root cause. + throw violationException + } + statsTrackers.foreach { + case tracker: BasicWriteJobStatsTracker => + val numOutputRowsOpt = tracker.driverSideMetrics.get("numOutputRows").map(_.value) + IdentityColumn.logTableWrite(snapshot, trackIdentityHighWaterMarks, numOutputRowsOpt) + case _ => () + } + } + + var resultFiles = + (if (optionalStatsTracker.isDefined) { + committer.addedStatuses.map { + a => + a.copy(stats = optionalStatsTracker + .map(_.delegate.recordedStats(a.toPath.getName)) + .getOrElse(a.stats)) + } + } else { + committer.addedStatuses + }) + .filter { + // In some cases, we can write out an empty `inputData`. Some examples of this (though, they + // may be fixed in the future) are the MERGE command when you delete with empty source, or + // empty target, or on disjoint tables. This is hard to catch before the write without + // collecting the DF ahead of time. Instead, we can return only the AddFiles that + // a) actually add rows, or + // b) don't have any stats so we don't know the number of rows at all + case a: AddFile => a.numLogicalRecords.forall(_ > 0) + case _ => true + } + + // add [[AddFile.Tags.ICEBERG_COMPAT_VERSION.name]] tags to addFiles + if (IcebergCompatV2.isEnabled(metadata)) { + resultFiles = resultFiles.map { + addFile => + val tags = if (addFile.tags != null) addFile.tags else Map.empty[String, String] + addFile.copy(tags = tags + (AddFile.Tags.ICEBERG_COMPAT_VERSION.name -> "2")) + } + } + + if (resultFiles.nonEmpty && !isOptimize) registerPostCommitHook(AutoCompact) + // Record the updated high water marks to be used during transaction commit. + identityTrackerOpt.ifDefined { + tracker => updatedIdentityHighWaterMarks.appendAll(tracker.delegate.highWaterMarks.toSeq) + } + + resultFiles.toSeq ++ committer.changeFiles + } + + private def shouldOptimizeWrite( + writeOptions: Option[DeltaOptions], + sessionConf: SQLConf): Boolean = { + writeOptions + .flatMap(_.optimizeWrite) + .getOrElse(TransactionalWrite.shouldOptimizeWrite(metadata, sessionConf)) + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala new file mode 100644 index 000000000000..91bc39bd2e4b --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.datasource.GlutenFormatFactory + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.StructType + +import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.parquet.hadoop.codec.CodecConfig +import org.apache.parquet.hadoop.util.ContextUtil +import org.slf4j.LoggerFactory + +class GlutenParquetFileFormat + extends ParquetFileFormat + with DataSourceRegister + with Logging + with Serializable { + import GlutenParquetFileFormat._ + + private val logger = LoggerFactory.getLogger(classOf[GlutenParquetFileFormat]) + + override def shortName(): String = "gluten-parquet" + + override def toString: String = "GlutenParquet" + + override def hashCode(): Int = getClass.hashCode() + + override def equals(other: Any): Boolean = other.isInstanceOf[GlutenParquetFileFormat] + + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + super.inferSchema(sparkSession, options, files) + } + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + if (isNativeWritable(dataSchema)) { + // Pass compression to job conf so that the file extension can be aware of it. + val conf = ContextUtil.getConfiguration(job) + val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) + val nativeConf = + GlutenFormatFactory("parquet") + .nativeConf(options, parquetOptions.compressionCodecClassName) + + return new OutputWriterFactory { + override def getFileExtension(context: TaskAttemptContext): String = { + CodecConfig.from(context).getCodec.getExtension + ".parquet" + } + + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + GlutenFormatFactory("parquet") + .createOutputWriter(path, dataSchema, context, nativeConf) + + } + } + } + logger.warn( + s"Data schema is unsupported by Gluten Parquet writer: $dataSchema, " + + s"falling back to the vanilla Spark Parquet writer") + super.prepareWrite(sparkSession, job, options, dataSchema) + } +} + +object GlutenParquetFileFormat { + def isNativeWritable(schema: StructType): Boolean = { + BackendsApiManager.getSettings.supportNativeWrite(schema.fields) + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala new file mode 100644 index 000000000000..90c0fa1bffc7 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.files + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.backendsapi.velox.VeloxBatchType +import org.apache.gluten.execution._ +import org.apache.gluten.execution.datasource.GlutenFormatFactory +import org.apache.gluten.extension.columnar.transition.Transitions + +import org.apache.spark._ +import org.apache.spark.internal.{LoggingShims, MDC} +import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} +import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.connector.write.WriterCommitMessage +import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat} +import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.FileFormatWriter._ +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.{SerializableConfiguration, Utils} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import java.util.{Date, UUID} + +// spotless:off +/** + * A helper object for writing FileFormat data out to a location. + * Logic is copied from FileFormatWriter from Spark 3.5 with added functionality to write partition + * values to data files. Specifically L123-126, L132, and L140 where it adds option + * WRITE_PARTITION_COLUMNS + */ +object GlutenDeltaFileFormatWriter extends LoggingShims { + + /** + * A variable used in tests to check whether the output ordering of the query matches the + * required ordering of the write command. + */ + private var outputOrderingMatched: Boolean = false + + /** + * A variable used in tests to check the final executed plan. + */ + private var executedPlan: Option[SparkPlan] = None + + // scalastyle:off argcount + /** + * Basic work flow of this command is: + * 1. Driver side setup, including output committer initialization and data source specific + * preparation work for the write job to be issued. + * 2. Issues a write job consists of one or more executor side tasks, each of which writes all + * rows within an RDD partition. + * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any + * exception is thrown during task commitment, also aborts that task. + * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is + * thrown during job commitment, also aborts the job. + * 5. If the job is successfully committed, perform post-commit operations such as + * processing statistics. + * @return The set of all partition paths that were updated during this write job. + */ + def write( + sparkSession: SparkSession, + plan: SparkPlan, + fileFormat: FileFormat, + committer: FileCommitProtocol, + outputSpec: OutputSpec, + hadoopConf: Configuration, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + statsTrackers: Seq[WriteJobStatsTracker], + options: Map[String, String], + numStaticPartitionCols: Int = 0): Set[String] = { + require(partitionColumns.size >= numStaticPartitionCols) + + val job = Job.getInstance(hadoopConf) + job.setOutputKeyClass(classOf[Void]) + job.setOutputValueClass(classOf[InternalRow]) + FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) + + val partitionSet = AttributeSet(partitionColumns) + // cleanup the internal metadata information of + // the file source metadata attribute if any before write out + val finalOutputSpec = outputSpec.copy( + outputColumns = outputSpec.outputColumns + .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation) + ) + val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains) + + val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options) + val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns) + + val caseInsensitiveOptions = CaseInsensitiveMap(options) + + val dataSchema = dataColumns.toStructType + DataSourceUtils.verifySchema(fileFormat, dataSchema) + DataSourceUtils.checkFieldNames(fileFormat, dataSchema) + // Note: prepareWrite has side effect. It sets "job". + + val outputDataColumns = + if (caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true")) { + dataColumns ++ partitionColumns + } else dataColumns + + val outputWriterFactory = + fileFormat.prepareWrite( + sparkSession, + job, + caseInsensitiveOptions, + outputDataColumns.toStructType + ) + + val description = new WriteJobDescription( + uuid = UUID.randomUUID.toString, + serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), + outputWriterFactory = outputWriterFactory, + allColumns = finalOutputSpec.outputColumns, + dataColumns = outputDataColumns, + partitionColumns = partitionColumns, + bucketSpec = writerBucketSpec, + path = finalOutputSpec.outputPath, + customPartitionLocations = finalOutputSpec.customPartitionLocations, + maxRecordsPerFile = caseInsensitiveOptions + .get("maxRecordsPerFile") + .map(_.toLong) + .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile), + timeZoneId = caseInsensitiveOptions + .get(DateTimeUtils.TIMEZONE_OPTION) + .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone), + statsTrackers = statsTrackers + ) + + // We should first sort by dynamic partition columns, then bucket id, and finally sorting + // columns. + val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ + writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns + val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) + + // SPARK-40588: when planned writing is disabled and AQE is enabled, + // plan contains an AdaptiveSparkPlanExec, which does not know + // its final plan's ordering, so we have to materialize that plan first + // it is fine to use plan further down as the final plan is cached in that plan + def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan + case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan)) + } + + // the sort order doesn't matter + val actualOrdering = writeFilesOpt + .map(_.child) + .getOrElse(materializeAdaptiveSparkPlan(plan)) + .outputOrdering + val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) + + SQLExecution.checkSQLExecutionId(sparkSession) + + // propagate the description UUID into the jobs, so that committers + // get an ID guaranteed to be unique. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) + + // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will add logical sort + // operator based on the required ordering of the V1 write command. So the output + // ordering of the physical plan should always match the required ordering. Here + // we set the variable to verify this behavior in tests. + // There are two cases where FileFormatWriter still needs to add physical sort: + // 1) When the planned write config is disabled. + // 2) When the concurrent writers are enabled (in this case the required ordering of a + // V1 write command will be empty). + if (Utils.isTesting) outputOrderingMatched = orderingMatched + + if (writeFilesOpt.isDefined) { + // build `WriteFilesSpec` for `WriteFiles` + val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => { + val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec) + createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns) + } + val writeSpec = WriteFilesSpec( + description = description, + committer = committer, + concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc + ) + executeWrite(sparkSession, plan, writeSpec, job) + } else { + executeWrite( + sparkSession, + plan, + job, + description, + committer, + outputSpec, + requiredOrdering, + partitionColumns, + sortColumns, + orderingMatched, + GlutenParquetFileFormat.isNativeWritable(dataSchema) + ) + } + } + // scalastyle:on argcount + + private def executeWrite( + sparkSession: SparkSession, + plan: SparkPlan, + job: Job, + description: WriteJobDescription, + committer: FileCommitProtocol, + outputSpec: OutputSpec, + requiredOrdering: Seq[Expression], + partitionColumns: Seq[Attribute], + sortColumns: Seq[Attribute], + orderingMatched: Boolean, + writeOffloadable: Boolean): Set[String] = { + val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns) + val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan + + writeAndCommit(job, description, committer) { + val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) { + (empty2NullPlan, None) + } else { + val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, outputSpec) + val concurrentOutputWriterSpec = + createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns) + if (concurrentOutputWriterSpec.isDefined) { + (empty2NullPlan, concurrentOutputWriterSpec) + } else { + def addNativeSort(input: SparkPlan): SparkPlan = { + val nativeSortPlan = SortExecTransformer(sortPlan.sortOrder, sortPlan.global, child = input) + val validationResult = nativeSortPlan.doValidate() + assert(validationResult.ok(), + s"Sort operation for Delta write is not offload-able: ${validationResult.reason()}") + nativeSortPlan + } + val newPlan = sortPlan.child match { + case WholeStageTransformer(wholeStageChild, materializeInput) => + WholeStageTransformer(addNativeSort(wholeStageChild), + materializeInput)(ColumnarCollapseTransformStages.transformStageCounter.incrementAndGet()) + case other => + Transitions.toBatchPlan(sortPlan, VeloxBatchType) + } + (newPlan, None) + } + } + + val wrappedPlanToExecute = if (writeOffloadable) { + BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(planToExecute) + } else { + planToExecute + } + + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(wrappedPlanToExecute) + + val rdd = wrappedPlanToExecute.execute() + + // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single + // partition rdd to make sure we at least set up one write task to write the metadata. + val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) { + sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1) + } else { + rdd + } + + val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date()) + val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length) + val partitionColumnToDataType = description.partitionColumns + .map(attr => (attr.name, attr.dataType)).toMap + sparkSession.sparkContext.runJob( + rddWithNonEmptyPartitions, + (taskContext: TaskContext, iter: Iterator[InternalRow]) => { + executeTask( + description = description, + jobTrackerID = jobTrackerID, + sparkStageId = taskContext.stageId(), + sparkPartitionId = taskContext.partitionId(), + sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, + committer, + iterator = iter, + concurrentOutputWriterSpec = concurrentOutputWriterSpec, + partitionColumnToDataType + ) + }, + rddWithNonEmptyPartitions.partitions.indices, + (index, res: WriteTaskResult) => { + committer.onTaskCommit(res.commitMsg) + ret(index) = res + } + ) + ret + } + } + + private def writeAndCommit( + job: Job, + description: WriteJobDescription, + committer: FileCommitProtocol)(f: => Array[WriteTaskResult]): Set[String] = { + // This call shouldn't be put into the `try` block below because it only initializes and + // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + committer.setupJob(job) + try { + val ret = f + val commitMsgs = ret.map(_.commitMsg) + + logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.") + val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) } + logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} committed. " + + log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.") + + processStats(description.statsTrackers, ret.map(_.summary.stats), duration) + logInfo(log"Finished processing stats for write job " + + log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.") + + // return a set of all the partition paths that were updated during this job + ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty) + } catch { + case cause: Throwable => + logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}", cause) + committer.abortJob(job) + throw cause + } + } + + /** + * Write files using [[SparkPlan.executeWrite]] + */ + private def executeWrite( + session: SparkSession, + planForWrites: SparkPlan, + writeFilesSpec: WriteFilesSpec, + job: Job): Set[String] = { + val committer = writeFilesSpec.committer + val description = writeFilesSpec.description + + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planForWrites) + + writeAndCommit(job, description, committer) { + val rdd = planForWrites.executeWrite(writeFilesSpec) + val ret = new Array[WriteTaskResult](rdd.partitions.length) + session.sparkContext.runJob( + rdd, + (context: TaskContext, iter: Iterator[WriterCommitMessage]) => { + assert(iter.hasNext) + val commitMessage = iter.next() + assert(!iter.hasNext) + commitMessage + }, + rdd.partitions.indices, + (index, res: WriterCommitMessage) => { + assert(res.isInstanceOf[WriteTaskResult]) + val writeTaskResult = res.asInstanceOf[WriteTaskResult] + committer.onTaskCommit(writeTaskResult.commitMsg) + ret(index) = writeTaskResult + } + ) + ret + } + } + + private def createSortPlan( + plan: SparkPlan, + requiredOrdering: Seq[Expression], + outputSpec: OutputSpec): SortExec = { + // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and + // the physical plan may have different attribute ids due to optimizer removing some + // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. + val orderingExpr = + bindReferences(requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns) + SortExec(orderingExpr, global = false, child = plan) + } + + private def createConcurrentOutputWriterSpec( + sparkSession: SparkSession, + sortPlan: SortExec, + sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = { + val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters + val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty + if (concurrentWritersEnabled) { + Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())) + } else { + None + } + } + + /** Writes data out in a single Spark task. */ + private def executeTask( + description: WriteJobDescription, + jobTrackerID: String, + sparkStageId: Int, + sparkPartitionId: Int, + sparkAttemptNumber: Int, + committer: FileCommitProtocol, + iterator: Iterator[InternalRow], + concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec], + partitionColumnToDataType: Map[String, DataType]): WriteTaskResult = { + + val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId) + val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) + val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) + + // Set up the attempt context required to use in the output committer. + val taskAttemptContext: TaskAttemptContext = { + // Set up the configuration object + val hadoopConf = description.serializableHadoopConf.value + hadoopConf.set("mapreduce.job.id", jobId.toString) + hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) + hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString) + hadoopConf.setBoolean("mapreduce.task.ismap", true) + hadoopConf.setInt("mapreduce.task.partition", 0) + + if (partitionColumnToDataType.isEmpty) { + new TaskAttemptContextImpl(hadoopConf, taskAttemptId) + } else { + new DeltaFileFormatWriter.PartitionedTaskAttemptContextImpl(hadoopConf, taskAttemptId, partitionColumnToDataType) + } + } + + committer.setupTask(taskAttemptContext) + + val dataWriter = + if (sparkPartitionId != 0 && !iterator.hasNext) { + // In case of empty job, leave first partition to save meta for file format like parquet. + new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) + } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { + new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + } else { + concurrentOutputWriterSpec match { + case Some(spec) => + new DynamicPartitionDataConcurrentWriter( + description, + taskAttemptContext, + committer, + spec + ) + case _ => + // Columnar-based partition writer to divide the input batch by partition values + // and bucket IDs in advance. + new ColumnarDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + } + } + + try { + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + // Execute the task to write rows out and commit the task. + dataWriter.writeWithIterator(iterator) + dataWriter.commit() + })(catchBlock = { + // If there is an error, abort the task + dataWriter.abort() + logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.") + }, finallyBlock = { + dataWriter.close() + }) + } catch { + case e: FetchFailedException => + throw e + case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput => + // If any output file to write already exists, it does not make sense to re-run this task. + // We throw the exception and let Executor throw ExceptionFailure to abort the job. + throw new TaskOutputFileAlreadyExistException(f) + case t: Throwable => + throw QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t) + } + } + + /** + * For every registered [[WriteJobStatsTracker]], call `processStats()` on it, passing it + * the corresponding [[WriteTaskStats]] from all executors. + */ + private def processStats( + statsTrackers: Seq[WriteJobStatsTracker], + statsPerTask: Seq[Seq[WriteTaskStats]], + jobCommitDuration: Long): Unit = { + + val numStatsTrackers = statsTrackers.length + assert( + statsPerTask.forall(_.length == numStatsTrackers), + s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker. + |There are $numStatsTrackers statsTrackers, but some task returned + |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead. + """.stripMargin + ) + + val statsPerTracker = if (statsPerTask.nonEmpty) { + statsPerTask.transpose + } else { + statsTrackers.map(_ => Seq.empty) + } + + statsTrackers.zip(statsPerTracker).foreach { + case (statsTracker, stats) => statsTracker.processStats(stats, jobCommitDuration) + } + } + + private class ColumnarDynamicPartitionDataSingleWriter( + description: WriteJobDescription, + taskAttemptContext: TaskAttemptContext, + committer: FileCommitProtocol, + customMetrics: Map[String, SQLMetric] = Map.empty) + extends BaseDynamicPartitionDataWriter( + description, + taskAttemptContext, + committer, + customMetrics) { + + private var currentPartitionValues: Option[UnsafeRow] = None + private var currentBucketId: Option[Int] = None + + private val partitionColIndice: Array[Int] = + description.partitionColumns.flatMap { + pcol => + description.allColumns.zipWithIndex.collect { + case (acol, index) if acol.name == pcol.name && acol.exprId == pcol.exprId => index + } + }.toArray + + private def beforeWrite(record: InternalRow): Unit = { + val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None + val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None + + if (currentPartitionValues != nextPartitionValues || currentBucketId != nextBucketId) { + // See a new partition or bucket - write to a new partition dir (or a new bucket file). + if (isPartitioned && currentPartitionValues != nextPartitionValues) { + currentPartitionValues = Some(nextPartitionValues.get.copy()) + statsTrackers.foreach(_.newPartition(currentPartitionValues.get)) + } + if (isBucketed) { + currentBucketId = nextBucketId + } + + fileCounter = 0 + renewCurrentWriter(currentPartitionValues, currentBucketId, closeCurrentWriter = true) + } else if (description.maxRecordsPerFile > 0 && + recordsInFile >= description.maxRecordsPerFile) { + renewCurrentWriterIfTooManyRecords(currentPartitionValues, currentBucketId) + } + } + + override def write(record: InternalRow): Unit = { + record match { + case carrierRow: BatchCarrierRow => + carrierRow match { + case placeholderRow: PlaceholderRow => + // Do nothing. + case terminalRow: TerminalRow => + val numRows = terminalRow.batch().numRows() + if (numRows > 0) { + val blockStripes = GlutenFormatFactory.rowSplitter + .splitBlockByPartitionAndBucket(terminalRow.batch(), partitionColIndice, + isBucketed) + val iter = blockStripes.iterator() + while (iter.hasNext) { + val blockStripe = iter.next() + val headingRow = blockStripe.getHeadingRow + beforeWrite(headingRow) + val columnBatch = blockStripe.getColumnarBatch + currentWriter.write(terminalRow.withNewBatch(columnBatch)) + columnBatch.close() + } + blockStripes.release() + for (_ <- 0 until numRows) { + statsTrackers.foreach(_.newRow(currentWriter.path, record)) + } + recordsInFile += numRows + } + } + case _ => + beforeWrite(record) + writeRecord(record) + } + } + } +} +// spotless:on diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala new file mode 100644 index 000000000000..30e61730c17b --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.stats + +import org.apache.gluten.execution.{PlaceholderRow, TerminalRow, VeloxColumnarToRowExec} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.delta.DeltaIdentityColumnStatsTracker +import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker} +import org.apache.spark.sql.execution.metric.SQLMetric + +class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker) + extends WriteJobStatsTracker { + import GlutenDeltaJobStatisticsTracker._ + + override def newTaskInstance(): WriteTaskStatsTracker = { + new GlutenDeltaTaskStatisticsTracker( + delegate.newTaskInstance().asInstanceOf[DeltaTaskStatisticsTracker]) + } + + override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { + delegate.processStats(stats, jobCommitTime) + } +} + +class GlutenDeltaIdentityColumnStatsTracker(override val delegate: DeltaIdentityColumnStatsTracker) + extends GlutenDeltaJobStatisticsTracker(delegate) + +private object GlutenDeltaJobStatisticsTracker { + + /** + * This is a temporary implementation of statistics tracker for Delta Lake. It's sub-optimal in + * performance because it internally performs C2R then send rows to the delegate row-based + * tracker. + * + * TODO: Columnar-based statistics collection. + */ + private class GlutenDeltaTaskStatisticsTracker(delegate: DeltaTaskStatisticsTracker) + extends WriteTaskStatsTracker { + + private val c2r = new VeloxColumnarToRowExec.Converter(new SQLMetric("convertTime")) + + override def newPartition(partitionValues: InternalRow): Unit = { + delegate.newPartition(partitionValues) + } + + override def newFile(filePath: String): Unit = { + delegate.newFile(filePath) + } + + override def closeFile(filePath: String): Unit = { + delegate.closeFile(filePath) + } + + override def newRow(filePath: String, row: InternalRow): Unit = { + row match { + case _: PlaceholderRow => + case t: TerminalRow => + c2r.toRowIterator(t.batch()).foreach(eachRow => delegate.newRow(filePath, eachRow)) + case otherRow => + delegate.newRow(filePath, otherRow) + } + } + + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { + delegate.getFinalStats(taskCommitTime) + } + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala new file mode 100644 index 000000000000..832cc084bb49 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.delta.{GlutenOptimisticTransaction, OptimisticTransaction, TransactionExecutionObserver} +import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.metric.SQLMetric + +case class GlutenDeltaLeafV2CommandExec(delegate: LeafV2CommandExec) extends LeafV2CommandExec { + + override def metrics: Map[String, SQLMetric] = delegate.metrics + + override protected def run(): Seq[InternalRow] = { + TransactionExecutionObserver.withObserver( + DeltaV2WriteOperators.UseColumnarDeltaTransactionLog) { + delegate.executeCollect() + } + } + + override def output: Seq[Attribute] = { + delegate.output + } + + override def nodeName: String = "GlutenDelta " + delegate.nodeName +} + +case class GlutenDeltaLeafRunnableCommand(delegate: LeafRunnableCommand) + extends LeafRunnableCommand { + override lazy val metrics: Map[String, SQLMetric] = delegate.metrics + + override def output: Seq[Attribute] = { + delegate.output + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + TransactionExecutionObserver.withObserver( + DeltaV2WriteOperators.UseColumnarDeltaTransactionLog) { + delegate.run(sparkSession) + } + } + + override def nodeName: String = "GlutenDelta " + delegate.nodeName +} + +object DeltaV2WriteOperators { + object UseColumnarDeltaTransactionLog extends TransactionExecutionObserver { + override def startingTransaction(f: => OptimisticTransaction): OptimisticTransaction = { + val delegate = f + new GlutenOptimisticTransaction(delegate) + } + + override def preparingCommit[T](f: => T): T = f + + override def beginDoCommit(): Unit = () + + override def beginBackfill(): Unit = () + + override def beginPostCommit(): Unit = () + + override def transactionCommitted(): Unit = () + + override def transactionAborted(): Unit = () + + override def createChild(): TransactionExecutionObserver = { + TransactionExecutionObserver.getObserver + } + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala new file mode 100644 index 000000000000..60865682b322 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.gluten.config.VeloxDeltaConfig +import org.apache.gluten.extension.columnar.offload.OffloadSingleNode + +import org.apache.spark.sql.delta.catalog.DeltaCatalog +import org.apache.spark.sql.delta.commands.DeleteCommand +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.ExecutedCommandExec + +case class OffloadDeltaCommand() extends OffloadSingleNode { + override def offload(plan: SparkPlan): SparkPlan = { + if (!VeloxDeltaConfig.get.enableNativeWrite) { + return plan + } + plan match { + case ExecutedCommandExec(dc: DeleteCommand) => + ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(dc)) + case ctas: AtomicCreateTableAsSelectExec if ctas.catalog.isInstanceOf[DeltaCatalog] => + GlutenDeltaLeafV2CommandExec(ctas) + case rtas: AtomicReplaceTableAsSelectExec if rtas.catalog.isInstanceOf[DeltaCatalog] => + GlutenDeltaLeafV2CommandExec(rtas) + case other => other + } + } +} diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala index 7bd6a59b7b91..d32772f565dd 100644 --- a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala @@ -36,7 +36,10 @@ class DeleteSQLSuite extends DeleteSuiteBase Seq( // FIXME: Excluded by Gluten as results are mismatch. "test delete on temp view - nontrivial projection - SQL TempView", - "test delete on temp view - nontrivial projection - Dataset TempView" + "test delete on temp view - nontrivial projection - Dataset TempView", + // FIXME: Different error messages. + "test delete on temp view - superset cols - SQL TempView", + "test delete on temp view - superset cols - Dataset TempView" ) // For EXPLAIN, which is not supported in OSS diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala index 4e7326c8c15c..8b290e4db7fc 100644 --- a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -507,14 +507,14 @@ trait DeltaTestUtilsForTempViews expectedErrorClassForDataSetTempView: String = null): Unit = { if (isSQLTempView) { if (expectedErrorMsgForSQLTempView != null) { - errorContains(ex.getMessage, expectedErrorMsgForSQLTempView) + checkError(ex, expectedErrorMsgForSQLTempView) } if (expectedErrorClassForSQLTempView != null) { assert(ex.getErrorClass == expectedErrorClassForSQLTempView) } } else { if (expectedErrorMsgForDataSetTempView != null) { - errorContains(ex.getMessage, expectedErrorMsgForDataSetTempView) + checkError(ex, expectedErrorMsgForDataSetTempView) } if (expectedErrorClassForDataSetTempView != null) { assert(ex.getErrorClass == expectedErrorClassForDataSetTempView, ex.getMessage) diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala index 3d94d2bde33f..bf64858399d9 100644 --- a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.delta.test +import org.apache.gluten.config.VeloxDeltaConfig + import org.apache.spark.SparkConf import org.apache.spark.sql.delta.catalog.DeltaCatalog import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -47,6 +49,7 @@ trait DeltaSQLCommandTest extends SharedSparkSession { .set("spark.sql.shuffle.partitions", "1") .set("spark.memory.offHeap.size", "2g") .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key, "true") } } // spotless:on diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala index e722b2767a0d..9ada805c6e3a 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala @@ -74,6 +74,17 @@ object DataGen { def run(spark: SparkSession, source: String) } + object Feature { + def run(spark: SparkSession, source: String, feature: Feature): Unit = { + println(s"Executing feature: ${feature.name()}") + val start = System.nanoTime() + feature.run(spark, source) + val end = System.nanoTime() + println( + s"Finished executing feature: ${feature.name()}, elapsed time: ${(end - start) / 1e6} ms.") + } + } + class FeatureRegistry extends Serializable { private val lookup: mutable.LinkedHashMap[String, Feature] = mutable.LinkedHashMap() diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala index 50404228f521..0c4bf94c7142 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala @@ -137,11 +137,7 @@ class TpcdsDataGen( override def gen(): Unit = { Table.getBaseTables.forEach(t => writeParquetTable(t)) - features.foreach { - feature => - println(s"Execute feature: ${feature.name()}") - feature.run(spark, source) - } + features.foreach(feature => DataGen.Feature.run(spark, source, feature)) } } diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala index eaf4b4ded935..aed8653f62f1 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala @@ -58,11 +58,7 @@ class TpchDataGen( generate(dir, "part", partSchema, partitions, partGenerator, partParser) generate(dir, "region", regionSchema, regionGenerator, regionParser) - features.foreach { - feature => - println(s"Execute feature: ${feature.name()}") - feature.run(spark, source) - } + features.foreach(feature => DataGen.Feature.run(spark, source, feature)) } // lineitem