From 0d442bf554ce6aae44dd5b004c78df7f18a7dc73 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Wed, 27 Nov 2024 10:28:31 +0800 Subject: [PATCH] Support read Iceberg equality delete file MOR table --- ...VeloxIcebergMOREqualityDeletionSuite.scala | 78 +++++++++++++++++++ .../backendsapi/velox/VeloxBackend.scala | 3 +- .../compute/iceberg/IcebergPlanConverter.cc | 11 ++- ep/build-velox/src/get-velox.sh | 2 +- .../gluten/execution/IcebergSuite.scala | 4 +- 5 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergMOREqualityDeletionSuite.scala diff --git a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergMOREqualityDeletionSuite.scala b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergMOREqualityDeletionSuite.scala new file mode 100644 index 000000000000..6400e57725da --- /dev/null +++ b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergMOREqualityDeletionSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.enhanced + +import org.apache.gluten.execution.{IcebergScanTransformer, WholeStageTransformerSuite} +import org.apache.gluten.tags.EnhancedFeaturesTest + +import org.apache.spark.SparkConf + +import org.apache.commons.io.FileUtils + +import java.io.File + +@EnhancedFeaturesTest +class VeloxIcebergMOREqualityDeletionSuite extends WholeStageTransformerSuite { + protected val rootPath: String = getClass.getResource("/").getPath + override protected val resourcePath: String = "" + override protected val fileFormat: String = "parquet" + + protected val ICEBERG_GENERATED_PATH = "junit6640909127060857423" + protected val ICEBERG_WAREHOUSE = s"/tmp/$ICEBERG_GENERATED_PATH" + protected val equalityDeletedDataPath: String = + "../../../../gluten-iceberg/src/test/resources" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.shuffle.partitions", "2") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") + .set("spark.sql.catalog.local.type", "hadoop") + .set("spark.sql.catalog.local.warehouse", s"file://$ICEBERG_WAREHOUSE/default") + } + + override def beforeAll(): Unit = { + super.beforeAll() + + val icebergPathDir = new File(ICEBERG_WAREHOUSE) + if (icebergPathDir.exists()) { + FileUtils.forceDelete(icebergPathDir) + } + FileUtils.forceMkdir(icebergPathDir) + val equalityDeletedData = + new File(s"$rootPath/$equalityDeletedDataPath/$ICEBERG_GENERATED_PATH") + FileUtils.copyDirectory(equalityDeletedData, icebergPathDir) + } + + test("iceberg read mor table with equality deletion") { + // The table 'test_upsert_query' was generated by Flink + Iceberg from the iceberg ut, + // the root path must be the '/tmp/junit6640909127060857423/default' + val testTableName = "local.db.test_upsert_query" + runQueryAndCompare(s""" + |select * from $testTableName; + |""".stripMargin) { + checkGlutenPlan[IcebergScanTransformer] + } + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 91f20dbc77a2..70b859e39478 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -573,7 +573,8 @@ object VeloxBackendSettings extends BackendSettingsApi { override def needPreComputeRangeFrameBoundary(): Boolean = true - override def supportIcebergEqualityDeleteRead(): Boolean = false + override def supportIcebergEqualityDeleteRead(): Boolean = + VeloxConfig.get.enableEnhancedFeatures() override def reorderColumnsForPartitionWrite(): Boolean = true diff --git a/cpp/velox/compute/iceberg/IcebergPlanConverter.cc b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc index 07c40e6e1c7b..00ce40a8a1b1 100644 --- a/cpp/velox/compute/iceberg/IcebergPlanConverter.cc +++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc @@ -66,11 +66,16 @@ std::shared_ptr IcebergPlanConverter::parseIcebergSplitInfo( fileContent = FileContent::kEqualityDeletes; break; default: - fileContent = FileContent::kData; - break; + VELOX_NYI("Unsupported Iceberg delete file content."); } + std::vector equalityFieldIds(deleteFile.equalityfieldids().begin(), deleteFile.equalityfieldids().end()); deletes.emplace_back(IcebergDeleteFile( - fileContent, deleteFile.filepath(), format, deleteFile.recordcount(), deleteFile.filesize())); + fileContent, + deleteFile.filepath(), + format, + deleteFile.recordcount(), + deleteFile.filesize(), + equalityFieldIds)); } icebergSplitInfo->deleteFilesVec.emplace_back(deletes); } else { diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 39628132f01e..c02ac839c953 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -22,7 +22,7 @@ VELOX_BRANCH=dft-2025_12_09-1 VELOX_ENHANCED_BRANCH=ibm-2025_12_09-1 VELOX_HOME="" RUN_SETUP_SCRIPT=ON -ENABLE_ENHANCED_FEATURES=OFF +ENABLE_ENHANCED_FEATURES=ON # Developer use only for testing Velox PR. UPSTREAM_VELOX_PR_ID="" diff --git a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala index 9b5dd3ddbe4e..177f9a48b5ad 100644 --- a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.Row abstract class IcebergSuite extends WholeStageTransformerSuite { protected val rootPath: String = getClass.getResource("/").getPath // FIXME: This folder doesn't exist in module gluten-iceberg so should be provided by - // backend modules that rely on this suite. + // backend modules that rely on this suite. override protected val resourcePath: String = "/tpch-data-parquet" override protected val fileFormat: String = "parquet" @@ -39,7 +39,7 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.spark_catalog.type", "hadoop") - .set("spark.sql.catalog.spark_catalog.warehouse", s"file://$rootPath/tpch-data-iceberg-velox") + .set("spark.sql.catalog.spark_catalog.warehouse", s"file://$rootPath/tpch-data-iceberg") } test("iceberg transformer exists") {