diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 08c315b0e78dd..fad6bcfbe4339 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -23,7 +23,7 @@ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.TimestampKeyGeneratorConfig import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.schema.HoodieSchema -import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.common.util.{StringUtils, Option => HOption} import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.storage.StoragePath @@ -31,6 +31,8 @@ import org.apache.hudi.util.ExceptionWrappingIterator import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.sync.common.model.PartitionValueExtractor + import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -224,11 +226,13 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi partitionPath: String, tableBasePath: StoragePath, tableSchema: StructType, - tableConfig: java.util.Map[String, String], + tableConfig: HoodieTableConfig, timeZoneId: String, - shouldValidatePartitionColumns: Boolean): Array[Object] = { + shouldValidatePartitionColumns: Boolean, + usePartitionValueExtractorOnRead: Boolean): Array[Object] = { val keyGeneratorClass = KeyGeneratorType.getKeyGeneratorClassName(tableConfig) - val timestampKeyGeneratorType = tableConfig.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()) + val timestampKeyGeneratorType = tableConfig.propsMap().get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key()) + val partitionValueExtractorClass = tableConfig.getPartitionValueExtractorClass if (null != keyGeneratorClass && null != timestampKeyGeneratorType @@ -238,10 +242,43 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi // we couldn't reconstruct initial partition column values from partition paths due to lost data after formatting. // But the output for these cases is in a string format, so we can pass partitionPath as UTF8String Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath)) + } else if(usePartitionValueExtractorOnRead && !StringUtils.isNullOrEmpty(partitionValueExtractorClass)) { + parsePartitionValuesBasedOnPartitionValueExtractor(partitionValueExtractorClass, partitionPath, + partitionColumns, tableSchema) } else { doParsePartitionColumnValues(partitionColumns, partitionPath, tableBasePath, tableSchema, timeZoneId, - shouldValidatePartitionColumns, tableConfig.getOrDefault(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key, - HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue).toBoolean) + shouldValidatePartitionColumns, tableConfig.getSlashSeparatedDatePartitioning) + } + } + + /** + * Parses partition values from partition path using a custom PartitionValueExtractor. + * + * @param partitionValueExtractorClass Fully qualified class name of the PartitionValueExtractor implementation + * @param partitionPath The partition path to extract values from + * @param partitionColumns Array of partition column names + * @param tableSchema The schema of the table + * @return Array of partition values as Objects, properly typed according to the schema + */ + private def parsePartitionValuesBasedOnPartitionValueExtractor( + partitionValueExtractorClass: String, + partitionPath: String, + partitionColumns: Array[String], + tableSchema: StructType): Array[Object] = { + try { + val partitionValueExtractor = Class.forName(partitionValueExtractorClass) + .getDeclaredConstructor() + .newInstance() + .asInstanceOf[PartitionValueExtractor] + val partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partitionPath).asScala.toArray + val partitionSchema = buildPartitionSchemaForNestedFields(tableSchema, partitionColumns) + val typedValues = partitionValues.zip(partitionSchema.fields).map { case (stringValue, field) => + castStringToType(stringValue, field.dataType) + } + typedValues.map(_.asInstanceOf[Object]) + } catch { + case e: Exception => + throw new RuntimeException(s"Failed to extract partition value using $partitionValueExtractorClass class", e) } } @@ -336,7 +373,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi ).toSeq(partitionSchema) } - private def buildPartitionSchemaForNestedFields(schema: StructType, partitionColumns: Array[String]): StructType = { + def buildPartitionSchemaForNestedFields(schema: StructType, partitionColumns: Array[String]): StructType = { val partitionFields = partitionColumns.flatMap { partitionCol => extractNestedField(schema, partitionCol) } @@ -364,7 +401,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi traverseSchema(schema, pathParts.toList, fieldPath) } - private def castStringToType(value: String, dataType: org.apache.spark.sql.types.DataType): Any = { + def castStringToType(value: String, dataType: org.apache.spark.sql.types.DataType): Any = { import org.apache.spark.sql.types._ // handling cases where the value contains path separators or is complex diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 3de45e1754ee6..0eb2e4fc1a617 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -335,6 +335,37 @@ public static final String getDefaultPayloadClassName() { .sinceVersion("1.0.0") .withDocumentation("Key Generator type to determine key generator class"); + public static final ConfigProperty PARTITION_VALUE_EXTRACTOR_CLASS = ConfigProperty + .key("hoodie.table.hive_sync.partition_extractor_class") + .noDefaultValue() + .withInferFunction(cfg -> { + Option partitionFieldsOpt = HoodieTableConfig.getPartitionFieldProp(cfg) + .or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))); + + if (!partitionFieldsOpt.isPresent()) { + return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); + } + String partitionFields = partitionFieldsOpt.get(); + if (StringUtils.nonEmpty(partitionFields)) { + int numOfPartFields = partitionFields.split(",").length; + if (numOfPartFields == 1) { + if (cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()) + && cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()).equals("true")) { + return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor"); + } else { + return Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor"); + } + } else { + return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor"); + } + } else { + return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); + } + }) + .markAdvanced() + .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, " + + "default is inferred based on partition configuration."); + // TODO: this has to be UTC. why is it not the default? public static final ConfigProperty TIMELINE_TIMEZONE = ConfigProperty .key("hoodie.table.timeline.timezone") @@ -1208,6 +1239,10 @@ public String getKeyGeneratorClassName() { return KeyGeneratorType.getKeyGeneratorClassName(this); } + public String getPartitionValueExtractorClass() { + return getStringOrDefault(PARTITION_VALUE_EXTRACTOR_CLASS, ""); + } + public HoodieTimelineTimeZone getTimelineTimezone() { return HoodieTimelineTimeZone.valueOf(getStringOrDefault(TIMELINE_TIMEZONE)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 6ddd459e3d6b8..ceb7fcedf9e25 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -1070,6 +1070,7 @@ public static class TableBuilder { private Boolean bootstrapIndexEnable; private Boolean populateMetaFields; private String keyGeneratorClassProp; + private String partitionValueExtractorClass; private String keyGeneratorType; private Boolean slashSeparatedDatePartitioning; private Boolean hiveStylePartitioningEnable; @@ -1243,6 +1244,11 @@ public TableBuilder setSlashSeparatedDatePartitioning(Boolean slashSeparatedDate return this; } + public TableBuilder setPartitionValueExtractorClass(String partitionValueExtractorClass) { + this.partitionValueExtractorClass = partitionValueExtractorClass; + return this; + } + public TableBuilder setHiveStylePartitioningEnable(Boolean hiveStylePartitioningEnable) { this.hiveStylePartitioningEnable = hiveStylePartitioningEnable; return this; @@ -1445,6 +1451,9 @@ public TableBuilder fromProperties(Properties properties) { if (hoodieConfig.contains(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING)) { setSlashSeparatedDatePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING)); } + if (hoodieConfig.contains(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS)) { + setPartitionValueExtractorClass(hoodieConfig.getString(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS)); + } if (hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) { setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)); } @@ -1588,6 +1597,9 @@ public Properties build() { if (null != slashSeparatedDatePartitioning) { tableConfig.setValue(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING, Boolean.toString(slashSeparatedDatePartitioning)); } + if (null != partitionValueExtractorClass) { + tableConfig.setValue(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS, partitionValueExtractorClass); + } if (null != hiveStylePartitioningEnable) { tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, Boolean.toString(hiveStylePartitioningEnable)); } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java b/hudi-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java similarity index 100% rename from hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java rename to hudi-common/src/main/java/org/apache/hudi/sync/common/model/PartitionValueExtractor.java diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 5dee4b8b1213b..eb2bf02e71923 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -41,6 +41,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.sink.overwrite.PartitionOverwriteMode; +import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import org.apache.hudi.util.ClientIds; @@ -657,6 +658,13 @@ public class FlinkOptions extends HoodieConfig { + "**Note** This is being actively worked on. Please use " + "`hoodie.datasource.write.keygenerator.class` instead."); + @AdvancedConfig + public static final ConfigOption PARTITION_VALUE_EXTRACTOR = ConfigOptions + .key(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()) + .stringType() + .noDefaultValue() + .withDescription("Partition value extractor class helps extract the partition value from partition paths"); + public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH"; public static final String PARTITION_FORMAT_DAY = "yyyyMMdd"; public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd"; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index ecf7b73e312dd..ffe94fcfd5585 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -314,6 +314,7 @@ public static HoodieTableMetaClient initTableIfNotExists( .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD.key(), null)) .setKeyGeneratorClassProp( conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName())) + .setPartitionValueExtractorClass(conf.getString(FlinkOptions.PARTITION_VALUE_EXTRACTOR.key(), null)) .setHiveStylePartitioningEnable(conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(conf.get(FlinkOptions.URL_ENCODE_PARTITIONING)) .setCDCEnabled(conf.get(FlinkOptions.CDC_ENABLED)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 237afaf4379f0..86423073c5c3b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -278,6 +278,15 @@ object DataSourceReadOptions { .sinceVersion("1.1.0") .withDocumentation("Fully qualified class name of the catalog that is used by the Polaris spark client.") + val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class") + .defaultValue("false") + .markAdvanced() + .sinceVersion("1.2.0") + .withDocumentation("This config helps whether PartitionValueExtractor interface can be used" + + " for parsing partition values from partition path. When this config is enabled, it uses" + + " PartitionValueExtractor class value stored in the hoodie.properties file.") + /** @deprecated Use {@link QUERY_TYPE} and its methods instead */ @Deprecated val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 56280da8a9d7d..15bb47ebbb69c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -477,6 +477,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, getPartitionColumnsAsInternalRowInternal(file, metaClient.getBasePath, extractPartitionValuesFromPartitionPath = true) + protected def usePartitionValueExtractorOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = { + optParams.getOrElse(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue).toBoolean || + ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession) + } + protected def getPartitionColumnsAsInternalRowInternal(file: StoragePathInfo, basePath: StoragePath, extractPartitionValuesFromPartitionPath: Boolean): InternalRow = { if (extractPartitionValuesFromPartitionPath) { @@ -489,9 +495,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, relativePath, basePath, tableStructSchema, - tableConfig.propsMap, + tableConfig, timeZoneId, - conf.getBoolean("spark.sql.sources.validatePartitionColumns", true)) + conf.getBoolean("spark.sql.sources.validatePartitionColumns", true), + usePartitionValueExtractorOnRead(optParams, sparkSession)) if(rowValues.length != partitionColumns.length) { throw new HoodieException("Failed to get partition column values from the partition-path:" + s"partition column size: ${partitionColumns.length}, parsed partition value size: ${rowValues.length}") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 9658fd451ec81..31f317161c72a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -527,9 +527,17 @@ object HoodieFileIndex extends Logging { properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride) } + val usePartitionValueExtractorOnRead = getConfigValue(options, sqlConf, + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, null) + if (usePartitionValueExtractorOnRead != null) { + properties.setProperty(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, + usePartitionValueExtractorOnRead) + } + if (tableConfig != null) { properties.setProperty(RECORDKEY_FIELD.key, tableConfig.getRecordKeyFields.orElse(Array.empty).mkString(",")) properties.setProperty(PARTITIONPATH_FIELD.key, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).orElse("")) + properties.setProperty(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), tableConfig.getPartitionValueExtractorClass) // for simple bucket index, we need to set the INDEX_TYPE, BUCKET_INDEX_HASH_FIELD, BUCKET_INDEX_NUM_BUCKETS val database = getDatabaseName(tableConfig, spark.catalog.currentDatabase) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f7db780e504ce..94b58bd4213f6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -291,6 +291,8 @@ class HoodieSparkSqlWriterInternal { if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME))) hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME) else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig) + val partitionValueExtractorClassName = hoodieConfig + .getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null) val tableFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT) HoodieTableMetaClient.newTableBuilder() .setTableType(tableType) @@ -310,6 +312,7 @@ class HoodieSparkSqlWriterInternal { .setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED)) .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setKeyGeneratorClassProp(keyGenProp) + .setPartitionValueExtractorClass(partitionValueExtractorClassName) .set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]]) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) @@ -749,7 +752,8 @@ class HoodieSparkSqlWriterInternal { String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()) )) val tableFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT) - + val partitionValueExtractorClassName = hoodieConfig + .getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null) HoodieTableMetaClient.newTableBuilder() .setTableType(HoodieTableType.valueOf(tableType)) .setTableName(tableName) @@ -769,6 +773,7 @@ class HoodieSparkSqlWriterInternal { .setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE)) .setPopulateMetaFields(populateMetaFields) .setKeyGeneratorClassProp(keyGenProp) + .setPartitionValueExtractorClass(partitionValueExtractorClassName) .set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]]) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index fdc41bcfbd51a..2386d2b535e3c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -323,6 +323,16 @@ object HoodieWriterUtils { && currentPartitionFields != tableConfigPartitionFields) { diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n") } + + // Validate partition value extractor + val currentPartitionValueExtractor = params.getOrElse(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null) + if (currentPartitionValueExtractor != null) { + val tableConfigPartitionValueExtractor = tableConfig.getString(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS) + if (tableConfigPartitionValueExtractor != null && + !currentPartitionValueExtractor.equals(tableConfigPartitionValueExtractor)) { + diffConfigs.append(s"ParitionValueExtractor:\t$currentPartitionValueExtractor\ttableConfigPartitionValueExtractortabelConfigPartitionValueExtractor\n") + } + } // The value of `HoodieTableConfig.RECORD_MERGE_STRATEGY_ID` can be NULL or non-NULL. // The non-NULL value has been validated above in the regular code path. // Here we check the NULL case since if the value is NULL, the check is skipped above. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 567053c618a71..1674d5627762e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -118,6 +118,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession, protected lazy val shouldFastBootstrap = configProperties.getBoolean(DATA_QUERIES_ONLY.key, false) + protected lazy val usePartitionValueExtractorOnRead = configProperties.getBoolean( + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key(), + DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue().toBoolean) + /** * Get the partition schema from the hoodie.properties. */ @@ -432,9 +436,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession, partitionPath, getBasePath, schema, - metaClient.getTableConfig.propsMap, + metaClient.getTableConfig, configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone), - shouldValidatePartitionColumns(spark)) + shouldValidatePartitionColumns(spark), + usePartitionValueExtractorOnRead) } private def arePartitionPathsUrlEncoded: Boolean = diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 90dfb4f83d729..034a56ded321b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -135,6 +135,7 @@ object CreateHoodieTableCommand { checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala new file mode 100644 index 0000000000000..a49c1c3566404 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashKeyGenerator.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.common + +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.keygen.{BuiltinKeyGenerator, ComplexAvroKeyGenerator, KeyGenUtils} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions + +import org.apache.avro.generic.GenericRecord +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +import java.util.Arrays + +import scala.jdk.CollectionConverters._ + +/** + * Key generator that converts partition values to slash-separated partition paths. + * + * This generator is useful when you have partition columns like: + * - datestr: "yyyy-mm-dd" format + * - country, state, city: regular string values + * + * And you want to create partition paths like: yyyy/mm/dd/country/state/city + * + * The first partition field (typically a date) will have its hyphens replaced with slashes. + * All partition fields are then combined with "/" as the separator. + */ +class MockSlashKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props) { + + private val complexAvroKeyGenerator: ComplexAvroKeyGenerator = new ComplexAvroKeyGenerator(props) + + this.recordKeyFields = KeyGenUtils.getRecordKeyFields(props) + this.partitionPathFields = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) + .split(",") + .map(_.trim) + .filter(_.nonEmpty) + .toList + .asJava + + override def getRecordKey(record: GenericRecord): String = { + complexAvroKeyGenerator.getRecordKey(record) + } + + override def getPartitionPath(record: GenericRecord): String = { + complexAvroKeyGenerator.getPartitionPath(record).replace('-', '/') + } + + override def getRecordKey(row: Row): String = { + tryInitRowAccessor(row.schema) + combineRecordKey(getRecordKeyFieldNames, Arrays.asList(rowAccessor.getRecordKeyParts(row): _*)) + } + + override def getRecordKey(internalRow: InternalRow, schema: StructType): UTF8String = { + tryInitRowAccessor(schema) + combineRecordKeyUnsafe(getRecordKeyFieldNames, Arrays.asList(rowAccessor.getRecordKeyParts(internalRow): _*)) + } + + override def getPartitionPath(row: Row): String = { + tryInitRowAccessor(row.schema) + val partitionValues = rowAccessor.getRecordPartitionPathValues(row) + formatPartitionPath(partitionValues) + } + + override def getPartitionPath(row: InternalRow, schema: StructType): UTF8String = { + tryInitRowAccessor(schema) + val partitionValues = rowAccessor.getRecordPartitionPathValues(row) + UTF8String.fromString(formatPartitionPath(partitionValues)) + } + + /** + * Formats the partition path by: + * 1. Converting the first partition value (date) from "yyyy-mm-dd" to "yyyy/mm/dd" + * 2. Combining all partition values with "/" separator + * + * @param partitionValues Array of partition field values + * @return Formatted partition path like "yyyy/mm/dd/country/state/city" + */ + private def formatPartitionPath(partitionValues: Array[Object]): String = { + if (partitionValues == null || partitionValues.length == 0) { + "" + } else { + val partitionPath = new StringBuilder() + + for (i <- partitionValues.indices) { + if (i > 0) { + partitionPath.append("/") + } + + var value = getPartitionValue(partitionValues(i)) + + // For the first partition field (typically the date), replace hyphens with slashes + if (i == 0 && value.contains("-")) { + value = value.replace("-", "/") + } + + partitionPath.append(value) + } + + partitionPath.toString() + } + } + + /** + * Extracts the string value from a partition field value object. + * + * @param value The partition field value + * @return String representation of the value + */ + private def getPartitionValue(value: Object): String = { + value match { + case null => "" + case utf8: UTF8String => utf8.toString + case _ => String.valueOf(value) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala new file mode 100644 index 0000000000000..b922473c729cc --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/MockSlashPartitionValueExtractor.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hudi.common + +import org.apache.hudi.sync.common.model.PartitionValueExtractor + +import java.util + +class MockSlashPartitionValueExtractor extends PartitionValueExtractor { + + /** + * + * This method will extract Partition values from the partition path + * and provide date, country, state, city values separately. + * @param partitionPath PartitionPath provided will be in the format yyyy/mm/dd/country/stat/city + * @return is a List of string with values template such as yyyy-mm-dd, country, state, city + */ + override def extractPartitionValuesInPath(partitionPath: String): util.List[String] = { + val partitionSplitsSeq = partitionPath.split("/") + val date = s"${partitionSplitsSeq(0)}-${partitionSplitsSeq(1)}-${partitionSplitsSeq(2)}" + val country = partitionSplitsSeq(3) + val state = partitionSplitsSeq(4) + val city = partitionSplitsSeq(5) + java.util.Arrays.asList(date, country, state, city) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala new file mode 100644 index 0000000000000..1996e89cf2801 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.common + +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.hudi.storage.StoragePath +import org.apache.hudi.sync.common.HoodieSyncConfig + +import org.junit.jupiter.api.Assertions.assertTrue + +import java.util.stream.Collectors + +class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { + + test("Test custom partition value extractor interface") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + // Enable partition value extractor on read + spark.conf.set("hoodie.datasource.read.partition.value.using.partion-value-extractor-class", "true") + + spark.sql( + s""" + |create table $targetTable ( + | `id` string, + | `name` string, + | `ts` bigint, + | `datestr` string, + | `country` string, + | `state` string, + | `city` string + |) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'COW', + | 'preCombineField'='ts', + | 'hoodie.datasource.write.hive_style_partitioning'='false', + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | '${HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor' + | ) + | partitioned by (`datestr`, `country`, `state`, `city`) + | location '$tablePath' + """.stripMargin) + // yyyy/mm/dd/country/state/city + import spark.implicits._ + val df = Seq( + ("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + ("2", "a2", 2000, "2024-01-01", "USA", "TX", "AU"), + ("3", "a3", 3000, "2024-01-02", "USA", "CA", "LA"), + ("4", "a4", 4000, "2024-01-02", "USA", "WA", "SEA"), + ("5", "a5", 5000, "2024-01-03", "USA", "CA", "SFO") + ).toDF("id", "name", "ts", "datestr", "country", "state", "city") + + df.write + .format("hudi") + .mode("append") + .save(tablePath) + + // check result after insert and merge data into target table + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable" + + s" where state = 'CA'")( + Seq("1", "a1", 1000L, "2024-01-01", "USA", "CA", "SFO"), + Seq("3", "a3", 3000L, "2024-01-02", "USA", "CA", "LA"), + Seq("5", "a5", 5000L, "2024-01-03", "USA", "CA", "SFO") + ) + + // Verify table config has custom partition value extractor class set + val metaClient = HoodieTableMetaClient.builder() + .setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)) + .setBasePath(tablePath) + .build() + val tableConfig = metaClient.getTableConfig + val partitionExtractorClass = tableConfig.getProps.getProperty( + HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key()) + assertTrue(partitionExtractorClass == "org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor", + s"Table config should have custom partition value extractor class set to MockSlashPartitionValueExtractor, but got $partitionExtractorClass") + + // Verify that partition paths are created with slash separated format (yyyy/MM/dd/country/state/city) + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/01/USA/CA/SFO")), + s"Partition path 2024/01/01/USA/CA/SFO should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/01/USA/TX/AU")), + s"Partition path 2024/01/01/USA/TX/AU should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/02/USA/CA/LA")), + s"Partition path 2024/01/02/USA/CA/LA should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/02/USA/WA/SEA")), + s"Partition path 2024/01/02/USA/WA/SEA should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/03/USA/CA/SFO")), + s"Partition path 2024/01/03/USA/CA/SFO should exist") + + val engine = new HoodieSparkEngineContext(spark.sparkContext) + val storage = metaClient.getStorage() + val metadataConfig = HoodieMetadataConfig.newBuilder().build() + val metadataTable = new HoodieBackedTableMetadata(engine, storage, metadataConfig, tablePath) + val partitionPaths = metadataTable.getAllPartitionPaths + assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO")) + assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU")) + assertTrue(partitionPaths.contains("2024/01/02/USA/CA/LA")) + assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA")) + assertTrue(partitionPaths.contains("2024/01/03/USA/CA/SFO")) + metadataTable.close() + } + } + + test("Test custom partition value extractor with partition pruning and filtering") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + // Enable partition value extractor on read + spark.conf.set("hoodie.datasource.read.partition.value.using.partion-value-extractor-class", "true") + + spark.sql( + s""" + |create table $targetTable ( + | `id` string, + | `name` string, + | `ts` bigint, + | `datestr` string, + | `country` string, + | `state` string, + | `city` string + |) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'COW', + | 'preCombineField'='ts', + | 'hoodie.datasource.write.hive_style_partitioning'='false', + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | '${HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor' + | ) + | partitioned by (`datestr`, `country`, `state`, `city`) + | location '$tablePath' + """.stripMargin) + + // Insert data across multiple partitions + import spark.implicits._ + val df = Seq( + ("1", "a1", 1000L, "2024-01-01", "USA", "CA", "SFO"), + ("2", "a2", 2000L, "2024-01-01", "USA", "CA", "LA"), + ("3", "a3", 3000L, "2024-01-01", "USA", "TX", "AU"), + ("4", "a4", 4000L, "2024-01-02", "USA", "CA", "SFO"), + ("5", "a5", 5000L, "2024-01-02", "USA", "WA", "SEA"), + ("6", "a6", 6000L, "2024-01-03", "USA", "CA", "LA"), + ("7", "a7", 7000L, "2024-01-03", "CAN", "ON", "TOR"), + ("8", "a8", 8000L, "2024-01-04", "USA", "NY", "NYC") + ).toDF("id", "name", "ts", "datestr", "country", "state", "city") + + df.write + .format("hudi") + .mode("append") + .save(tablePath) + + // Test partition pruning with single partition column filter (state) + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state = 'CA' order by id")( + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO"), + Seq("6", "a6", 6000, "2024-01-03", "USA", "CA", "LA") + ) + + // Test partition pruning with multiple partition column filters + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state = 'CA' and city = 'SFO' order by id")( + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO") + ) + + // Test partition pruning with date filter + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where datestr = '2024-01-01' order by id")( + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq("3", "a3", 3000, "2024-01-01", "USA", "TX", "AU") + ) + + // Test partition pruning with country filter + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where country = 'CAN' order by id")( + Seq("7", "a7", 7000, "2024-01-03", "CAN", "ON", "TOR") + ) + + // Test partition pruning with combined date and state filter + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where datestr = '2024-01-02' and state = 'CA' order by id")( + Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO") + ) + + // Test partition pruning with IN clause + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state IN ('CA', 'NY') order by id")( + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq("2", "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq("4", "a4", 4000, "2024-01-02", "USA", "CA", "SFO"), + Seq("6", "a6", 6000, "2024-01-03", "USA", "CA", "LA"), + Seq("8", "a8", 8000, "2024-01-04", "USA", "NY", "NYC") + ) + + // Test reading with _hoodie_partition_path to verify custom partition format + checkAnswer(s"select id, _hoodie_partition_path from $targetTable where state = 'CA' order by id")( + Seq("1", "2024/01/01/USA/CA/SFO"), + Seq("2", "2024/01/01/USA/CA/LA"), + Seq("4", "2024/01/02/USA/CA/SFO"), + Seq("6", "2024/01/03/USA/CA/LA") + ) + + // Verify partition pruning works by corrupting a parquet file in a partition that won't be queried + // We'll corrupt a file in the WA/SEA partition and query for CA - if partition pruning works, the query succeeds + val metaClient = HoodieTableMetaClient.builder() + .setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)) + .setBasePath(tablePath) + .build() + + // Find and corrupt a parquet file in the 2024/01/02/USA/WA/SEA partition + val partitionPathForCorruption = new StoragePath(tablePath, "2024/01/02/USA/WA/SEA") + val storage = metaClient.getStorage + val parquetFiles = storage.listDirectEntries(partitionPathForCorruption).stream() + .filter(fileStatus => fileStatus.getPath.getName.endsWith(".parquet") && !fileStatus.getPath.getName.startsWith(".")) + .collect(Collectors.toList()) + + assertTrue(!parquetFiles.isEmpty, "Should have at least one parquet file in WA/SEA partition") + + // Corrupt the first parquet file by writing garbage data + val fileToCorrupt = parquetFiles.get(0).getPath + val outputStream = storage.create(fileToCorrupt, true) + try { + outputStream.write("CORRUPTED_DATA".getBytes()) + } finally { + outputStream.close() + } + + // Query for state = 'CA' should still succeed because partition pruning avoids the corrupted WA partition + checkAnswer(s"select id, name, state from $targetTable where state = 'CA' order by id")( + Seq("1", "a1", "CA"), + Seq("2", "a2", "CA"), + Seq("4", "a4", "CA"), + Seq("6", "a6", "CA") + ) + + // Similarly, query for state = 'TX' should succeed + checkAnswer(s"select id, name, state from $targetTable where state = 'TX' order by id")( + Seq("3", "a3", "TX") + ) + + // Create DataFrame and analyze query plan to verify partition pruning + val dfWithStateFilter = spark.sql(s"select * from $targetTable where state = 'CA'") + val planWithStateFilter = dfWithStateFilter.queryExecution.executedPlan.toString() + // Verify partition filters are pushed down + assertTrue(planWithStateFilter.contains("PartitionFilters") || planWithStateFilter.contains("PushedFilters"), + s"Query plan should contain partition filters for state column") + + // Test DataFrame API with multiple partition filters + val dfWithMultipleFilters = spark.table(targetTable) + .filter("state = 'CA' and datestr = '2024-01-01'") + val planWithMultipleFilters = dfWithMultipleFilters.queryExecution.executedPlan.toString() + assertTrue(planWithMultipleFilters.contains("PartitionFilters") || planWithMultipleFilters.contains("PushedFilters"), + s"Query plan should contain partition filters for multiple columns") + + // Verify the filtered results + val multiFilterResults = dfWithMultipleFilters.select("id", "name", "state", "datestr").orderBy("id").collect() + assertTrue(multiFilterResults.length == 2, s"Expected 2 rows but got ${multiFilterResults.length}") + assertTrue(multiFilterResults(0).getString(0) == "1", s"First row id should be 1") + assertTrue(multiFilterResults(0).getString(1) == "a1", s"First row name should be a1") + assertTrue(multiFilterResults(0).getString(2) == "CA", s"First row state should be CA") + assertTrue(multiFilterResults(0).getString(3) == "2024-01-01", s"First row datestr should be 2024-01-01") + assertTrue(multiFilterResults(1).getString(0) == "2", s"Second row id should be 2") + assertTrue(multiFilterResults(1).getString(1) == "a2", s"Second row name should be a2") + + // Test DataFrame with country filter + val dfWithCountryFilter = spark.table(targetTable).filter("country = 'CAN'") + val planWithCountryFilter = dfWithCountryFilter.queryExecution.executedPlan.toString() + assertTrue(planWithCountryFilter.contains("PartitionFilters") || planWithCountryFilter.contains("PushedFilters"), + s"Query plan should contain partition filters for country column") + + val countryFilterResults = dfWithCountryFilter.select("id", "country").orderBy("id").collect() + assertTrue(countryFilterResults.length == 1, s"Expected 1 row but got ${countryFilterResults.length}") + assertTrue(countryFilterResults(0).getString(0) == "7", s"Row id should be 7") + assertTrue(countryFilterResults(0).getString(1) == "CAN", s"Row country should be CAN") + + // Verify all partitions exist as expected + val engine = new HoodieSparkEngineContext(spark.sparkContext) + val metadataConfig = HoodieMetadataConfig.newBuilder().build() + val metadataTable = new HoodieBackedTableMetadata(engine, storage, metadataConfig, tablePath) + val partitionPaths = metadataTable.getAllPartitionPaths + + // Verify expected partition paths + assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO")) + assertTrue(partitionPaths.contains("2024/01/01/USA/CA/LA")) + assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU")) + assertTrue(partitionPaths.contains("2024/01/02/USA/CA/SFO")) + assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA")) + assertTrue(partitionPaths.contains("2024/01/03/USA/CA/LA")) + assertTrue(partitionPaths.contains("2024/01/03/CAN/ON/TOR")) + assertTrue(partitionPaths.contains("2024/01/04/USA/NY/NYC")) + + metadataTable.close() + } + } + + test("Test custom partition value extractor with URL encoding") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + // Enable partition value extractor on read + spark.conf.set("hoodie.datasource.read.partition.value.using.partion-value-extractor-class", "true") + + spark.sql( + s""" + |create table $targetTable ( + | `id` string, + | `name` string, + | `ts` bigint, + | `datestr` string, + | `country` string, + | `state` string, + | `city` string + |) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'COW', + | 'preCombineField'='ts', + | 'hoodie.datasource.write.hive_style_partitioning'='false', + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.MockSlashKeyGenerator', + | '${HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.MockSlashPartitionValueExtractor', + | 'hoodie.datasource.write.partitionpath.urlencode'='true' + | ) + | partitioned by (`datestr`, `country`, `state`, `city`) + | location '$tablePath' + """.stripMargin) + + // Insert data with special characters that will be URL encoded + import spark.implicits._ + val df = Seq( + ("1", "a1", 1000L, "2024-01-01", "USA", "CA", "San Francisco"), + ("2", "a2", 2000L, "2024-01-01", "USA", "TX", "Austin+Dallas"), + ("3", "a3", 3000L, "2024-01-02", "USA", "CA", "Los Angeles") + ).toDF("id", "name", "ts", "datestr", "country", "state", "city") + + df.write + .format("hudi") + .mode("append") + .save(tablePath) + + // Verify that data can be read back correctly with URL-encoded partition paths + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where city = 'San Francisco' order by id")( + Seq("1", "a1", 1000, "2024-01-01", "USA", "CA", "San Francisco") + ) + + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where city = 'Austin+Dallas' order by id")( + Seq("2", "a2", 2000, "2024-01-01", "USA", "TX", "Austin+Dallas") + ) + + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where city = 'Los Angeles' order by id")( + Seq("3", "a3", 3000, "2024-01-02", "USA", "CA", "Los Angeles") + ) + + // Verify all data can be read + val allData = spark.sql(s"select id, city from $targetTable order by id").collect() + assertTrue(allData.length == 3, s"Expected 3 rows but got ${allData.length}") + assertTrue(allData(0).getString(1) == "San Francisco", "First row city should be San Francisco") + assertTrue(allData(1).getString(1) == "Austin+Dallas", "Second row city should be Austin+Dallas") + assertTrue(allData(2).getString(1) == "Los Angeles", "Third row city should be Los Angeles") + } + } +} diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java index 890c9712f714f..b35cce58cd7c5 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java @@ -51,8 +51,6 @@ import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; import static org.apache.hudi.common.table.HoodieTableConfig.BASE_FILE_FORMAT; import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME; -import static org.apache.hudi.common.table.HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING; -import static org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_NAME_KEY; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY; import static org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING; @@ -127,19 +125,17 @@ public class HoodieSyncConfig extends HoodieConfig { partitionFieldsOpt = HoodieTableConfig.getPartitionFieldProp(cfg) .or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))); } + if (!partitionFieldsOpt.isPresent()) { - return Option.empty(); + return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); } String partitionFields = partitionFieldsOpt.get(); if (StringUtils.nonEmpty(partitionFields)) { int numOfPartFields = partitionFields.split(",").length; if (numOfPartFields == 1) { - if (cfg.contains(HIVE_STYLE_PARTITIONING_ENABLE) - && cfg.getString(HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) { + if (cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()) + && cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()).equals("true")) { return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor"); - } else if (cfg.contains(SLASH_SEPARATED_DATE_PARTITIONING) - && cfg.getString(SLASH_SEPARATED_DATE_PARTITIONING).equals("true")) { - return Option.of("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor"); } else { return Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor"); } @@ -152,7 +148,7 @@ public class HoodieSyncConfig extends HoodieConfig { }) .markAdvanced() .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, " - + "default 'org.apache.hudi.hive.MultiPartKeysValueExtractor'."); + + "default is inferred based on partition configuration."); public static final ConfigProperty META_SYNC_DECODE_PARTITION = ConfigProperty .key("hoodie.meta.sync.decode_partition") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index d1db2855a6b33..c6c33b87dc19f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -91,6 +91,7 @@ import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.sync.common.util.SyncUtilHelpers; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.util.JavaScalaConverters; @@ -452,7 +453,8 @@ HoodieTableMetaClient initializeEmptyTable(HoodieTableMetaClient.TableBuilder ta payloadClass = overridingMergeConfigs.get().getMiddle(); mergeStrategyId = overridingMergeConfigs.get().getRight(); } - + String partitionValueExtractorClassName = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.defaultValue()); return tableBuilder.setTableType(cfg.tableType) .setTableName(cfg.targetTableName) .setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue()) @@ -466,6 +468,7 @@ HoodieTableMetaClient initializeEmptyTable(HoodieTableMetaClient.TableBuilder ta .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())) .setKeyGeneratorClassProp(keyGenClassName) + .setPartitionValueExtractorClass(partitionValueExtractorClassName) .setOrderingFields(cfg.sourceOrderingFields) .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))