Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.{StringUtils, Option => HOption}
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.util.ExceptionWrappingIterator
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.sync.common.model.PartitionValueExtractor

import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -224,11 +226,13 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
partitionPath: String,
tableBasePath: StoragePath,
tableSchema: StructType,
tableConfig: java.util.Map[String, String],
tableConfig: HoodieTableConfig,
timeZoneId: String,
shouldValidatePartitionColumns: Boolean): Array[Object] = {
shouldValidatePartitionColumns: Boolean,
usePartitionValueExtractorOnRead: Boolean): Array[Object] = {
val keyGeneratorClass = KeyGeneratorType.getKeyGeneratorClassName(tableConfig)
val timestampKeyGeneratorType = tableConfig.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
val timestampKeyGeneratorType = tableConfig.propsMap().get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
val partitionValueExtractorClass = tableConfig.getPartitionValueExtractorClass

if (null != keyGeneratorClass
&& null != timestampKeyGeneratorType
Expand All @@ -238,10 +242,43 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
// we couldn't reconstruct initial partition column values from partition paths due to lost data after formatting.
// But the output for these cases is in a string format, so we can pass partitionPath as UTF8String
Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
} else if(usePartitionValueExtractorOnRead && !StringUtils.isNullOrEmpty(partitionValueExtractorClass)) {
parsePartitionValuesBasedOnPartitionValueExtractor(partitionValueExtractorClass, partitionPath,
partitionColumns, tableSchema)
} else {
doParsePartitionColumnValues(partitionColumns, partitionPath, tableBasePath, tableSchema, timeZoneId,
shouldValidatePartitionColumns, tableConfig.getOrDefault(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.key,
HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue).toBoolean)
shouldValidatePartitionColumns, tableConfig.getSlashSeparatedDatePartitioning)
}
}

/**
* Parses partition values from partition path using a custom PartitionValueExtractor.
*
* @param partitionValueExtractorClass Fully qualified class name of the PartitionValueExtractor implementation
* @param partitionPath The partition path to extract values from
* @param partitionColumns Array of partition column names
* @param tableSchema The schema of the table
* @return Array of partition values as Objects, properly typed according to the schema
*/
private def parsePartitionValuesBasedOnPartitionValueExtractor(
partitionValueExtractorClass: String,
partitionPath: String,
partitionColumns: Array[String],
tableSchema: StructType): Array[Object] = {
try {
val partitionValueExtractor = Class.forName(partitionValueExtractorClass)
.getDeclaredConstructor()
.newInstance()
.asInstanceOf[PartitionValueExtractor]
val partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partitionPath).asScala.toArray
val partitionSchema = buildPartitionSchemaForNestedFields(tableSchema, partitionColumns)
val typedValues = partitionValues.zip(partitionSchema.fields).map { case (stringValue, field) =>
castStringToType(stringValue, field.dataType)
}
typedValues.map(_.asInstanceOf[Object])
} catch {
case e: Exception =>
throw new RuntimeException(s"Failed to extract partition value using $partitionValueExtractorClass class", e)
}
}

Expand Down Expand Up @@ -336,7 +373,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
).toSeq(partitionSchema)
}

private def buildPartitionSchemaForNestedFields(schema: StructType, partitionColumns: Array[String]): StructType = {
def buildPartitionSchemaForNestedFields(schema: StructType, partitionColumns: Array[String]): StructType = {
val partitionFields = partitionColumns.flatMap { partitionCol =>
extractNestedField(schema, partitionCol)
}
Expand Down Expand Up @@ -364,7 +401,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
traverseSchema(schema, pathParts.toList, fieldPath)
}

private def castStringToType(value: String, dataType: org.apache.spark.sql.types.DataType): Any = {
def castStringToType(value: String, dataType: org.apache.spark.sql.types.DataType): Any = {
import org.apache.spark.sql.types._

// handling cases where the value contains path separators or is complex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,37 @@ public static final String getDefaultPayloadClassName() {
.sinceVersion("1.0.0")
.withDocumentation("Key Generator type to determine key generator class");

public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS = ConfigProperty
.key("hoodie.table.hive_sync.partition_extractor_class")
.noDefaultValue()
.withInferFunction(cfg -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not be setting any default here right?
ok to have the infer function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't use a config key w/ "hive_sync" in the name.
We plan to use this for reading.

May be hoodie.table.partition_value_extractor_class.

but we might have to introduce new partition value extractor classes for the read instead of using the same one we use for hive sync.

let me think about it more or chat w/ others to see how we can go about this

Option<String> partitionFieldsOpt = HoodieTableConfig.getPartitionFieldProp(cfg)
.or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));

if (!partitionFieldsOpt.isPresent()) {
return Option.of("org.apache.hudi.hive.NonPartitionedExtractor");
}
String partitionFields = partitionFieldsOpt.get();
if (StringUtils.nonEmpty(partitionFields)) {
int numOfPartFields = partitionFields.split(",").length;
if (numOfPartFields == 1) {
if (cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key())
&& cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()).equals("true")) {
return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor");
} else {
return Option.of("org.apache.hudi.hive.SinglePartPartitionValueExtractor");
}
} else {
return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor");
}
} else {
return Option.of("org.apache.hudi.hive.NonPartitionedExtractor");
}
})
.markAdvanced()
.withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, "
+ "default is inferred based on partition configuration.");

