-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat(schema): Add support to write shredded variants for HoodieRecordType.SPARK #18036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
- Add adapter pattern for Spark3 and 4 - Cleanup invariant issue in SparkSqlWriter - Add cross engine test - Add backward compatibility test for Spark3.x - Add cross engine read for Flink
b5bfda3 to
d5d986c
Compare
| StructType shreddedStruct = SparkAdapterSupport$.MODULE$.sparkAdapter() | ||
| .generateVariantShreddingSchema(typedValueDataType, true, false); | ||
|
|
||
| // Add metadata to mark this as a shredding struct | ||
| StructType markedShreddedStruct = SparkAdapterSupport$.MODULE$.sparkAdapter() | ||
| .addVariantWriteShreddingMetadata(shreddedStruct); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we combine these?
|
|
||
| if (fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) { | ||
| HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) fieldHoodieSchema; | ||
| if (variantSchema.isShredded() && variantSchema.getTypedValueField().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect a case where isShredded is true but the value field is not present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. The value field must always be present from my understanding of things.
In unstructured data, the data can either be shredded or not and by design, since shredding is a feature ontop of variants. As such, the default state of all variant fields is unshredded, so the field has to be there for all data that cannot be shredded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simplify this condition then?
| && SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantShreddingStruct((StructType) shreddedField.dataType()) | ||
| && SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(originalField.dataType())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, could this be a single check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
The isVariantType(originalField.dataType()) check was redundant because generateShreddedSchema only produces shredding structs (with the metadata that isVariantShreddingStruct looks for) from fields that are already Variant types.
Hence, isVariantShreddingStruct being true already implies the original field was a Variant, will remove the isVariantType check.
| }).toArray(ValueWriter[]::new); | ||
|
|
||
| // Check if this field is a shredded Variant (shreddedField has shredding struct, originalField has VariantType) | ||
| if (shreddedField.dataType() instanceof StructType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: can we move this into the makeWriter so all the logic for creating the writer methods is contained in one place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, the code definitely looks ALOT cleaner now.
a9565bb to
17f548b
Compare
17f548b to
99eadaf
Compare
| HoodieSchema parsedSchema = HoodieSchema.parse(schemaString); | ||
| return HoodieSchemaUtils.addMetadataFields(parsedSchema, config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD)); | ||
| }); | ||
| // Generate shredded schema if there are shredded Variant columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to note the behavior when there are no shredded columns, like "falls back to provided schema if no shredded Variant columns are present"
| DataType dataType = field.dataType(); | ||
|
|
||
| // Check if this is a Variant field that should be shredded | ||
| if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we rely directly on the provided HoodieSchema here?
|
|
||
| // Check if this is a Variant field that should be shredded | ||
| if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) { | ||
| HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ever possible for the provided schema to be null? Is there a possibility of it being out of sync with the struct?
|
|
||
| if (fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) { | ||
| HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) fieldHoodieSchema; | ||
| if (variantSchema.isShredded() && variantSchema.getTypedValueField().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simplify this condition then?
| StructField[] shreddedFields = new StructField[fields.length]; | ||
| boolean hasShredding = false; | ||
|
|
||
| for (int i = 0; i < fields.length; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop only contains top level fields. Should we recursively inspect the struct fields?
If it is possible to have nested Variant fields, let's make sure we have a test for it as well.
Describe the issue this Pull Request addresses
Adds read and write support for Spark
Variantdata types in Hudi. SinceVariantis exclusive to Spark 4.0+, this PR introduces an adapter pattern to handle schema conversion differences, ensuring backward compatibility with Spark 3.x while enabling semi-structured data support in Spark 4.x.Closes: #17747
Note0: Please merge this PR first: #17833
Note1: This only covers
HoodieRecordType.SPARK. AVRO will be covered in another PR.Note2:
This PR is purely to add support for writing shredded variants.
There is no end-2-end flow to allow users to enable shredded variant writes as of now. We will address this in a separate PR to make PRs small and manageable for reviews.
The next PR for this will be to add a parquet-config to allow users to enable/disable shredding and also to force shredding on certain columns for testing.
Summary and Changelog
This PR refactors schema converters behind
SparkAdapterto handle version-specific data types.Updated
HoodieRowParquetWriteSupportto allow support for writing shredded variants.Impact
Variantshredded columns.Risk Level
Low
Documentation Update
None
Contributor's checklist