-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat: Add Unshredded Variant read & write support #17833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
voonhous
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some self-reviews
| * </pre> | ||
| */ | ||
| @Test | ||
| public void testWriteUnshreddedVariant() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These test aren't really meaningful, they pass without the variant read + write code changes anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correction, they are meaningful in helping ensure that our definition of Variant is correct.
.../hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala
Outdated
Show resolved
Hide resolved
|
|
||
| class TestVariantDataType extends HoodieSparkSqlTestBase { | ||
|
|
||
| test("Test COW Table with Variant Data Type") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a MOR test to see if there's anything missing too.
|
|
||
| import java.nio.file.{Files, Path} | ||
|
|
||
| class TestHoodieFileGroupReaderOnSparkVariant extends SparkAdapterSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed, i added this for debugging to allow for finer grain control. It is almost identical to TestVariantDataType.scala. Just that this instantiates a CloseableInternalRowIterator for row reading.
| if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType, fileStructMap(f.name))) { | ||
| val readerType = addMissingFields(requiredType, fileStructMap(f.name)) | ||
| implicitTypeChangeInfo.put(new Integer(requiredSchema.fieldIndex(f.name)), org.apache.hudi.common.util.collection.Pair.of(requiredType, readerType)) | ||
| implicitTypeChangeInfo.put(Integer.valueOf(requiredSchema.fieldIndex(f.name)), org.apache.hudi.common.util.collection.Pair.of(requiredType, readerType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new Integer is marked for removal in Java 9. Using Integer#valueOf instead. This could be a separate PR.
1858ee3 to
4d76cb4
Compare
936d6cd to
345450a
Compare
3b62c5a to
157fa96
Compare
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
Outdated
Show resolved
Hide resolved
...di-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java
Outdated
Show resolved
Hide resolved
...ent/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
Outdated
Show resolved
Hide resolved
| List<Types.Field> variantFields = new ArrayList<>(); | ||
| // Assign field IDs: these are used for schema evolution tracking | ||
| // Use negative IDs: indicate these are system-generated for Variant type | ||
| // TODO (voon): Check if we can remove the magic numbers? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who can we reach out to in order to figure out how we should handle this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very sure, author for this is @xiarixiaoyao, but i don't think they are active anymore.
| // Check for precision and scale if the schema has a logical decimal type. | ||
| // VARIANT (unshredded) type is excluded because it stores semi-structured data as opaque binary blobs, | ||
| // making min/max statistics meaningless | ||
| // TODO: For shredded, we are able to store colstats, explore that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make a GH Issue and then link it here so we don't forget
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
#17988
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you link it inline as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
hudi-common/src/test/java/org/apache/hudi/common/testutils/ZipTestUtils.java
Outdated
Show resolved
Hide resolved
157fa96 to
97ef2e3
Compare
| } else if (dataType == DataTypes.BinaryType) { | ||
| return (row, ordinal) -> recordConsumer.addBinary( | ||
| Binary.fromReusedByteArray(row.getBinary(ordinal))); | ||
| } else if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need something similar in HoodieAvroWriteSupport?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think so, i have test that uses HoodieRecordType.{AVRO, SPARK}. They should trigger both write support and it seems there are no test failures.
In Avro, Variant is already an Avro record from HoodieSchema.createVariant. Where Fields: value (bytes), metadata (bytes).
IIUC, Parquet's AvroWriteSupport handles this automatically as it will know how to convert:
- Avro record -> Parquet group
- Avro bytes -> Parquet binary
HoodieAvroWriteSupport just wraps AvroWriteSupport to add bloom filter support and does not override write logic.
In the Spark Row path, custom handling is needed because Spark's VariantType requires special APIs (createVariantValueWriter) to extract the raw bytes as there are no automatic Spark VariantType -> Parquet conversion from what i can see in our code.
| "CREATE TABLE variant_table (" | ||
| + " id INT," | ||
| + " name STRING," | ||
| + " v ROW<`value` BYTES, metadata BYTES>," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to create the table with the HoodieSchema so the type is annotated as Variant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what is meant by this, do you mean some sort of utility function where HoodieSchema.getVariantTypeSQLStringForFlink where it checks the current version of Flink if it supports Variant natively and applies the relevant Variant type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, something along those lines. Similar to how you have a test with the Variant type in the TestVariantDataType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to clarify one thing, the table on disk will have Variant in the Hudi schema for this table, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It will have a variant logical type in HoodieSchema.
I'm using the physical type here as I am not really sure how to wire this for Flink2.0+ and Flink1.20.
Variant is only supported in Flink2.0+
|
|
||
| // Create parquet reader | ||
| val hadoopConf = new Configuration(spark.sparkContext.hadoopConfiguration) | ||
| val reader = sparkAdapter.createParquetFileReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using a parquetFileReader directly, can we use a FileGroupReader? I'm thinking later it will be useful when MoR is incorporated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me remove this test, i added this to allow me to debug a SPECIFIC flow when trying to develop the read/write support. This is not required as the test here is covered by TestVariantType.scala.
See:
#17833 (comment)
This test may still be useful, let me paste it somewhere in our issue for reference in the future.
It can be found here: #17746 (comment)
...c/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSparkVariant.scala
Outdated
Show resolved
Hide resolved
...ource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
Outdated
Show resolved
Hide resolved
...ource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
Outdated
Show resolved
Hide resolved
|
|
||
| // Handle VariantType comparisons | ||
| (requiredType, fileType) match { | ||
| case (_: VariantType, s: StructType) if isVariantPhysicalSchema(s) => Some(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wouldn't the file's type also be Variant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files written before Spark 4.0 (or by older Hudi versions) have the struct representation.
When reading the Parquet file's schema directly (without Spark's logical type inference), we also get the physical struct type.
IIRC, This piece of code here to address the variant columns being read out as base64 string issue:
Our Spark40ParquetReader, and other versions implicitly compares schema that's requested, i.e. requestedSchema from the user and the fileSchema implicitly to do projection for mini optimizations.
With the discrepancy between how Variant is represented as a DataType and MessageType, HoodieParquetFileFormatHelper builds an implicitTypeChangeInfo map that looks something like this for the unsafeProjection.
({Integer@27093}7 -> {ImmutablePair@27094}{VariantType$@27095}VariantType -> {StructType@27096}size = 2)
indexOfField -> requestedSchema -> fileSchema
This unsafeProjection causes bytes in the variant to be read out as:
{"metadata":"AQIAAwdrZXlsaXN0","value":"AgIAAQAHExl2YWx1ZTIDAwACBAYMAQwCDAM="}
instead of (which is technically equivalent), but the above being is in base64 string form, and will impede further evaluation/representation of Variant.
{'value': b'\x02\x02\x00\x01\x00\x07\x13\x19value2\x03\x03\x00\x02\x04\x06\x0c\x01\x0c\x02\x0c\x03', 'metadata': b'\x01\x02\x00\x03\x07keylist'}
Which should be represented as (if Variant is supported)
{"key":"value2","list":[1,2,3]}
This only affects HoodieRecordType.Spark.
That's why we need this here in Spark4.0 and why the fileType is a StructType.
Hope this makes sense, especially with the copied out content snapshot in memory above of unsafeProjection.
|
Moving this commit: To: |
a88a018 to
18cb139
Compare
|
Rebased |
| } else if (logicalType == LogicalTypes.uuid()) { | ||
| return UUID; | ||
| } else if (logicalType instanceof VariantLogicalType) { | ||
| } else if (logicalType == VariantLogicalType.variant()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the comparison against the singleton here.
| } | ||
|
|
||
| case other => throw new IncompatibleSchemaException(s"Unsupported HoodieSchemaType: $other") | ||
| // VARIANT type (Spark >4.x only), which will be handled via SparkAdapter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For lower spark versions, do we want to just return the underlying struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current implementation in Spark3.5, this is dead code as the variant column will not have a logicalType of variant. It's a record instead.
The reason why it's dead code is because is:
- The
tableSchemais resolved fromschemaSpec.map(s => convertToHoodieSchema(s, tableName))inHoodieBaseHadoopFsRelationFactory. - This converts a struct where the variant column is a struct of metadata and value into HoodieSchema.
- Hence, when the code flow reaches
HoodieSparkSchemaConverters, the variant column with HoodieSchema will not have the variant logical type to fall into this code path. It will resolve to theRECORDpath instead.
This might change if the table has an internalSchema though, which i think we need to investigate. I'll create an issue for this!
Will inline the issue too.
| import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampNTZType} | ||
|
|
||
| object HoodieParquetFileFormatHelper { | ||
| trait HoodieParquetFileFormatHelperTrait { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this switch from object to trait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, reverting.
| // Maps VariantType to a group containing 'metadata' and 'value' fields. | ||
| // This ensures Spark 4.0 compatibility and supports both Shredded and Unshredded schemas. | ||
| // Note: We intentionally omit 'typed_value' for shredded variants as this writer only accesses raw binary blobs. | ||
| final byte[][] variantBytes = new byte[2][]; // [0] = value, [1] = metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of reusing these byte arrays, can we just return the pair of byte arrays from the variant data to avoid the extra copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made the code here leaner. Instead of storing the bytes and reading them back, pass the parquet writing logic directly into the consumers.
| // Check for precision and scale if the schema has a logical decimal type. | ||
| // VARIANT (unshredded) type is excluded because it stores semi-structured data as opaque binary blobs, | ||
| // making min/max statistics meaningless | ||
| // TODO: For shredded, we are able to store colstats, explore that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you link it inline as well?
| "CREATE TABLE variant_table (" | ||
| + " id INT," | ||
| + " name STRING," | ||
| + " v ROW<`value` BYTES, metadata BYTES>," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to clarify one thing, the table on disk will have Variant in the Hudi schema for this table, right?
| break; | ||
| case UNION: | ||
| return convertUnion(fieldName, schema, repetition, schemaPath); | ||
| case VARIANT: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the TestAvroSchemaConverter to cover this branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| package org.apache.spark.sql.hudi.command | ||
|
|
||
| import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} | ||
| import org.apache.hudi.SparkAdapterSupport.sparkAdapter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove changes to this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // Note: We intentionally omit 'typed_value' for shredded variants as this writer only accesses raw binary blobs. | ||
| BiConsumer<SpecializedGetters, Integer> variantWriter = SparkAdapterSupport$.MODULE$.sparkAdapter().createVariantValueWriter( | ||
| dataType, | ||
| valueBytes -> consumeField("value", 0, () -> recordConsumer.addBinary(Binary.fromReusedByteArray(valueBytes))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understand is that the valueBytes are not part of a reused byte array. They are already copied when the variant object is read so you can skip this copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I traced the code a little. I think you're right.
CMIIW or if this does not align with your mental model, it Variant row is created from org.apache.spark.sql.catalyst.expressions.UnsafeRow#getVariant,
@Override
public VariantVal getVariant(int ordinal) {
if (isNullAt(ordinal)) return null;
return VariantVal.readFromUnsafeRow(getLong(ordinal), baseObject, baseOffset);
}
Looking at org.apache.spark.unsafe.types.VariantVal#readFromUnsafeRow, new bytes are allocated for both metadata and value.
So these are essentially copies.
I will change fromReusedByteArray to fromConstantByteArray then.
| } | ||
|
|
||
| override def isVariantType(dataType: DataType): Boolean = { | ||
| import org.apache.spark.sql.types.VariantType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One style question, why do the imports inline with the method instead of at the top of the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was encountering some import issue while debugging and implementing the different version switches, I'll check them again if we can put position them at the top of the file!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is possible, let's cleanup all the places this is done in this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved them to the top of the file.
| fieldName: String, | ||
| fieldSchema: HoodieSchema, | ||
| repetition: Repetition | ||
| ): org.apache.parquet.schema.Type = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we import the parquet type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
- Add adapter pattern for Spark3 and 4 - Cleanup invariant issue in SparkSqlWriter - Add cross engine test - Add backward compatibility test for Spark3.x - Add cross engine read for Flink
51c5108 to
8756466
Compare
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love to take a pass over this. Is this ready for review?
Yes, it's in a reviewable state, just a few minor stylistic imports to move around. |
Describe the issue this Pull Request addresses
Adds read and write support for Spark
Variantdata types in Hudi. SinceVariantis exclusive to Spark 4.0+, this PR introduces an adapter pattern to handle schema conversion differences, ensuring backward compatibility with Spark 3.x while enabling semi-structured data support in Spark 4.x.Note: Please merge this PR first: #17751
Summary and Changelog
This PR refactors schema converters behind
SparkAdapterto handle version-specific data types.HoodieSparkSchemaConvertersandSchemaConvertersinto traits; logic is now delegated viaSparkAdapter.Variantsupport mapping SparkVariantType<-> AvroRecord(logicalType: variant) <-> ParquetStruct(value/metadata binaries).HoodieRowParquetWriteSupportto handle physical writing of Variant structs.AvroSchemaUtilsto ensurelogicalTypeand metadata are preserved during schema pruning.Impact
Variantcolumns in Hudi tables (COW and MOR).Risk Level
Low
The adapter pattern isolates the new logic.
Variantdata.Documentation Update
None
Contributor's checklist