// TODO: this has to be UTC. why is it not the default?
public static final ConfigProperty<HoodieTimelineTimeZone> TIMELINE_TIMEZONE = ConfigProperty
.key("hoodie.table.timeline.timezone")
Expand Down Expand Up @@ -1208,6 +1239,10 @@ public String getKeyGeneratorClassName() {
return KeyGeneratorType.getKeyGeneratorClassName(this);
}

public String getPartitionValueExtractorClass() {
return getStringOrDefault(PARTITION_VALUE_EXTRACTOR_CLASS, "");
}

public HoodieTimelineTimeZone getTimelineTimezone() {
return HoodieTimelineTimeZone.valueOf(getStringOrDefault(TIMELINE_TIMEZONE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,7 @@ public static class TableBuilder {
private Boolean bootstrapIndexEnable;
private Boolean populateMetaFields;
private String keyGeneratorClassProp;
private String partitionValueExtractorClass;
private String keyGeneratorType;
private Boolean slashSeparatedDatePartitioning;
private Boolean hiveStylePartitioningEnable;
Expand Down Expand Up @@ -1243,6 +1244,11 @@ public TableBuilder setSlashSeparatedDatePartitioning(Boolean slashSeparatedDate
return this;
}

public TableBuilder setPartitionValueExtractorClass(String partitionValueExtractorClass) {
this.partitionValueExtractorClass = partitionValueExtractorClass;
return this;
}

public TableBuilder setHiveStylePartitioningEnable(Boolean hiveStylePartitioningEnable) {
this.hiveStylePartitioningEnable = hiveStylePartitioningEnable;
return this;
Expand Down Expand Up @@ -1445,6 +1451,9 @@ public TableBuilder fromProperties(Properties properties) {
if (hoodieConfig.contains(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING)) {
setSlashSeparatedDatePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING));
}
if (hoodieConfig.contains(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS)) {
setPartitionValueExtractorClass(hoodieConfig.getString(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS));
}
if (hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) {
setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
}
Expand Down Expand Up @@ -1588,6 +1597,9 @@ public Properties build() {
if (null != slashSeparatedDatePartitioning) {
tableConfig.setValue(HoodieTableConfig.SLASH_SEPARATED_DATE_PARTITIONING, Boolean.toString(slashSeparatedDatePartitioning));
}
if (null != partitionValueExtractorClass) {
tableConfig.setValue(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS, partitionValueExtractorClass);
}
if (null != hiveStylePartitioningEnable) {
tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, Boolean.toString(hiveStylePartitioningEnable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.util.ClientIds;

Expand Down Expand Up @@ -657,6 +658,13 @@ public class FlinkOptions extends HoodieConfig {
+ "**Note** This is being actively worked on. Please use "
+ "`hoodie.datasource.write.keygenerator.class` instead.");

@AdvancedConfig
public static final ConfigOption<String> PARTITION_VALUE_EXTRACTOR = ConfigOptions
.key(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key())
.stringType()
.noDefaultValue()
.withDescription("Partition value extractor class helps extract the partition value from partition paths");

public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ public static HoodieTableMetaClient initTableIfNotExists(
.setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD.key(), null))
.setKeyGeneratorClassProp(
conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName()))
.setPartitionValueExtractorClass(conf.getString(FlinkOptions.PARTITION_VALUE_EXTRACTOR.key(), null))
.setHiveStylePartitioningEnable(conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(conf.get(FlinkOptions.URL_ENCODE_PARTITIONING))
.setCDCEnabled(conf.get(FlinkOptions.CDC_ENABLED))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,15 @@ object DataSourceReadOptions {
.sinceVersion("1.1.0")
.withDocumentation("Fully qualified class name of the catalog that is used by the Polaris spark client.")

val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class")
.defaultValue("false")
.markAdvanced()
.sinceVersion("1.2.0")
.withDocumentation("This config helps whether PartitionValueExtractor interface can be used" +
" for parsing partition values from partition path. When this config is enabled, it uses" +
" PartitionValueExtractor class value stored in the hoodie.properties file.")

/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
getPartitionColumnsAsInternalRowInternal(file,
metaClient.getBasePath, extractPartitionValuesFromPartitionPath = true)

protected def usePartitionValueExtractorOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = {
optParams.getOrElse(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key,
DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.defaultValue).toBoolean ||
ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession)
}

protected def getPartitionColumnsAsInternalRowInternal(file: StoragePathInfo, basePath: StoragePath,
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
if (extractPartitionValuesFromPartitionPath) {
Expand All @@ -489,9 +495,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
relativePath,
basePath,
tableStructSchema,
tableConfig.propsMap,
tableConfig,
timeZoneId,
conf.getBoolean("spark.sql.sources.validatePartitionColumns", true))
conf.getBoolean("spark.sql.sources.validatePartitionColumns", true),
usePartitionValueExtractorOnRead(optParams, sparkSession))
if(rowValues.length != partitionColumns.length) {
throw new HoodieException("Failed to get partition column values from the partition-path:"
+ s"partition column size: ${partitionColumns.length}, parsed partition value size: ${rowValues.length}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,17 @@ object HoodieFileIndex extends Logging {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride)
}

val usePartitionValueExtractorOnRead = getConfigValue(options, sqlConf,
DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key, null)
if (usePartitionValueExtractorOnRead != null) {
properties.setProperty(DataSourceReadOptions.USE_PARTITION_VALUE_EXTRACTOR_ON_READ.key,
usePartitionValueExtractorOnRead)
}

if (tableConfig != null) {
properties.setProperty(RECORDKEY_FIELD.key, tableConfig.getRecordKeyFields.orElse(Array.empty).mkString(","))
properties.setProperty(PARTITIONPATH_FIELD.key, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).orElse(""))
properties.setProperty(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS.key(), tableConfig.getPartitionValueExtractorClass)

// for simple bucket index, we need to set the INDEX_TYPE, BUCKET_INDEX_HASH_FIELD, BUCKET_INDEX_NUM_BUCKETS
val database = getDatabaseName(tableConfig, spark.catalog.currentDatabase)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ class HoodieSparkSqlWriterInternal {
if (StringUtils.nonEmpty(hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)))
hoodieConfig.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)
else KeyGeneratorType.getKeyGeneratorClassName(hoodieConfig)
val partitionValueExtractorClassName = hoodieConfig
.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null)
val tableFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT)
HoodieTableMetaClient.newTableBuilder()
.setTableType(tableType)
Expand All @@ -310,6 +312,7 @@ class HoodieSparkSqlWriterInternal {
.setCDCEnabled(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED))
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setKeyGeneratorClassProp(keyGenProp)
.setPartitionValueExtractorClass(partitionValueExtractorClassName)
.set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]])
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
Expand Down Expand Up @@ -749,7 +752,8 @@ class HoodieSparkSqlWriterInternal {
String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
))
val tableFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.TABLE_FORMAT)

