From ff71010ca795cbf6dc731968f8c1b500bd93e624 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Tue, 30 Dec 2025 17:31:26 -0800 Subject: [PATCH 1/2] Handle nested map and array columns in MDT --- .../hudi/SparkHoodieTableFileIndex.scala | 4 +- ...HoodieFileGroupReaderBasedFileFormat.scala | 1 + .../hudi/functional/TestCOWDataSource.scala | 65 +++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index cd79b78ddb816..7b06d7edb6bf7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -33,7 +33,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.Types.RecordType import org.apache.hudi.internal.schema.utils.Conversions -import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator, StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.storage.{StoragePath, StoragePathInfo} import org.apache.hudi.util.JFunction @@ -131,6 +131,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession, val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { + // || classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) + // || classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { val partitionFields: Array[StructField] = partitionColumns.get().map(column => StructField(column, StringType)) StructType(partitionFields) } else { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 4dd3f66551de5..e52efa5ce4a35 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -214,6 +214,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val exclusionFields = new java.util.HashSet[String]() exclusionFields.add("op") partitionSchema.fields.foreach(f => exclusionFields.add(f.name)) + // QQ: How do we add nested partitioned fields to this without any field id's? val requestedStructType = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) val requestedSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(requestedStructType, sanitizedTableName), exclusionFields) val dataSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(dataStructType, sanitizedTableName), exclusionFields) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 6473bb3de3431..05948ee693bab 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -2453,6 +2453,71 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup writeToHudi(opt, firstUpdateDF, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) }) } + + @Test + def testNestedFieldPartition(): Unit = { + // Define schema with nested_record containing level field + val nestedSchema = StructType(Seq( + StructField("nested_int", IntegerType, nullable = false), + StructField("level", StringType, nullable = false) + )) + + val schema = StructType(Seq( + StructField("key", StringType, nullable = false), + StructField("ts", LongType, nullable = false), + StructField("level", StringType, nullable = false), + StructField("int_field", IntegerType, nullable = false), + StructField("string_field", StringType, nullable = true), + StructField("nested_record", nestedSchema, nullable = true) + )) + + // Create test data where top-level 'level' and 'nested_record.level' have DIFFERENT values + // This helps verify we're correctly partitioning/filtering on the nested field + val records = Seq( + Row("key1", 1L, "L1", 1, "str1", Row(10, "INFO")), + Row("key2", 2L, "L2", 2, "str2", Row(20, "ERROR")), + Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")), + Row("key4", 4L, "L4", 4, "str4", Row(40, "DEBUG")), + Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO")) + ) + + val inputDF = spark.createDataFrame( + spark.sparkContext.parallelize(records), + schema + ) + + // Write to Hudi partitioned by nested_record.level + inputDF.write.format("hudi") + .option("hoodie.insert.shuffle.parallelism", "4") + .option("hoodie.upsert.shuffle.parallelism", "4") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "key") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "nested_record.level") + .option(HoodieTableConfig.ORDERING_FIELDS.key, "ts") + .option(HoodieWriteConfig.TBL_NAME.key, "test_nested_partition") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + // Read and filter on nested_record.level = 'INFO' + val results = spark.read.format("hudi") + .load(basePath) + .filter("nested_record.level = 'INFO'") + .select("key", "ts", "level", "int_field", "string_field", "nested_record") + .orderBy("key") + .collect() + + // Expected results - 3 records with nested_record.level = 'INFO' + val expectedResults = Array( + Row("key1", 1L, "L1", 1, "str1", Row(10, "INFO")), + Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")), + Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO")) + ) + + assertEquals(expectedResults.length, results.length) + expectedResults.zip(results).foreach { case (expected, actual) => + assertEquals(expected, actual) + } + } } object TestCOWDataSource { From eaf9bd8893e923fb11126821fcc39018a03c3aad Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Sat, 7 Feb 2026 16:56:23 -0800 Subject: [PATCH 2/2] Fix the issue and add tests --- .../hudi/SparkHoodieTableFileIndex.scala | 9 +- ...HoodieFileGroupReaderBasedFileFormat.scala | 1 - .../hudi/functional/TestCOWDataSource.scala | 173 ++++++++++++++++-- 3 files changed, 158 insertions(+), 25 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 7b06d7edb6bf7..c794889a6c0b4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -33,7 +33,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.Types.RecordType import org.apache.hudi.internal.schema.utils.Conversions -import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator, StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.storage.{StoragePath, StoragePathInfo} import org.apache.hudi.util.JFunction @@ -131,13 +131,14 @@ class SparkHoodieTableFileIndex(spark: SparkSession, val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { - // || classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) - // || classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { val partitionFields: Array[StructField] = partitionColumns.get().map(column => StructField(column, StringType)) StructType(partitionFields) } else { + // Use full partition path (e.g. "nested_record.level") as the partition column name so that + // data schema does not exclude a same-named top-level column (e.g. "level") when partition + // path is a nested field. Otherwise partition value would overwrite the data column on read. val partitionFields: Array[StructField] = partitionColumns.get().filter(column => nameFieldMap.contains(column)) - .map(column => nameFieldMap.apply(column)) + .map(column => StructField(column, nameFieldMap.apply(column).dataType)) if (partitionFields.length != partitionColumns.get().length) { val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index e52efa5ce4a35..4dd3f66551de5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -214,7 +214,6 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val exclusionFields = new java.util.HashSet[String]() exclusionFields.add("op") partitionSchema.fields.foreach(f => exclusionFields.add(f.name)) - // QQ: How do we add nested partitioned fields to this without any field id's? val requestedStructType = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) val requestedSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(requestedStructType, sanitizedTableName), exclusionFields) val dataSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(dataStructType, sanitizedTableName), exclusionFields) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 05948ee693bab..87d6f976352aa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -2454,8 +2454,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup }) } - @Test - def testNestedFieldPartition(): Unit = { + @ParameterizedTest + @CsvSource(Array("COW", "MOR")) + def testNestedFieldPartition(tableType: String): Unit = { // Define schema with nested_record containing level field val nestedSchema = StructType(Seq( StructField("nested_int", IntegerType, nullable = false), @@ -2473,7 +2474,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup // Create test data where top-level 'level' and 'nested_record.level' have DIFFERENT values // This helps verify we're correctly partitioning/filtering on the nested field - val records = Seq( + val recordsCommit1 = Seq( Row("key1", 1L, "L1", 1, "str1", Row(10, "INFO")), Row("key2", 2L, "L2", 2, "str2", Row(20, "ERROR")), Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")), @@ -2481,40 +2482,172 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO")) ) - val inputDF = spark.createDataFrame( - spark.sparkContext.parallelize(records), - schema + val tableTypeOptVal = if (tableType == "MOR") { + DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL + } else { + DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL + } + + val baseWriteOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "nested_record.level", + HoodieTableConfig.ORDERING_FIELDS.key -> "ts", + HoodieWriteConfig.TBL_NAME.key -> "test_nested_partition", + DataSourceWriteOptions.TABLE_TYPE.key -> tableTypeOptVal ) + val writeOpts = if (tableType == "MOR") { + baseWriteOpts + ("hoodie.compact.inline" -> "false") + } else { + baseWriteOpts + } - // Write to Hudi partitioned by nested_record.level - inputDF.write.format("hudi") - .option("hoodie.insert.shuffle.parallelism", "4") - .option("hoodie.upsert.shuffle.parallelism", "4") - .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "key") - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "nested_record.level") - .option(HoodieTableConfig.ORDERING_FIELDS.key, "ts") - .option(HoodieWriteConfig.TBL_NAME.key, "test_nested_partition") + // Commit 1 - Initial insert + val inputDF1 = spark.createDataFrame( + spark.sparkContext.parallelize(recordsCommit1), + schema + ) + inputDF1.write.format("hudi") + .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .mode(SaveMode.Overwrite) .save(basePath) + val commit1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2 - Upsert: update key1 (int_field 1->100), insert key6 (INFO) + val recordsCommit2 = Seq( + Row("key1", 10L, "L1", 100, "str1", Row(10, "INFO")), + Row("key6", 6L, "L6", 6, "str6", Row(60, "INFO")) + ) + val inputDF2 = spark.createDataFrame( + spark.sparkContext.parallelize(recordsCommit2), + schema + ) + inputDF2.write.format("hudi") + .options(writeOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + val commit2 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 3 - Upsert: update key3 (int_field 3->300), insert key7 (INFO) + val recordsCommit3 = Seq( + Row("key3", 30L, "L3", 300, "str3", Row(30, "INFO")), + Row("key7", 7L, "L7", 7, "str7", Row(70, "INFO")) + ) + val inputDF3 = spark.createDataFrame( + spark.sparkContext.parallelize(recordsCommit3), + schema + ) + inputDF3.write.format("hudi") + .options(writeOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + val commit3 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Snapshot read - filter on nested_record.level = 'INFO' (latest state: 5 records) + val snapshotResults = spark.read.format("hudi") + .load(basePath) + .filter("nested_record.level = 'INFO'") + .select("key", "ts", "level", "int_field", "string_field", "nested_record") + .orderBy("key") + .collect() + + val expectedSnapshot = Array( + Row("key1", 10L, "L1", 100, "str1", Row(10, "INFO")), + Row("key3", 30L, "L3", 300, "str3", Row(30, "INFO")), + Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO")), + Row("key6", 6L, "L6", 6, "str6", Row(60, "INFO")), + Row("key7", 7L, "L7", 7, "str7", Row(70, "INFO")) + ) + assertEquals(expectedSnapshot.length, snapshotResults.length, + s"Snapshot (INFO) count mismatch for $tableType") + expectedSnapshot.zip(snapshotResults).foreach { case (expected, actual) => + assertEquals(expected, actual) + } - // Read and filter on nested_record.level = 'INFO' - val results = spark.read.format("hudi") + // Time travel - as of commit1 (only initial 5 records; INFO = key1, key3, key5) + val timeTravelCommit1 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit1) .load(basePath) .filter("nested_record.level = 'INFO'") .select("key", "ts", "level", "int_field", "string_field", "nested_record") .orderBy("key") .collect() - // Expected results - 3 records with nested_record.level = 'INFO' - val expectedResults = Array( + val expectedAfterCommit1 = Array( Row("key1", 1L, "L1", 1, "str1", Row(10, "INFO")), Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")), Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO")) ) + assertEquals(expectedAfterCommit1.length, timeTravelCommit1.length, + s"Time travel to commit1 (INFO) count mismatch for $tableType") + expectedAfterCommit1.zip(timeTravelCommit1).foreach { case (expected, actual) => + assertEquals(expected, actual) + } + + // Time travel - as of commit2 (after 2nd commit; INFO = key1 updated, key3, key5, key6) + val timeTravelCommit2 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commit2) + .load(basePath) + .filter("nested_record.level = 'INFO'") + .select("key", "ts", "level", "int_field", "string_field", "nested_record") + .orderBy("key") + .collect() + + val expectedAfterCommit2 = Array( + Row("key1", 10L, "L1", 100, "str1", Row(10, "INFO")), + Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")), + Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO")), + Row("key6", 6L, "L6", 6, "str6", Row(60, "INFO")) + ) + assertEquals(expectedAfterCommit2.length, timeTravelCommit2.length, + s"Time travel to commit2 (INFO) count mismatch for $tableType") + expectedAfterCommit2.zip(timeTravelCommit2).foreach { case (expected, actual) => + assertEquals(expected, actual) + } + + // Incremental query - from commit1 to commit2 (only key1 update and key6 insert; both INFO) + val incrementalCommit1To2 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.START_COMMIT.key, commit1) + .option(DataSourceReadOptions.END_COMMIT.key, commit2) + .load(basePath) + .filter("nested_record.level = 'INFO'") + .select("key", "ts", "level", "int_field", "string_field", "nested_record") + .orderBy("key") + .collect() + + val expectedInc1To2 = Array( + Row("key1", 10L, "L1", 100, "str1", Row(10, "INFO")), + Row("key6", 6L, "L6", 6, "str6", Row(60, "INFO")) + ) + assertEquals(expectedInc1To2.length, incrementalCommit1To2.length, + s"Incremental (commit1->commit2, INFO) count mismatch for $tableType") + expectedInc1To2.zip(incrementalCommit1To2).foreach { case (expected, actual) => + assertEquals(expected, actual) + } - assertEquals(expectedResults.length, results.length) - expectedResults.zip(results).foreach { case (expected, actual) => + // Incremental query - from commit2 to commit3 (only key3 update and key7 insert; both INFO) + val incrementalCommit2To3 = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.START_COMMIT.key, commit2) + .option(DataSourceReadOptions.END_COMMIT.key, commit3) + .load(basePath) + .filter("nested_record.level = 'INFO'") + .select("key", "ts", "level", "int_field", "string_field", "nested_record") + .orderBy("key") + .collect() + + val expectedInc2To3 = Array( + Row("key3", 30L, "L3", 300, "str3", Row(30, "INFO")), + Row("key7", 7L, "L7", 7, "str7", Row(70, "INFO")) + ) + assertEquals(expectedInc2To3.length, incrementalCommit2To3.length, + s"Incremental (commit2->commit3, INFO) count mismatch for $tableType") + expectedInc2To3.zip(incrementalCommit2To3).foreach { case (expected, actual) => assertEquals(expected, actual) } }