diff --git a/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index d2966eafee8d..29caf77b631b 100644 --- a/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -15,11 +15,12 @@ * limitations under the License. */ package org.apache.spark.sql.delta.catalog + +import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog, DeltaTimeTravelSpec, Snapshot} import org.apache.spark.sql.delta.actions.Metadata @@ -153,7 +154,7 @@ object ClickHouseTableV2 extends Logging { optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], disableBucketedScan: Boolean, - filterExprs: Seq[Expression]): Seq[InputPartition] = { + filterExprs: Seq[Expression]): Seq[Partition] = { val tableV2 = ClickHouseTableV2.getTable(deltaLog) MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions( diff --git a/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index d2966eafee8d..29caf77b631b 100644 --- a/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -15,11 +15,12 @@ * limitations under the License. */ package org.apache.spark.sql.delta.catalog + +import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog, DeltaTimeTravelSpec, Snapshot} import org.apache.spark.sql.delta.actions.Metadata @@ -153,7 +154,7 @@ object ClickHouseTableV2 extends Logging { optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], disableBucketedScan: Boolean, - filterExprs: Seq[Expression]): Seq[InputPartition] = { + filterExprs: Seq[Expression]): Seq[Partition] = { val tableV2 = ClickHouseTableV2.getTable(deltaLog) MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions( diff --git a/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index 2c7a57625bc3..f16134471c66 100644 --- a/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.delta.catalog +import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier @@ -24,7 +25,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.V1Table -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTableUtils, DeltaTimeTravelSpec, Snapshot, UnresolvedPathBasedDeltaTable} import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.delta.sources.DeltaDataSource @@ -147,7 +147,7 @@ object ClickHouseTableV2 extends Logging { optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], disableBucketedScan: Boolean, - filterExprs: Seq[Expression]): Seq[InputPartition] = { + filterExprs: Seq[Expression]): Seq[Partition] = { val tableV2 = ClickHouseTableV2.getTable(deltaLog) MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions( diff --git a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala index e4aff0329075..95de2e0414d4 100644 --- a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala +++ b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala @@ -292,7 +292,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite getExecutedPlan(df).map { case plan: IcebergScanTransformer => assert(plan.getKeyGroupPartitioning.isDefined) - assert(plan.getSplitInfosWithIndex.length == 3) + assert(plan.getSplitInfos.length == 3) case _ => // do nothing } } @@ -372,7 +372,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite getExecutedPlan(df).map { case plan: IcebergScanTransformer => assert(plan.getKeyGroupPartitioning.isDefined) - assert(plan.getSplitInfosWithIndex.length == 3) + assert(plan.getSplitInfos.length == 3) case _ => // do nothing } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 07c3d144097e..5fff386d7258 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -27,14 +27,13 @@ import org.apache.gluten.substrait.rel._ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.vectorized.{BatchIterator, CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator} -import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext} +import org.apache.spark.{InterruptibleIterator, Partition, SparkConf, TaskContext} import org.apache.spark.affinity.CHAffinity import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging import org.apache.spark.shuffle.CHColumnarShuffleWriter import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer @@ -125,12 +124,16 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { } override def genSplitInfo( - partition: InputPartition, + partitionIndex: Int, + partitions: Seq[Partition], partitionSchema: StructType, dataSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], properties: Map[String, String]): SplitInfo = { + // todo: support multi partitions + assert(partitions.size == 1) + val partition = partitions.head partition match { case p: GlutenMergeTreePartition => ExtensionTableBuilder diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala index 9da8eed20217..ddc857182f6e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala @@ -22,12 +22,12 @@ import org.apache.gluten.execution.{CHHashAggregateExecTransformer, WriteFilesEx import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.expression.{BooleanLiteralNode, ExpressionBuilder, ExpressionNode} -import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil} +import org.apache.gluten.utils.{CHPartitionsUtil, ExpressionDocUtil} +import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.rpc.GlutenDriverEndpoint import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.delta.MergeTreeFileFormat import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex @@ -51,8 +51,7 @@ import java.util class CHTransformerApi extends TransformerApi with Logging { - /** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */ - def genInputPartitionSeq( + def genPartitionSeq( relation: HadoopFsRelation, requiredSchema: StructType, selectedPartitions: Array[PartitionDirectory], @@ -61,7 +60,7 @@ class CHTransformerApi extends TransformerApi with Logging { optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], disableBucketedScan: Boolean, - filterExprs: Seq[Expression]): Seq[InputPartition] = { + filterExprs: Seq[Expression]): Seq[Partition] = { relation.location match { case index: TahoeFileIndex if relation.fileFormat @@ -81,7 +80,7 @@ class CHTransformerApi extends TransformerApi with Logging { ) case _ => // Generate FilePartition for Parquet - CHInputPartitionsUtil( + CHPartitionsUtil( relation, requiredSchema, selectedPartitions, @@ -89,7 +88,7 @@ class CHTransformerApi extends TransformerApi with Logging { bucketedScan, optionalBucketSet, optionalNumCoalescedBuckets, - disableBucketedScan).genInputPartitionSeq() + disableBucketedScan).genPartitionSeq() } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala index bdb716c67640..fb9fc4b4d831 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala @@ -24,8 +24,8 @@ import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo} +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -53,7 +53,7 @@ case class CHRangeExecTransformer( } } - override def getPartitions: Seq[InputPartition] = { + override def getPartitions: Seq[Partition] = { (0 until numSlices).map { sliceIndex => GlutenRangeExecPartition(start, end, step, numSlices, sliceIndex) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala index 46f40fc25b91..3cbce8423aba 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.execution +import org.apache.spark.Partition import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.types.StructType @@ -95,7 +96,8 @@ case class GlutenMergeTreePartition( partList: Array[MergeTreePartSplit], tableSchema: StructType, clickhouseTableConfigs: Map[String, String]) - extends InputPartition { + extends Partition + with InputPartition { override def preferredLocations(): Array[String] = { Array.empty[String] } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala deleted file mode 100644 index cf0508d6cb16..000000000000 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.execution - -import org.apache.gluten.metrics.GlutenTimeMetric -import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator} - -import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.vectorized.ColumnarBatch - -import java.util -import java.util.concurrent.TimeUnit.NANOSECONDS - -class NativeFileScanColumnarRDD( - @transient sc: SparkContext, - @transient private val inputPartitions: Seq[InputPartition], - numOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - scanTime: SQLMetric) - extends RDD[ColumnarBatch](sc, Nil) { - - override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { - val inputPartition = castNativePartition(split) - - assert( - inputPartition.isInstanceOf[GlutenPartition], - "NativeFileScanColumnarRDD only accepts GlutenPartition.") - - val splitInfoByteArray = inputPartition - .asInstanceOf[GlutenPartition] - .splitInfos - .map(splitInfo => splitInfo.toProtobuf.toByteArray) - .toArray - - val resIter = GlutenTimeMetric.millis(scanTime) { - _ => - val inBatchIters = new util.ArrayList[ColumnarNativeIterator]() - CHNativeExpressionEvaluator.createKernelWithBatchIterator( - inputPartition.plan, - splitInfoByteArray, - inBatchIters, - false, - split.index - ) - } - TaskContext - .get() - .addTaskFailureListener( - (ctx, _) => { - if (ctx.isInterrupted()) { - resIter.cancel() - } - }) - TaskContext.get().addTaskCompletionListener[Unit](_ => resIter.close()) - val iter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] { - var scanTotalTime = 0L - var scanTimeAdded = false - - override def hasNext: Boolean = { - val res = GlutenTimeMetric.withNanoTime(resIter.hasNext)(t => scanTotalTime += t) - if (!res && !scanTimeAdded) { - scanTime += NANOSECONDS.toMillis(scanTotalTime) - scanTimeAdded = true - } - res - } - - override def next(): ColumnarBatch = { - GlutenTimeMetric.withNanoTime { - val cb = resIter.next() - numOutputRows += cb.numRows() - numOutputBatches += 1 - cb - }(t => scanTotalTime += t) - } - } - new CloseableCHColumnBatchIterator(iter) - } - - private def castNativePartition(split: Partition): BaseGlutenPartition = split match { - case FirstZippedPartitionsPartition(_, p: BaseGlutenPartition, _) => p - case _ => throw new SparkException(s"[BUG] Not a NativeSubstraitPartition: $split") - } - - override def getPreferredLocations(split: Partition): Seq[String] = { - castPartition(split).inputPartition.preferredLocations() - } - - private def castPartition(split: Partition): FirstZippedPartitionsPartition = split match { - case p: FirstZippedPartitionsPartition => p - case _ => throw new SparkException(s"[BUG] Not a NativeSubstraitPartition: $split") - } - - override protected def getPartitions: Array[Partition] = { - inputPartitions.zipWithIndex.map { - case (inputPartition, index) => FirstZippedPartitionsPartition(index, inputPartition) - }.toArray - } -} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHPartitionsUtil.scala similarity index 94% rename from backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala rename to backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHPartitionsUtil.scala index 39f86429e2a6..6eaab6815832 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHPartitionsUtil.scala @@ -19,9 +19,9 @@ package org.apache.gluten.utils import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SparkResourceUtil @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path import scala.collection.mutable.ArrayBuffer -case class CHInputPartitionsUtil( +case class CHPartitionsUtil( relation: HadoopFsRelation, requiredSchema: StructType, selectedPartitions: Array[PartitionDirectory], @@ -42,15 +42,15 @@ case class CHInputPartitionsUtil( disableBucketedScan: Boolean) extends Logging { - def genInputPartitionSeq(): Seq[InputPartition] = { + def genPartitionSeq(): Seq[Partition] = { if (bucketedScan) { - genBucketedInputPartitionSeq() + genBucketedPartitionSeq() } else { - genNonBuckedInputPartitionSeq() + genNonBuckedPartitionSeq() } } - private def genNonBuckedInputPartitionSeq(): Seq[InputPartition] = { + private def genNonBuckedPartitionSeq(): Seq[Partition] = { val maxSplitBytes = FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions) @@ -111,7 +111,7 @@ case class CHInputPartitionsUtil( } } - private def genBucketedInputPartitionSeq(): Seq[InputPartition] = { + private def genBucketedPartitionSeq(): Seq[Partition] = { val bucketSpec = relation.bucketSpec.get logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala index 749de3f9d493..96caf5b790af 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala @@ -26,12 +26,12 @@ import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder import org.apache.gluten.substrait.rel.RelBuilder +import org.apache.spark.Partition import org.apache.spark.affinity.CHAffinity import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.delta.ClickhouseSnapshot import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex @@ -69,7 +69,7 @@ object MergeTreePartsPartitionsUtil extends Logging { optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], disableBucketedScan: Boolean, - filterExprs: Seq[Expression]): Seq[InputPartition] = { + filterExprs: Seq[Expression]): Seq[Partition] = { if ( !relation.location.isInstanceOf[TahoeFileIndex] || !relation.fileFormat .isInstanceOf[DeltaMergeTreeFileFormat] @@ -82,7 +82,7 @@ object MergeTreePartsPartitionsUtil extends Logging { val snapshotId = ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(stalenessAcceptable = true)) - val partitions = new ArrayBuffer[InputPartition] + val partitions = new ArrayBuffer[Partition] val (database, tableName) = if (table.catalogTable.isDefined) { (table.catalogTable.get.identifier.database.get, table.catalogTable.get.identifier.table) } else { @@ -95,7 +95,7 @@ object MergeTreePartsPartitionsUtil extends Logging { // bucket table if (table.bucketOption.isDefined && bucketedScan) { - genBucketedInputPartitionSeq( + genBucketedPartitionSeq( engine, database, tableName, @@ -115,7 +115,7 @@ object MergeTreePartsPartitionsUtil extends Logging { sparkSession ) } else { - genInputPartitionSeq( + genPartitionSeq( relation, engine, database, @@ -137,7 +137,7 @@ object MergeTreePartsPartitionsUtil extends Logging { partitions.toSeq } - def genInputPartitionSeq( + def genPartitionSeq( relation: HadoopFsRelation, engine: String, database: String, @@ -148,7 +148,7 @@ object MergeTreePartsPartitionsUtil extends Logging { optionalBucketSet: Option[BitSet], selectedPartitions: Array[PartitionDirectory], tableSchema: StructType, - partitions: ArrayBuffer[InputPartition], + partitions: ArrayBuffer[Partition], table: ClickHouseTableV2, clickhouseTableConfigs: Map[String, String], output: Seq[Attribute], @@ -316,7 +316,7 @@ object MergeTreePartsPartitionsUtil extends Logging { relativeTablePath: String, absoluteTablePath: String, tableSchema: StructType, - partitions: ArrayBuffer[InputPartition], + partitions: ArrayBuffer[Partition], table: ClickHouseTableV2, clickhouseTableConfigs: Map[String, String], splitFiles: Seq[MergeTreePartSplit], @@ -372,7 +372,7 @@ object MergeTreePartsPartitionsUtil extends Logging { relativeTablePath: String, absoluteTablePath: String, tableSchema: StructType, - partitions: ArrayBuffer[InputPartition], + partitions: ArrayBuffer[Partition], table: ClickHouseTableV2, clickhouseTableConfigs: Map[String, String], splitFiles: Seq[MergeTreePartSplit], @@ -441,7 +441,7 @@ object MergeTreePartsPartitionsUtil extends Logging { } /** Generate bucket partition */ - def genBucketedInputPartitionSeq( + def genBucketedPartitionSeq( engine: String, database: String, tableName: String, @@ -453,7 +453,7 @@ object MergeTreePartsPartitionsUtil extends Logging { optionalNumCoalescedBuckets: Option[Int], selectedPartitions: Array[PartitionDirectory], tableSchema: StructType, - partitions: ArrayBuffer[InputPartition], + partitions: ArrayBuffer[Partition], table: ClickHouseTableV2, clickhouseTableConfigs: Map[String, String], output: Seq[Attribute], diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala index 8a8c2e321c4e..fe1ca1e6f694 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala @@ -19,8 +19,7 @@ package org.apache.gluten.execution.mergetree import org.apache.gluten.affinity.{CHUTAffinity, CHUTSoftAffinityManager} import org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, GlutenMergeTreePartition, MergeTreePartSplit} -import org.apache.spark.SparkConf -import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.{Partition, SparkConf} import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil import org.apache.spark.sql.types.StructType @@ -61,7 +60,7 @@ class GlutenClickhouseMergetreeSoftAffinitySuite test("Soft Affinity Scheduler with duplicate reading detection") { - val partitions: ArrayBuffer[InputPartition] = new ArrayBuffer[InputPartition]() + val partitions: ArrayBuffer[Partition] = new ArrayBuffer[Partition]() var splitFiles: Seq[MergeTreePartSplit] = Seq() val relativeTablePath = "tmp/" diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 4afad58c9ac2..9bb670f10e9f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -28,12 +28,11 @@ import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, Split import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.vectorized._ -import org.apache.spark.{SparkConf, TaskContext} +import org.apache.spark.{Partition, SparkConf, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.softaffinity.SoftAffinity import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ @@ -66,67 +65,21 @@ class VeloxIteratorApi extends IteratorApi with Logging { } override def genSplitInfo( - partition: InputPartition, - partitionSchema: StructType, - dataSchema: StructType, - fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo = { - partition match { - case f: FilePartition => - val ( - paths, - starts, - lengths, - fileSizes, - modificationTimes, - partitionColumns, - metadataColumns, - otherMetadataColumns) = - constructSplitInfo(partitionSchema, f.files, metadataColumnNames) - val preferredLocations = - SoftAffinity.getFilePartitionLocations(f) - setFileSchemaForLocalFiles( - LocalFilesBuilder.makeLocalFiles( - f.index, - paths, - starts, - lengths, - fileSizes, - modificationTimes, - partitionColumns, - metadataColumns, - fileFormat, - preferredLocations.toList.asJava, - mapAsJavaMap(properties), - otherMetadataColumns - ), - dataSchema, - fileFormat - ) - case _ => - throw new UnsupportedOperationException(s"Unsupported input partition.") - } - } - - override def genSplitInfoForPartitions( partitionIndex: Int, - partitions: Seq[InputPartition], + partitions: Seq[Partition], partitionSchema: StructType, dataSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], properties: Map[String, String]): SplitInfo = { - val partitionFiles = partitions.flatMap { - p => - if (!p.isInstanceOf[FilePartition]) { - throw new UnsupportedOperationException( - s"Unsupported input partition ${p.getClass.getName}.") - } - p.asInstanceOf[FilePartition].files - }.toArray - val locations = - partitions.flatMap(p => SoftAffinity.getFilePartitionLocations(p.asInstanceOf[FilePartition])) + val filePartitions: Seq[FilePartition] = partitions.map { + case p: FilePartition => p + case o => + throw new UnsupportedOperationException( + s"Unsupported input partition: ${o.getClass.getName}") + } + val partitionFiles = filePartitions.flatMap(_.files).toArray + val locations = filePartitions.flatMap(p => SoftAffinity.getFilePartitionLocations(p)) val ( paths, starts, diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala index 76a1a0e3a7fc..5222265beab6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala @@ -25,12 +25,12 @@ import org.apache.gluten.proto.ConfigMap import org.apache.gluten.runtime.Runtimes import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} -import org.apache.gluten.utils.InputPartitionsUtil +import org.apache.gluten.utils.PartitionsUtil import org.apache.gluten.vectorized.PlanEvaluatorJniWrapper +import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.execution.HiveFileFormat @@ -44,8 +44,7 @@ import java.util.{Map => JMap} class VeloxTransformerApi extends TransformerApi with Logging { - /** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */ - def genInputPartitionSeq( + def genPartitionSeq( relation: HadoopFsRelation, requiredSchema: StructType, selectedPartitions: Array[PartitionDirectory], @@ -54,8 +53,8 @@ class VeloxTransformerApi extends TransformerApi with Logging { optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], disableBucketedScan: Boolean, - filterExprs: Seq[Expression] = Seq.empty): Seq[InputPartition] = { - InputPartitionsUtil( + filterExprs: Seq[Expression] = Seq.empty): Seq[Partition] = { + PartitionsUtil( relation, requiredSchema, selectedPartitions, @@ -64,7 +63,7 @@ class VeloxTransformerApi extends TransformerApi with Logging { optionalBucketSet, optionalNumCoalescedBuckets, disableBucketedScan) - .genInputPartitionSeq() + .genPartitionSeq() } override def postProcessNativeConfig( diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index 4cdc51fe2aa0..7bfa522a481c 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -17,16 +17,18 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution.IcebergScanTransformer.{containsMetadataColumn, containsUuidOrFixedType} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.{LocalFilesNode, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.connector.read.{InputPartition, Scan} +import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.{ArrayType, DataType, StructType} @@ -137,30 +139,11 @@ case class IcebergScanTransformer( override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan) - override def getSplitInfosWithIndex: Seq[SplitInfo] = { - val splitInfos = getPartitionsWithIndex.zipWithIndex.map { - case (partitions, index) => - GlutenIcebergSourceUtil.genSplitInfo(partitions, index, getPartitionSchema) - } - numSplits.add(splitInfos.map(s => s.asInstanceOf[LocalFilesNode].getPaths.size()).sum) - splitInfos - } - - override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { - val groupedPartitions = SparkShimLoader.getSparkShims - .orderPartitions( - this, - scan, - keyGroupedPartitioning, - filteredPartitions, - outputPartitioning, - commonPartitionValues, - applyPartialClustering, - replicatePartitions) - .flatten - val splitInfos = groupedPartitions.zipWithIndex.map { - case (p, index) => - GlutenIcebergSourceUtil.genSplitInfoForPartition(p, index, getPartitionSchema) + override def getSplitInfosFromPartitions(partitions: Seq[Partition]): Seq[SplitInfo] = { + val splitInfos = partitions.map { + case p: SparkDataSourceRDDPartition => + GlutenIcebergSourceUtil.genSplitInfo(p, getPartitionSchema) + case _ => throw new GlutenNotSupportException() } numSplits.add(splitInfos.map(s => s.asInstanceOf[LocalFilesNode].getPaths.size()).sum) splitInfos diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala index 3816499a97a1..2436166e923a 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -18,12 +18,13 @@ package org.apache.iceberg.spark.source import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenNotSupportException +import org.apache.gluten.execution.SparkDataSourceRDDPartition import org.apache.gluten.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.softaffinity.SoftAffinity import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils -import org.apache.spark.sql.connector.read.{InputPartition, Scan} +import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.types.StructType import org.apache.iceberg._ @@ -36,67 +37,17 @@ import scala.collection.JavaConverters._ object GlutenIcebergSourceUtil { - def genSplitInfoForPartition( - inputPartition: InputPartition, - index: Int, - readPartitionSchema: StructType): SplitInfo = inputPartition match { - case partition: SparkInputPartition => - val paths = new JArrayList[String]() - val starts = new JArrayList[JLong]() - val lengths = new JArrayList[JLong]() - val partitionColumns = new JArrayList[JMap[String, String]]() - val deleteFilesList = new JArrayList[JList[DeleteFile]]() - var fileFormat = ReadFileFormat.UnknownFormat - - val tasks = partition.taskGroup[ScanTask]().tasks().asScala - asFileScanTask(tasks.toList).foreach { - task => - paths.add( - BackendsApiManager.getTransformerApiInstance - .encodeFilePathIfNeed(task.file().path().toString)) - starts.add(task.start()) - lengths.add(task.length()) - partitionColumns.add(getPartitionColumns(task, readPartitionSchema)) - deleteFilesList.add(task.deletes()) - val currentFileFormat = convertFileFormat(task.file().format()) - if (fileFormat == ReadFileFormat.UnknownFormat) { - fileFormat = currentFileFormat - } else if (fileFormat != currentFileFormat) { - throw new UnsupportedOperationException( - s"Only one file format is supported, " + - s"find different file format $fileFormat and $currentFileFormat") - } - } - val preferredLoc = SoftAffinity.getFilePartitionLocations( - paths.asScala.toArray, - inputPartition.preferredLocations()) - IcebergLocalFilesBuilder.makeIcebergLocalFiles( - index, - paths, - starts, - lengths, - partitionColumns, - fileFormat, - preferredLoc.toList.asJava, - deleteFilesList - ) - case _ => - throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.") - } - def genSplitInfo( - inputPartitions: Seq[InputPartition], - index: Int, + partition: SparkDataSourceRDDPartition, readPartitionSchema: StructType): SplitInfo = { val paths = new JArrayList[String]() val starts = new JArrayList[JLong]() val lengths = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]]() val deleteFilesList = new JArrayList[JList[DeleteFile]]() - val preferredLocs = new JArrayList[String]() var fileFormat = ReadFileFormat.UnknownFormat - inputPartitions.foreach { + partition.inputPartitions.foreach { case partition: SparkInputPartition => val tasks = partition.taskGroup[ScanTask]().tasks().asScala asFileScanTask(tasks.toList).foreach { @@ -117,17 +68,18 @@ object GlutenIcebergSourceUtil { s"find different file format $fileFormat and $currentFileFormat") } } - preferredLocs.addAll(partition.preferredLocations().toList.asJava) + case o => + throw new GlutenNotSupportException(s"Unsupported input partition type: $o") } IcebergLocalFilesBuilder.makeIcebergLocalFiles( - index, + partition.index, paths, starts, lengths, partitionColumns, fileFormat, SoftAffinity - .getFilePartitionLocations(paths.asScala.toArray, preferredLocs.asScala.toArray) + .getFilePartitionLocations(paths.asScala.toArray, partition.preferredLocations()) .toList .asJava, deleteFilesList diff --git a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala index 296db20829c2..890c408658c7 100644 --- a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala @@ -271,7 +271,7 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { getExecutedPlan(df).map { case plan: IcebergScanTransformer => assert(plan.getKeyGroupPartitioning.isDefined) - assert(plan.getSplitInfosWithIndex.length == 3) + assert(plan.getSplitInfos.length == 3) case _ => // do nothing } } @@ -348,7 +348,7 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { getExecutedPlan(df).map { case plan: IcebergScanTransformer => assert(plan.getKeyGroupPartitioning.isDefined) - assert(plan.getSplitInfosWithIndex.length == 3) + assert(plan.getSplitInfos.length == 3) case _ => // do nothing } } diff --git a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala index f6443227643a..2ab458f6f761 100644 --- a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala +++ b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala @@ -20,6 +20,7 @@ import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.rel.{ReadRelNode, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.connector.catalog.Table @@ -67,7 +68,10 @@ case class MicroBatchScanExecTransformer( override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty - override def getPartitions: Seq[InputPartition] = inputPartitionsShim + // todo: consider grouped partitions + override def getPartitions: Seq[Partition] = inputPartitionsShim.zipWithIndex.map { + case (inputPartition, index) => new SparkDataSourceRDDPartition(index, Seq(inputPartition)) + } /** Returns the actual schema of this data source scan. */ override def getDataSchema: StructType = scan.readSchema() @@ -80,7 +84,7 @@ case class MicroBatchScanExecTransformer( MicroBatchScanExecTransformer.supportsBatchScan(scan) } - override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { + override def getSplitInfosFromPartitions(partitions: Seq[Partition]): Seq[SplitInfo] = { val groupedPartitions = filteredPartitions.flatten groupedPartitions.zipWithIndex.map { case (p, _) => GlutenStreamKafkaSourceUtil.genSplitInfo(p) diff --git a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala index 31eb1970917d..a4911c8263ed 100644 --- a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala +++ b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala @@ -21,6 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.{PaimonLocalFilesBuilder, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.spark.Partition import org.apache.spark.rdd.RDD import org.apache.spark.softaffinity.SoftAffinity import org.apache.spark.sql.catalyst.InternalRow @@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.connector.read.{InputPartition, Scan} +import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -119,16 +120,17 @@ case class PaimonScanTransformer( override def doExecuteColumnar(): RDD[ColumnarBatch] = throw new UnsupportedOperationException() - override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { + override def getSplitInfosFromPartitions(partitions: Seq[Partition]): Seq[SplitInfo] = { val partitionComputer = PaimonScanTransformer.getRowDataPartitionComputer(scan) - getPartitions.zipWithIndex.map { - case (p, index) => - p match { + partitions.map { + case p: SparkDataSourceRDDPartition => + val paths = mutable.ListBuffer.empty[String] + val starts = mutable.ListBuffer.empty[JLong] + val lengths = mutable.ListBuffer.empty[JLong] + val partitionColumns = mutable.ListBuffer.empty[JMap[String, String]] + + p.inputPartitions.foreach { case partition: PaimonInputPartition => - val paths = mutable.ListBuffer.empty[String] - val starts = mutable.ListBuffer.empty[JLong] - val lengths = mutable.ListBuffer.empty[JLong] - val partitionColumns = mutable.ListBuffer.empty[JMap[String, String]] partition.splits.foreach { split => val rawFilesOpt = split.convertToRawFiles() @@ -145,21 +147,24 @@ case class PaimonScanTransformer( "Cannot get raw files from paimon SparkInputPartition.") } } - val preferredLoc = - SoftAffinity.getFilePartitionLocations(paths.toArray, partition.preferredLocations()) - PaimonLocalFilesBuilder.makePaimonLocalFiles( - index, - paths.asJava, - starts.asJava, - lengths.asJava, - partitionColumns.asJava, - fileFormat, - preferredLoc.toList.asJava, - new JHashMap[String, String]() - ) - case _ => - throw new GlutenNotSupportException("Only support paimon SparkInputPartition.") + case o => + throw new GlutenNotSupportException(s"Unsupported input partition type: $o") } + + PaimonLocalFilesBuilder.makePaimonLocalFiles( + p.index, + paths.asJava, + starts.asJava, + lengths.asJava, + partitionColumns.asJava, + fileFormat, + SoftAffinity + .getFilePartitionLocations(paths.toArray, p.preferredLocations()) + .toList + .asJava, + new JHashMap[String, String]() + ) + case _ => throw new GlutenNotSupportException() } } diff --git a/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala b/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala index 91493283a000..fddac2123d40 100644 --- a/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala +++ b/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala @@ -94,4 +94,21 @@ abstract class PaimonSuite extends WholeStageTransformerSuite { } } } + + test("paimon transformer exists with bucket table") { + withTable(s"paimon_tbl") { + sql(s""" + |CREATE TABLE paimon_tbl (id INT, name STRING) + |USING paimon + |TBLPROPERTIES ( + | 'bucket' = '1', + | 'bucket-key' = 'id' + |) + |""".stripMargin) + sql(s"INSERT INTO paimon_tbl VALUES (1, 'Bob'), (2, 'Blue'), (3, 'Mike')") + runQueryAndCompare("SELECT * FROM paimon_tbl") { + checkGlutenOperatorMatch[PaimonScanTransformer] + } + } + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 34f7f41b786a..52be6c49547e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -23,7 +23,6 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.substrait.rel.SplitInfo import org.apache.spark._ -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper @@ -32,21 +31,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch trait IteratorApi { def genSplitInfo( - partition: InputPartition, - partitionSchema: StructType, - dataSchema: StructType, - fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo - - def genSplitInfoForPartitions( partitionIndex: Int, - partition: Seq[InputPartition], + partition: Seq[Partition], partitionSchema: StructType, dataSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo = throw new UnsupportedOperationException() + properties: Map[String, String]): SplitInfo /** Generate native row partition. */ def genPartitions( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala index 92d6ebd32574..a85553766443 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala @@ -20,8 +20,8 @@ import org.apache.gluten.execution.WriteFilesExecTransformer import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.expression.ExpressionNode +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.types.{DataType, DecimalType, StructType} @@ -33,8 +33,8 @@ import java.util trait TransformerApi { - /** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */ - def genInputPartitionSeq( + /** Generate Seq[Partition] for FileSourceScanExecTransformer. */ + def genPartitionSeq( relation: HadoopFsRelation, requiredSchema: StructType, selectedPartitions: Array[PartitionDirectory], @@ -43,7 +43,7 @@ trait TransformerApi { optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], disableBucketedScan: Boolean, - filterExprs: Seq[Expression] = Seq.empty): Seq[InputPartition] + filterExprs: Seq[Expression] = Seq.empty): Seq[Partition] /** * Post-process native config, For example, for ClickHouse backend, sync 'spark.executor.cores' to diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index a8524d53f2ee..86b76e7a25ab 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -25,8 +25,8 @@ import org.apache.gluten.substrait.extensions.ExtensionBuilder import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.types.{StringType, StructField, StructType} import com.google.protobuf.StringValue @@ -56,21 +56,27 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource /** Returns the file format properties. */ def getProperties: Map[String, String] = Map.empty - /** Returns the split infos that will be processed by the underlying native engine. */ override def getSplitInfos: Seq[SplitInfo] = { getSplitInfosFromPartitions(getPartitions) } - def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { + def getSplitInfosFromPartitions(partitions: Seq[Partition]): Seq[SplitInfo] = { partitions.map( - BackendsApiManager.getIteratorApiInstance - .genSplitInfo( - _, - getPartitionSchema, - getDataSchema, - fileFormat, - getMetadataColumns().map(_.name), - getProperties)) + p => { + val ps = p match { + case sp: SparkDataSourceRDDPartition => sp.inputPartitions.map(_.asInstanceOf[Partition]) + case o => Seq(o) + } + BackendsApiManager.getIteratorApiInstance + .genSplitInfo( + p.index, + ps, + getPartitionSchema, + getDataSchema, + fileFormat, + getMetadataColumns().map(_.name), + getProperties) + }) } override protected def doValidateInternal(): ValidationResult = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala index 03a840cccdcf..0b0dc7c52f00 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala @@ -21,15 +21,15 @@ import org.apache.gluten.expression.ExpressionConverter import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat -import org.apache.gluten.substrait.rel.SplitInfo import org.apache.gluten.utils.FileIndexUtil +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.connector.read.{InputPartition, Scan} +import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType @@ -128,26 +128,7 @@ abstract class BatchScanExecTransformerBase( override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty - // With storage partition join, the return partition type is changed, so as SplitInfo - def getPartitionsWithIndex: Seq[Seq[InputPartition]] = finalPartitions - - def getSplitInfosWithIndex: Seq[SplitInfo] = { - getPartitionsWithIndex.zipWithIndex.map { - case (partitions, index) => - BackendsApiManager.getIteratorApiInstance - .genSplitInfoForPartitions( - index, - partitions, - getPartitionSchema, - getDataSchema, - fileFormat, - getMetadataColumns().map(_.name), - getProperties) - } - } - - // May cannot call for bucket scan - override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions + def getPartitions: Seq[Partition] = finalPartitions override def getPartitionSchema: StructType = scan match { case fileScan: FileScan => fileScan.readPartitionSchema @@ -195,19 +176,21 @@ abstract class BatchScanExecTransformerBase( override def metricsUpdater(): MetricsUpdater = BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetricsUpdater(metrics) - @transient protected lazy val filteredFlattenPartitions: Seq[InputPartition] = - filteredPartitions.flatten - - @transient protected lazy val finalPartitions: Seq[Seq[InputPartition]] = - SparkShimLoader.getSparkShims.orderPartitions( - this, - scan, - keyGroupedPartitioning, - filteredPartitions, - outputPartitioning, - commonPartitionValues, - applyPartialClustering, - replicatePartitions) + @transient protected lazy val finalPartitions: Seq[Partition] = + SparkShimLoader.getSparkShims + .orderPartitions( + this, + scan, + keyGroupedPartitioning, + filteredPartitions, + outputPartitioning, + commonPartitionValues, + applyPartialClustering, + replicatePartitions) + .zipWithIndex + .map { + case (inputPartitions, index) => new SparkDataSourceRDDPartition(index, inputPartitions) + } @transient override lazy val fileFormat: ReadFileFormat = BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala index 61c94747bca2..0584922bca7f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala @@ -23,11 +23,11 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.utils.FileIndexUtil +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.connector.read.streaming.SparkDataStream import org.apache.spark.sql.execution.FileSourceScanExecShim import org.apache.spark.sql.execution.datasources.HadoopFsRelation @@ -123,18 +123,19 @@ abstract class FileSourceScanExecTransformerBase( override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns - override def getPartitions: Seq[InputPartition] = { - BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq( - relation, - requiredSchema, - getPartitionArray(), - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan, - filterExprs() - ) + override def getPartitions: Seq[Partition] = { + BackendsApiManager.getTransformerApiInstance + .genPartitionSeq( + relation, + requiredSchema, + getPartitionArray(), + output, + bucketedScan, + optionalBucketSet, + optionalNumCoalescedBuckets, + disableBucketedScan, + filterExprs() + ) } override def getPartitionSchema: StructType = relation.partitionSchema diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index 263d56720cea..4a00dbb58727 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -47,13 +47,13 @@ case class GlutenPartition( case class FirstZippedPartitionsPartition( index: Int, - inputPartition: InputPartition, + inputPartition: Partition, inputColumnarRDDPartitions: Seq[Partition] = Seq.empty) extends Partition class GlutenWholeStageColumnarRDD( @transient sc: SparkContext, - @transient private val inputPartitions: Seq[InputPartition], + @transient private val inputPartitions: Seq[Partition], var rdds: ColumnarInputRDDsWrapper, pipelineTime: SQLMetric, updateInputMetrics: InputMetricsWrapper => Unit, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/SparkDataSourceRDDPartition.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SparkDataSourceRDDPartition.scala new file mode 100644 index 000000000000..e3313c9cd296 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SparkDataSourceRDDPartition.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.Partition +import org.apache.spark.sql.connector.read.InputPartition + +/** + * Copy from spark's [[org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition]] to + * make compatible with spark3.3 and before. + */ +class SparkDataSourceRDDPartition(val index: Int, val inputPartitions: Seq[InputPartition]) + extends Partition + with Serializable + with InputPartition { + + override def preferredLocations(): Array[String] = { + inputPartitions.flatMap(_.preferredLocations()).toArray + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index f87b9673cc56..264195f93f5a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -18,7 +18,6 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.config.GlutenConfig -import org.apache.gluten.exception.GlutenException import org.apache.gluten.expression._ import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater} @@ -35,7 +34,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeTag -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper @@ -136,10 +134,12 @@ trait TransformSupport extends ValidatablePlan { trait LeafTransformSupport extends TransformSupport with LeafExecNode { final override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = Seq.empty + + /** Returns the split infos that will be processed by the underlying native engine. */ def getSplitInfos: Seq[SplitInfo] /** Returns the partitions generated by this data source scan. */ - def getPartitions: Seq[InputPartition] + def getPartitions: Seq[Partition] } trait UnaryTransformSupport extends TransformSupport with UnaryExecNode { @@ -293,32 +293,38 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f allLeafTransformers.toSeq } + /** + * If containing leaf exec transformer this "whole stage" generates a RDD which itself takes care + * of [[LeafTransformSupport]] there won't be any other RDD for leaf operator. As a result, + * genFirstStageIterator rather than genFinalStageIterator will be invoked + */ private def generateWholeStageRDD( leafTransformers: Seq[LeafTransformSupport], wsCtx: WholeStageTransformContext, inputRDDs: ColumnarInputRDDsWrapper, pipelineTime: SQLMetric): RDD[ColumnarBatch] = { - val isKeyGroupPartition = leafTransformers.exists { - // TODO: May can apply to BatchScanExecTransformer without key group partitioning - case b: BatchScanExecTransformerBase if b.keyGroupedPartitioning.isDefined => true - case _ => false - } - /** - * If containing leaf exec transformer this "whole stage" generates a RDD which itself takes - * care of [[LeafTransformSupport]] there won't be any other RDD for leaf operator. As a result, - * genFirstStageIterator rather than genFinalStageIterator will be invoked - */ - val allInputPartitions = leafTransformers.map( - leafTransformer => { - if (isKeyGroupPartition) { - leafTransformer.asInstanceOf[BatchScanExecTransformerBase].getPartitionsWithIndex - } else { - Seq(leafTransformer.getPartitions) - } - }) - - val allSplitInfos = getSplitInfosFromPartitions(isKeyGroupPartition, leafTransformers) + // If these are two leaf transformers, they must have same partitions, + // otherwise, exchange will be inserted. We should combine the two leaf + // transformers' partitions with same index, and set them together in + // the substraitContext. + // We use transpose to do that, You can refer to + // the diagram below. + // leaf1 p11 p12 p13 p14 ... p1n + // leaf2 p21 p22 p23 p24 ... p2n + // transpose => + // leaf1 | leaf2 + // p11 | p21 => substraitContext.setSplitInfo([p11, p21]) + // p12 | p22 => substraitContext.setSplitInfo([p12, p22]) + // p13 | p23 ... + // p14 | p24 + // ... + // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n]) + // The data in partition may be empty, for example, + // if these are two batch scan transformer with keyGroupPartitioning, + // they have same partitionValues, + // but some partitions maybe empty for hose partition values that are not present. + val allSplitInfos = leafTransformers.map(_.getSplitInfos).transpose if (GlutenConfig.get.enableHdfsViewfs) { val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty @@ -356,6 +362,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f wsCtx.enableCudf ) + val allInputPartitions = leafTransformers.map(_.getPartitions) SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id) leafTransformers.foreach { @@ -367,55 +374,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f rdd } - private def getSplitInfosFromPartitions( - isKeyGroupPartition: Boolean, - leafTransformers: Seq[LeafTransformSupport]): Seq[Seq[SplitInfo]] = { - val allSplitInfos = if (isKeyGroupPartition) { - // If these are two batch scan transformer with keyGroupPartitioning, - // they have same partitionValues, - // but some partitions maybe empty for those partition values that are not present, - // otherwise, exchange will be inserted. We should combine the two leaf - // transformers' partitions with same index, and set them together in - // the substraitContext. We use transpose to do that, You can refer to - // the diagram below. - // leaf1 Seq(p11) Seq(p12, p13) Seq(p14) ... Seq(p1n) - // leaf2 Seq(p21) Seq(p22) Seq() ... Seq(p2n) - // transpose => - // leaf1 | leaf2 - // Seq(p11) | Seq(p21) => substraitContext.setSplitInfo([Seq(p11), Seq(p21)]) - // Seq(p12, p13) | Seq(p22) => substraitContext.setSplitInfo([Seq(p12, p13), Seq(p22)]) - // Seq(p14) | Seq() ... - // ... - // Seq(p1n) | Seq(p2n) => substraitContext.setSplitInfo([Seq(p1n), Seq(p2n)]) - leafTransformers.map(_.asInstanceOf[BatchScanExecTransformerBase].getSplitInfosWithIndex) - } else { - // If these are two leaf transformers, they must have same partitions, - // otherwise, exchange will be inserted. We should combine the two leaf - // transformers' partitions with same index, and set them together in - // the substraitContext. We use transpose to do that, You can refer to - // the diagram below. - // leaf1 p11 p12 p13 p14 ... p1n - // leaf2 p21 p22 p23 p24 ... p2n - // transpose => - // leaf1 | leaf2 - // p11 | p21 => substraitContext.setSplitInfo([p11, p21]) - // p12 | p22 => substraitContext.setSplitInfo([p12, p22]) - // p13 | p23 ... - // p14 | p24 - // ... - // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n]) - leafTransformers.map(_.getSplitInfos) - } - - val partitionLength = allSplitInfos.head.size - if (allSplitInfos.exists(_.size != partitionLength)) { - throw new GlutenException( - "The partition length of all the leaf transformer are not the same.") - } - - allSplitInfos.transpose - } - override def doExecuteColumnar(): RDD[ColumnarBatch] = { assert(child.isInstanceOf[TransformSupport]) val pipelineTime: SQLMetric = longMetric("pipelineTime") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala similarity index 93% rename from gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala rename to gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala index b05992fa301d..953f95bf2332 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala @@ -18,16 +18,16 @@ package org.apache.gluten.utils import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.types.StructType import org.apache.spark.util.collection.BitSet import org.apache.hadoop.fs.Path -case class InputPartitionsUtil( +case class PartitionsUtil( relation: HadoopFsRelation, requiredSchema: StructType, selectedPartitions: Array[PartitionDirectory], @@ -38,15 +38,15 @@ case class InputPartitionsUtil( disableBucketedScan: Boolean) extends Logging { - def genInputPartitionSeq(): Seq[InputPartition] = { + def genPartitionSeq(): Seq[Partition] = { if (bucketedScan) { - genBucketedInputPartitionSeq() + genBucketedPartitionSeq() } else { - genNonBuckedInputPartitionSeq() + genNonBuckedPartitionSeq() } } - private def genNonBuckedInputPartitionSeq(): Seq[InputPartition] = { + private def genNonBuckedPartitionSeq(): Seq[Partition] = { val openCostInBytes = relation.sparkSession.sessionState.conf.filesOpenCostInBytes val maxSplitBytes = FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions) @@ -99,7 +99,7 @@ case class InputPartitionsUtil( FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) } - private def genBucketedInputPartitionSeq(): Seq[InputPartition] = { + private def genBucketedPartitionSeq(): Seq[Partition] = { val bucketSpec = relation.bucketSpec.get logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = diff --git a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala index 7ae47e422bed..714e632d085c 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala @@ -20,9 +20,9 @@ import org.apache.gluten.config.GlutenConfig import org.apache.gluten.logging.LogLevelUtil import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager} +import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.scheduler.ExecutorCacheTaskLocation -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.FilePartition abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil with Logging { @@ -80,15 +80,13 @@ abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil with } /** Update the RDD id to SoftAffinityManager */ - def updateFilePartitionLocations( - inputPartitions: Seq[Seq[Seq[InputPartition]]], - rddId: Int): Unit = { + def updateFilePartitionLocations(inputPartitions: Seq[Seq[Partition]], rddId: Int): Unit = { if (SoftAffinityManager.usingSoftAffinity && SoftAffinityManager.detectDuplicateReading) { - inputPartitions.foreach(_.foreach(_.foreach { + inputPartitions.foreach(_.foreach { case f: FilePartition => SoftAffinityManager.updatePartitionMap(f, rddId) case _ => - })) + }) } } } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index d83edfb22dad..39500a50e76f 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -21,11 +21,11 @@ import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.spark.Partition import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, Expression} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.hive.HiveTableScanExecTransformer._ @@ -67,7 +67,7 @@ case class HiveTableScanExecTransformer( override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty - override def getPartitions: Seq[InputPartition] = partitions + override def getPartitions: Seq[Partition] = partitions override def getPartitionSchema: StructType = relation.tableMeta.partitionSchema @@ -82,7 +82,7 @@ case class HiveTableScanExecTransformer( @transient private lazy val hivePartitionConverter = new HivePartitionConverter(session.sessionState.newHadoopConf(), session) - @transient private lazy val partitions: Seq[InputPartition] = + @transient private lazy val partitions: Seq[Partition] = if (!relation.isPartitioned) { val tableLocation: URI = relation.tableMeta.storage.locationUri.getOrElse { throw new UnsupportedOperationException("Table path not set.") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala index 8b5efa91673f..339bda2a039e 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.extension import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.FileSourceScanExecTransformerBase +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.types.StructType import org.apache.spark.util.collection.BitSet @@ -49,16 +49,17 @@ case class TestFileSourceScanExecTransformer( tableIdentifier, disableBucketedScan) { - override def getPartitions: Seq[InputPartition] = - BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq( - relation, - requiredSchema, - selectedPartitions, - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan) + override def getPartitions: Seq[Partition] = + BackendsApiManager.getTransformerApiInstance + .genPartitionSeq( + relation, + requiredSchema, + selectedPartitions, + output, + bucketedScan, + optionalBucketSet, + optionalNumCoalescedBuckets, + disableBucketedScan) override val nodeNamePrefix: String = "TestFile" diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala index 98ef4eff7869..6846f140edaf 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.extension import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.FileSourceScanExecTransformerBase +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.types.StructType import org.apache.spark.util.collection.BitSet @@ -47,16 +47,17 @@ case class TestFileSourceScanExecTransformer( dataFilters, tableIdentifier, disableBucketedScan) { - override def getPartitions: Seq[InputPartition] = - BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq( - relation, - requiredSchema, - selectedPartitions, - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan) + override def getPartitions: Seq[Partition] = + BackendsApiManager.getTransformerApiInstance + .genPartitionSeq( + relation, + requiredSchema, + selectedPartitions, + output, + bucketedScan, + optionalBucketSet, + optionalNumCoalescedBuckets, + disableBucketedScan) override val nodeNamePrefix: String = "TestFile" } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala index 98ef4eff7869..6846f140edaf 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.extension import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.FileSourceScanExecTransformerBase +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.types.StructType import org.apache.spark.util.collection.BitSet @@ -47,16 +47,17 @@ case class TestFileSourceScanExecTransformer( dataFilters, tableIdentifier, disableBucketedScan) { - override def getPartitions: Seq[InputPartition] = - BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq( - relation, - requiredSchema, - selectedPartitions, - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan) + override def getPartitions: Seq[Partition] = + BackendsApiManager.getTransformerApiInstance + .genPartitionSeq( + relation, + requiredSchema, + selectedPartitions, + output, + bucketedScan, + optionalBucketSet, + optionalNumCoalescedBuckets, + disableBucketedScan) override val nodeNamePrefix: String = "TestFile" } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala index 98ef4eff7869..6846f140edaf 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.extension import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.FileSourceScanExecTransformerBase +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.types.StructType import org.apache.spark.util.collection.BitSet @@ -47,16 +47,17 @@ case class TestFileSourceScanExecTransformer( dataFilters, tableIdentifier, disableBucketedScan) { - override def getPartitions: Seq[InputPartition] = - BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq( - relation, - requiredSchema, - selectedPartitions, - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan) + override def getPartitions: Seq[Partition] = + BackendsApiManager.getTransformerApiInstance + .genPartitionSeq( + relation, + requiredSchema, + selectedPartitions, + output, + bucketedScan, + optionalBucketSet, + optionalNumCoalescedBuckets, + disableBucketedScan) override val nodeNamePrefix: String = "TestFile" } diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index 5f62c272f789..bbbd665cd836 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -243,6 +243,10 @@ trait SparkShims { def getCommonPartitionValues(batchScan: BatchScanExec): Option[Seq[(InternalRow, Int)]] = Option(Seq()) + /** + * Most of the code in this method is copied from + * [[org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD]]. + */ def orderPartitions( batchScan: DataSourceV2ScanExecBase, scan: Scan,