Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
* limitations under the License.
*/
package org.apache.spark.sql.delta.catalog

import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog, DeltaTimeTravelSpec, Snapshot}
import org.apache.spark.sql.delta.actions.Metadata
Expand Down Expand Up @@ -153,7 +154,7 @@ object ClickHouseTableV2 extends Logging {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
filterExprs: Seq[Expression]): Seq[InputPartition] = {
filterExprs: Seq[Expression]): Seq[Partition] = {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)

MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
* limitations under the License.
*/
package org.apache.spark.sql.delta.catalog

import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog, DeltaTimeTravelSpec, Snapshot}
import org.apache.spark.sql.delta.actions.Metadata
Expand Down Expand Up @@ -153,7 +154,7 @@ object ClickHouseTableV2 extends Logging {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
filterExprs: Seq[Expression]): Seq[InputPartition] = {
filterExprs: Seq[Expression]): Seq[Partition] = {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)

MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.delta.catalog

import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -24,7 +25,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.V1Table
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTableUtils, DeltaTimeTravelSpec, Snapshot, UnresolvedPathBasedDeltaTable}
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.sources.DeltaDataSource
Expand Down Expand Up @@ -147,7 +147,7 @@ object ClickHouseTableV2 extends Logging {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
filterExprs: Seq[Expression]): Seq[InputPartition] = {
filterExprs: Seq[Expression]): Seq[Partition] = {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)

MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
getExecutedPlan(df).map {
case plan: IcebergScanTransformer =>
assert(plan.getKeyGroupPartitioning.isDefined)
assert(plan.getSplitInfosWithIndex.length == 3)
assert(plan.getSplitInfos.length == 3)
case _ => // do nothing
}
}
Expand Down Expand Up @@ -372,7 +372,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
getExecutedPlan(df).map {
case plan: IcebergScanTransformer =>
assert(plan.getKeyGroupPartitioning.isDefined)
assert(plan.getSplitInfosWithIndex.length == 3)
assert(plan.getSplitInfos.length == 3)
case _ => // do nothing
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.vectorized.{BatchIterator, CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator}

import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
import org.apache.spark.{InterruptibleIterator, Partition, SparkConf, TaskContext}
import org.apache.spark.affinity.CHAffinity
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.CHColumnarShuffleWriter
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer
Expand Down Expand Up @@ -125,12 +124,16 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}

override def genSplitInfo(
partition: InputPartition,
partitionIndex: Int,
partitions: Seq[Partition],
partitionSchema: StructType,
dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
// todo: support multi partitions
assert(partitions.size == 1)
val partition = partitions.head
partition match {
case p: GlutenMergeTreePartition =>
ExtensionTableBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import org.apache.gluten.execution.{CHHashAggregateExecTransformer, WriteFilesEx
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.{BooleanLiteralNode, ExpressionBuilder, ExpressionNode}
import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil}
import org.apache.gluten.utils.{CHPartitionsUtil, ExpressionDocUtil}

import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.MergeTreeFileFormat
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
Expand All @@ -51,8 +51,7 @@ import java.util

class CHTransformerApi extends TransformerApi with Logging {

/** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */
def genInputPartitionSeq(
def genPartitionSeq(
relation: HadoopFsRelation,
requiredSchema: StructType,
selectedPartitions: Array[PartitionDirectory],
Expand All @@ -61,7 +60,7 @@ class CHTransformerApi extends TransformerApi with Logging {
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
disableBucketedScan: Boolean,
filterExprs: Seq[Expression]): Seq[InputPartition] = {
filterExprs: Seq[Expression]): Seq[Partition] = {
relation.location match {
case index: TahoeFileIndex
if relation.fileFormat
Expand All @@ -81,15 +80,15 @@ class CHTransformerApi extends TransformerApi with Logging {
)
case _ =>
// Generate FilePartition for Parquet
CHInputPartitionsUtil(
CHPartitionsUtil(
relation,
requiredSchema,
selectedPartitions,
output,
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
disableBucketedScan).genInputPartitionSeq()
disableBucketedScan).genPartitionSeq()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}

import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand Down Expand Up @@ -53,7 +53,7 @@ case class CHRangeExecTransformer(
}
}

override def getPartitions: Seq[InputPartition] = {
override def getPartitions: Seq[Partition] = {
(0 until numSlices).map {
sliceIndex => GlutenRangeExecPartition(start, end, step, numSlices, sliceIndex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.spark.Partition
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -95,7 +96,8 @@ case class GlutenMergeTreePartition(
partList: Array[MergeTreePartSplit],
tableSchema: StructType,
clickhouseTableConfigs: Map[String, String])
extends InputPartition {
extends Partition
with InputPartition {
override def preferredLocations(): Array[String] = {
Array.empty[String]
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.gluten.utils
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SparkResourceUtil
Expand All @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path

import scala.collection.mutable.ArrayBuffer

case class CHInputPartitionsUtil(
case class CHPartitionsUtil(
relation: HadoopFsRelation,
requiredSchema: StructType,
selectedPartitions: Array[PartitionDirectory],
Expand All @@ -42,15 +42,15 @@ case class CHInputPartitionsUtil(
disableBucketedScan: Boolean)
extends Logging {

def genInputPartitionSeq(): Seq[InputPartition] = {
def genPartitionSeq(): Seq[Partition] = {
if (bucketedScan) {
genBucketedInputPartitionSeq()
genBucketedPartitionSeq()
} else {
genNonBuckedInputPartitionSeq()
genNonBuckedPartitionSeq()
}
}

private def genNonBuckedInputPartitionSeq(): Seq[InputPartition] = {
private def genNonBuckedPartitionSeq(): Seq[Partition] = {
val maxSplitBytes =
FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)

Expand Down Expand Up @@ -111,7 +111,7 @@ case class CHInputPartitionsUtil(
}
}

private def genBucketedInputPartitionSeq(): Seq[InputPartition] = {
private def genBucketedPartitionSeq(): Seq[Partition] = {
val bucketSpec = relation.bucketSpec.get
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
Expand Down
Loading