From 427ade0e7a8388705dcdf5c8cb9654f48e867fda Mon Sep 17 00:00:00 2001 From: syalla Date: Tue, 13 Jan 2026 20:54:50 +0000 Subject: [PATCH 1/8] Use PartitionValueExtractor interface on read Create unit test Fix the test custom partition value extractor interface --- .../org/apache/hudi/HoodieSparkUtils.scala | 33 +++++++--- .../hudi/common/table/HoodieTableConfig.java | 37 +++++++++++ .../common/table/HoodieTableMetaClient.java | 12 ++++ .../hive/sync}/PartitionValueExtractor.java | 2 +- .../org/apache/hudi/util/StreamerUtil.java | 2 + .../org/apache/hudi/DataSourceOptions.scala | 9 +++ .../org/apache/hudi/HoodieBaseRelation.scala | 11 +++- .../org/apache/hudi/HoodieFileIndex.scala | 8 +++ .../apache/hudi/HoodieSparkSqlWriter.scala | 4 ++ .../hudi/SparkHoodieTableFileIndex.scala | 8 ++- .../TestCustomParitionValueExtractor.scala | 66 +++++++++++++++++++ ...stCustomSlashPartitionValueExtractor.scala | 40 +++++++++++ .../datahub/DummyPartitionValueExtractor.java | 2 +- .../HiveStylePartitionValueExtractor.java | 2 +- .../hive/MultiPartKeysValueExtractor.java | 2 +- .../hudi/hive/NonPartitionedExtractor.java | 2 +- .../SinglePartPartitionValueExtractor.java | 2 +- ...lashEncodedDayPartitionValueExtractor.java | 2 +- ...ashEncodedHourPartitionValueExtractor.java | 2 +- .../apache/hudi/hive/ddl/HMSDDLExecutor.java | 2 +- .../hudi/hive/ddl/QueryBasedDDLExecutor.java | 2 +- .../hudi/hive/util/HivePartitionUtil.java | 2 +- .../hive/util/PartitionFilterGenerator.java | 2 +- .../hive/TestPartitionValueExtractor.java | 2 +- .../hudi/sync/common/HoodieSyncClient.java | 2 +- .../hudi/sync/common/HoodieSyncConfig.java | 38 +---------- .../sync/common/TestHoodieSyncClient.java | 2 +- .../hudi/utilities/streamer/StreamSync.java | 3 + 28 files changed, 237 insertions(+), 64 deletions(-) rename {hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model => hudi-common/src/main/java/org/apache/hudi/hive/sync}/PartitionValueExtractor.java (96%) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashPartitionValueExtractor.scala diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 08c315b0e78dd..2a7c3ec799c7c 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -23,7 +23,7 @@ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.TimestampKeyGeneratorConfig import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.schema.HoodieSchema -import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.common.util.{StringUtils, Option => HOption} import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.storage.StoragePath @@ -31,6 +31,8 @@ import org.apache.hudi.util.ExceptionWrappingIterator import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.hive.sync.PartitionValueExtractor + import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -224,11 +226,13 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi partitionPath: String, tableBasePath: StoragePath, tableSchema: StructType, - tableConfig: java.util.Map[String, String], + tableConfig: HoodieTableConfig, timeZoneId: String, - shouldValidatePartitionColumns: Boolean): Array[Object] = { + shouldValidatePartitionColumns: Boolean, + usePartitionValueExtractorOnRead: Boolean): Array[Object] = { val keyGeneratorClass = KeyGeneratorType.getKeyGeneratorClassName(tableConfig) - val timestampKeyGeneratorType = tableConfig.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()) + val timestampKeyGeneratorType = tableConfig.propsMap().get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()) + val partitionValueExtractorClass = tableConfig.getPartitionValueExtractorClass if (null != keyGeneratorClass && null != timestampKeyGeneratorType @@ -238,10 +242,23 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi // we couldn't reconstruct initial partition column values from partition paths due to lost data after formatting. // But the output for these cases is in a string format, so we can pass partitionPath as UTF8String Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath)) + } else if(usePartitionValueExtractorOnRead && !StringUtils.isNullOrEmpty(partitionValueExtractorClass)) { + try { + val partitionValueExtractor = Class.forName(partitionValueExtractorClass) + .asInstanceOf[PartitionValueExtractor] + val partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partitionPath).asScala.toArray + val partitionSchema = buildPartitionSchemaForNestedFields(tableSchema, partitionColumns) + val typedValues = partitionValues.zip(partitionSchema.fields).map { case (stringValue, field) => + castStringToType(stringValue, field.dataType) + } + typedValues.map(_.asInstanceOf[Object]) + } catch { + case e: Exception => + throw new RuntimeException(s"Failed to extract partition value using $partitionValueExtractorClass class", e) + } } else { doParsePartitionColumnValues(partitionColumns, partitionPath, tableBasePath, tableSchema, timeZoneId, - shouldValidatePartitionColumns, tableConfig.getOrDefault(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key, - HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue).toBoolean) + shouldValidatePartitionColumns, tableConfig.getSlashSeparatedDatePartitioning) } } @@ -336,7 +353,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi ).toSeq(partitionSchema) } - private def buildPartitionSchemaForNestedFields(schema: StructType, partitionColumns: Array[String]): StructType = { + def buildPartitionSchemaForNestedFields(schema: StructType, partitionColumns: Array[String]): StructType = { val partitionFields = partitionColumns.flatMap { partitionCol => extractNestedField(schema, partitionCol) } @@ -364,7 +381,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi traverseSchema(schema, pathParts.toList, fieldPath) } - private def castStringToType(value: String, dataType: org.apache.spark.sql.types.DataType): Any = { + def castStringToType(value: String, dataType: org.apache.spark.sql.types.DataType): Any = { import org.apache.spark.sql.types._ // handling cases where the value contains path separators or is complex diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 3de45e1754ee6..21db6dcc7c586 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -335,6 +335,39 @@ public static final String getDefaultPayloadClassName() { .sinceVersion("1.0.0") .withDocumentation("Key Generator type to determine key generator class"); + public static final boolean DEFAULT_USE_PARTITION_VALUE_EXTRACTOR_FOR_READERS = false; + + public static final ConfigProperty PARTITION_VALUE_EXTRACTOR_CLASS = ConfigProperty + .key("hoodie.datasource.hive_sync.partition_extractor_class") + .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor") + .withInferFunction(cfg -> { + Option partitionFieldsOpt = HoodieTableConfig.getPartitionFieldProp(cfg) + .or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))); + + if (!partitionFieldsOpt.isPresent()) { + return Option.empty(); + } + String partitionFields = partitionFieldsOpt.get(); + if (StringUtils.nonEmpty(partitionFields)) { + int numOfPartFields = partitionFields.split(",").length; + if (numOfPartFields == 1) { + if (cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()) + && cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()).equals("true")) { + return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor"); + } else { + return Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor"); + } + } else { + return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor"); + } + } else { + return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); + } + }) + .markAdvanced() + .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, " + + "default 'org.apache.hudi.hive.MultiPartKeysValueExtractor'."); + // TODO: this has to be UTC. why is it not the default? public static final ConfigProperty TIMELINE_TIMEZONE = ConfigProperty .key("hoodie.table.timeline.timezone") @@ -1208,6 +1241,10 @@ public String getKeyGeneratorClassName() { return KeyGeneratorType.getKeyGeneratorClassName(this); } + public String getPartitionValueExtractorClass() { + return getStringOrDefault(PARTITION_VALUE_EXTRACTOR_CLASS); + } + public HoodieTimelineTimeZone getTimelineTimezone() { return HoodieTimelineTimeZone.valueOf(getStringOrDefault(TIMELINE_TIMEZONE)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 6ddd459e3d6b8..ceb7fcedf9e25 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -1070,6 +1070,7 @@ public static class TableBuilder { private Boolean bootstrapIndexEnable; private Boolean populateMetaFields; private String keyGeneratorClassProp; + private String partitionValueExtractorClass; private String keyGeneratorType; private Boolean slashSeparatedDatePartitioning; private Boolean hiveStylePartitioningEnable; @@ -1243,6 +1244,11 @@ public TableBuilder setSlashSeparatedDatePartitioning(Boolean slashSeparatedDate return this; } + public TableBuilder setPartitionValueExtractorClass(String partitionValueExtractorClass) { + this.partitionValueExtractorClass = partitionValueExtractorClass; + return this; + } + public TableBuilder setHiveStylePartitioningEnable(Boolean hiveStylePartitioningEnable) { this.hiveStylePartitioningEnable = hiveStylePartitioningEnable; return this; @@ -1445,6 +1451,9 @@ public TableBuilder fromProperties(Properties properties) { if (hoodieConfig.contains(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING)) { setSlashSeparatedDatePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING)); } + if (hoodieConfig.contains(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS)) { + setPartitionValueExtractorClass(hoodieConfig.getString(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS)); + } if (hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) { setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)); } @@ -1588,6 +1597,9 @@ public Properties build() { if (null != slashSeparatedDatePartitioning) { tableConfig.setValue(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING, Boolean.toString(slashSeparatedDatePartitioning)); } + if (null != partitionValueExtractorClass) { + tableConfig.setValue(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS, partitionValueExtractorClass); + } if (null != hiveStylePartitioningEnable) { tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, Boolean.toString(hiveStylePartitioningEnable)); } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java b/hudi-common/src/main/java/org/apache/hudi/hive/sync/PartitionValueExtractor.java similarity index 96% rename from hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java rename to hudi-common/src/main/java/org/apache/hudi/hive/sync/PartitionValueExtractor.java index 60d080a0ffdd8..d3ebab77225d0 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java +++ b/hudi-common/src/main/java/org/apache/hudi/hive/sync/PartitionValueExtractor.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.sync.common.model; +package org.apache.hudi.hive.sync; import java.io.Serializable; import java.util.List; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index ecf7b73e312dd..abb1cd4a64d3c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -314,6 +314,8 @@ public static HoodieTableMetaClient initTableIfNotExists( .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD.key(), null)) .setKeyGeneratorClassProp( conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName())) + .setPartitionValueExtractorClass(conf.getString(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), + HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.defaultValue())) .setHiveStylePartitioningEnable(conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(conf.get(FlinkOptions.URL_ENCODE_PARTITIONING)) .setCDCEnabled(conf.get(FlinkOptions.CDC_ENABLED)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 237afaf4379f0..86423073c5c3b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -278,6 +278,15 @@ object DataSourceReadOptions { .sinceVersion("1.1.0") .withDocumentation("Fully qualified class name of the catalog that is used by the Polaris spark client.") + val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class") + .defaultValue("false") + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("This config helps whether PartitionValueExtractor interface can be used" + + " for parsing partition values from partition path. When this config is enabled, it uses" + + " PartitionValueExtractor class value stored in the hoodie.properties file.") + /** @deprecated Use {@link QUERY_TYPE} and its methods instead */ @Deprecated val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 56280da8a9d7d..2af9bcfb66796 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -477,6 +477,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, getPartitionColumnsAsInternalRowInternal(file, metaClient.getBasePath, extractPartitionValuesFromPartitionPath = true) + protected def usePartitionValueExtractorOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = { + optParams.getOrElse(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue.toString).toBoolean || + ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession) + } + protected def getPartitionColumnsAsInternalRowInternal(file: StoragePathInfo, basePath: StoragePath, extractPartitionValuesFromPartitionPath: Boolean): InternalRow = { if (extractPartitionValuesFromPartitionPath) { @@ -489,9 +495,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, relativePath, basePath, tableStructSchema, - tableConfig.propsMap, + tableConfig, timeZoneId, - conf.getBoolean("spark.sql.sources.validatePartitionColumns", true)) + conf.getBoolean("spark.sql.sources.validatePartitionColumns", true), + usePartitionValueExtractorOnRead(optParams, sparkSession)) if(rowValues.length != partitionColumns.length) { throw new HoodieException("Failed to get partition column values from the partition-path:" + s"partition column size: ${partitionColumns.length}, parsed partition value size: ${rowValues.length}") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 9658fd451ec81..eb1bacbb8f9ec 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -527,9 +527,17 @@ object HoodieFileIndex extends Logging { properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride) } + val usePartitionValueExtractorForReaders = getConfigValue(options, sqlConf, + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, null) + if (usePartitionValueExtractorForReaders != null) { + properties.setProperty(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, + usePartitionValueExtractorForReaders) + } + if (tableConfig != null) { properties.setProperty(RECORDKEY_FIELD.key, tableConfig.getRecordKeyFields.orElse(Array.empty).mkString(",")) properties.setProperty(PARTITIONPATH_FIELD.key, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).orElse("")) + properties.setProperty(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), tableConfig.getPartitionValueExtractorClass) // for simple bucket index, we need to set the INDEX_TYPE, BUCKET_INDEX_HASH_FIELD, BUCKET_INDEX_NUM_BUCKETS val database = getDatabaseName(tableConfig, spark.catalog.currentDatabase) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f7db780e504ce..e037b46da5251 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -310,6 +310,8 @@ class HoodieSparkSqlWriterInternal { .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setKeyGeneratorClassProp(keyGenProp) + .setPartitionValueExtractorClass(hoodieConfig.getStringOrDefault(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), + HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.defaultValue())) .set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]]) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) @@ -769,6 +771,8 @@ class HoodieSparkSqlWriterInternal { .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setPopulateMetaFields(populateMetaFields) .setKeyGeneratorClassProp(keyGenProp) + .setPartitionValueExtractorClass(hoodieConfig.getStringOrDefault(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), + HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.defaultValue())) .set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]]) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 567053c618a71..7fd77914ff6f8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -118,6 +118,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession, protected lazy val shouldFastBootstrap = configProperties.getBoolean(DATA_QUERIES_ONLY.key, false) + protected lazy val usePartitionValueExtractorOnRead = configProperties.getBoolean( + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key(), false) + /** * Get the partition schema from the hoodie.properties. */ @@ -432,9 +435,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession, partitionPath, getBasePath, schema, - metaClient.getTableConfig.propsMap, + metaClient.getTableConfig, configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone), - shouldValidatePartitionColumns(spark)) + shouldValidatePartitionColumns(spark), + usePartitionValueExtractorOnRead) } private def arePartitionPathsUrlEncoded: Boolean = diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala new file mode 100644 index 0000000000000..f2177e26cbd1b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.common + +import org.apache.hudi.sync.common.HoodieSyncConfig + +class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { + test("Test custom partition value extractor interface") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + spark.sql( + s""" + |create table $targetTable ( + | `id` string, + | `name` string, + | `ts` bigint, + | `datestr` string, + | `country` string, + | `state` string, + | `city` string + |) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'COW', + | 'preCombineField'='ts', + | 'hoodie.datasource.write.hive_style_partitioning'='false', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor' + | ) + | partitioned by (`datestr`, `country`, `state`, `city`) + | location '$tablePath' + """.stripMargin) + // yyyy/mm/dd + spark.sql( + s""" + | insert into $targetTable values + | (1, 'a1', 1000, '2024-01-01' "USA", "CA", "SFO"), + | (2, 'a2', 2000, '2024-01-01', "USA", "CA", "LA") + """.stripMargin) + val catalogTable = spark.sessionState.catalog.externalCatalog.getTable("default", targetTable) + // catalogTable.storage. + + // check result after insert and merge data into target table + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable")( + Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA") + ) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashPartitionValueExtractor.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashPartitionValueExtractor.scala new file mode 100644 index 0000000000000..437fb7611e5e9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashPartitionValueExtractor.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hudi.common + +import org.apache.hudi.hive.sync.PartitionValueExtractor + +import java.util + +class TestCustomSlashPartitionValueExtractor extends PartitionValueExtractor { + + /** + * + * This method will extract Partition values from the partition path + * and provide date, country, state, city values separately. + * @param partitionPath PartitionPath provided will be in the format yyyy/mm/dd/country/stat/city + * @return is a List of string with values template such as yyyy-mm-dd, country, state, city + */ + override def extractPartitionValuesInPath(partitionPath: String): util.List[String] = { + val partitionSplitsSeq = partitionPath.split("/") + val date = s"${partitionSplitsSeq(0)}-${partitionSplitsSeq(1)}-${partitionSplitsSeq(2)}" + val country = partitionSplitsSeq(3) + val state = partitionSplitsSeq(4) + val city = partitionSplitsSeq(5) + java.util.Arrays.asList(date, country, state, city) + } +} diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java index 3c00e313a990e..7943d3267b705 100644 --- a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java +++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.sync.datahub; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import java.util.Collections; import java.util.List; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveStylePartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveStylePartitionValueExtractor.java index 11098698e8aeb..9291ff4d58699 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveStylePartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveStylePartitionValueExtractor.java @@ -18,7 +18,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import java.util.Collections; import java.util.List; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java index dd356638a47e6..fb42f16c50c24 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java @@ -20,7 +20,7 @@ package org.apache.hudi.hive; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import java.util.Arrays; import java.util.Collections; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java index 37b15c6a61bd0..47b7fe5a7382e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import java.util.ArrayList; import java.util.List; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SinglePartPartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SinglePartPartitionValueExtractor.java index abbccfcc53763..ae2965adfba79 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SinglePartPartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SinglePartPartitionValueExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import java.util.Collections; import java.util.List; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java index 85fda472754e7..89255d2ffafea 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import java.time.LocalDateTime; import java.time.ZoneId; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedHourPartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedHourPartitionValueExtractor.java index a30296b403d71..da23cf1b25ea6 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedHourPartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedHourPartitionValueExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import java.time.LocalDateTime; import java.time.ZoneId; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 81ba15660f395..06227a3d852e3 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -28,7 +28,7 @@ import org.apache.hudi.hive.util.HivePartitionUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.storage.StorageSchemes; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.Path; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 472bcedd328a2..c602457b6355a 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -28,7 +28,7 @@ import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.storage.StorageSchemes; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.Path; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java index 3e75582266dff..294964f5375e9 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.hive.metastore.IMetaStoreClient; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java index d1b934988d2f3..8bc8991d3f0dd 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java @@ -30,7 +30,7 @@ import org.apache.hudi.internal.schema.Types; import org.apache.hudi.sync.common.model.FieldSchema; import org.apache.hudi.sync.common.model.Partition; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import java.util.Arrays; import java.util.Comparator; diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestPartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestPartitionValueExtractor.java index 075542d596717..784f30bd87e79 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestPartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestPartitionValueExtractor.java @@ -18,7 +18,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import org.junit.jupiter.api.Test; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 04a6e4a10db17..35605ab1955b3 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -32,7 +32,7 @@ import org.apache.hudi.storage.StoragePath; import org.apache.hudi.sync.common.model.Partition; import org.apache.hudi.sync.common.model.PartitionEvent; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java index 890c9712f714f..61d8f9b28312f 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java @@ -116,43 +116,7 @@ public class HoodieSyncConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Field in the table to use for determining hive partition columns."); - public static final ConfigProperty META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty - .key("hoodie.datasource.hive_sync.partition_extractor_class") - .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor") - .withInferFunction(cfg -> { - Option partitionFieldsOpt; - if (StringUtils.nonEmpty(cfg.getString(META_SYNC_PARTITION_FIELDS))) { - partitionFieldsOpt = Option.ofNullable(cfg.getString(META_SYNC_PARTITION_FIELDS)); - } else { - partitionFieldsOpt = HoodieTableConfig.getPartitionFieldProp(cfg) - .or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))); - } - if (!partitionFieldsOpt.isPresent()) { - return Option.empty(); - } - String partitionFields = partitionFieldsOpt.get(); - if (StringUtils.nonEmpty(partitionFields)) { - int numOfPartFields = partitionFields.split(",").length; - if (numOfPartFields == 1) { - if (cfg.contains(HIVE_STYLE_PARTITIONING_ENABLE) - && cfg.getString(HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) { - return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor"); - } else if (cfg.contains(SLASH_SEPARATED_DATE_PARTITIONING) - && cfg.getString(SLASH_SEPARATED_DATE_PARTITIONING).equals("true")) { - return Option.of("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor"); - } else { - return Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor"); - } - } else { - return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor"); - } - } else { - return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); - } - }) - .markAdvanced() - .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, " - + "default 'org.apache.hudi.hive.MultiPartKeysValueExtractor'."); + public static final ConfigProperty META_SYNC_PARTITION_EXTRACTOR_CLASS = HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS; public static final ConfigProperty META_SYNC_DECODE_PARTITION = ConfigProperty .key("hoodie.meta.sync.decode_partition") diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncClient.java index 14efd1df0be8c..dc941a7dbd9ab 100644 --- a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncClient.java @@ -19,7 +19,7 @@ package org.apache.hudi.sync.common; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.sync.common.model.PartitionValueExtractor; +import org.apache.hudi.hive.sync.PartitionValueExtractor; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.BeforeEach; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index d1db2855a6b33..7ecfd482937be 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -466,6 +466,9 @@ HoodieTableMetaClient initializeEmptyTable(HoodieTableMetaClient.TableBuilder ta .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())) .setKeyGeneratorClassProp(keyGenClassName) + .setPartitionValueExtractorClass(props.getString( + HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), + HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.defaultValue())) .setOrderingFields(cfg.sourceOrderingFields) .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())) From 04fd2065280a72ba8dbdd9c82bc27517eca0a2f7 Mon Sep 17 00:00:00 2001 From: syalla Date: Sun, 1 Feb 2026 22:55:11 +0000 Subject: [PATCH 2/8] Add necessary imports --- .../main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java index 61d8f9b28312f..f557b7db54d3e 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java @@ -51,8 +51,6 @@ import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; import static org.apache.hudi.common.table.HoodieTableConfig.BASE_FILE_FORMAT; import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME; -import static org.apache.hudi.common.table.HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING; -import static org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_NAME_KEY; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY; import static org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING; From 08ff52fca7009faa82e5f5d2127417408e278575 Mon Sep 17 00:00:00 2001 From: syalla Date: Mon, 2 Feb 2026 00:43:03 +0000 Subject: [PATCH 3/8] Create unit tests and removed unnecessary variables --- .../hudi/common/table/HoodieTableConfig.java | 2 - .../org/apache/hudi/DataSourceOptions.scala | 2 +- .../org/apache/hudi/HoodieBaseRelation.scala | 2 +- .../hudi/SparkHoodieTableFileIndex.scala | 3 +- .../TestCustomParitionValueExtractor.scala | 210 +++++++++++++++++- 5 files changed, 207 insertions(+), 12 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 21db6dcc7c586..0975056eb3003 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -335,8 +335,6 @@ public static final String getDefaultPayloadClassName() { .sinceVersion("1.0.0") .withDocumentation("Key Generator type to determine key generator class"); - public static final boolean DEFAULT_USE_PARTITION_VALUE_EXTRACTOR_FOR_READERS = false; - public static final ConfigProperty PARTITION_VALUE_EXTRACTOR_CLASS = ConfigProperty .key("hoodie.datasource.hive_sync.partition_extractor_class") .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 86423073c5c3b..22d7f1c1bfdcb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -280,7 +280,7 @@ object DataSourceReadOptions { val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class") - .defaultValue("false") + .defaultValue("true") .markAdvanced() .sinceVersion("1.2.0") .withDocumentation("This config helps whether PartitionValueExtractor interface can be used" + diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 2af9bcfb66796..15bb47ebbb69c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -479,7 +479,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def usePartitionValueExtractorOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = { optParams.getOrElse(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, - DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue.toString).toBoolean || + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue).toBoolean || ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 7fd77914ff6f8..1674d5627762e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -119,7 +119,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession, protected lazy val shouldFastBootstrap = configProperties.getBoolean(DATA_QUERIES_ONLY.key, false) protected lazy val usePartitionValueExtractorOnRead = configProperties.getBoolean( - DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key(), false) + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key(), + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue().toBoolean) /** * Get the partition schema from the hoodie.properties. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala index f2177e26cbd1b..7714a286b4581 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala @@ -17,8 +17,16 @@ package org.apache.spark.sql.hudi.common +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.hudi.storage.StoragePath import org.apache.hudi.sync.common.HoodieSyncConfig +import org.junit.jupiter.api.Assertions.assertTrue + class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { test("Test custom partition value extractor interface") { withTempDir { tmp => @@ -46,21 +54,209 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { | partitioned by (`datestr`, `country`, `state`, `city`) | location '$tablePath' """.stripMargin) - // yyyy/mm/dd + // yyyy/mm/dd/country/state/city spark.sql( s""" | insert into $targetTable values - | (1, 'a1', 1000, '2024-01-01' "USA", "CA", "SFO"), - | (2, 'a2', 2000, '2024-01-01', "USA", "CA", "LA") + | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"), + | (2, 'a2', 2000, '2024-01-01', "USA", "TX", "AU"), + | (3, 'a3', 3000, '2024-01-02', "USA", "CA", "LA"), + | (4, 'a4', 4000, '2024-01-02', "USA", "WA", "SEA"), + | (5, 'a5', 5000, '2024-01-03', "USA", "CA", "SFO") """.stripMargin) - val catalogTable = spark.sessionState.catalog.externalCatalog.getTable("default", targetTable) - // catalogTable.storage. // check result after insert and merge data into target table - checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable")( + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable" + + s" where state = 'CA'")( + Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq(3, "a3", 3000, "2024-01-02", "USA", "CA", "LA"), + Seq(5, "a5", 5000, "2024-01-03", "USA", "CA", "SFO") + ) + + // Verify table config has custom partition value extractor class set + val metaClient = HoodieTableMetaClient.builder() + .setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)) + .setBasePath(tablePath) + .build() + val tableConfig = metaClient.getTableConfig + val partitionExtractorClass = tableConfig.getProps.getProperty( + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()) + assertTrue(partitionExtractorClass == "org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor", + s"Table config should have custom partition value extractor class set to TestCustomSlashPartitionValueExtractor, but got $partitionExtractorClass") + + // Verify that partition paths are created with slash separated format (yyyy/MM/dd/country/state/city) + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/01/USA/CA/SFO")), + s"Partition path 2024/01/01/USA/CA/SFO should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/01/USA/TX/AU")), + s"Partition path 2024/01/01/USA/TX/AU should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/02/USA/CA/LA")), + s"Partition path 2024/01/02/USA/CA/LA should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/02/USA/WA/SEA")), + s"Partition path 2024/01/02/USA/WA/SEA should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/03/USA/CA/SFO")), + s"Partition path 2024/01/03/USA/CA/SFO should exist") + + val engine = new HoodieSparkEngineContext(spark.sparkContext) + val storage = metaClient.getStorage() + val metadataConfig = HoodieMetadataConfig.newBuilder().build() + val metadataTable = new HoodieBackedTableMetadata(engine, storage, metadataConfig, tablePath) + val partitionPaths = metadataTable.getAllPartitionPaths + assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO")) + assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU")) + assertTrue(partitionPaths.contains("2024/01/02/USA/CA/LA")) + assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA")) + assertTrue(partitionPaths.contains("2024/01/03/USA/CA/SFO")) + metadataTable.close() + } + } + + test("Test custom partition value extractor with partition pruning and filtering") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + spark.sql( + s""" + |create table $targetTable ( + | `id` string, + | `name` string, + | `ts` bigint, + | `datestr` string, + | `country` string, + | `state` string, + | `city` string + |) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'COW', + | 'preCombineField'='ts', + | 'hoodie.datasource.write.hive_style_partitioning'='false', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor' + | ) + | partitioned by (`datestr`, `country`, `state`, `city`) + | location '$tablePath' + """.stripMargin) + + // Insert data across multiple partitions + spark.sql( + s""" + | insert into $targetTable values + | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"), + | (2, 'a2', 2000, '2024-01-01', "USA", "CA", "LA"), + | (3, 'a3', 3000, '2024-01-01', "USA", "TX", "AU"), + | (4, 'a4', 4000, '2024-01-02', "USA", "CA", "SFO"), + | (5, 'a5', 5000, '2024-01-02', "USA", "WA", "SEA"), + | (6, 'a6', 6000, '2024-01-03', "USA", "CA", "LA"), + | (7, 'a7', 7000, '2024-01-03', "CAN", "ON", "TOR"), + | (8, 'a8', 8000, '2024-01-04', "USA", "NY", "NYC") + """.stripMargin) + + // Test partition pruning with single partition column filter (state) + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state = 'CA' order by id")( + Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"), + Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA") + ) + + // Test partition pruning with multiple partition column filters + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state = 'CA' and city = 'SFO' order by id")( + Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO") + ) + + // Test partition pruning with date filter + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where datestr = '2024-01-01' order by id")( + Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq(3, "a3", 3000, "2024-01-01", "USA", "TX", "AU") + ) + + // Test partition pruning with country filter + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where country = 'CAN' order by id")( + Seq(7, "a7", 7000, "2024-01-03", "CAN", "ON", "TOR") + ) + + // Test partition pruning with combined date and state filter + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where datestr = '2024-01-02' and state = 'CA' order by id")( + Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO") + ) + + // Test partition pruning with IN clause + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state IN ('CA', 'NY') order by id")( Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), - Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA") + Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"), + Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA"), + Seq(8, "a8", 8000, "2024-01-04", "USA", "NY", "NYC") + ) + + // Test reading with _hoodie_partition_path to verify custom partition format + checkAnswer(s"select id, _hoodie_partition_path from $targetTable where state = 'CA' order by id")( + Seq("1", "2024/01/01/USA/CA/SFO"), + Seq("2", "2024/01/01/USA/CA/LA"), + Seq("4", "2024/01/02/USA/CA/SFO"), + Seq("6", "2024/01/03/USA/CA/LA") ) + + // Create DataFrame and analyze query plan to verify partition pruning + val dfWithStateFilter = spark.sql(s"select * from $targetTable where state = 'CA'") + val planWithStateFilter = dfWithStateFilter.queryExecution.executedPlan.toString() + // Verify partition filters are pushed down + assertTrue(planWithStateFilter.contains("PartitionFilters") || planWithStateFilter.contains("PushedFilters"), + s"Query plan should contain partition filters for state column") + + // Test DataFrame API with multiple partition filters + val dfWithMultipleFilters = spark.table(targetTable) + .filter("state = 'CA' and datestr = '2024-01-01'") + val planWithMultipleFilters = dfWithMultipleFilters.queryExecution.executedPlan.toString() + assertTrue(planWithMultipleFilters.contains("PartitionFilters") || planWithMultipleFilters.contains("PushedFilters"), + s"Query plan should contain partition filters for multiple columns") + + // Verify the filtered results + val multiFilterResults = dfWithMultipleFilters.select("id", "name", "state", "datestr").orderBy("id").collect() + assertTrue(multiFilterResults.length == 2, s"Expected 2 rows but got ${multiFilterResults.length}") + assertTrue(multiFilterResults(0).getString(0) == "1", s"First row id should be 1") + assertTrue(multiFilterResults(0).getString(1) == "a1", s"First row name should be a1") + assertTrue(multiFilterResults(0).getString(2) == "CA", s"First row state should be CA") + assertTrue(multiFilterResults(0).getString(3) == "2024-01-01", s"First row datestr should be 2024-01-01") + assertTrue(multiFilterResults(1).getString(0) == "2", s"Second row id should be 2") + assertTrue(multiFilterResults(1).getString(1) == "a2", s"Second row name should be a2") + + // Test DataFrame with country filter + val dfWithCountryFilter = spark.table(targetTable).filter("country = 'CAN'") + val planWithCountryFilter = dfWithCountryFilter.queryExecution.executedPlan.toString() + assertTrue(planWithCountryFilter.contains("PartitionFilters") || planWithCountryFilter.contains("PushedFilters"), + s"Query plan should contain partition filters for country column") + + val countryFilterResults = dfWithCountryFilter.select("id", "country").orderBy("id").collect() + assertTrue(countryFilterResults.length == 1, s"Expected 1 row but got ${countryFilterResults.length}") + assertTrue(countryFilterResults(0).getString(0) == "7", s"Row id should be 7") + assertTrue(countryFilterResults(0).getString(1) == "CAN", s"Row country should be CAN") + + // Verify all partitions exist as expected + val metaClient = HoodieTableMetaClient.builder() + .setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)) + .setBasePath(tablePath) + .build() + + val engine = new HoodieSparkEngineContext(spark.sparkContext) + val storage = metaClient.getStorage() + val metadataConfig = HoodieMetadataConfig.newBuilder().build() + val metadataTable = new HoodieBackedTableMetadata(engine, storage, metadataConfig, tablePath) + val partitionPaths = metadataTable.getAllPartitionPaths + + // Verify expected partition paths + assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO")) + assertTrue(partitionPaths.contains("2024/01/01/USA/CA/LA")) + assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU")) + assertTrue(partitionPaths.contains("2024/01/02/USA/CA/SFO")) + assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA")) + assertTrue(partitionPaths.contains("2024/01/03/USA/CA/LA")) + assertTrue(partitionPaths.contains("2024/01/03/CAN/ON/TOR")) + assertTrue(partitionPaths.contains("2024/01/04/USA/NY/NYC")) + + metadataTable.close() } } } From 58c4a76a6ddf3277026912c72c6f0d51a1d41cca Mon Sep 17 00:00:00 2001 From: syalla Date: Mon, 2 Feb 2026 02:14:02 +0000 Subject: [PATCH 4/8] Create unit tests --- .../org/apache/hudi/HoodieSparkUtils.scala | 2 + .../TestCustomParitionValueExtractor.scala | 3 + .../common/TestCustomSlashKeyGenerator.scala | 134 ++++++++++++++++++ 3 files changed, 139 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashKeyGenerator.scala diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 2a7c3ec799c7c..5c17c1e9efe37 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -245,6 +245,8 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi } else if(usePartitionValueExtractorOnRead && !StringUtils.isNullOrEmpty(partitionValueExtractorClass)) { try { val partitionValueExtractor = Class.forName(partitionValueExtractorClass) + .getDeclaredConstructor() + .newInstance() .asInstanceOf[PartitionValueExtractor] val partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partitionPath).asScala.toArray val partitionSchema = buildPartitionSchemaForNestedFields(tableSchema, partitionColumns) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala index 7714a286b4581..d04c215c06f52 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala @@ -49,6 +49,8 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { | 'type' = 'COW', | 'preCombineField'='ts', | 'hoodie.datasource.write.hive_style_partitioning'='false', + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', + | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor' | ) | partitioned by (`datestr`, `country`, `state`, `city`) @@ -131,6 +133,7 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { | 'type' = 'COW', | 'preCombineField'='ts', | 'hoodie.datasource.write.hive_style_partitioning'='false', + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor' | ) | partitioned by (`datestr`, `country`, `state`, `city`) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashKeyGenerator.scala new file mode 100644 index 0000000000000..825ffa98dc551 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashKeyGenerator.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.common + +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.keygen.{BuiltinKeyGenerator, ComplexAvroKeyGenerator, KeyGenUtils} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions + +import org.apache.avro.generic.GenericRecord +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +import java.util.Arrays + +import scala.jdk.CollectionConverters._ + +/** + * Key generator that converts partition values to slash-separated partition paths. + * + * This generator is useful when you have partition columns like: + * - datestr: "yyyy-mm-dd" format + * - country, state, city: regular string values + * + * And you want to create partition paths like: yyyy/mm/dd/country/state/city + * + * The first partition field (typically a date) will have its hyphens replaced with slashes. + * All partition fields are then combined with "/" as the separator. + */ +class TestCustomSlashKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props) { + + private val complexAvroKeyGenerator: ComplexAvroKeyGenerator = new ComplexAvroKeyGenerator(props) + + this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props) + this.partitionPathFields = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) + .split(",") + .map(_.trim) + .filter(_.nonEmpty) + .toList + .asJava + + override def getRecordKey(record: GenericRecord): String = { + complexAvroKeyGenerator.getRecordKey(record) + } + + override def getPartitionPath(record: GenericRecord): String = { + complexAvroKeyGenerator.getPartitionPath(record) + } + + override def getRecordKey(row: Row): String = { + tryInitRowAccessor(row.schema) + combineRecordKey(getRecordKeyFieldNames, Arrays.asList(rowAccessor.getRecordKeyParts(row): _*)) + } + + override def getRecordKey(internalRow: InternalRow, schema: StructType): UTF8String = { + tryInitRowAccessor(schema) + combineRecordKeyUnsafe(getRecordKeyFieldNames, Arrays.asList(rowAccessor.getRecordKeyParts(internalRow): _*)) + } + + override def getPartitionPath(row: Row): String = { + tryInitRowAccessor(row.schema) + val partitionValues = rowAccessor.getRecordPartitionPathValues(row) + formatPartitionPath(partitionValues) + } + + override def getPartitionPath(row: InternalRow, schema: StructType): UTF8String = { + tryInitRowAccessor(schema) + val partitionValues = rowAccessor.getRecordPartitionPathValues(row) + UTF8String.fromString(formatPartitionPath(partitionValues)) + } + + /** + * Formats the partition path by: + * 1. Converting the first partition value (date) from "yyyy-mm-dd" to "yyyy/mm/dd" + * 2. Combining all partition values with "/" separator + * + * @param partitionValues Array of partition field values + * @return Formatted partition path like "yyyy/mm/dd/country/state/city" + */ + private def formatPartitionPath(partitionValues: Array[Object]): String = { + if (partitionValues == null || partitionValues.length == 0) { + "" + } else { + val partitionPath = new StringBuilder() + + for (i <- partitionValues.indices) { + if (i > 0) { + partitionPath.append("/") + } + + var value = getPartitionValue(partitionValues(i)) + + // For the first partition field (typically the date), replace hyphens with slashes + if (i == 0 && value.contains("-")) { + value = value.replace("-", "/") + } + + partitionPath.append(value) + } + + partitionPath.toString() + } + } + + /** + * Extracts the string value from a partition field value object. + * + * @param value The partition field value + * @return String representation of the value + */ + private def getPartitionValue(value: Object): String = { + value match { + case null => "" + case utf8: UTF8String => utf8.toString + case _ => String.valueOf(value) + } + } +} From 0ac8d9215d3f849a8fc1b656783cd5d84c20beb1 Mon Sep 17 00:00:00 2001 From: syalla Date: Sun, 8 Feb 2026 18:16:15 +0000 Subject: [PATCH 5/8] WIP --- .../spark/sql/hudi/common/TestCustomParitionValueExtractor.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala index d04c215c06f52..28c802f29ac47 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala @@ -134,6 +134,7 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { | 'preCombineField'='ts', | 'hoodie.datasource.write.hive_style_partitioning'='false', | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', + | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor' | ) | partitioned by (`datestr`, `country`, `state`, `city`) From db26b6eec97edf55f3c138512130d42f57687f98 Mon Sep 17 00:00:00 2001 From: syalla Date: Mon, 9 Feb 2026 05:56:07 +0000 Subject: [PATCH 6/8] Address review comments --- .../org/apache/hudi/HoodieSparkUtils.scala | 50 ++-- .../hudi/common/table/HoodieTableConfig.java | 10 +- .../model}/PartitionValueExtractor.java | 2 +- .../org/apache/hudi/DataSourceOptions.scala | 2 +- .../org/apache/hudi/HoodieFileIndex.scala | 6 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 4 +- .../org/apache/hudi/HoodieWriterUtils.scala | 10 + .../command/CreateHoodieTableCommand.scala | 1 + ...ator.scala => MockSlashKeyGenerator.scala} | 4 +- ...=> MockSlashPartitionValueExtractor.scala} | 4 +- .../TestCustomParitionValueExtractor.scala | 232 +++++++++++++----- .../datahub/DummyPartitionValueExtractor.java | 2 +- .../HiveStylePartitionValueExtractor.java | 2 +- .../hive/MultiPartKeysValueExtractor.java | 2 +- .../hudi/hive/NonPartitionedExtractor.java | 2 +- .../SinglePartPartitionValueExtractor.java | 2 +- ...lashEncodedDayPartitionValueExtractor.java | 2 +- ...ashEncodedHourPartitionValueExtractor.java | 2 +- .../apache/hudi/hive/ddl/HMSDDLExecutor.java | 2 +- .../hudi/hive/ddl/QueryBasedDDLExecutor.java | 2 +- .../hudi/hive/util/HivePartitionUtil.java | 2 +- .../hive/util/PartitionFilterGenerator.java | 2 +- .../hive/TestPartitionValueExtractor.java | 2 +- .../hudi/sync/common/HoodieSyncClient.java | 2 +- .../hudi/sync/common/HoodieSyncConfig.java | 36 ++- .../sync/common/TestHoodieSyncClient.java | 2 +- .../hudi/utilities/streamer/StreamSync.java | 8 +- 27 files changed, 290 insertions(+), 107 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/{hive/sync => sync/common/model}/PartitionValueExtractor.java (96%) rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/{TestCustomSlashKeyGenerator.scala => MockSlashKeyGenerator.scala} (96%) rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/{TestCustomSlashPartitionValueExtractor.scala => MockSlashPartitionValueExtractor.scala} (92%) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 5c17c1e9efe37..fad6bcfbe4339 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -31,7 +31,7 @@ import org.apache.hudi.util.ExceptionWrappingIterator import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.hudi.common.table.HoodieTableConfig -import org.apache.hudi.hive.sync.PartitionValueExtractor +import org.apache.hudi.sync.common.model.PartitionValueExtractor import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging @@ -243,27 +243,45 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi // But the output for these cases is in a string format, so we can pass partitionPath as UTF8String Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath)) } else if(usePartitionValueExtractorOnRead && !StringUtils.isNullOrEmpty(partitionValueExtractorClass)) { - try { - val partitionValueExtractor = Class.forName(partitionValueExtractorClass) - .getDeclaredConstructor() - .newInstance() - .asInstanceOf[PartitionValueExtractor] - val partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partitionPath).asScala.toArray - val partitionSchema = buildPartitionSchemaForNestedFields(tableSchema, partitionColumns) - val typedValues = partitionValues.zip(partitionSchema.fields).map { case (stringValue, field) => - castStringToType(stringValue, field.dataType) - } - typedValues.map(_.asInstanceOf[Object]) - } catch { - case e: Exception => - throw new RuntimeException(s"Failed to extract partition value using $partitionValueExtractorClass class", e) - } + parsePartitionValuesBasedOnPartitionValueExtractor(partitionValueExtractorClass, partitionPath, + partitionColumns, tableSchema) } else { doParsePartitionColumnValues(partitionColumns, partitionPath, tableBasePath, tableSchema, timeZoneId, shouldValidatePartitionColumns, tableConfig.getSlashSeparatedDatePartitioning) } } + /** + * Parses partition values from partition path using a custom PartitionValueExtractor. + * + * @param partitionValueExtractorClass Fully qualified class name of the PartitionValueExtractor implementation + * @param partitionPath The partition path to extract values from + * @param partitionColumns Array of partition column names + * @param tableSchema The schema of the table + * @return Array of partition values as Objects, properly typed according to the schema + */ + private def parsePartitionValuesBasedOnPartitionValueExtractor( + partitionValueExtractorClass: String, + partitionPath: String, + partitionColumns: Array[String], + tableSchema: StructType): Array[Object] = { + try { + val partitionValueExtractor = Class.forName(partitionValueExtractorClass) + .getDeclaredConstructor() + .newInstance() + .asInstanceOf[PartitionValueExtractor] + val partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partitionPath).asScala.toArray + val partitionSchema = buildPartitionSchemaForNestedFields(tableSchema, partitionColumns) + val typedValues = partitionValues.zip(partitionSchema.fields).map { case (stringValue, field) => + castStringToType(stringValue, field.dataType) + } + typedValues.map(_.asInstanceOf[Object]) + } catch { + case e: Exception => + throw new RuntimeException(s"Failed to extract partition value using $partitionValueExtractorClass class", e) + } + } + def doParsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String, basePath: StoragePath, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 0975056eb3003..0eb2e4fc1a617 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -336,14 +336,14 @@ public static final String getDefaultPayloadClassName() { .withDocumentation("Key Generator type to determine key generator class"); public static final ConfigProperty PARTITION_VALUE_EXTRACTOR_CLASS = ConfigProperty - .key("hoodie.datasource.hive_sync.partition_extractor_class") - .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor") + .key("hoodie.table.hive_sync.partition_extractor_class") + .noDefaultValue() .withInferFunction(cfg -> { Option partitionFieldsOpt = HoodieTableConfig.getPartitionFieldProp(cfg) .or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))); if (!partitionFieldsOpt.isPresent()) { - return Option.empty(); + return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); } String partitionFields = partitionFieldsOpt.get(); if (StringUtils.nonEmpty(partitionFields)) { @@ -364,7 +364,7 @@ public static final String getDefaultPayloadClassName() { }) .markAdvanced() .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, " - + "default 'org.apache.hudi.hive.MultiPartKeysValueExtractor'."); + + "default is inferred based on partition configuration."); // TODO: this has to be UTC. why is it not the default? public static final ConfigProperty TIMELINE_TIMEZONE = ConfigProperty @@ -1240,7 +1240,7 @@ public String getKeyGeneratorClassName() { } public String getPartitionValueExtractorClass() { - return getStringOrDefault(PARTITION_VALUE_EXTRACTOR_CLASS); + return getStringOrDefault(PARTITION_VALUE_EXTRACTOR_CLASS, ""); } public HoodieTimelineTimeZone getTimelineTimezone() { diff --git a/hudi-common/src/main/java/org/apache/hudi/hive/sync/PartitionValueExtractor.java b/hudi-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java similarity index 96% rename from hudi-common/src/main/java/org/apache/hudi/hive/sync/PartitionValueExtractor.java rename to hudi-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java index d3ebab77225d0..60d080a0ffdd8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hive/sync/PartitionValueExtractor.java +++ b/hudi-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.hive.sync; +package org.apache.hudi.sync.common.model; import java.io.Serializable; import java.util.List; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 22d7f1c1bfdcb..86423073c5c3b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -280,7 +280,7 @@ object DataSourceReadOptions { val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class") - .defaultValue("true") + .defaultValue("false") .markAdvanced() .sinceVersion("1.2.0") .withDocumentation("This config helps whether PartitionValueExtractor interface can be used" + diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index eb1bacbb8f9ec..31f317161c72a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -527,11 +527,11 @@ object HoodieFileIndex extends Logging { properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride) } - val usePartitionValueExtractorForReaders = getConfigValue(options, sqlConf, + val usePartitionValueExtractorOnRead = getConfigValue(options, sqlConf, DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, null) - if (usePartitionValueExtractorForReaders != null) { + if (usePartitionValueExtractorOnRead != null) { properties.setProperty(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, - usePartitionValueExtractorForReaders) + usePartitionValueExtractorOnRead) } if (tableConfig != null) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index e037b46da5251..6f4bead55e2d1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -291,6 +291,7 @@ class HoodieSparkSqlWriterInternal { if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME))) hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME) else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig) + val partitionValueExtractorClassName = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) val tableFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT) HoodieTableMetaClient.newTableBuilder() .setTableType(tableType) @@ -310,8 +311,7 @@ class HoodieSparkSqlWriterInternal { .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setKeyGeneratorClassProp(keyGenProp) - .setPartitionValueExtractorClass(hoodieConfig.getStringOrDefault(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), - HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.defaultValue())) + .setPartitionValueExtractorClass(partitionValueExtractorClassName) .set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]]) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index fdc41bcfbd51a..2386d2b535e3c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -323,6 +323,16 @@ object HoodieWriterUtils { && currentPartitionFields != tableConfigPartitionFields) { diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n") } + + // Validate partition value extractor + val currentPartitionValueExtractor = params.getOrElse(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null) + if (currentPartitionValueExtractor != null) { + val tableConfigPartitionValueExtractor = tableConfig.getString(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS) + if (tableConfigPartitionValueExtractor != null && + !currentPartitionValueExtractor.equals(tableConfigPartitionValueExtractor)) { + diffConfigs.append(s"ParitionValueExtractor:\t$currentPartitionValueExtractor\ttableConfigPartitionValueExtractortabelConfigPartitionValueExtractor\n") + } + } // The value of `HoodieTableConfig.RECORD_MERGE_STRATEGY_ID` can be NULL or non-NULL. // The non-NULL value has been validated above in the regular code path. // Here we check the NULL case since if the value is NULL, the check is skipped above. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 90dfb4f83d729..034a56ded321b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -135,6 +135,7 @@ object CreateHoodieTableCommand { checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala similarity index 96% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashKeyGenerator.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala index 825ffa98dc551..a49c1c3566404 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala @@ -43,7 +43,7 @@ import scala.jdk.CollectionConverters._ * The first partition field (typically a date) will have its hyphens replaced with slashes. * All partition fields are then combined with "/" as the separator. */ -class TestCustomSlashKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props) { +class MockSlashKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props) { private val complexAvroKeyGenerator: ComplexAvroKeyGenerator = new ComplexAvroKeyGenerator(props) @@ -60,7 +60,7 @@ class TestCustomSlashKeyGenerator(props: TypedProperties) extends BuiltinKeyGene } override def getPartitionPath(record: GenericRecord): String = { - complexAvroKeyGenerator.getPartitionPath(record) + complexAvroKeyGenerator.getPartitionPath(record).replace('-', '/') } override def getRecordKey(row: Row): String = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashPartitionValueExtractor.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala similarity index 92% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashPartitionValueExtractor.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala index 437fb7611e5e9..b922473c729cc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashPartitionValueExtractor.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala @@ -16,11 +16,11 @@ */ package org.apache.spark.sql.hudi.common -import org.apache.hudi.hive.sync.PartitionValueExtractor +import org.apache.hudi.sync.common.model.PartitionValueExtractor import java.util -class TestCustomSlashPartitionValueExtractor extends PartitionValueExtractor { +class MockSlashPartitionValueExtractor extends PartitionValueExtractor { /** * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala index 28c802f29ac47..1996e89cf2801 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.common import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.metadata.HoodieBackedTableMetadata import org.apache.hudi.storage.StoragePath @@ -27,12 +27,18 @@ import org.apache.hudi.sync.common.HoodieSyncConfig import org.junit.jupiter.api.Assertions.assertTrue +import java.util.stream.Collectors + class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { + test("Test custom partition value extractor interface") { withTempDir { tmp => val targetTable = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + // Enable partition value extractor on read + spark.conf.set("hoodie.datasource.read.partition.value.using.partion-value-extractor-class", "true") + spark.sql( s""" |create table $targetTable ( @@ -49,30 +55,35 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { | 'type' = 'COW', | 'preCombineField'='ts', | 'hoodie.datasource.write.hive_style_partitioning'='false', - | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', - | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', - | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor' + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | '${HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor' | ) | partitioned by (`datestr`, `country`, `state`, `city`) | location '$tablePath' """.stripMargin) // yyyy/mm/dd/country/state/city - spark.sql( - s""" - | insert into $targetTable values - | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"), - | (2, 'a2', 2000, '2024-01-01', "USA", "TX", "AU"), - | (3, 'a3', 3000, '2024-01-02', "USA", "CA", "LA"), - | (4, 'a4', 4000, '2024-01-02', "USA", "WA", "SEA"), - | (5, 'a5', 5000, '2024-01-03', "USA", "CA", "SFO") - """.stripMargin) + import spark.implicits._ + val df = Seq( + ("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + ("2", "a2", 2000, "2024-01-01", "USA", "TX", "AU"), + ("3", "a3", 3000, "2024-01-02", "USA", "CA", "LA"), + ("4", "a4", 4000, "2024-01-02", "USA", "WA", "SEA"), + ("5", "a5", 5000, "2024-01-03", "USA", "CA", "SFO") + ).toDF("id", "name", "ts", "datestr", "country", "state", "city") + + df.write + .format("hudi") + .mode("append") + .save(tablePath) // check result after insert and merge data into target table checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable" + s" where state = 'CA'")( - Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), - Seq(3, "a3", 3000, "2024-01-02", "USA", "CA", "LA"), - Seq(5, "a5", 5000, "2024-01-03", "USA", "CA", "SFO") + Seq("1", "a1", 1000L, "2024-01-01", "USA", "CA", "SFO"), + Seq("3", "a3", 3000L, "2024-01-02", "USA", "CA", "LA"), + Seq("5", "a5", 5000L, "2024-01-03", "USA", "CA", "SFO") ) // Verify table config has custom partition value extractor class set @@ -82,9 +93,9 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { .build() val tableConfig = metaClient.getTableConfig val partitionExtractorClass = tableConfig.getProps.getProperty( - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()) - assertTrue(partitionExtractorClass == "org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor", - s"Table config should have custom partition value extractor class set to TestCustomSlashPartitionValueExtractor, but got $partitionExtractorClass") + HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key()) + assertTrue(partitionExtractorClass == "org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor", + s"Table config should have custom partition value extractor class set to MockSlashPartitionValueExtractor, but got $partitionExtractorClass") // Verify that partition paths are created with slash separated format (yyyy/MM/dd/country/state/city) assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/01/USA/CA/SFO")), @@ -117,6 +128,9 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { val targetTable = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + // Enable partition value extractor on read + spark.conf.set("hoodie.datasource.read.partition.value.using.partion-value-extractor-class", "true") + spark.sql( s""" |create table $targetTable ( @@ -133,66 +147,71 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { | 'type' = 'COW', | 'preCombineField'='ts', | 'hoodie.datasource.write.hive_style_partitioning'='false', - | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', - | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', - | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor' + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | '${HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor' | ) | partitioned by (`datestr`, `country`, `state`, `city`) | location '$tablePath' """.stripMargin) // Insert data across multiple partitions - spark.sql( - s""" - | insert into $targetTable values - | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"), - | (2, 'a2', 2000, '2024-01-01', "USA", "CA", "LA"), - | (3, 'a3', 3000, '2024-01-01', "USA", "TX", "AU"), - | (4, 'a4', 4000, '2024-01-02', "USA", "CA", "SFO"), - | (5, 'a5', 5000, '2024-01-02', "USA", "WA", "SEA"), - | (6, 'a6', 6000, '2024-01-03', "USA", "CA", "LA"), - | (7, 'a7', 7000, '2024-01-03', "CAN", "ON", "TOR"), - | (8, 'a8', 8000, '2024-01-04', "USA", "NY", "NYC") - """.stripMargin) + import spark.implicits._ + val df = Seq( + ("1", "a1", 1000L, "2024-01-01", "USA", "CA", "SFO"), + ("2", "a2", 2000L, "2024-01-01", "USA", "CA", "LA"), + ("3", "a3", 3000L, "2024-01-01", "USA", "TX", "AU"), + ("4", "a4", 4000L, "2024-01-02", "USA", "CA", "SFO"), + ("5", "a5", 5000L, "2024-01-02", "USA", "WA", "SEA"), + ("6", "a6", 6000L, "2024-01-03", "USA", "CA", "LA"), + ("7", "a7", 7000L, "2024-01-03", "CAN", "ON", "TOR"), + ("8", "a8", 8000L, "2024-01-04", "USA", "NY", "NYC") + ).toDF("id", "name", "ts", "datestr", "country", "state", "city") + + df.write + .format("hudi") + .mode("append") + .save(tablePath) // Test partition pruning with single partition column filter (state) checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state = 'CA' order by id")( - Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), - Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"), - Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"), - Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA") + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO"), + Seq("6", "a6", 6000, "2024-01-03", "USA", "CA", "LA") ) // Test partition pruning with multiple partition column filters checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state = 'CA' and city = 'SFO' order by id")( - Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), - Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO") + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO") ) // Test partition pruning with date filter checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where datestr = '2024-01-01' order by id")( - Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), - Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"), - Seq(3, "a3", 3000, "2024-01-01", "USA", "TX", "AU") + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq("3", "a3", 3000, "2024-01-01", "USA", "TX", "AU") ) // Test partition pruning with country filter checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where country = 'CAN' order by id")( - Seq(7, "a7", 7000, "2024-01-03", "CAN", "ON", "TOR") + Seq("7", "a7", 7000, "2024-01-03", "CAN", "ON", "TOR") ) // Test partition pruning with combined date and state filter checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where datestr = '2024-01-02' and state = 'CA' order by id")( - Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO") + Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO") ) // Test partition pruning with IN clause checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state IN ('CA', 'NY') order by id")( - Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), - Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"), - Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"), - Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA"), - Seq(8, "a8", 8000, "2024-01-04", "USA", "NY", "NYC") + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO"), + Seq("6", "a6", 6000, "2024-01-03", "USA", "CA", "LA"), + Seq("8", "a8", 8000, "2024-01-04", "USA", "NY", "NYC") ) // Test reading with _hoodie_partition_path to verify custom partition format @@ -203,6 +222,44 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { Seq("6", "2024/01/03/USA/CA/LA") ) + // Verify partition pruning works by corrupting a parquet file in a partition that won't be queried + // We'll corrupt a file in the WA/SEA partition and query for CA - if partition pruning works, the query succeeds + val metaClient = HoodieTableMetaClient.builder() + .setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)) + .setBasePath(tablePath) + .build() + + // Find and corrupt a parquet file in the 2024/01/02/USA/WA/SEA partition + val partitionPathForCorruption = new StoragePath(tablePath, "2024/01/02/USA/WA/SEA") + val storage = metaClient.getStorage + val parquetFiles = storage.listDirectEntries(partitionPathForCorruption).stream() + .filter(fileStatus => fileStatus.getPath.getName.endsWith(".parquet") && !fileStatus.getPath.getName.startsWith(".")) + .collect(Collectors.toList()) + + assertTrue(!parquetFiles.isEmpty, "Should have at least one parquet file in WA/SEA partition") + + // Corrupt the first parquet file by writing garbage data + val fileToCorrupt = parquetFiles.get(0).getPath + val outputStream = storage.create(fileToCorrupt, true) + try { + outputStream.write("CORRUPTED_DATA".getBytes()) + } finally { + outputStream.close() + } + + // Query for state = 'CA' should still succeed because partition pruning avoids the corrupted WA partition + checkAnswer(s"select id, name, state from $targetTable where state = 'CA' order by id")( + Seq("1", "a1", "CA"), + Seq("2", "a2", "CA"), + Seq("4", "a4", "CA"), + Seq("6", "a6", "CA") + ) + + // Similarly, query for state = 'TX' should succeed + checkAnswer(s"select id, name, state from $targetTable where state = 'TX' order by id")( + Seq("3", "a3", "TX") + ) + // Create DataFrame and analyze query plan to verify partition pruning val dfWithStateFilter = spark.sql(s"select * from $targetTable where state = 'CA'") val planWithStateFilter = dfWithStateFilter.queryExecution.executedPlan.toString() @@ -239,13 +296,7 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { assertTrue(countryFilterResults(0).getString(1) == "CAN", s"Row country should be CAN") // Verify all partitions exist as expected - val metaClient = HoodieTableMetaClient.builder() - .setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)) - .setBasePath(tablePath) - .build() - val engine = new HoodieSparkEngineContext(spark.sparkContext) - val storage = metaClient.getStorage() val metadataConfig = HoodieMetadataConfig.newBuilder().build() val metadataTable = new HoodieBackedTableMetadata(engine, storage, metadataConfig, tablePath) val partitionPaths = metadataTable.getAllPartitionPaths @@ -263,4 +314,73 @@ class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { metadataTable.close() } } + + test("Test custom partition value extractor with URL encoding") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + // Enable partition value extractor on read + spark.conf.set("hoodie.datasource.read.partition.value.using.partion-value-extractor-class", "true") + + spark.sql( + s""" + |create table $targetTable ( + | `id` string, + | `name` string, + | `ts` bigint, + | `datestr` string, + | `country` string, + | `state` string, + | `city` string + |) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'COW', + | 'preCombineField'='ts', + | 'hoodie.datasource.write.hive_style_partitioning'='false', + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | '${HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor', + | 'hoodie.datasource.write.partitionpath.urlencode'='true' + | ) + | partitioned by (`datestr`, `country`, `state`, `city`) + | location '$tablePath' + """.stripMargin) + + // Insert data with special characters that will be URL encoded + import spark.implicits._ + val df = Seq( + ("1", "a1", 1000L, "2024-01-01", "USA", "CA", "San Francisco"), + ("2", "a2", 2000L, "2024-01-01", "USA", "TX", "Austin+Dallas"), + ("3", "a3", 3000L, "2024-01-02", "USA", "CA", "Los Angeles") + ).toDF("id", "name", "ts", "datestr", "country", "state", "city") + + df.write + .format("hudi") + .mode("append") + .save(tablePath) + + // Verify that data can be read back correctly with URL-encoded partition paths + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where city = 'San Francisco' order by id")( + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "San Francisco") + ) + + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where city = 'Austin+Dallas' order by id")( + Seq("2", "a2", 2000, "2024-01-01", "USA", "TX", "Austin+Dallas") + ) + + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where city = 'Los Angeles' order by id")( + Seq("3", "a3", 3000, "2024-01-02", "USA", "CA", "Los Angeles") + ) + + // Verify all data can be read + val allData = spark.sql(s"select id, city from $targetTable order by id").collect() + assertTrue(allData.length == 3, s"Expected 3 rows but got ${allData.length}") + assertTrue(allData(0).getString(1) == "San Francisco", "First row city should be San Francisco") + assertTrue(allData(1).getString(1) == "Austin+Dallas", "Second row city should be Austin+Dallas") + assertTrue(allData(2).getString(1) == "Los Angeles", "Third row city should be Los Angeles") + } + } } diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java index 7943d3267b705..3c00e313a990e 100644 --- a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java +++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.sync.datahub; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import java.util.Collections; import java.util.List; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveStylePartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveStylePartitionValueExtractor.java index 9291ff4d58699..11098698e8aeb 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveStylePartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveStylePartitionValueExtractor.java @@ -18,7 +18,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import java.util.Collections; import java.util.List; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java index fb42f16c50c24..dd356638a47e6 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java @@ -20,7 +20,7 @@ package org.apache.hudi.hive; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import java.util.Arrays; import java.util.Collections; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java index 47b7fe5a7382e..37b15c6a61bd0 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/NonPartitionedExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import java.util.ArrayList; import java.util.List; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SinglePartPartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SinglePartPartitionValueExtractor.java index ae2965adfba79..abbccfcc53763 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SinglePartPartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SinglePartPartitionValueExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import java.util.Collections; import java.util.List; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java index 89255d2ffafea..85fda472754e7 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import java.time.LocalDateTime; import java.time.ZoneId; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedHourPartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedHourPartitionValueExtractor.java index da23cf1b25ea6..a30296b403d71 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedHourPartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SlashEncodedHourPartitionValueExtractor.java @@ -19,7 +19,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import java.time.LocalDateTime; import java.time.ZoneId; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index 06227a3d852e3..81ba15660f395 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -28,7 +28,7 @@ import org.apache.hudi.hive.util.HivePartitionUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.storage.StorageSchemes; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.Path; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index c602457b6355a..472bcedd328a2 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -28,7 +28,7 @@ import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.storage.StorageSchemes; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.Path; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java index 294964f5375e9..3e75582266dff 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.hive.metastore.IMetaStoreClient; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java index 8bc8991d3f0dd..d1b934988d2f3 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java @@ -30,7 +30,7 @@ import org.apache.hudi.internal.schema.Types; import org.apache.hudi.sync.common.model.FieldSchema; import org.apache.hudi.sync.common.model.Partition; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import java.util.Arrays; import java.util.Comparator; diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestPartitionValueExtractor.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestPartitionValueExtractor.java index 784f30bd87e79..075542d596717 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestPartitionValueExtractor.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestPartitionValueExtractor.java @@ -18,7 +18,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import org.junit.jupiter.api.Test; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 35605ab1955b3..04a6e4a10db17 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -32,7 +32,7 @@ import org.apache.hudi.storage.StoragePath; import org.apache.hudi.sync.common.model.Partition; import org.apache.hudi.sync.common.model.PartitionEvent; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java index f557b7db54d3e..b35cce58cd7c5 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java @@ -114,7 +114,41 @@ public class HoodieSyncConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Field in the table to use for determining hive partition columns."); - public static final ConfigProperty META_SYNC_PARTITION_EXTRACTOR_CLASS = HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS; + public static final ConfigProperty META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty + .key("hoodie.datasource.hive_sync.partition_extractor_class") + .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor") + .withInferFunction(cfg -> { + Option partitionFieldsOpt; + if (StringUtils.nonEmpty(cfg.getString(META_SYNC_PARTITION_FIELDS))) { + partitionFieldsOpt = Option.ofNullable(cfg.getString(META_SYNC_PARTITION_FIELDS)); + } else { + partitionFieldsOpt = HoodieTableConfig.getPartitionFieldProp(cfg) + .or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))); + } + + if (!partitionFieldsOpt.isPresent()) { + return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); + } + String partitionFields = partitionFieldsOpt.get(); + if (StringUtils.nonEmpty(partitionFields)) { + int numOfPartFields = partitionFields.split(",").length; + if (numOfPartFields == 1) { + if (cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()) + && cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()).equals("true")) { + return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor"); + } else { + return Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor"); + } + } else { + return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor"); + } + } else { + return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); + } + }) + .markAdvanced() + .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, " + + "default is inferred based on partition configuration."); public static final ConfigProperty META_SYNC_DECODE_PARTITION = ConfigProperty .key("hoodie.meta.sync.decode_partition") diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncClient.java index dc941a7dbd9ab..14efd1df0be8c 100644 --- a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/TestHoodieSyncClient.java @@ -19,7 +19,7 @@ package org.apache.hudi.sync.common; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.hive.sync.PartitionValueExtractor; +import org.apache.hudi.sync.common.model.PartitionValueExtractor; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.BeforeEach; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 7ecfd482937be..c6c33b87dc19f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -91,6 +91,7 @@ import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.sync.common.util.SyncUtilHelpers; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.util.JavaScalaConverters; @@ -452,7 +453,8 @@ HoodieTableMetaClient initializeEmptyTable(HoodieTableMetaClient.TableBuilder ta payloadClass = overridingMergeConfigs.get().getMiddle(); mergeStrategyId = overridingMergeConfigs.get().getRight(); } - + String partitionValueExtractorClassName = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.defaultValue()); return tableBuilder.setTableType(cfg.tableType) .setTableName(cfg.targetTableName) .setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue()) @@ -466,9 +468,7 @@ HoodieTableMetaClient initializeEmptyTable(HoodieTableMetaClient.TableBuilder ta .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())) .setKeyGeneratorClassProp(keyGenClassName) - .setPartitionValueExtractorClass(props.getString( - HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), - HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.defaultValue())) + .setPartitionValueExtractorClass(partitionValueExtractorClassName) .setOrderingFields(cfg.sourceOrderingFields) .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())) From de1f91121d7d3eeb4d194b73920927f7b5efe4f7 Mon Sep 17 00:00:00 2001 From: syalla Date: Mon, 9 Feb 2026 17:06:53 +0000 Subject: [PATCH 7/8] Fix failed unit test --- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6f4bead55e2d1..c25215275d0e1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -751,7 +751,7 @@ class HoodieSparkSqlWriterInternal { String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()) )) val tableFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT) - + val partitionValueExtractorClassName = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) HoodieTableMetaClient.newTableBuilder() .setTableType(HoodieTableType.valueOf(tableType)) .setTableName(tableName) @@ -771,8 +771,7 @@ class HoodieSparkSqlWriterInternal { .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setPopulateMetaFields(populateMetaFields) .setKeyGeneratorClassProp(keyGenProp) - .setPartitionValueExtractorClass(hoodieConfig.getStringOrDefault(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), - HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.defaultValue())) + .setPartitionValueExtractorClass(partitionValueExtractorClassName) .set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]]) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) From f3e6402e77defadfd63520887d3db95a099945bb Mon Sep 17 00:00:00 2001 From: syalla Date: Mon, 9 Feb 2026 18:19:32 +0000 Subject: [PATCH 8/8] Fix flink test failures --- .../java/org/apache/hudi/configuration/FlinkOptions.java | 8 ++++++++ .../src/main/java/org/apache/hudi/util/StreamerUtil.java | 3 +-- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 6 ++++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 5dee4b8b1213b..eb2bf02e71923 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -41,6 +41,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.sink.overwrite.PartitionOverwriteMode; +import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import org.apache.hudi.util.ClientIds; @@ -657,6 +658,13 @@ public class FlinkOptions extends HoodieConfig { + "**Note** This is being actively worked on. Please use " + "`hoodie.datasource.write.keygenerator.class` instead."); + @AdvancedConfig + public static final ConfigOption PARTITION_VALUE_EXTRACTOR = ConfigOptions + .key(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()) + .stringType() + .noDefaultValue() + .withDescription("Partition value extractor class helps extract the partition value from partition paths"); + public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH"; public static final String PARTITION_FORMAT_DAY = "yyyyMMdd"; public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd"; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index abb1cd4a64d3c..ffe94fcfd5585 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -314,8 +314,7 @@ public static HoodieTableMetaClient initTableIfNotExists( .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD.key(), null)) .setKeyGeneratorClassProp( conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName())) - .setPartitionValueExtractorClass(conf.getString(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), - HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.defaultValue())) + .setPartitionValueExtractorClass(conf.getString(FlinkOptions.PARTITION_VALUE_EXTRACTOR.key(), null)) .setHiveStylePartitioningEnable(conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(conf.get(FlinkOptions.URL_ENCODE_PARTITIONING)) .setCDCEnabled(conf.get(FlinkOptions.CDC_ENABLED)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index c25215275d0e1..94b58bd4213f6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -291,7 +291,8 @@ class HoodieSparkSqlWriterInternal { if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME))) hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME) else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig) - val partitionValueExtractorClassName = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) + val partitionValueExtractorClassName = hoodieConfig + .getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null) val tableFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT) HoodieTableMetaClient.newTableBuilder() .setTableType(tableType) @@ -751,7 +752,8 @@ class HoodieSparkSqlWriterInternal { String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()) )) val tableFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT) - val partitionValueExtractorClassName = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) + val partitionValueExtractorClassName = hoodieConfig + .getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null) HoodieTableMetaClient.newTableBuilder() .setTableType(HoodieTableType.valueOf(tableType)) .setTableName(tableName)