val partitionValueExtractorClassName = hoodieConfig
.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null)
HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
Expand All @@ -769,6 +773,7 @@ class HoodieSparkSqlWriterInternal {
.setCDCSupplementalLoggingMode(hoodieConfig.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE))
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
.setPartitionValueExtractorClass(partitionValueExtractorClassName)
.set(timestampKeyGeneratorConfigs.asJava.asInstanceOf[java.util.Map[String, Object]])
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,16 @@ object HoodieWriterUtils {
&& currentPartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
}

// Validate partition value extractor
val currentPartitionValueExtractor = params.getOrElse(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make a new writer property.
lets align before we go ahead w/ more changes

if (currentPartitionValueExtractor != null) {
val tableConfigPartitionValueExtractor = tableConfig.getString(HoodieTableConfig.PARTITION_VALUE_EXTRACTOR_CLASS)
if (tableConfigPartitionValueExtractor != null &&
!currentPartitionValueExtractor.equals(tableConfigPartitionValueExtractor)) {
diffConfigs.append(s"ParitionValueExtractor:\t$currentPartitionValueExtractor\ttableConfigPartitionValueExtractortabelConfigPartitionValueExtractor\n")
}
}
// The value of `HoodieTableConfig.RECORD_MERGE_STRATEGY_ID` can be NULL or non-NULL.
// The non-NULL value has been validated above in the regular code path.
// Here we check the NULL case since if the value is NULL, the check is skipped above.
Expand Down
Loading
Loading