From cda9c62f069514ea041e0edf5c88f287ed6cc38b Mon Sep 17 00:00:00 2001 From: binwei Date: Sat, 14 Sep 2024 02:44:13 +0000 Subject: [PATCH 1/8] Fix for pinterest spark Spark3.2.0 support --- package/pom.xml | 1 + pom.xml | 10 +- .../sql/shims/spark32/Spark32Shims.scala | 8 +- .../datasources/FileFormatDataWriter.scala | 23 +++-- .../InsertIntoHadoopFsRelationCommand.scala | 75 ++++++++++++++- .../parquet/ParquetFileFormat.scala | 93 +++++++------------ .../v2/AbstractBatchScanExec.scala | 19 ++-- .../datasources/v2/BatchScanExecShim.scala | 19 ++-- .../datasources/v2/utils/CatalogUtil.scala | 3 - 9 files changed, 145 insertions(+), 106 deletions(-) diff --git a/package/pom.xml b/package/pom.xml index fc72fe93de84..5abbe5c23dbd 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -353,6 +353,7 @@ org.apache.spark.memory.TaskMemoryManager com.google.protobuf.* + org.apache.spark.sql.execution.stat.StatFunctions compile diff --git a/pom.xml b/pom.xml index 7dc6f0ec758a..1aa80092d6f1 100644 --- a/pom.xml +++ b/pom.xml @@ -58,10 +58,10 @@ 2.12 2.12.15 3 - 3.4 - 3.4.3 - spark34 - spark-sql-columnar-shims-spark34 + 3.2 + 3.2.0 + spark32 + spark-sql-columnar-shims-spark32 1.5.0 delta-core 2.4.0 @@ -275,7 +275,7 @@ 3.2 spark32 spark-sql-columnar-shims-spark32 - 3.2.2 + 3.2.0 1.3.1 delta-core 2.0.1 diff --git a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala index f62f9031ce9e..3d4ca4fa1389 100644 --- a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala @@ -30,10 +30,9 @@ import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryExpression, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName} import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, StatefulOpClusteredDistribution} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, SparkPlan} @@ -61,7 +60,8 @@ class Spark32Shims extends SparkShims { override def getDistribution( leftKeys: Seq[Expression], rightKeys: Seq[Expression]): Seq[Distribution] = { - HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + val dist = StatefulOpClusteredDistribution(leftKeys, 0) + dist :: StatefulOpClusteredDistribution(rightKeys, 0) :: Nil } override def scalarExpressionMappings: Seq[Sig] = Seq(Sig[Empty2Null](ExpressionNames.EMPTY2NULL)) @@ -272,7 +272,7 @@ class Spark32Shims extends SparkShims { conf.parquetFilterPushDownStringStartWith, conf.parquetFilterPushDownInFilterThreshold, caseSensitive.getOrElse(conf.caseSensitiveAnalysis), - RebaseSpec(LegacyBehaviorPolicy.CORRECTED) + LegacyBehaviorPolicy.CORRECTED ) } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index e5aaff691168..a7c24bdc0690 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.Utils.isDirectWriteScheme import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext @@ -161,9 +162,12 @@ class SingleDirectoryDataWriter( releaseResources() val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) - val currentPath = - committer.newTaskTempFile(taskAttemptContext, None, f"-c$fileCounter%03d" + ext) - + val fileCounterExt = if (description.maxRecordsPerFile > 0) { + f"-c$fileCounter%03d" + } else { + "" + } + val currentPath = committer.newTaskTempFile(taskAttemptContext, None, fileCounterExt + ext) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, dataSchema = description.dataColumns.toStructType, @@ -293,14 +297,21 @@ abstract class BaseDynamicPartitionDataWriter( val bucketIdStr = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") // This must be in a form that matches our bucketing format. See BucketingUtils. - val ext = f"$bucketIdStr.c$fileCounter%03d" + + val ext = if (!bucketIdStr.isEmpty || description.maxRecordsPerFile > 0) { + f"$bucketIdStr.c$fileCounter%03d" + + description.outputWriterFactory.getFileExtension(taskAttemptContext) + } else { description.outputWriterFactory.getFileExtension(taskAttemptContext) - + } val customPath = partDir.flatMap { dir => description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir)) } val currentPath = if (customPath.isDefined) { - committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) + if (isDirectWriteScheme(taskAttemptContext.getConfiguration, customPath.get)) { + committer.newTaskTempFile(taskAttemptContext, None, ext) + } else { + committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) + } } else { committer.newTaskTempFile(taskAttemptContext, partDir, ext) } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 9221ecbd1294..e4cda6fad487 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -21,14 +21,15 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.util.Utils.isDirectWriteScheme import org.apache.hadoop.fs.{FileSystem, Path} @@ -105,11 +106,73 @@ case class InsertIntoHadoopFsRelationCommand( } val jobId = java.util.UUID.randomUUID().toString + + val isDirectWrite = isDirectWriteScheme(hadoopConf, outputPath.toString) + val isStaticPartitionInsert = + staticPartitions.size == partitionColumns.length && partitionColumns.nonEmpty + + val formattedOutputPath = if (isStaticPartitionInsert) { + // This will not work correctly for cases where spark.sql.storeAssignmentPolicy=LEGACY + // and we insert into a partition where the types don't match i.e + // create table t(a int, b string) using parquet partitioned by (a); + // insert into t partition(a='ansi') values('ansi'); + // The correct behavior is to insert data under a=__HIVE_DEFAULT_PARTITION__ + // since ansi cannot be an integer but this will insert data under a=ansi + // so we extract the value of the partition by analyzing the query. + val formattedStaticPartitions = staticPartitions.map { + case (k, v) => + val index = outputColumnNames.indexOf(k) + val isNull = query match { + case Project(projectList, _) => + projectList(index).asInstanceOf[Alias].child.asInstanceOf[Literal].value == null + case LocalRelation(_, Nil, _) => false + case LocalRelation(_, data, _) => + data.head.isNullAt(index) + case _ => false + } + if (isNull) { + k -> null + } else { + k -> v + } + } + val defaultLocation = outputPath + + "/" + PartitioningUtils.getPathFragment(formattedStaticPartitions, partitionColumns) + new Path(customPartitionLocations.getOrElse(staticPartitions, defaultLocation)) + } else { + outputPath + } + + hadoopConf.setBoolean("pinterest.spark.sql.staticPartitionInsert", isStaticPartitionInsert) + // DataSourceStrategy.scala relies on the s3 committer being set here when checking + // if the writePath == readPath + if (isDirectWrite) { + if ((isStaticPartitionInsert || partitionColumns.isEmpty) && bucketSpec.isEmpty) { + hadoopConf.set( + "spark.sql.sources.outputCommitterClass", + "com.netflix.bdp.s3.S3DirectoryOutputCommitter") + hadoopConf.set( + "spark.sql.parquet.output.committer.class", + "com.netflix.bdp.s3.S3DirectoryOutputCommitter") + } else { + hadoopConf.set( + "spark.sql.sources.outputCommitterClass", + "com.netflix.bdp.s3.S3PartitionedOutputCommitter") + hadoopConf.set( + "spark.sql.parquet.output.committer.class", + "com.netflix.bdp.s3.S3PartitionedOutputCommitter") + } + hadoopConf.set( + "s3.multipart.committer.conflict-mode", + if (mode == SaveMode.Overwrite) "replace" else "append") + } + val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = jobId, - outputPath = outputPath.toString, - dynamicPartitionOverwrite = dynamicPartitionOverwrite) + outputPath = formattedOutputPath.toString, + dynamicPartitionOverwrite = !isDirectWrite && dynamicPartitionOverwrite + ) val doInsertion = if (mode == SaveMode.Append) { true @@ -125,7 +188,9 @@ case class InsertIntoHadoopFsRelationCommand( // For dynamic partition overwrite, do not delete partition directories ahead. true } else { - deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + if (!isDirectWrite) { + deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + } true } case (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 145c36e467df..bc80f2b45cd9 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -260,7 +260,8 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging sparkSession.sessionState.conf.isParquetBinaryAsString) hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + sparkSession.sessionState.conf.isParquetINT96AsTimestamp + ) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -272,8 +273,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging val sqlConf = sparkSession.sessionState.conf val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled val enableVectorizedReader: Boolean = - sqlConf.parquetVectorizedReaderEnabled && - resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion val capacity = sqlConf.parquetVectorizedReaderBatchSize @@ -300,7 +300,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging lazy val footerFileMetaData = ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) // Try to push down filters when filter push-down is enabled. @@ -314,7 +314,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive, - datetimeRebaseSpec) + datetimeRebaseMode) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -340,7 +340,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging None } - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) @@ -357,46 +357,30 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, - datetimeRebaseSpec.mode.toString, - datetimeRebaseSpec.timeZone, - int96RebaseSpec.mode.toString, - int96RebaseSpec.timeZone, + datetimeRebaseMode.toString, + int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, - capacity - ) - // SPARK-37089: We cannot register a task completion listener to close this iterator here - // because downstream exec nodes have already registered their listeners. Since listeners - // are executed in reverse order of registration, a listener registered here would close the - // iterator while downstream exec nodes are still running. When off-heap column vectors are - // enabled, this can cause a use-after-free bug leading to a segfault. - // - // Instead, we use FileScanRDD's task completion listener to close this iterator. + capacity) val iter = new RecordReaderIterator(vectorizedReader) - try { - vectorizedReader.initialize(split, hadoopAttemptContext) - logDebug(s"Appending $partitionSchema ${file.partitionValues}") - vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() - } - - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + vectorizedReader.initialize(split, hadoopAttemptContext) + logDebug(s"Appending $partitionSchema ${file.partitionValues}") + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport( convertTz, enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) + datetimeRebaseMode, + int96RebaseMode) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -404,25 +388,19 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging new ParquetRecordReader[InternalRow](readSupport) } val iter = new RecordReaderIterator[InternalRow](reader) - try { - reader.initialize(split, hadoopAttemptContext) - - val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - if (partitionSchema.length == 0) { - // There is no partition columns - iter.map(unsafeProjection) - } else { - val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) - } - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) } } } @@ -457,7 +435,8 @@ object ParquetFileFormat extends Logging { val converter = new ParquetToSparkSchemaConverter( sparkSession.sessionState.conf.isParquetBinaryAsString, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + sparkSession.sessionState.conf.isParquetINT96AsTimestamp, + false) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala index 6b495105d9c6..8eae7995723f 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala @@ -46,9 +46,12 @@ abstract class AbstractBatchScanExec( override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters) - @transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() + @transient override lazy val inputPartitions: Seq[InputPartition] = batch.planInputPartitions() - @transient private lazy val filteredPartitions: Seq[InputPartition] = { + @transient override lazy val partitions: Seq[Seq[InputPartition]] = + batch.planInputPartitions().map(Seq(_)) + + @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = { val dataSourceFilters = runtimeFilters.flatMap { case DynamicPruningExpression(e) => DataSourceStrategy.translateRuntimeFilter(e) case _ => None @@ -64,17 +67,7 @@ abstract class AbstractBatchScanExec( // call toBatch again to get filtered partitions val newPartitions = scan.toBatch.planInputPartitions() - originalPartitioning match { - case p: DataSourcePartitioning if p.numPartitions != newPartitions.size => - throw new SparkException( - "Data source must have preserved the original partitioning during runtime filtering; " + - s"reported num partitions: ${p.numPartitions}, " + - s"num partitions after runtime filtering: ${newPartitions.size}") - case _ => - // no validation is needed as the data source did not report any specific partitioning - } - - newPartitions + newPartitions.map(Seq(_)) } else { partitions } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala index e445dd33a585..6290e86315f9 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala @@ -69,20 +69,9 @@ abstract class BatchScanExecShim( // call toBatch again to get filtered partitions val newPartitions = scan.toBatch.planInputPartitions() - - originalPartitioning match { - case p: DataSourcePartitioning if p.numPartitions != newPartitions.size => - throw new SparkException( - "Data source must have preserved the original partitioning during runtime filtering; " + - s"reported num partitions: ${p.numPartitions}, " + - s"num partitions after runtime filtering: ${newPartitions.size}") - case _ => - // no validation is needed as the data source did not report any specific partitioning - } - newPartitions.map(Seq(_)) } else { - partitions.map(Seq(_)) + partitions } } @@ -102,5 +91,9 @@ abstract class BatchScanExecShim( } abstract class ArrowBatchScanExecShim(original: BatchScanExec) extends DataSourceV2ScanExecBase { - @transient override lazy val partitions: Seq[InputPartition] = original.partitions + @transient override lazy val partitions: Seq[Seq[InputPartition]] = + original.partitions + @transient override lazy val inputPartitions: Seq[InputPartition] = original.inputPartitions + + override def keyGroupedPartitioning: Option[Seq[Expression]] = original.keyGroupedPartitioning } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala index 810fb2a05511..f9f64dae194a 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala @@ -32,9 +32,6 @@ object CatalogUtil { case IdentityTransform(FieldReference(Seq(col))) => identityCols += col - case BucketTransform(numBuckets, FieldReference(Seq(col))) => - bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil)) - case transform => throw new UnsupportedOperationException(s"Partitioning by expressions") } From 9ef7af21a52e8df84029a484bb51265316fd304d Mon Sep 17 00:00:00 2001 From: binwei Date: Sat, 14 Sep 2024 02:58:42 +0000 Subject: [PATCH 2/8] update build script --- dev/buildbundle-veloxbe.sh | 22 ++++++++++++---------- dev/builddeps-veloxbe.sh | 11 ++++++++--- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/dev/buildbundle-veloxbe.sh b/dev/buildbundle-veloxbe.sh index 8515f7b12db7..ea530a2b9734 100755 --- a/dev/buildbundle-veloxbe.sh +++ b/dev/buildbundle-veloxbe.sh @@ -5,7 +5,7 @@ source "$BASEDIR/builddeps-veloxbe.sh" function build_for_spark { spark_version=$1 - mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-$spark_version -DskipTests + mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-$spark_version -DskipTests -Dcheckstyle.skip=true -DskipScalastyle=true } function check_supported { @@ -21,14 +21,16 @@ function check_supported { cd $GLUTEN_DIR -check_supported +#check_supported # SPARK_VERSION is defined in builddeps-veloxbe.sh -if [ "$SPARK_VERSION" = "ALL" ]; then - for spark_version in 3.2 3.3 3.4 3.5 - do - build_for_spark $spark_version - done -else - build_for_spark $SPARK_VERSION -fi +#if [ "$SPARK_VERSION" = "ALL" ]; then +# for spark_version in 3.2 3.3 3.4 3.5 +# do +# build_for_spark $spark_version +# done +#else +# build_for_spark $SPARK_VERSION +#fi + +build_for_spark 3.2 diff --git a/dev/builddeps-veloxbe.sh b/dev/builddeps-veloxbe.sh index 7f1ce41a117d..2bcc414e61e8 100755 --- a/dev/builddeps-veloxbe.sh +++ b/dev/builddeps-veloxbe.sh @@ -6,6 +6,10 @@ #################################################################################################### set -exu +export CFLAGS=" -g " +export CXXFLAGS=" -g " + + CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) GLUTEN_DIR="$CURRENT_DIR/.." BUILD_TYPE=Release @@ -20,11 +24,12 @@ ENABLE_QAT=OFF ENABLE_IAA=OFF ENABLE_HBM=OFF ENABLE_GCS=OFF -ENABLE_S3=OFF +ENABLE_S3=ON ENABLE_HDFS=OFF ENABLE_ABFS=OFF -ENABLE_EP_CACHE=OFF -ENABLE_VCPKG=OFF +ENABLE_EP_CACHE=ON +ARROW_ENABLE_CUSTOM_CODEC=OFF +ENABLE_VCPKG=ON RUN_SETUP_SCRIPT=ON VELOX_REPO="" VELOX_BRANCH="" From a4e9a5b75a743b4edd603468bd848f998f5147e6 Mon Sep 17 00:00:00 2001 From: binwei Date: Sat, 14 Sep 2024 02:58:53 +0000 Subject: [PATCH 3/8] fix for pinterest build --- dev/builddeps-veloxbe.sh | 5 ++--- ep/build-velox/src/get_velox.sh | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/dev/builddeps-veloxbe.sh b/dev/builddeps-veloxbe.sh index 2bcc414e61e8..49bfa869d9ab 100755 --- a/dev/builddeps-veloxbe.sh +++ b/dev/builddeps-veloxbe.sh @@ -15,9 +15,9 @@ GLUTEN_DIR="$CURRENT_DIR/.." BUILD_TYPE=Release BUILD_TESTS=OFF BUILD_EXAMPLES=OFF -BUILD_BENCHMARKS=OFF +BUILD_BENCHMARKS=ON BUILD_JEMALLOC=OFF -BUILD_PROTOBUF=OFF +BUILD_PROTOBUF=ON BUILD_VELOX_TESTS=OFF BUILD_VELOX_BENCHMARKS=OFF ENABLE_QAT=OFF @@ -28,7 +28,6 @@ ENABLE_S3=ON ENABLE_HDFS=OFF ENABLE_ABFS=OFF ENABLE_EP_CACHE=ON -ARROW_ENABLE_CUSTOM_CODEC=OFF ENABLE_VCPKG=ON RUN_SETUP_SCRIPT=ON VELOX_REPO="" diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 7f9312aa7f76..6ec7c75c9afc 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -16,8 +16,8 @@ set -exu -VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2024_09_13 +VELOX_REPO=https://github.com/yma11/velox.git +VELOX_BRANCH=golden-ptst VELOX_HOME="" OS=`uname -s` From 87edbea8be09ec11509c4240149d7f452177e1b3 Mon Sep 17 00:00:00 2001 From: binwei Date: Sat, 14 Sep 2024 02:59:07 +0000 Subject: [PATCH 4/8] fix arrow build --- .gitignore | 6 ++++++ dev/build_arrow.sh | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/.gitignore b/.gitignore index 224f61bae6ec..f7483cf1c90d 100644 --- a/.gitignore +++ b/.gitignore @@ -95,3 +95,9 @@ dist/ /cpp-ch/local-engine/Parser/*_udf !/cpp-ch/local-engine/Parser/example_udf + + +# build arrow +dev/arrow_ep/ +ep/_ep/ + diff --git a/dev/build_arrow.sh b/dev/build_arrow.sh index e7496350f988..58cc1d44e244 100755 --- a/dev/build_arrow.sh +++ b/dev/build_arrow.sh @@ -23,6 +23,10 @@ VELOX_ARROW_BUILD_VERSION=15.0.0 ARROW_PREFIX=$CURRENT_DIR/../ep/_ep/arrow_ep BUILD_TYPE=Release + +export CFLAGS=" -g " +export CXXFLAGS=" -g " + function prepare_arrow_build() { mkdir -p ${ARROW_PREFIX}/../ && pushd ${ARROW_PREFIX}/../ && sudo rm -rf arrow_ep/ wget_and_untar https://archive.apache.org/dist/arrow/arrow-${VELOX_ARROW_BUILD_VERSION}/apache-arrow-${VELOX_ARROW_BUILD_VERSION}.tar.gz arrow_ep From cf5313e32bad411a4d8dcf4f994b487aef61548b Mon Sep 17 00:00:00 2001 From: binwei Date: Sat, 14 Sep 2024 03:04:57 +0000 Subject: [PATCH 5/8] fix compile --- .../sql/execution/datasources/v2/AbstractBatchScanExec.scala | 1 - .../spark/sql/execution/datasources/v2/BatchScanExecShim.scala | 1 - .../spark/sql/execution/datasources/v2/utils/CatalogUtil.scala | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala index 8eae7995723f..959d07dbe001 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala index 6290e86315f9..c599ac4f62c2 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala index f9f64dae194a..15d368aea733 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.v2.utils import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} +import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} import scala.collection.mutable From e746eb0e6f03cae325eb597113219b572fd6f40b Mon Sep 17 00:00:00 2001 From: binwei Date: Sat, 14 Sep 2024 03:08:22 +0000 Subject: [PATCH 6/8] add udf --- cpp/velox/udf/examples/CMakeLists.txt | 7 + cpp/velox/udf/examples/LinkedIdUDF.cc | 117 +++++++++++++++ .../udf/examples/ListVectorSimilarityUDF.cc | 139 ++++++++++++++++++ 3 files changed, 263 insertions(+) create mode 100644 cpp/velox/udf/examples/LinkedIdUDF.cc create mode 100644 cpp/velox/udf/examples/ListVectorSimilarityUDF.cc diff --git a/cpp/velox/udf/examples/CMakeLists.txt b/cpp/velox/udf/examples/CMakeLists.txt index 32f39425bb77..5d32d08d05e2 100644 --- a/cpp/velox/udf/examples/CMakeLists.txt +++ b/cpp/velox/udf/examples/CMakeLists.txt @@ -18,3 +18,10 @@ target_link_libraries(myudf velox) add_library(myudaf SHARED "MyUDAF.cc") target_link_libraries(myudaf velox) + +add_library(LinkedIdUDF SHARED "LinkedIdUDF.cc") +target_link_libraries(LinkedIdUDF velox) + +add_library(ListVectorSimilarityUDF SHARED "ListVectorSimilarityUDF.cc") +target_link_libraries(ListVectorSimilarityUDF velox) + diff --git a/cpp/velox/udf/examples/LinkedIdUDF.cc b/cpp/velox/udf/examples/LinkedIdUDF.cc new file mode 100644 index 000000000000..a1fb17acd99e --- /dev/null +++ b/cpp/velox/udf/examples/LinkedIdUDF.cc @@ -0,0 +1,117 @@ + +#include +#include +#include +#include +#include "udf/Udf.h" +#include "udf/examples/UdfCommon.h" +#include +#include +#include +#include + +using namespace facebook::velox; +using namespace facebook::velox::exec; + +namespace { + +static const char* kString="string"; + +std::string GetHashKey(const std::string& str) { +if (str.empty()) { +throw std::invalid_argument("Input string cannot be null"); +} +unsigned char digest[MD5_DIGEST_LENGTH]; +MD5(reinterpret_cast(str.c_str()), str.size(), digest); +std::string key; +key.reserve(MD5_DIGEST_LENGTH * 2); +for (unsigned char i : digest) { +char buf[3]; +snprintf(buf, sizeof(buf), "%02x", i); +key.append(buf); +} +return key; +} + +template +struct LinkIdUDF { + VELOX_DEFINE_FUNCTION_TYPES(T); + + +void call(out_type& out, const arg_type& arg1) { + +auto cppArg1 = std::string(arg1); +if (cppArg1.empty()) { +{out = ""; +return;} +} +{out = GetHashKey(cppArg1); +return;} +} + + +}; + +class LinkIdUDFRegisterer final : public gluten::UdfRegisterer { + public: + int getNumUdf() override { + return 1; + } + + void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override { + udfEntries[index++] = {name_.c_str(), kString, 1, myArg_}; + } + + void registerSignatures() override { + facebook::velox::registerFunction({name_}); + } + + private: + const std::string name_ = "com.pinterest.hadoop.hive.LinkIdUDF"; + const char* myArg_[1] = {kString}; +}; + +std::vector>& globalRegisters() { + static std::vector> registerers; + return registerers; +} + +void setupRegisterers() { + static bool inited = false; + if (inited) { + return; + } + auto& registerers = globalRegisters(); + + registerers.push_back(std::make_shared()); + + inited = true; +} +} // namespace + +DEFINE_GET_NUM_UDF { + setupRegisterers(); + + int numUdf = 0; + for (const auto& registerer : globalRegisters()) { + numUdf += registerer->getNumUdf(); + } + return numUdf; +} + +DEFINE_GET_UDF_ENTRIES { + setupRegisterers(); + + int index = 0; + for (const auto& registerer : globalRegisters()) { + registerer->populateUdfEntries(index, udfEntries); + } +} + +DEFINE_REGISTER_UDF { + setupRegisterers(); + + for (const auto& registerer : globalRegisters()) { + registerer->registerSignatures(); + } +} diff --git a/cpp/velox/udf/examples/ListVectorSimilarityUDF.cc b/cpp/velox/udf/examples/ListVectorSimilarityUDF.cc new file mode 100644 index 000000000000..2bd4161ebf72 --- /dev/null +++ b/cpp/velox/udf/examples/ListVectorSimilarityUDF.cc @@ -0,0 +1,139 @@ + +#include +#include +#include +#include +#include "udf/Udf.h" +#include "udf/examples/UdfCommon.h" +#include +#include +#include +#include + +using namespace facebook::velox; +using namespace facebook::velox::exec; + +namespace { + +static const char* kString="string"; +static const char* kDouble="double"; + +double cosineSimilarity(const std::vector& vectorA, const std::vector& vectorB) { +double intersectionScore = 0.0; +double aScore = 0.0; +double bScore = 0.0; +for (size_t i = 0; i < vectorA.size(); ++i) { +intersectionScore += vectorA[i] * vectorB[i]; +aScore += std::pow(vectorA[i], 2); +bScore += std::pow(vectorB[i], 2); +} +if (aScore == 0 || bScore == 0) { +return 0.0; +} else { +return intersectionScore / (std::sqrt(aScore) * std::sqrt(bScore)); +} +} +std::vector parseVectorFromString(const std::string& vectorString) { +std::vector vector; +std::stringstream ss(vectorString); +std::string item; +while (std::getline(ss, item, ',')) { +vector.push_back(std::stof(item)); +} +return vector; +} + +template +struct ListVectorSimilarityUDF { + VELOX_DEFINE_FUNCTION_TYPES(T); + + +void call(out_type& out, const arg_type& arg1, const arg_type& arg2, const arg_type& arg3) { + static constexpr const char* COSINE_SIMILARITY = "cosine"; +auto cppArg3 = std::string(arg3); +auto cppArg2 = std::string(arg2); +auto cppArg1 = std::string(arg1); +if (cppArg1.empty() || cppArg2.empty() || cppArg3.empty()) { +throw std::invalid_argument("invalid arguments"); +} +std::vector vectorA = parseVectorFromString(cppArg1); +std::vector vectorB = parseVectorFromString(cppArg2); +if (vectorA.empty() || vectorB.empty()) { +{out = 0.0; +return;} // In C++, we typically don't return null for non-pointer types. +} +if (cppArg3 == COSINE_SIMILARITY) { +{out = cosineSimilarity(vectorA, vectorB); +return;} +} else { +{out = 0.0; +return;} // In C++, we typically don't return null for non-pointer types. +} +} + + +}; + +class ListVectorSimilarityUDFRegisterer final : public gluten::UdfRegisterer { + public: + int getNumUdf() override { + return 1; + } + + void populateUdfEntries(int& index, gluten::UdfEntry* udfEntries) override { + udfEntries[index++] = {name_.c_str(), kDouble, 3, myArg_}; + } + + void registerSignatures() override { + facebook::velox::registerFunction({name_}); + } + + private: + const std::string name_ = "com.pinterest.hadoop.hive.ListVectorSimilarityUDF"; + const char* myArg_[3] = {kString ,kString ,kString}; +}; + +std::vector>& globalRegisters() { + static std::vector> registerers; + return registerers; +} + +void setupRegisterers() { + static bool inited = false; + if (inited) { + return; + } + auto& registerers = globalRegisters(); + + registerers.push_back(std::make_shared()); + + inited = true; +} +} // namespace + +DEFINE_GET_NUM_UDF { + setupRegisterers(); + + int numUdf = 0; + for (const auto& registerer : globalRegisters()) { + numUdf += registerer->getNumUdf(); + } + return numUdf; +} + +DEFINE_GET_UDF_ENTRIES { + setupRegisterers(); + + int index = 0; + for (const auto& registerer : globalRegisters()) { + registerer->populateUdfEntries(index, udfEntries); + } +} + +DEFINE_REGISTER_UDF { + setupRegisterers(); + + for (const auto& registerer : globalRegisters()) { + registerer->registerSignatures(); + } +} \ No newline at end of file From e3d6c23229d7e96cd8f7ddad72e10db1c5f4cae2 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 19 Sep 2024 10:01:04 +0800 Subject: [PATCH 7/8] fixup --- .../gluten/planner/cost/GlabsCostModel.scala | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 backends-velox/src/main/scala/org/apache/gluten/planner/cost/GlabsCostModel.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/planner/cost/GlabsCostModel.scala b/backends-velox/src/main/scala/org/apache/gluten/planner/cost/GlabsCostModel.scala new file mode 100644 index 000000000000..4c6e1dfd5d93 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/planner/cost/GlabsCostModel.scala @@ -0,0 +1,134 @@ +package org.apache.gluten.planner.cost + +import org.apache.gluten.execution._ +import org.apache.gluten.extension.columnar.enumerated.RemoveFilter +import org.apache.gluten.extension.columnar.enumerated.RemoveFilter.NoopFilter +import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, RowToColumnarLike} +import org.apache.gluten.planner.plan.GlutenPlanModel.GroupLeafExec +import org.apache.gluten.ras.{Cost, CostModel} +import org.apache.gluten.utils.PlanUtil + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.window.WindowExec +import org.apache.spark.sql.types.{ArrayType, MapType, StructType} + +class GlabsCostModel extends CostModel[SparkPlan] with Logging { + import GlabsCostModel._ + private val infLongCost = Long.MaxValue + + logInfo(s"Created cost model: ${classOf[GlabsCostModel]}") + + override def costOf(node: SparkPlan): GlutenCost = node match { + case _: GroupLeafExec => throw new IllegalStateException() + case _ => GlutenCost(longCostOf(node)) + } + + private def longCostOf(node: SparkPlan): Long = node match { + case n => + val selfCost = selfLongCostOf(n) + + // Sum with ceil to avoid overflow. + def safeSum(a: Long, b: Long): Long = { + assert(a >= 0) + assert(b >= 0) + val sum = a + b + if (sum < a || sum < b) Long.MaxValue else sum + } + + (n.children.map(longCostOf).toList :+ selfCost).reduce(safeSum) + } + + // A plan next to scan may be able to push runtime filters to scan in Velox. + // TODO: Check join build side + private def isNextToScanTransformer(plan: SparkPlan): Boolean = { + plan.children.exists { + case NoopFilter(_: BasicScanExecTransformer, _) => true + case _: BasicScanExecTransformer => true + case _ => false + } + } + + // A very rough estimation as of now. + private def selfLongCostOf(node: SparkPlan): Long = { + node match { + case _: ProjectExecTransformer => 0L + case _: ProjectExec => 0L + + case _: ShuffleExchangeExec => 3L + case _: VeloxResizeBatchesExec => 0L + case _: ColumnarShuffleExchangeExec => 2L + + // Consider joins, aggregations, windows to be highest priority for Gluten to offload. + case _: BroadcastHashJoinExec => 60L + case j: BroadcastHashJoinExecTransformer if isNextToScanTransformer(j) => 32L + case _: BroadcastHashJoinExecTransformer => 52L + + case _: ShuffledHashJoinExec => 80L + case j: ShuffledHashJoinExecTransformer if isNextToScanTransformer(j) => 32L + case _: ShuffledHashJoinExecTransformer => 52L + + case _: SortMergeJoinExec => 80L + case j: SortMergeJoinExecTransformer if isNextToScanTransformer(j) => 32L + case _: SortMergeJoinExecTransformer => 52L + + case _: HashAggregateExec => 80L + case _: ObjectHashAggregateExec => 80L + case _: SortAggregateExec => 80L + case _: HashAggregateExecTransformer => 52L + + case _: WindowExec => 80L + case _: WindowExecTransformer => 52L + + case r2c: RowToColumnarExecBase if hasComplexTypes(r2c.schema) => + // Avoid moving computation back to native when transition has complex types in schema. + // Such transitions are observed to be extremely expensive as of now. + Long.MaxValue + + // Row-to-Velox is observed much more expensive than Velox-to-row. + case ColumnarToRowExec(child) => 2L + case RowToColumnarExec(child) => 15L + case ColumnarToRowLike(child) => 2L + case RowToColumnarLike(child) => 15L + + case _: RemoveFilter.NoopFilter => + // To make planner choose the tree that has applied rule PushFilterToScan. + 0L + + case p if PlanUtil.isGlutenColumnarOp(p) => 2L + case p if PlanUtil.isVanillaColumnarOp(p) => 3L + // Other row ops. Usually a vanilla row op. + case _ => 5L + } + } + + private def isCheapExpression(ne: NamedExpression): Boolean = ne match { + case Alias(_: Attribute, _) => true + case _: Attribute => true + case _ => false + } + + private def hasComplexTypes(schema: StructType): Boolean = { + schema.exists(_.dataType match { + case _: StructType => true + case _: ArrayType => true + case _: MapType => true + case _ => false + }) + } + + override def costComparator(): Ordering[Cost] = Ordering.Long.on { + case GlutenCost(value) => value + case _ => throw new IllegalStateException("Unexpected cost type") + } + + override def makeInfCost(): Cost = GlutenCost(infLongCost) +} + +object GlabsCostModel { + case class GlutenCost(value: Long) extends Cost +} From ccf27b14e1926c5a122a93ff9a55ab095ae0f046 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 19 Sep 2024 10:01:50 +0800 Subject: [PATCH 8/8] fixup --- .../cost/{GlabsCostModel.scala => VeloxCostModel.scala} | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) rename backends-velox/src/main/scala/org/apache/gluten/planner/cost/{GlabsCostModel.scala => VeloxCostModel.scala} (96%) diff --git a/backends-velox/src/main/scala/org/apache/gluten/planner/cost/GlabsCostModel.scala b/backends-velox/src/main/scala/org/apache/gluten/planner/cost/VeloxCostModel.scala similarity index 96% rename from backends-velox/src/main/scala/org/apache/gluten/planner/cost/GlabsCostModel.scala rename to backends-velox/src/main/scala/org/apache/gluten/planner/cost/VeloxCostModel.scala index 4c6e1dfd5d93..f3922b14cc14 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/planner/cost/GlabsCostModel.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/planner/cost/VeloxCostModel.scala @@ -17,11 +17,11 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHash import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.types.{ArrayType, MapType, StructType} -class GlabsCostModel extends CostModel[SparkPlan] with Logging { - import GlabsCostModel._ +class VeloxCostModel extends CostModel[SparkPlan] with Logging { + import VeloxCostModel._ private val infLongCost = Long.MaxValue - logInfo(s"Created cost model: ${classOf[GlabsCostModel]}") + logInfo(s"Created cost model: ${classOf[VeloxCostModel]}") override def costOf(node: SparkPlan): GlutenCost = node match { case _: GroupLeafExec => throw new IllegalStateException() @@ -129,6 +129,6 @@ class GlabsCostModel extends CostModel[SparkPlan] with Logging { override def makeInfCost(): Cost = GlutenCost(infLongCost) } -object GlabsCostModel { +object VeloxCostModel { case class GlutenCost(value: Long) extends Cost }