From aa450a41e44a86bbb8a8575b543af91f0b343e45 Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Thu, 11 Sep 2025 11:35:40 -0700 Subject: [PATCH] [VL] Support mapping columns by index for ORC and Parquet files --- .../backendsapi/clickhouse/CHBackend.scala | 1 + .../clickhouse/CHIteratorApi.scala | 1 + .../backendsapi/velox/VeloxBackend.scala | 37 ++++++- .../backendsapi/velox/VeloxIteratorApi.scala | 80 +++++++++----- .../backendsapi/velox/VeloxValidatorApi.scala | 45 ++++---- .../apache/gluten/config/VeloxConfig.scala | 16 +++ .../gluten/execution/FallbackSuite.scala | 36 +++++- .../gluten/execution/VeloxScanSuite.scala | 103 +++++++++++++++++- cpp/velox/compute/VeloxPlanConverter.cc | 33 +++++- cpp/velox/compute/VeloxPlanConverter.h | 1 + cpp/velox/compute/WholeStageResultIterator.cc | 4 + cpp/velox/config/VeloxConfig.h | 2 + cpp/velox/substrait/SubstraitToVeloxPlan.cc | 11 +- cpp/velox/substrait/SubstraitToVeloxPlan.h | 3 + cpp/velox/utils/ConfigExtractor.cc | 4 - docs/velox-configuration.md | 4 +- .../gluten/substrait/rel/LocalFilesNode.java | 22 ++-- .../backendsapi/BackendSettingsApi.scala | 5 +- .../gluten/backendsapi/IteratorApi.scala | 2 + .../execution/BasicScanExecTransformer.scala | 17 +-- .../execution/BatchScanExecTransformer.scala | 1 + 21 files changed, 336 insertions(+), 92 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 141cdaa24c7c..0573d0cf3809 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -181,6 +181,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { override def validateScanExec( format: ReadFileFormat, fields: Array[StructField], + dataSchema: StructType, rootPaths: Seq[String], properties: Map[String, String], hadoopConf: Configuration): ValidationResult = { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 8d22bd2b1cbf..07c3d144097e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -127,6 +127,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { override def genSplitInfo( partition: InputPartition, partitionSchema: StructType, + dataSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], properties: Map[String, String]): SplitInfo = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 3967ab68d4ac..3da306e1e9f5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -99,6 +99,7 @@ object VeloxBackendSettings extends BackendSettingsApi { override def validateScanExec( format: ReadFileFormat, fields: Array[StructField], + dataSchema: StructType, rootPaths: Seq[String], properties: Map[String, String], hadoopConf: Configuration): ValidationResult = { @@ -117,9 +118,11 @@ object VeloxBackendSettings extends BackendSettingsApi { } def validateFormat(): Option[String] = { - def validateTypes(validatorFunc: PartialFunction[StructField, String]): Option[String] = { + def validateTypes( + validatorFunc: PartialFunction[StructField, String], + fieldsToValidate: Array[StructField]): Option[String] = { // Collect unsupported types. - val unsupportedDataTypeReason = fields.collect(validatorFunc) + val unsupportedDataTypeReason = fieldsToValidate.collect(validatorFunc) if (unsupportedDataTypeReason.nonEmpty) { Some( s"Found unsupported data type in $format: ${unsupportedDataTypeReason.mkString(", ")}.") @@ -152,7 +155,7 @@ object VeloxBackendSettings extends BackendSettingsApi { if (!VeloxConfig.get.veloxOrcScanEnabled) { Some(s"Velox ORC scan is turned off, ${VeloxConfig.VELOX_ORC_SCAN_ENABLED.key}") } else { - val typeValidator: PartialFunction[StructField, String] = { + val fieldTypeValidator: PartialFunction[StructField, String] = { case StructField(_, arrayType: ArrayType, _, _) if arrayType.elementType.isInstanceOf[StructType] => "StructType as element in ArrayType" @@ -165,12 +168,16 @@ object VeloxBackendSettings extends BackendSettingsApi { case StructField(_, mapType: MapType, _, _) if mapType.valueType.isInstanceOf[ArrayType] => "ArrayType as Value in MapType" + case StructField(_, TimestampType, _, _) => "TimestampType" + } + + val schemaTypeValidator: PartialFunction[StructField, String] = { case StructField(_, stringType: StringType, _, metadata) if isCharType(stringType, metadata) => CharVarcharUtils.getRawTypeString(metadata) + "(force fallback)" - case StructField(_, TimestampType, _, _) => "TimestampType" } - validateTypes(typeValidator) + validateTypes(fieldTypeValidator, fields) + .orElse(validateTypes(schemaTypeValidator, dataSchema.fields)) } case _ => Some(s"Unsupported file format $format.") } @@ -193,10 +200,28 @@ object VeloxBackendSettings extends BackendSettingsApi { } } + def validateDataSchema(): Option[String] = { + if (VeloxConfig.get.parquetUseColumnNames && VeloxConfig.get.orcUseColumnNames) { + return None + } + + // If we are using column indices for schema evolution, we need to pass the table schema to + // Velox. We need to ensure all types in the table schema are supported. + val validationResults = + dataSchema.fields.flatMap(field => VeloxValidatorApi.validateSchema(field.dataType)) + if (validationResults.nonEmpty) { + Some(s"""Found unsupported data type(s) in file + |schema: ${validationResults.mkString(", ")}.""".stripMargin) + } else { + None + } + } + val validationChecks = Seq( validateScheme(), validateFormat(), - validateEncryption() + validateEncryption(), + validateDataSchema() ) for (check <- validationChecks) { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index f603f454ac04..4afad58c9ac2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -18,12 +18,13 @@ package org.apache.gluten.backendsapi.velox import org.apache.gluten.backendsapi.{BackendsApiManager, IteratorApi} import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName +import org.apache.gluten.config.VeloxConfig import org.apache.gluten.execution._ import org.apache.gluten.iterator.Iterators import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode -import org.apache.gluten.substrait.rel.{LocalFilesBuilder, SplitInfo} +import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.vectorized._ @@ -49,9 +50,25 @@ import scala.collection.JavaConverters._ class VeloxIteratorApi extends IteratorApi with Logging { + private def setFileSchemaForLocalFiles( + localFilesNode: LocalFilesNode, + fileSchema: StructType, + fileFormat: ReadFileFormat): LocalFilesNode = { + if ( + ((fileFormat == ReadFileFormat.OrcReadFormat || fileFormat == ReadFileFormat.DwrfReadFormat) + && !VeloxConfig.get.orcUseColumnNames) + || (fileFormat == ReadFileFormat.ParquetReadFormat && !VeloxConfig.get.parquetUseColumnNames) + ) { + localFilesNode.setFileSchema(fileSchema) + } + + localFilesNode + } + override def genSplitInfo( partition: InputPartition, partitionSchema: StructType, + dataSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], properties: Map[String, String]): SplitInfo = { @@ -69,19 +86,23 @@ class VeloxIteratorApi extends IteratorApi with Logging { constructSplitInfo(partitionSchema, f.files, metadataColumnNames) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) - LocalFilesBuilder.makeLocalFiles( - f.index, - paths, - starts, - lengths, - fileSizes, - modificationTimes, - partitionColumns, - metadataColumns, - fileFormat, - preferredLocations.toList.asJava, - mapAsJavaMap(properties), - otherMetadataColumns + setFileSchemaForLocalFiles( + LocalFilesBuilder.makeLocalFiles( + f.index, + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns, + fileFormat, + preferredLocations.toList.asJava, + mapAsJavaMap(properties), + otherMetadataColumns + ), + dataSchema, + fileFormat ) case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") @@ -92,6 +113,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionIndex: Int, partitions: Seq[InputPartition], partitionSchema: StructType, + dataSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], properties: Map[String, String]): SplitInfo = { @@ -115,19 +137,23 @@ class VeloxIteratorApi extends IteratorApi with Logging { metadataColumns, otherMetadataColumns) = constructSplitInfo(partitionSchema, partitionFiles, metadataColumnNames) - LocalFilesBuilder.makeLocalFiles( - partitionIndex, - paths, - starts, - lengths, - fileSizes, - modificationTimes, - partitionColumns, - metadataColumns, - fileFormat, - locations.toList.asJava, - mapAsJavaMap(properties), - otherMetadataColumns + setFileSchemaForLocalFiles( + LocalFilesBuilder.makeLocalFiles( + partitionIndex, + paths, + starts, + lengths, + fileSizes, + modificationTimes, + partitionColumns, + metadataColumns, + fileFormat, + locations.toList.asJava, + mapAsJavaMap(properties), + otherMetadataColumns + ), + dataSchema, + fileFormat ) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 15c367628e05..7b9cf91112b9 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -31,6 +31,7 @@ import org.apache.spark.task.TaskResources import scala.collection.JavaConverters._ class VeloxValidatorApi extends ValidatorApi { + import VeloxValidatorApi._ /** For velox backend, key validation is on native side. */ override def doExprValidate(substraitExprName: String, expr: Expression): Boolean = @@ -53,6 +54,27 @@ class VeloxValidatorApi extends ValidatorApi { info.fallbackInfo.asScala.reduce[String] { case (l, r) => l + "\n |- " + r })) } + override def doSchemaValidate(schema: DataType): Option[String] = { + validateSchema(schema) + } + + override def doColumnarShuffleExchangeExecValidate( + outputAttributes: Seq[Attribute], + outputPartitioning: Partitioning, + child: SparkPlan): Option[String] = { + if (outputAttributes.isEmpty) { + // See: https://github.com/apache/incubator-gluten/issues/7600. + return Some("Shuffle with empty output schema is not supported") + } + if (child.output.isEmpty) { + // See: https://github.com/apache/incubator-gluten/issues/7600. + return Some("Shuffle with empty input schema is not supported") + } + doSchemaValidate(child.schema) + } +} + +object VeloxValidatorApi { private def isPrimitiveType(dataType: DataType): Boolean = { dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | @@ -63,41 +85,26 @@ class VeloxValidatorApi extends ValidatorApi { } } - override def doSchemaValidate(schema: DataType): Option[String] = { + def validateSchema(schema: DataType): Option[String] = { if (isPrimitiveType(schema)) { return None } schema match { case map: MapType => - doSchemaValidate(map.keyType).orElse(doSchemaValidate(map.valueType)) + validateSchema(map.keyType).orElse(validateSchema(map.valueType)) case struct: StructType => struct.foreach { field => - val reason = doSchemaValidate(field.dataType) + val reason = validateSchema(field.dataType) if (reason.isDefined) { return reason } } None case array: ArrayType => - doSchemaValidate(array.elementType) + validateSchema(array.elementType) case _ => Some(s"Schema / data type not supported: $schema") } } - - override def doColumnarShuffleExchangeExecValidate( - outputAttributes: Seq[Attribute], - outputPartitioning: Partitioning, - child: SparkPlan): Option[String] = { - if (outputAttributes.isEmpty) { - // See: https://github.com/apache/incubator-gluten/issues/7600. - return Some("Shuffle with empty output schema is not supported") - } - if (child.output.isEmpty) { - // See: https://github.com/apache/incubator-gluten/issues/7600. - return Some("Shuffle with empty input schema is not supported") - } - doSchemaValidate(child.schema) - } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 1be6749e08e2..8aa290354042 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -77,6 +77,10 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def veloxPreferredBatchBytes: Long = getConf(COLUMNAR_VELOX_PREFERRED_BATCH_BYTES) def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN) + + def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES) + + def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES) } object VeloxConfig extends ConfigRegistry { @@ -665,4 +669,16 @@ object VeloxConfig extends ConfigRegistry { "instance per thread of execution.") .intConf .createWithDefault(100) + + val ORC_USE_COLUMN_NAMES = + buildConf("spark.gluten.sql.columnar.backend.velox.orcUseColumnNames") + .doc("Maps table field names to file field names using names, not indices for ORC files.") + .booleanConf + .createWithDefault(true) + + val PARQUET_USE_COLUMN_NAMES = + buildConf("spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames") + .doc("Maps table field names to file field names using names, not indices for Parquet files.") + .booleanConf + .createWithDefault(true) } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index 83958537f1c7..c357e488b68a 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.execution -import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} import org.apache.spark.SparkConf import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarShuffleExchangeExec, SparkPlan} @@ -270,4 +270,38 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl } } } + + testWithMinSparkVersion("fallback with index based schema evolution", "3.3") { + val query = "SELECT c2 FROM test" + Seq("parquet", "orc").foreach { + format => + Seq("true", "false").foreach { + parquetUseColumnNames => + Seq("true", "false").foreach { + orcUseColumnNames => + withSQLConf( + VeloxConfig.PARQUET_USE_COLUMN_NAMES.key -> parquetUseColumnNames, + VeloxConfig.ORC_USE_COLUMN_NAMES.key -> orcUseColumnNames + ) { + withTable("test") { + spark + .range(100) + .selectExpr("to_timestamp_ntz(from_unixtime(id % 3)) as c1", "id as c2") + .write + .format(format) + .saveAsTable("test") + + runQueryAndCompare(query) { + df => + val plan = df.queryExecution.executedPlan + val fallback = parquetUseColumnNames == "false" || + orcUseColumnNames == "false" + assert(collect(plan) { case g: GlutenPlan => g }.isEmpty == fallback) + } + } + } + } + } + } + } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala index 5caf3df83202..050fdaf7bfd4 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala @@ -18,7 +18,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.velox.VeloxBackendSettings import org.apache.gluten.benchmarks.RandomParquetDataGenerator -import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} import org.apache.gluten.utils.VeloxFileSystemValidationJniWrapper import org.apache.spark.SparkConf @@ -28,6 +28,8 @@ import org.apache.spark.sql.execution.ScalarSubquery import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import scala.reflect.ClassTag + class VeloxScanSuite extends VeloxWholeStageTransformerSuite { protected val rootPath: String = getClass.getResource("/").getPath override protected val resourcePath: String = "/tpch-data-parquet" @@ -44,6 +46,12 @@ class VeloxScanSuite extends VeloxWholeStageTransformerSuite { super.beforeAll() } + def checkQuery[T <: GlutenPlan: ClassTag](query: String, expectedResults: Seq[Row]): Unit = { + val df = sql(query) + checkAnswer(df, expectedResults) + checkGlutenOperatorMatch[T](df) + } + test("tpch q22 subquery filter pushdown - v1") { createTPCHNotNullTables() runTPCHQuery(22, tpchQueries, queriesResults, compareResult = false, noFallBack = false) { @@ -202,9 +210,98 @@ class VeloxScanSuite extends VeloxWholeStageTransformerSuite { withTable("test") { sql("create table test (a long, b string) using parquet options (path '" + path + "')") - val df = sql("select b from test group by b order by b") - checkAnswer(df, Seq(Row("10"), Row("11"))) + checkQuery[FileSourceScanExecTransformer]( + "select b from test group by b order by b", + Seq(Row("10"), Row("11"))) } } } + + test("parquet index based schema evolution") { + withSQLConf( + VeloxConfig.PARQUET_USE_COLUMN_NAMES.key -> "false", + "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") { + withTempDir { + dir => + val path = dir.getCanonicalPath + spark + .range(2) + .selectExpr("id as a", "cast(id + 10 as string) as b") + .write + .mode("overwrite") + .parquet(path) + + withTable("test") { + sql(s"""create table test (c long, d string, e float) using parquet options + |(path '$path')""".stripMargin) + + checkQuery[FileSourceScanExecTransformer]( + "select c, d from test", + Seq(Row(0L, "10"), Row(1L, "11"))) + + checkQuery[FileSourceScanExecTransformer]( + "select d from test", + Seq(Row("10"), Row("11"))) + + checkQuery[FileSourceScanExecTransformer]("select c from test", Seq(Row(0L), Row(1L))) + + checkQuery[FileSourceScanExecTransformer]( + "select d, c from test", + Seq(Row("10", 0L), Row("11", 1L))) + + checkQuery[FileSourceScanExecTransformer]( + "select c, d, e from test", + Seq(Row(0L, "10", null), Row(1L, "11", null))) + + checkQuery[FileSourceScanExecTransformer]( + "select e, d, c from test", + Seq(Row(null, "10", 0L), Row(null, "11", 1L))) + } + } + } + } + + test("ORC index based schema evolution") { + withSQLConf( + VeloxConfig.ORC_USE_COLUMN_NAMES.key -> "false", + "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") { + withTempDir { + dir => + val path = dir.getCanonicalPath + spark + .range(2) + .selectExpr("id as a", "cast(id + 10 as string) as b") + .write + .mode("overwrite") + .orc(path) + + withTable("test") { + sql(s"""create table test (c long, d string, e float) using orc options + |(path '$path')""".stripMargin) + + checkQuery[FileSourceScanExecTransformer]( + "select c, d from test", + Seq(Row(0L, "10"), Row(1L, "11"))) + + checkQuery[FileSourceScanExecTransformer]( + "select d from test", + Seq(Row("10"), Row("11"))) + + checkQuery[FileSourceScanExecTransformer]("select c from test", Seq(Row(0L), Row(1L))) + + checkQuery[FileSourceScanExecTransformer]( + "select d, c from test", + Seq(Row("10", 0L), Row("11", 1L))) + + checkQuery[FileSourceScanExecTransformer]( + "select c, d, e from test", + Seq(Row(0L, "10", null), Row(1L, "11", null))) + + checkQuery[FileSourceScanExecTransformer]( + "select e, d, c from test", + Seq(Row(null, "10", 0L), Row(null, "11", 1L))) + } + } + } + } } diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 7b58584c344d..88eceb3a74e4 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -36,12 +36,15 @@ VeloxPlanConverter::VeloxPlanConverter( const std::optional writeFileName, bool validationMode) : validationMode_(validationMode), + veloxCfg_(veloxCfg), substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath, writeFileName, validationMode) { + VELOX_USER_CHECK_NOT_NULL(veloxCfg_); substraitVeloxPlanConverter_.setInputIters(std::move(inputIters)); } namespace { std::shared_ptr parseScanSplitInfo( + const facebook::velox::config::ConfigBase* veloxCfg, const google::protobuf::RepeatedPtrField& fileList) { using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase; @@ -98,18 +101,44 @@ std::shared_ptr parseScanSplitInfo( splitInfo->format = dwio::common::FileFormat::UNKNOWN; break; } + + // The schema in file represents the table schema, it is set when the TableScan requires the + // table schema to be present, currently when the option is set to map columns by index rather + // than by name in Parquet or ORC files. Since the table schema should be the same for all + // files, we set it in the SplitInfo based on the first file we encounter with the schema set. + if (!splitInfo->tableSchema && file.has_schema()) { + const auto& schema = file.schema(); + + std::vector names; + std::vector types; + names.reserve(schema.names().size()); + + const bool asLowerCase = !veloxCfg->get(kCaseSensitive, false); + for (const auto& name : schema.names()) { + std::string fieldName = name; + if (asLowerCase) { + folly::toLowerAscii(fieldName); + } + names.emplace_back(std::move(fieldName)); + } + types = SubstraitParser::parseNamedStruct(schema, asLowerCase); + + splitInfo->tableSchema = ROW(std::move(names), std::move(types)); + } } return splitInfo; } void parseLocalFileNodes( SubstraitToVeloxPlanConverter* planConverter, + const facebook::velox::config::ConfigBase* veloxCfg, std::vector<::substrait::ReadRel_LocalFiles>& localFiles) { std::vector> splitInfos; splitInfos.reserve(localFiles.size()); for (const auto& localFile : localFiles) { const auto& fileList = localFile.items(); - splitInfos.push_back(parseScanSplitInfo(fileList)); + + splitInfos.push_back(parseScanSplitInfo(veloxCfg, fileList)); } planConverter->setSplitInfos(std::move(splitInfos)); @@ -120,7 +149,7 @@ std::shared_ptr VeloxPlanConverter::toVel const ::substrait::Plan& substraitPlan, std::vector<::substrait::ReadRel_LocalFiles> localFiles) { if (!validationMode_) { - parseLocalFileNodes(&substraitVeloxPlanConverter_, localFiles); + parseLocalFileNodes(&substraitVeloxPlanConverter_, veloxCfg_, localFiles); } return substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan); diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 7a14693cb7ff..4678dccea74a 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -48,6 +48,7 @@ class VeloxPlanConverter { private: bool validationMode_; + const facebook::velox::config::ConfigBase* veloxCfg_; SubstraitToVeloxPlanConverter substraitVeloxPlanConverter_; }; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 08047b34fbc2..4aacc43b7f76 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -699,6 +699,10 @@ std::shared_ptr WholeStageResultIterator::createConne std::to_string(veloxCfg_->get(kMaxPartitions, 10000)); configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] = std::to_string(veloxCfg_->get(kIgnoreMissingFiles, false)); + configs[velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession] = + std::to_string(veloxCfg_->get(kParquetUseColumnNames, true)); + configs[velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] = + std::to_string(veloxCfg_->get(kOrcUseColumnNames, true)); return std::make_shared(std::move(configs)); } diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 3d3a7d36bf60..2f3aad218746 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -147,6 +147,8 @@ const std::string kMaxCoalescedBytes = "spark.gluten.sql.columnar.backend.velox. const std::string kCachePrefetchMinPct = "spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct"; const std::string kMemoryPoolCapacityTransferAcrossTasks = "spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks"; +const std::string kOrcUseColumnNames = "spark.gluten.sql.columnar.backend.velox.orcUseColumnNames"; +const std::string kParquetUseColumnNames = "spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames"; // write fies const std::string kMaxPartitions = "spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession"; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 7e4cd1f51476..5b76c22d8c24 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1330,9 +1330,14 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: bool filterPushdownEnabled = true; auto names = colNameList; auto types = veloxTypeList; - auto dataColumns = ROW(std::move(names), std::move(types)); + + // The columns we project from the file. + auto baseSchema = ROW(std::move(names), std::move(types)); + // The columns present in the table, if not available default to the baseSchema. + auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema : baseSchema; + connector::ConnectorTableHandlePtr tableHandle; - auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), dataColumns) : nullptr; + auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; auto connectorId = kHiveConnectorId; if (useCudfTableHandle(splitInfos_) && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { @@ -1342,7 +1347,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } common::SubfieldFilters subfieldFilters; tableHandle = std::make_shared( - connectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter, dataColumns); + connectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter, tableSchema); // Get assignments and out names. std::vector outNames; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index b00c3447f712..e1c6cee63cb1 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -55,6 +55,9 @@ struct SplitInfo { /// The file sizes and modification times of the files to be scanned. std::vector> properties; + /// The schema of the table being scanned. + RowTypePtr tableSchema; + /// Make SplitInfo polymorphic virtual ~SplitInfo() = default; diff --git a/cpp/velox/utils/ConfigExtractor.cc b/cpp/velox/utils/ConfigExtractor.cc index 96da5069a2ec..1d4cd7f8597b 100644 --- a/cpp/velox/utils/ConfigExtractor.cc +++ b/cpp/velox/utils/ConfigExtractor.cc @@ -277,10 +277,6 @@ std::shared_ptr getHiveConfig( // read as UTC hiveConfMap[facebook::velox::connector::hive::HiveConfig::kReadTimestampPartitionValueAsLocalTime] = "false"; - // Maps table field names to file field names using names, not indices. - hiveConfMap[facebook::velox::connector::hive::HiveConfig::kParquetUseColumnNames] = "true"; - hiveConfMap[facebook::velox::connector::hive::HiveConfig::kOrcUseColumnNames] = "true"; - return std::make_shared(std::move(hiveConfMap)); } diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index 160f0c88ae5e..44996a5cd49e 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -9,7 +9,7 @@ nav_order: 16 ## Gluten Velox backend configurations -| Key | Default | Description | +| Key | Default | Description | |----------------------------------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | spark.gluten.sql.columnar.backend.velox.IOThreads | <undefined> | The Size of the IO thread pool in the Connector. This thread pool is used for split preloading and DirectBufferedInput. By default, the value is the same as the maximum task slots per Spark executor. | | spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task | @@ -48,6 +48,8 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. | | spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. | | spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | +| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. If this is set to false Gluten will fallback to vanilla Spark if it does not support all column types present in any of the schemas of the tables being read, at this time unsupported types include TimestampNTZ and Char. | +| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. If this is set to false Gluten will fallback to vanilla Spark if it does not support all column types present in any of the schemas of the tables being read, at this time unsupported types include TimestampNTZ and Char. | | spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan | | spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. | | spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. | diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index 7a17372ec273..5a1b106db8e6 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -109,15 +109,14 @@ public void setFileSchema(StructType schema) { private NamedStruct buildNamedStruct() { NamedStruct.Builder namedStructBuilder = NamedStruct.newBuilder(); - if (fileSchema != null) { - Type.Struct.Builder structBuilder = Type.Struct.newBuilder(); - for (StructField field : fileSchema.fields()) { - structBuilder.addTypes( - ConverterUtils.getTypeNode(field.dataType(), field.nullable()).toProtobuf()); - namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name())); - } - namedStructBuilder.setStruct(structBuilder.build()); + Type.Struct.Builder structBuilder = Type.Struct.newBuilder(); + for (StructField field : fileSchema.fields()) { + structBuilder.addTypes( + ConverterUtils.getTypeNode(field.dataType(), field.nullable()).toProtobuf()); + namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name())); } + namedStructBuilder.setStruct(structBuilder.build()); + return namedStructBuilder.build(); } @@ -195,8 +194,11 @@ public ReadRel.LocalFiles toProtobuf() { ReadRel.LocalFiles.FileOrFiles.metadataColumn.newBuilder(); fileBuilder.addMetadataColumns(mcBuilder.build()); } - NamedStruct namedStruct = buildNamedStruct(); - fileBuilder.setSchema(namedStruct); + + if (fileSchema != null) { + NamedStruct namedStruct = buildNamedStruct(); + fileBuilder.setSchema(namedStruct); + } if (!otherMetadataColumns.isEmpty()) { Map otherMetadatas = otherMetadataColumns.get(i); diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index eea34e8c447b..7c84218681b2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} -import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.hadoop.conf.Configuration @@ -39,7 +39,8 @@ trait BackendSettingsApi { def validateScanExec( format: ReadFileFormat, - fields: Array[StructField], + fields: Array[StructField], // the fields to be output + dataSchema: StructType, // the schema of the table rootPaths: Seq[String], properties: Map[String, String], hadoopConf: Configuration): ValidationResult = diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 0a19d2207d26..34f7f41b786a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -34,6 +34,7 @@ trait IteratorApi { def genSplitInfo( partition: InputPartition, partitionSchema: StructType, + dataSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], properties: Map[String, String]): SplitInfo @@ -42,6 +43,7 @@ trait IteratorApi { partitionIndex: Int, partition: Seq[InputPartition], partitionSchema: StructType, + dataSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], properties: Map[String, String]): SplitInfo = throw new UnsupportedOperationException() diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 79671224243c..a8524d53f2ee 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -27,7 +27,6 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{StringType, StructField, StructType} import com.google.protobuf.StringValue @@ -68,28 +67,18 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource .genSplitInfo( _, getPartitionSchema, + getDataSchema, fileFormat, getMetadataColumns().map(_.name), getProperties)) } override protected def doValidateInternal(): ValidationResult = { - var fields = schema.fields - - this match { - case transformer: FileSourceScanExecTransformer => - fields = appendStringFields(transformer.relation.schema, fields) - case transformer: HiveTableScanExecTransformer => - fields = appendStringFields(transformer.getDataSchema, fields) - case transformer: BatchScanExecTransformer => - fields = appendStringFields(transformer.getDataSchema, fields) - case _ => - } - val validationResult = BackendsApiManager.getSettings .validateScanExec( fileFormat, - fields, + schema.fields, + getDataSchema, getRootFilePaths, getProperties, sparkContext.hadoopConfiguration) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala index ae13078f9946..03a840cccdcf 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala @@ -139,6 +139,7 @@ abstract class BatchScanExecTransformerBase( index, partitions, getPartitionSchema, + getDataSchema, fileFormat, getMetadataColumns().map(_.name), getProperties)