diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 308b3fa4f5c7..51b5f65dedcf 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -130,7 +130,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo = { + properties: Map[String, String], + dataSchema: StructType): SplitInfo = { partition match { case p: GlutenMergeTreePartition => ExtensionTableBuilder diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 1be7b8d7356d..6a01caad4e63 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -55,7 +55,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo = { + properties: Map[String, String], + dataSchema: StructType): SplitInfo = { partition match { case f: FilePartition => val ( @@ -69,7 +70,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { constructSplitInfo(partitionSchema, f.files, metadataColumnNames) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) - LocalFilesBuilder.makeLocalFiles( + val localFile = LocalFilesBuilder.makeLocalFiles( f.index, paths, starts, @@ -82,6 +83,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { preferredLocations.toList.asJava, mapAsJavaMap(properties) ) + localFile.setFileSchema(dataSchema) + localFile case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") } @@ -168,26 +171,28 @@ class VeloxIteratorApi extends IteratorApi with Logging { SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames) metadataColumns.add(metadataColumn) val partitionColumn = new JHashMap[String, String]() - for (i <- 0 until file.partitionValues.numFields) { - val partitionColumnValue = if (file.partitionValues.isNullAt(i)) { - ExternalCatalogUtils.DEFAULT_PARTITION_NAME - } else { - val pn = file.partitionValues.get(i, schema.fields(i).dataType) - schema.fields(i).dataType match { - case _: BinaryType => - new String(pn.asInstanceOf[Array[Byte]], StandardCharsets.UTF_8) - case _: DateType => - DateFormatter.apply().format(pn.asInstanceOf[Integer]) - case _: DecimalType => - pn.asInstanceOf[Decimal].toJavaBigInteger.toString - case _: TimestampType => - TimestampFormatter - .getFractionFormatter(ZoneOffset.UTC) - .format(pn.asInstanceOf[java.lang.Long]) - case _ => pn.toString + if (file.partitionValues != null) { + for (i <- 0 until file.partitionValues.numFields) { + val partitionColumnValue = if (file.partitionValues.isNullAt(i)) { + ExternalCatalogUtils.DEFAULT_PARTITION_NAME + } else { + val pn = file.partitionValues.get(i, schema.fields(i).dataType) + schema.fields(i).dataType match { + case _: BinaryType => + new String(pn.asInstanceOf[Array[Byte]], StandardCharsets.UTF_8) + case _: DateType => + DateFormatter.apply().format(pn.asInstanceOf[Integer]) + case _: DecimalType => + pn.asInstanceOf[Decimal].toJavaBigInteger.toString + case _: TimestampType => + TimestampFormatter + .getFractionFormatter(ZoneOffset.UTC) + .format(pn.asInstanceOf[java.lang.Long]) + case _ => pn.toString + } } + partitionColumn.put(schema.names(i), partitionColumnValue) } - partitionColumn.put(schema.names(i), partitionColumnValue) } partitionColumns.add(partitionColumn) } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcForcePositionalSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcForcePositionalSuite.scala new file mode 100644 index 000000000000..272b9a87210f --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcForcePositionalSuite.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + +class VeloxOrcForcePositionalSuite extends VeloxWholeStageTransformerSuite { + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = + super.sparkConf + .set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0") + .set("spark.gluten.sql.columnar.backend.velox.glogVerboseLevel", "1") + .set("spark.executorEnv.GLOG_v", "1") + .set("spark.gluten.sql.debug", "true") + .set("orc.force.positional.evolution", "true") + + import testImplicits._ + + test("rename root columns") { + withTempPath { + tmp => + val path = tmp.getCanonicalPath + val df1 = Seq((1, 2), (4, 5), (8, 9)).toDF("col1", "col2") + val df2 = Seq((10, 12), (15, 19), (22, 40)).toDF("col1", "col3") + + val dir1 = s"file://$path/part=one" + val dir2 = s"file://$path/part=two" + + df1.write.format("orc").save(dir1) + df2.write.format("orc").save(dir2) + + spark.read + .schema(df2.schema) + .format("orc") + .load(s"file://$path") + .createOrReplaceTempView("test") + + runQueryAndCompare("select * from test") { _ => } + } + } + + test("rename nested columns") { + withTempPath { + tmp => + val path = tmp.getCanonicalPath + val df1 = spark.createDataFrame( + spark.sparkContext.parallelize(Row(1, Row("abc", 2)) :: Nil), + schema = StructType( + Array( + StructField("col1", IntegerType, nullable = true), + StructField( + "col2", + StructType( + Array( + StructField("a", StringType, nullable = true), + StructField("b", IntegerType, nullable = true) + ))) + )) + ) + val df2 = spark.createDataFrame( + spark.sparkContext.parallelize(Row(20, Row("EFG", 10)) :: Nil), + schema = StructType( + Array( + StructField("col1", IntegerType, nullable = true), + StructField( + "col2", + StructType( + Array( + StructField("a", StringType, nullable = true), + StructField("c", IntegerType, nullable = true) + ))) + )) + ) + + df1.write.mode("overwrite").format("orc").save(s"file://$path/part=one") + df2.write.mode("overwrite").format("orc").save(s"file://$path/part=two") + + spark.read + .schema(df2.schema) + .format("orc") + .load(s"file://$path") + .createOrReplaceTempView("test") + + runQueryAndCompare("select * from test") { _ => } + } + } + + test("prune nested schema") { + withTempPath { + tmp => + val path = tmp.getCanonicalPath + + val df1 = spark.createDataFrame( + spark.sparkContext.parallelize(Row(1, 5, Row("abc", 2)) :: Nil), + schema = StructType( + Array( + StructField("col1", IntegerType, nullable = true), + StructField("col2", IntegerType, nullable = true), + StructField( + "col3", + StructType( + Array( + StructField("a", StringType, nullable = true), + StructField("b", IntegerType, nullable = true) + ))) + )) + ) + df1.write.format("orc").save(s"file://$path") + + spark.read + .format("orc") + .schema(StructType(Array( + StructField("col1", IntegerType, nullable = true), + StructField("col2", IntegerType, nullable = true), + StructField( + "col3", + StructType(Array( + StructField("b", IntegerType, nullable = true) + ))) + ))) + .load(s"file://$path") + .createOrReplaceTempView("test") + + runQueryAndCompare("select * from test") { _ => } + } + } +} + +class VeloxOrcForcePositionalOffSuite extends VeloxOrcForcePositionalSuite { + override protected def sparkConf: SparkConf = + super.sparkConf + .set("orc.force.positional.evolution", "false") +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcSchemaEvolutionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcSchemaEvolutionSuite.scala new file mode 100644 index 000000000000..81a9d2f4aa6a --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcSchemaEvolutionSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class VeloxOrcSchemaEvolutionSuite extends VeloxWholeStageTransformerSuite { + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + import testImplicits._ + + test("read ORC with column names all starting with '_col'") { + withTempPath { + tmp => + val df = Seq((1, 2, 3), (4, 5, 6), (7, 8, 9)).toDF("_col0", "_col1", "_col2") + df.write.format("orc").save(s"file://${tmp.getCanonicalPath}") + + withTempView("test") { + spark.read + .format("orc") + .schema( + StructType( + Array( + StructField("a", IntegerType, nullable = true), + StructField("b", IntegerType, nullable = true), + StructField("c", IntegerType, nullable = true) + ))) + .load(s"file://${tmp.getCanonicalPath}") + .createOrReplaceTempView("test") + + runQueryAndCompare("select a, b, c from test") { + df => + checkAnswer( + df, + Row(1, 2, 3) :: + Row(4, 5, 6) :: + Row(7, 8, 9) :: + Nil) + } + } + } + } +} diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index f3d05d7afd29..38f431bba874 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -271,6 +271,11 @@ void VeloxBackend::initConnector() { connectorConfMap[velox::connector::hive::HiveConfig::kFilePreloadThreshold] = backendConf_->get(kFilePreloadThreshold, "1048576"); // 1M + // Map table schema to file schema using name + connectorConfMap[velox::connector::hive::HiveConfig::kParquetUseColumnNames] = "true"; + connectorConfMap[velox::connector::hive::HiveConfig::kOrcUseColumnNames] = "true"; + connectorConfMap[velox::connector::hive::HiveConfig::kOrcUseNestedColumnNames] = "true"; + // read as UTC connectorConfMap[velox::connector::hive::HiveConfig::kReadTimestampPartitionValueAsLocalTime] = "false"; diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index ed2545c78114..6b4b3b470a20 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -51,6 +51,10 @@ std::shared_ptr parseScanSplitInfo( splitInfo->partitionColumns.reserve(fileList.size()); splitInfo->properties.reserve(fileList.size()); splitInfo->metadataColumns.reserve(fileList.size()); + + std::vector colNames; + std::vector veloxTypes; + for (const auto& file : fileList) { // Expect all Partitions share the same index. splitInfo->partitionIndex = file.partition_index(); @@ -71,6 +75,16 @@ std::shared_ptr parseScanSplitInfo( splitInfo->starts.emplace_back(file.start()); splitInfo->lengths.emplace_back(file.length()); + if (colNames.empty() && file.has_schema()) { + const auto& tableSchema = file.schema(); + colNames.reserve(tableSchema.names().size()); + for (const auto& name : tableSchema.names()) { + colNames.emplace_back(name); + } + veloxTypes = SubstraitParser::parseNamedStruct(tableSchema); + } + splitInfo->fileSchema = ROW(std::move(colNames), std::move(veloxTypes)); + facebook::velox::FileProperties fileProps; if (file.has_properties()) { fileProps.fileSize = file.properties().filesize(); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 57157711663c..9c7cec69ecd9 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -598,6 +598,12 @@ std::shared_ptr WholeStageResultIterator::createConne std::to_string(veloxCfg_->get(kMaxPartitions, 10000)); configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] = std::to_string(veloxCfg_->get(kIgnoreMissingFiles, false)); + configs[velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession] = "true"; + if (veloxCfg_->get(kOrcForcePositionalEvolution, false)) { + configs[velox::connector::hive::HiveConfig::kOrcUseNestedColumnNamesSession] = "true"; + } else { + configs[velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] = "true"; + } return std::make_shared(std::move(configs)); } diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index dfe471a68cc7..5c67f77bca1a 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -127,6 +127,7 @@ const std::string kLoadQuantum = "spark.gluten.sql.columnar.backend.velox.loadQu const std::string kMaxCoalescedDistance = "spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance"; const std::string kMaxCoalescedBytes = "spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes"; const std::string kCachePrefetchMinPct = "spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct"; +const std::string kOrcForcePositionalEvolution = "spark.hadoop.orc.force.positional.evolution"; // write fies const std::string kMaxPartitions = "spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession"; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 4e0e36cefb0b..ce57ec569521 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1284,7 +1284,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: std::shared_ptr tableHandle; if (!readRel.has_filter()) { tableHandle = std::make_shared( - kHiveConnectorId, "hive_table", filterPushdownEnabled, common::SubfieldFilters{}, nullptr); + kHiveConnectorId, + "hive_table", + filterPushdownEnabled, + common::SubfieldFilters{}, + nullptr, + splitInfo->fileSchema); } else { common::SubfieldFilters subfieldFilters; auto names = colNameList; @@ -1292,7 +1297,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: auto remainingFilter = exprConverter_->toVeloxExpr(readRel.filter(), ROW(std::move(names), std::move(types))); tableHandle = std::make_shared( - kHiveConnectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter); + kHiveConnectorId, + "hive_table", + filterPushdownEnabled, + std::move(subfieldFilters), + remainingFilter, + splitInfo->fileSchema); } // Get assignments and out names. diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 9636f6615f96..759ffaaa2fd1 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -55,6 +55,9 @@ struct SplitInfo { /// The file sizes and modification times of the files to be scanned. std::vector> properties; + /// The file schema + RowTypePtr fileSchema; + /// Make SplitInfo polymorphic virtual ~SplitInfo() = default; }; diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc index cccc619a86ff..7ba5526a3010 100644 --- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc +++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc @@ -257,7 +257,7 @@ TEST_F(Substrait2VeloxPlanConversionTest, ifthenTest) { // Convert to Velox PlanNode. auto planNode = planConverter_->toVeloxPlan(substraitPlan, std::vector<::substrait::ReadRel_LocalFiles>{split}); ASSERT_EQ( - "-- Project[1][expressions: ] -> \n -- TableScan[0][table: hive_table, remaining filter: (and(and(and(and(isnotnull(\"hd_vehicle_count\"),or(equalto(\"hd_buy_potential\",\">10000\"),equalto(\"hd_buy_potential\",\"unknown\"))),greaterthan(\"hd_vehicle_count\",0)),if(greaterthan(\"hd_vehicle_count\",0),greaterthan(divide(cast \"hd_dep_count\" as DOUBLE,cast \"hd_vehicle_count\" as DOUBLE),1.2))),isnotnull(\"hd_demo_sk\")))] -> n0_0:BIGINT, n0_1:VARCHAR, n0_2:BIGINT, n0_3:BIGINT\n", + "-- Project[1][expressions: ] -> \n -- TableScan[0][table: hive_table, remaining filter: (and(and(and(and(isnotnull(\"hd_vehicle_count\"),or(equalto(\"hd_buy_potential\",\">10000\"),equalto(\"hd_buy_potential\",\"unknown\"))),greaterthan(\"hd_vehicle_count\",0)),if(greaterthan(\"hd_vehicle_count\",0),greaterthan(divide(cast \"hd_dep_count\" as DOUBLE,cast \"hd_vehicle_count\" as DOUBLE),1.2))),isnotnull(\"hd_demo_sk\"))), data columns: ROW<>] -> n0_0:BIGINT, n0_1:VARCHAR, n0_2:BIGINT, n0_3:BIGINT\n", planNode->toString(true, true)); } @@ -273,7 +273,7 @@ TEST_F(Substrait2VeloxPlanConversionTest, filterUpper) { // Convert to Velox PlanNode. auto planNode = planConverter_->toVeloxPlan(substraitPlan, std::vector<::substrait::ReadRel_LocalFiles>{split}); ASSERT_EQ( - "-- Project[1][expressions: ] -> \n -- TableScan[0][table: hive_table, remaining filter: (and(isnotnull(\"key\"),lessthan(\"key\",3)))] -> n0_0:INTEGER\n", + "-- Project[1][expressions: ] -> \n -- TableScan[0][table: hive_table, remaining filter: (and(isnotnull(\"key\"),lessthan(\"key\",3))), data columns: ROW<>] -> n0_0:INTEGER\n", planNode->toString(true, true)); } diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 66f0033dd385..0db7d58234c6 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -16,8 +16,8 @@ set -exu -VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2025_03_02 +VELOX_REPO=https://github.com/ccat3z/velox.git +VELOX_BRANCH=feat/orc-positional-oap VELOX_HOME="" OS=`uname -s` diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index e886e0a517a5..e2f773c8fd2b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -36,6 +36,8 @@ import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS import org.apache.spark.task.TaskResources import org.apache.spark.util.SparkResourceUtil +import org.apache.orc.OrcConf + import java.util import java.util.Collections @@ -259,6 +261,10 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false") conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key, "false") } + + conf.set( + GlutenConfig.ORC_FORCE_POSITIONAL_EVOLUTION, + conf.get(OrcConf.FORCE_POSITIONAL_EVOLUTION.getAttribute, "false")) } } diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index bebc6b1e95ef..4ace9601c17a 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -151,15 +151,17 @@ public ReadRel.LocalFiles toProtobuf() { if (index != null) { fileBuilder.setPartitionIndex(index); } - Map partitionColumn = partitionColumns.get(i); - if (!partitionColumn.isEmpty()) { - partitionColumn.forEach( - (key, value) -> { - ReadRel.LocalFiles.FileOrFiles.partitionColumn.Builder pcBuilder = - ReadRel.LocalFiles.FileOrFiles.partitionColumn.newBuilder(); - pcBuilder.setKey(key).setValue(value); - fileBuilder.addPartitionColumns(pcBuilder.build()); - }); + if (partitionColumns != null && partitionColumns.size() == paths.size()) { + Map partitionColumn = partitionColumns.get(i); + if (!partitionColumn.isEmpty()) { + partitionColumn.forEach( + (key, value) -> { + ReadRel.LocalFiles.FileOrFiles.partitionColumn.Builder pcBuilder = + ReadRel.LocalFiles.FileOrFiles.partitionColumn.newBuilder(); + pcBuilder.setKey(key).setValue(value); + fileBuilder.addPartitionColumns(pcBuilder.build()); + }); + } } fileBuilder.setLength(lengths.get(i)); fileBuilder.setStart(starts.get(i)); diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index efb793839f6a..c9573f57adf2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -37,7 +37,8 @@ trait IteratorApi { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo + properties: Map[String, String], + dataSchema: StructType): SplitInfo def genSplitInfoForPartitions( partitionIndex: Int, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index b5d3c03fc6ef..bb88b45a2879 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -74,8 +74,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource _, getPartitionSchema, fileFormat, - getMetadataColumns().map(_.name), - getProperties)) + getMetadataColumns.map(_.name), + getProperties, + getDataSchema)) } val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( @@ -94,6 +95,14 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource case _ => } + val dataSchemaValidateResult = BackendsApiManager.getValidatorApiInstance + .doSchemaValidate(this.getDataSchema) + .map(ValidationResult.failed(_)) + .getOrElse(ValidationResult.succeeded) + if (!dataSchemaValidateResult.ok()) { + return dataSchemaValidateResult + } + val validationResult = BackendsApiManager.getSettings .validateScanExec( fileFormat, diff --git a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index e82acda34078..2b26b80b6055 100644 --- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -375,6 +375,7 @@ object GlutenConfig { val PARQUET_GZIP_WINDOW_SIZE: String = "parquet.gzip.windowSize" // Hadoop config val HADOOP_PREFIX = "spark.hadoop." + val ORC_FORCE_POSITIONAL_EVOLUTION = HADOOP_PREFIX + "orc.force.positional.evolution" // S3 config val S3A_PREFIX = "fs.s3a." @@ -466,6 +467,7 @@ object GlutenConfig { "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems", "spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits", "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits", + ORC_FORCE_POSITIONAL_EVOLUTION, // s3 config SPARK_S3_ACCESS_KEY, SPARK_S3_SECRET_KEY,