From 4fc0f6243d4930033959ead044e6f823850d7adf Mon Sep 17 00:00:00 2001 From: Lingfeng Zhang Date: Fri, 28 Feb 2025 17:09:29 +0800 Subject: [PATCH 1/6] Passthrough table schema from scan transformer to velox --- .../clickhouse/CHIteratorApi.scala | 3 +- .../backendsapi/velox/VeloxIteratorApi.scala | 45 ++++++++++--------- cpp/velox/compute/VeloxBackend.cc | 5 +++ cpp/velox/compute/VeloxPlanConverter.cc | 14 ++++++ cpp/velox/substrait/SubstraitToVeloxPlan.cc | 16 ++++++- cpp/velox/substrait/SubstraitToVeloxPlan.h | 6 +++ .../gluten/substrait/rel/LocalFilesNode.java | 28 ++++++++---- .../gluten/backendsapi/IteratorApi.scala | 3 +- .../execution/BasicScanExecTransformer.scala | 5 ++- 9 files changed, 90 insertions(+), 35 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 308b3fa4f5c7..51b5f65dedcf 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -130,7 +130,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo = { + properties: Map[String, String], + dataSchema: StructType): SplitInfo = { partition match { case p: GlutenMergeTreePartition => ExtensionTableBuilder diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 1be7b8d7356d..6a01caad4e63 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -55,7 +55,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo = { + properties: Map[String, String], + dataSchema: StructType): SplitInfo = { partition match { case f: FilePartition => val ( @@ -69,7 +70,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { constructSplitInfo(partitionSchema, f.files, metadataColumnNames) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) - LocalFilesBuilder.makeLocalFiles( + val localFile = LocalFilesBuilder.makeLocalFiles( f.index, paths, starts, @@ -82,6 +83,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { preferredLocations.toList.asJava, mapAsJavaMap(properties) ) + localFile.setFileSchema(dataSchema) + localFile case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") } @@ -168,26 +171,28 @@ class VeloxIteratorApi extends IteratorApi with Logging { SparkShimLoader.getSparkShims.generateMetadataColumns(file, metadataColumnNames) metadataColumns.add(metadataColumn) val partitionColumn = new JHashMap[String, String]() - for (i <- 0 until file.partitionValues.numFields) { - val partitionColumnValue = if (file.partitionValues.isNullAt(i)) { - ExternalCatalogUtils.DEFAULT_PARTITION_NAME - } else { - val pn = file.partitionValues.get(i, schema.fields(i).dataType) - schema.fields(i).dataType match { - case _: BinaryType => - new String(pn.asInstanceOf[Array[Byte]], StandardCharsets.UTF_8) - case _: DateType => - DateFormatter.apply().format(pn.asInstanceOf[Integer]) - case _: DecimalType => - pn.asInstanceOf[Decimal].toJavaBigInteger.toString - case _: TimestampType => - TimestampFormatter - .getFractionFormatter(ZoneOffset.UTC) - .format(pn.asInstanceOf[java.lang.Long]) - case _ => pn.toString + if (file.partitionValues != null) { + for (i <- 0 until file.partitionValues.numFields) { + val partitionColumnValue = if (file.partitionValues.isNullAt(i)) { + ExternalCatalogUtils.DEFAULT_PARTITION_NAME + } else { + val pn = file.partitionValues.get(i, schema.fields(i).dataType) + schema.fields(i).dataType match { + case _: BinaryType => + new String(pn.asInstanceOf[Array[Byte]], StandardCharsets.UTF_8) + case _: DateType => + DateFormatter.apply().format(pn.asInstanceOf[Integer]) + case _: DecimalType => + pn.asInstanceOf[Decimal].toJavaBigInteger.toString + case _: TimestampType => + TimestampFormatter + .getFractionFormatter(ZoneOffset.UTC) + .format(pn.asInstanceOf[java.lang.Long]) + case _ => pn.toString + } } + partitionColumn.put(schema.names(i), partitionColumnValue) } - partitionColumn.put(schema.names(i), partitionColumnValue) } partitionColumns.add(partitionColumn) } diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index e808006983da..0c33d4a6bafa 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -272,6 +272,11 @@ void VeloxBackend::initConnector() { connectorConfMap[velox::connector::hive::HiveConfig::kFilePreloadThreshold] = backendConf_->get(kFilePreloadThreshold, "1048576"); // 1M + // Map table schema to file schema using name + connectorConfMap[velox::connector::hive::HiveConfig::kParquetUseColumnNames] = "true"; + connectorConfMap[velox::connector::hive::HiveConfig::kOrcUseColumnNames] = "true"; + connectorConfMap[velox::connector::hive::HiveConfig::kOrcUseNestedColumnNames] = "true"; + // read as UTC connectorConfMap[velox::connector::hive::HiveConfig::kReadTimestampPartitionValueAsLocalTime] = "false"; diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index ed2545c78114..6b4b3b470a20 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -51,6 +51,10 @@ std::shared_ptr parseScanSplitInfo( splitInfo->partitionColumns.reserve(fileList.size()); splitInfo->properties.reserve(fileList.size()); splitInfo->metadataColumns.reserve(fileList.size()); + + std::vector colNames; + std::vector veloxTypes; + for (const auto& file : fileList) { // Expect all Partitions share the same index. splitInfo->partitionIndex = file.partition_index(); @@ -71,6 +75,16 @@ std::shared_ptr parseScanSplitInfo( splitInfo->starts.emplace_back(file.start()); splitInfo->lengths.emplace_back(file.length()); + if (colNames.empty() && file.has_schema()) { + const auto& tableSchema = file.schema(); + colNames.reserve(tableSchema.names().size()); + for (const auto& name : tableSchema.names()) { + colNames.emplace_back(name); + } + veloxTypes = SubstraitParser::parseNamedStruct(tableSchema); + } + splitInfo->fileSchema = ROW(std::move(colNames), std::move(veloxTypes)); + facebook::velox::FileProperties fileProps; if (file.has_properties()) { fileProps.fileSize = file.properties().filesize(); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 4e0e36cefb0b..7b50171f9c53 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1284,7 +1284,13 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: std::shared_ptr tableHandle; if (!readRel.has_filter()) { tableHandle = std::make_shared( - kHiveConnectorId, "hive_table", filterPushdownEnabled, common::SubfieldFilters{}, nullptr); + kHiveConnectorId, + "hive_table", + filterPushdownEnabled, + common::SubfieldFilters{}, + nullptr, + splitInfo->fileSchema, + splitInfo->properties); } else { common::SubfieldFilters subfieldFilters; auto names = colNameList; @@ -1292,7 +1298,13 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: auto remainingFilter = exprConverter_->toVeloxExpr(readRel.filter(), ROW(std::move(names), std::move(types))); tableHandle = std::make_shared( - kHiveConnectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter); + kHiveConnectorId, + "hive_table", + filterPushdownEnabled, + std::move(subfieldFilters), + remainingFilter, + splitInfo->fileSchema, + splitInfo->properties); } // Get assignments and out names. diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 9636f6615f96..4e180aba4896 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -55,6 +55,12 @@ struct SplitInfo { /// The file sizes and modification times of the files to be scanned. std::vector> properties; + /// The file read options + std::unordered_map readOptions; + + /// The file schema + RowTypePtr fileSchema; + /// Make SplitInfo polymorphic virtual ~SplitInfo() = default; }; diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index bebc6b1e95ef..92d6263b731f 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -151,15 +151,17 @@ public ReadRel.LocalFiles toProtobuf() { if (index != null) { fileBuilder.setPartitionIndex(index); } - Map partitionColumn = partitionColumns.get(i); - if (!partitionColumn.isEmpty()) { - partitionColumn.forEach( - (key, value) -> { - ReadRel.LocalFiles.FileOrFiles.partitionColumn.Builder pcBuilder = - ReadRel.LocalFiles.FileOrFiles.partitionColumn.newBuilder(); - pcBuilder.setKey(key).setValue(value); - fileBuilder.addPartitionColumns(pcBuilder.build()); - }); + if (partitionColumns != null && partitionColumns.size() == paths.size()) { + Map partitionColumn = partitionColumns.get(i); + if (!partitionColumn.isEmpty()) { + partitionColumn.forEach( + (key, value) -> { + ReadRel.LocalFiles.FileOrFiles.partitionColumn.Builder pcBuilder = + ReadRel.LocalFiles.FileOrFiles.partitionColumn.newBuilder(); + pcBuilder.setKey(key).setValue(value); + fileBuilder.addPartitionColumns(pcBuilder.build()); + }); + } } fileBuilder.setLength(lengths.get(i)); fileBuilder.setStart(starts.get(i)); @@ -238,6 +240,14 @@ public ReadRel.LocalFiles toProtobuf() { .build(); fileBuilder.setJson(jsonReadOptions); break; + case HidiReadFormat: + String compatValues = fileReadProperties.getOrDefault("compact_values", "true"); + ReadRel.LocalFiles.FileOrFiles.HidiReadOptions hidiReadOptions = + ReadRel.LocalFiles.FileOrFiles.HidiReadOptions.newBuilder() + .setCompactValues(compatValues) + .build(); + fileBuilder.setHidi(hidiReadOptions); + break; default: break; } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index efb793839f6a..c9573f57adf2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -37,7 +37,8 @@ trait IteratorApi { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String]): SplitInfo + properties: Map[String, String], + dataSchema: StructType): SplitInfo def genSplitInfoForPartitions( partitionIndex: Int, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index b5d3c03fc6ef..4a09b429d5af 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -74,8 +74,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource _, getPartitionSchema, fileFormat, - getMetadataColumns().map(_.name), - getProperties)) + getMetadataColumns.map(_.name), + getProperties, + getDataSchema)) } val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( From 34a95c1a37cd1b2be538c2a3511d1e203cb2ed16 Mon Sep 17 00:00:00 2001 From: Lingfeng Zhang Date: Fri, 28 Feb 2025 19:33:35 +0800 Subject: [PATCH 2/6] Add ut for schema evolution --- .../VeloxOrcSchemaEvolutionSuite.scala | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcSchemaEvolutionSuite.scala diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcSchemaEvolutionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcSchemaEvolutionSuite.scala new file mode 100644 index 000000000000..81a9d2f4aa6a --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcSchemaEvolutionSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class VeloxOrcSchemaEvolutionSuite extends VeloxWholeStageTransformerSuite { + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + import testImplicits._ + + test("read ORC with column names all starting with '_col'") { + withTempPath { + tmp => + val df = Seq((1, 2, 3), (4, 5, 6), (7, 8, 9)).toDF("_col0", "_col1", "_col2") + df.write.format("orc").save(s"file://${tmp.getCanonicalPath}") + + withTempView("test") { + spark.read + .format("orc") + .schema( + StructType( + Array( + StructField("a", IntegerType, nullable = true), + StructField("b", IntegerType, nullable = true), + StructField("c", IntegerType, nullable = true) + ))) + .load(s"file://${tmp.getCanonicalPath}") + .createOrReplaceTempView("test") + + runQueryAndCompare("select a, b, c from test") { + df => + checkAnswer( + df, + Row(1, 2, 3) :: + Row(4, 5, 6) :: + Row(7, 8, 9) :: + Nil) + } + } + } + } +} From d99b46361dabde7fa69511b369cfd07272fae71a Mon Sep 17 00:00:00 2001 From: Lingfeng Zhang Date: Mon, 3 Mar 2025 14:12:33 +0800 Subject: [PATCH 3/6] Set UseColumnNames --- cpp/velox/compute/WholeStageResultIterator.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 55d07a76b45c..b3769c98705d 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -597,6 +597,8 @@ std::shared_ptr WholeStageResultIterator::createConne std::to_string(veloxCfg_->get(kMaxPartitions, 10000)); configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] = std::to_string(veloxCfg_->get(kIgnoreMissingFiles, false)); + configs[velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession] = "true"; + configs[velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] = "true"; return std::make_shared(std::move(configs)); } From 77bd5be0c8f41dac3ef57b4ca2aafeff4309e658 Mon Sep 17 00:00:00 2001 From: Lingfeng Zhang Date: Fri, 28 Feb 2025 19:18:49 +0800 Subject: [PATCH 4/6] [DNM] Use velox ccat3z/feat/old-orc-oap --- ep/build-velox/src/get_velox.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 5452b0363ff3..ffc8d85927a7 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -16,8 +16,8 @@ set -exu -VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2025_02_22 +VELOX_REPO=https://github.com/ccat3z/velox.git +VELOX_BRANCH=feat/old-orc-oap VELOX_HOME="" OS=`uname -s` From 3c9aff68075eeee7b154bdf907fd9a5b88a015bb Mon Sep 17 00:00:00 2001 From: Lingfeng Zhang Date: Fri, 21 Feb 2025 10:10:18 +0800 Subject: [PATCH 5/6] Support orc.force.positional.evolution --- .../VeloxOrcForcePositionalSuite.scala | 234 ++++++++++++++++++ cpp/velox/compute/WholeStageResultIterator.cc | 6 +- cpp/velox/config/VeloxConfig.h | 1 + .../org/apache/gluten/GlutenPlugin.scala | 6 + .../apache/gluten/config/GlutenConfig.scala | 2 + 5 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcForcePositionalSuite.scala diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcForcePositionalSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcForcePositionalSuite.scala new file mode 100644 index 000000000000..2bd2ff64d0c2 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxOrcForcePositionalSuite.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + +class VeloxOrcForcePositionalSuite extends VeloxWholeStageTransformerSuite { + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = + super.sparkConf + .set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0") + .set("spark.gluten.sql.columnar.backend.velox.glogVerboseLevel", "1") + .set("spark.executorEnv.GLOG_v", "1") + .set("spark.gluten.sql.debug", "true") + + import testImplicits._ + + def withForcePositional(f: => Unit): Unit = { + withSQLConf( + GlutenConfig.ORC_FORCE_POSITIONAL_EVOLUTION -> "true", + "orc.force.positional.evolution" -> "true" + ) { + f + } + } + + def testRenameRootColumn(customCheck: DataFrame => Unit): Unit = { + withTempPath { + tmp => + val path = tmp.getCanonicalPath + val df1 = Seq((1, 2), (4, 5), (8, 9)).toDF("col1", "col2") + val df2 = Seq((10, 12), (15, 19), (22, 40)).toDF("col1", "col3") + + val dir1 = s"file://$path/part=one" + val dir2 = s"file://$path/part=two" + + df1.write.format("orc").save(dir1) + df2.write.format("orc").save(dir2) + + spark.read + .schema(df2.schema) + .format("orc") + .load(s"file://$path") + .createOrReplaceTempView("test") + + runQueryAndCompare("select * from test")(customCheck) + } + } + + test("rename root columns") { + testRenameRootColumn { + df => + checkAnswer( + df, + Seq( + Row(1, null, "one"), + Row(4, null, "one"), + Row(8, null, "one"), + Row(10, 12, "two"), + Row(15, 19, "two"), + Row(22, 40, "two") + )) + } + } + + test("rename root columns - top level column should be replaced by index") { + withForcePositional { + testRenameRootColumn { + df => + checkAnswer( + df, + Seq( + Row(1, 2, "one"), + Row(4, 5, "one"), + Row(8, 9, "one"), + Row(10, 12, "two"), + Row(15, 19, "two"), + Row(22, 40, "two") + ) + ) + } + } + } + + def testRenameNestedColumn(customCheck: DataFrame => Unit): Unit = { + withTempPath { + tmp => + val path = tmp.getCanonicalPath + val df1 = spark.createDataFrame( + spark.sparkContext.parallelize(Row(1, Row("abc", 2)) :: Nil), + schema = StructType( + Array( + StructField("col1", IntegerType, nullable = true), + StructField( + "col2", + StructType( + Array( + StructField("a", StringType, nullable = true), + StructField("b", IntegerType, nullable = true) + ))) + )) + ) + val df2 = spark.createDataFrame( + spark.sparkContext.parallelize(Row(20, Row("EFG", 10)) :: Nil), + schema = StructType( + Array( + StructField("col1", IntegerType, nullable = true), + StructField( + "col2", + StructType( + Array( + StructField("a", StringType, nullable = true), + StructField("c", IntegerType, nullable = true) + ))) + )) + ) + + df1.write.mode("overwrite").format("orc").save(s"file://$path/part=one") + df2.write.mode("overwrite").format("orc").save(s"file://$path/part=two") + + spark.read + .schema(df2.schema) + .format("orc") + .load(s"file://$path") + .createOrReplaceTempView("test") + + runQueryAndCompare("select * from test")(customCheck) + } + } + + test("rename nested columns") { + testRenameNestedColumn { + df => + checkAnswer( + df, + Row(1, Row("abc", null), "one") :: Row(20, Row("EFG", 10), "two") :: Nil + ) + } + } + + test("rename nested columns - only top level column should be replaced") { + withForcePositional { + testRenameNestedColumn { + df => + checkAnswer( + df, + Row(1, Row("abc", null), "one") :: Row(20, Row("EFG", 10), "two") :: Nil + ) + } + } + } + + def testPruneNestedSchema(customCheck: DataFrame => Unit): Unit = { + withTempPath { + tmp => + val path = tmp.getCanonicalPath + + val df1 = spark.createDataFrame( + spark.sparkContext.parallelize(Row(1, 5, Row("abc", 2)) :: Nil), + schema = StructType( + Array( + StructField("col1", IntegerType, nullable = true), + StructField("col2", IntegerType, nullable = true), + StructField( + "col3", + StructType( + Array( + StructField("a", StringType, nullable = true), + StructField("b", IntegerType, nullable = true) + ))) + )) + ) + df1.write.format("orc").save(s"file://$path") + + spark.read + .format("orc") + .schema(StructType(Array( + StructField("col1", IntegerType, nullable = true), + StructField("col2", IntegerType, nullable = true), + StructField( + "col3", + StructType(Array( + StructField("b", IntegerType, nullable = true) + ))) + ))) + .load(s"file://$path") + .createOrReplaceTempView("test") + + runQueryAndCompare("select * from test")(customCheck) + } + } + + test("prune nested schema") { + testPruneNestedSchema { + df => + checkAnswer( + df, + Row(1, 5, Row(2)) :: Nil + ) + } + } + + test("prune nested schema - only top level column should be replaced") { + withForcePositional { + testPruneNestedSchema { + df => + checkAnswer( + df, + Row(1, 5, Row(2)) :: Nil + ) + } + } + } +} diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index b3769c98705d..dd5406e92450 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -598,7 +598,11 @@ std::shared_ptr WholeStageResultIterator::createConne configs[velox::connector::hive::HiveConfig::kIgnoreMissingFilesSession] = std::to_string(veloxCfg_->get(kIgnoreMissingFiles, false)); configs[velox::connector::hive::HiveConfig::kParquetUseColumnNamesSession] = "true"; - configs[velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] = "true"; + if (veloxCfg_->get(kOrcForcePositionalEvolution, false)) { + configs[velox::connector::hive::HiveConfig::kOrcUseNestedColumnNamesSession] = "true"; + } else { + configs[velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] = "true"; + } return std::make_shared(std::move(configs)); } diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index dfe471a68cc7..5c67f77bca1a 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -127,6 +127,7 @@ const std::string kLoadQuantum = "spark.gluten.sql.columnar.backend.velox.loadQu const std::string kMaxCoalescedDistance = "spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance"; const std::string kMaxCoalescedBytes = "spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes"; const std::string kCachePrefetchMinPct = "spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct"; +const std::string kOrcForcePositionalEvolution = "spark.hadoop.orc.force.positional.evolution"; // write fies const std::string kMaxPartitions = "spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession"; diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index e886e0a517a5..e2f773c8fd2b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -36,6 +36,8 @@ import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS import org.apache.spark.task.TaskResources import org.apache.spark.util.SparkResourceUtil +import org.apache.orc.OrcConf + import java.util import java.util.Collections @@ -259,6 +261,10 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false") conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key, "false") } + + conf.set( + GlutenConfig.ORC_FORCE_POSITIONAL_EVOLUTION, + conf.get(OrcConf.FORCE_POSITIONAL_EVOLUTION.getAttribute, "false")) } } diff --git a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index e82acda34078..2b26b80b6055 100644 --- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -375,6 +375,7 @@ object GlutenConfig { val PARQUET_GZIP_WINDOW_SIZE: String = "parquet.gzip.windowSize" // Hadoop config val HADOOP_PREFIX = "spark.hadoop." + val ORC_FORCE_POSITIONAL_EVOLUTION = HADOOP_PREFIX + "orc.force.positional.evolution" // S3 config val S3A_PREFIX = "fs.s3a." @@ -466,6 +467,7 @@ object GlutenConfig { "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems", "spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits", "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits", + ORC_FORCE_POSITIONAL_EVOLUTION, // s3 config SPARK_S3_ACCESS_KEY, SPARK_S3_SECRET_KEY, From 9ba00d125517652331c96a0450546c31aae7e590 Mon Sep 17 00:00:00 2001 From: Lingfeng Zhang Date: Fri, 28 Feb 2025 17:45:30 +0800 Subject: [PATCH 6/6] [DNM] Change velox repo for test --- ep/build-velox/src/get_velox.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index ffc8d85927a7..0db7d58234c6 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -17,7 +17,7 @@ set -exu VELOX_REPO=https://github.com/ccat3z/velox.git -VELOX_BRANCH=feat/old-orc-oap +VELOX_BRANCH=feat/orc-positional-oap VELOX_HOME="" OS=`uname -s`