diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java index 5e211c4851a0d..e89662a0a4222 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java @@ -324,6 +324,8 @@ public static DataType convertToDataType(HoodieSchema hoodieSchema) { return convertRecord(hoodieSchema); case UNION: return convertUnion(hoodieSchema); + case VARIANT: + return convertVariant(hoodieSchema); default: throw new IllegalArgumentException("Unsupported HoodieSchemaType: " + type); } @@ -445,6 +447,25 @@ private static DataType convertUnion(HoodieSchema schema) { return nullable ? rawDataType.nullable() : rawDataType; } + /** + * Converts a Variant schema to Flink's ROW type. + * Variant is represented as ROW<`value` BYTES, `metadata` BYTES> in Flink. + * + * @param schema HoodieSchema to convert (must be a VARIANT type) + * @return DataType representing the Variant as a ROW with binary fields + */ + private static DataType convertVariant(HoodieSchema schema) { + if (schema.getType() != HoodieSchemaType.VARIANT) { + throw new IllegalStateException("Expected HoodieSchema.Variant but got: " + schema.getClass()); + } + + // Variant is stored as a struct with two binary fields: value and metadata + return DataTypes.ROW( + DataTypes.FIELD("value", DataTypes.BYTES().notNull()), + DataTypes.FIELD("metadata", DataTypes.BYTES().notNull()) + ).notNull(); + } + /** * Returns true if all the types are RECORD type with same number of fields. */ diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java index 9d0da7f534afa..7171d2cc6868c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java @@ -18,6 +18,7 @@ package org.apache.hudi.io.storage.row; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; @@ -73,6 +74,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.BiConsumer; import scala.Enumeration; import scala.Function1; @@ -121,6 +123,12 @@ public class HoodieRowParquetWriteSupport extends WriteSupport { private final ValueWriter[] rootFieldWriters; private final HoodieSchema schema; private final StructType structType; + /** + * The shredded schema. When Variant columns are configured for shredding, this schema has those VariantType columns replaced with their shredded struct schemas. + *

+ * For non-shredded cases, this is identical to structType. + */ + private final StructType shreddedSchema; private RecordConsumer recordConsumer; public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option bloomFilterOpt, HoodieConfig config) { @@ -139,21 +147,89 @@ public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, O HoodieSchema parsedSchema = HoodieSchema.parse(schemaString); return HoodieSchemaUtils.addMetadataFields(parsedSchema, config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD)); }); + // Generate shredded schema if there are shredded Variant columns + this.shreddedSchema = generateShreddedSchema(structType, schema); ParquetWriteSupport.setSchema(structType, hadoopConf); - this.rootFieldWriters = getFieldWriters(structType, schema); + // Use shreddedSchema for creating writers when shredded Variants are present + this.rootFieldWriters = getFieldWriters(shreddedSchema, schema); this.hadoopConf = hadoopConf; this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new); } + /** + * Generates a shredded schema from the given structType and hoodieSchema. + *

+ * For Variant fields that are configured for shredding (based on HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded struct schema. + * + * @param structType The original Spark StructType + * @param hoodieSchema The HoodieSchema containing shredding information + * @return A StructType with shredded Variant fields replaced by their shredded schemas + */ + private StructType generateShreddedSchema(StructType structType, HoodieSchema hoodieSchema) { + StructField[] fields = structType.fields(); + StructField[] shreddedFields = new StructField[fields.length]; + boolean hasShredding = false; + + for (int i = 0; i < fields.length; i++) { + StructField field = fields[i]; + DataType dataType = field.dataType(); + + // Check if this is a Variant field that should be shredded + if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) { + HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema) + .flatMap(s -> s.getField(field.name())) + .map(f -> f.schema()) + .orElse(null); + + if (fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) { + HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) fieldHoodieSchema; + if (variantSchema.isShredded() && variantSchema.getTypedValueField().isPresent()) { + // Use plain types for SparkShreddingUtils (unwraps nested {value, typed_value} structs if present) + HoodieSchema typedValueSchema = variantSchema.getPlainTypedValueSchema().get(); + DataType typedValueDataType = HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(typedValueSchema); + + // Generate the shredding schema with write metadata using SparkAdapter + StructType markedShreddedStruct = SparkAdapterSupport$.MODULE$.sparkAdapter() + .generateVariantWriteShreddingSchema(typedValueDataType, true, false); + + shreddedFields[i] = new StructField(field.name(), markedShreddedStruct, field.nullable(), field.metadata()); + hasShredding = true; + continue; + } + } + } + + // Not a shredded Variant, keep the original field + shreddedFields[i] = field; + } + + return hasShredding ? new StructType(shreddedFields) : structType; + } + + /** + * Creates field writers for each field in the schema. + * + * @param schema The schema to create writers for (may contain shredded Variant struct types) + * @param hoodieSchema The HoodieSchema for type information + * @return Array of ValueWriters for each field + */ private ValueWriter[] getFieldWriters(StructType schema, HoodieSchema hoodieSchema) { - return Arrays.stream(schema.fields()).map(field -> { + StructField[] fields = schema.fields(); + ValueWriter[] writers = new ValueWriter[fields.length]; + + for (int i = 0; i < fields.length; i++) { + StructField field = fields[i]; + HoodieSchema fieldSchema = Option.ofNullable(hoodieSchema) .flatMap(s -> s.getField(field.name())) // Note: Cannot use HoodieSchemaField::schema method reference due to Java 17 compilation ambiguity .map(f -> f.schema()) .orElse(null); - return makeWriter(fieldSchema, field.dataType()); - }).toArray(ValueWriter[]::new); + + writers[i] = makeWriter(fieldSchema, field.dataType()); + } + + return writers; } @Override @@ -166,7 +242,9 @@ public WriteContext init(Configuration configuration) { } Configuration configurationCopy = new Configuration(configuration); configurationCopy.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, Boolean.toString(writeLegacyListFormat)); - MessageType messageType = convert(structType, schema); + // Use shreddedSchema for Parquet schema conversion when shredding is enabled + // This ensures the Parquet file structure includes the shredded typed_value columns + MessageType messageType = convert(shreddedSchema, schema); return new WriteContext(messageType, metadata); } @@ -281,6 +359,18 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) { } else if (dataType == DataTypes.BinaryType) { return (row, ordinal) -> recordConsumer.addBinary( Binary.fromReusedByteArray(row.getBinary(ordinal))); + } else if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) { + // Maps VariantType to a group containing 'metadata' and 'value' fields. + // This ensures Spark 4.0 compatibility and supports both Shredded and Unshredded schemas. + // Note: We intentionally omit 'typed_value' for shredded variants as this writer only accesses raw binary blobs. + BiConsumer variantWriter = SparkAdapterSupport$.MODULE$.sparkAdapter().createVariantValueWriter( + dataType, + valueBytes -> consumeField("value", 0, () -> recordConsumer.addBinary(Binary.fromReusedByteArray(valueBytes))), + metadataBytes -> consumeField("metadata", 1, () -> recordConsumer.addBinary(Binary.fromReusedByteArray(metadataBytes))) + ); + return (row, ordinal) -> { + consumeGroup(() -> variantWriter.accept(row, ordinal)); + }; } else if (dataType instanceof DecimalType) { return (row, ordinal) -> { int precision = ((DecimalType) dataType).precision(); @@ -337,6 +427,9 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) { } }); }; + } else if (dataType instanceof StructType + && SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantShreddingStruct((StructType) dataType)) { + return makeShreddedVariantWriter((StructType) dataType); } else if (dataType instanceof StructType) { StructType structType = (StructType) dataType; ValueWriter[] fieldWriters = getFieldWriters(structType, resolvedSchema); @@ -349,6 +442,33 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) { } } + /** + * Creates a ValueWriter for a shredded Variant column. + * This writer converts a Variant value into its shredded components (metadata, value, typed_value) and writes them to Parquet. + * + * @param shreddedStructType The shredded StructType (with shredding metadata) + * @return A ValueWriter that handles shredded Variant writing + */ + private ValueWriter makeShreddedVariantWriter(StructType shreddedStructType) { + // Create writers for the shredded struct fields + // The shreddedStructType contains: metadata (binary), value (binary), typed_value (optional) + ValueWriter[] shreddedFieldWriters = Arrays.stream(shreddedStructType.fields()) + .map(field -> makeWriter(null, field.dataType())) + .toArray(ValueWriter[]::new); + + // Use the SparkAdapter to create a shredded variant writer that converts Variant to shredded components + BiConsumer shreddedWriter = SparkAdapterSupport$.MODULE$.sparkAdapter() + .createShreddedVariantWriter( + shreddedStructType, + shreddedRow -> { + // Write the shredded row as a group + consumeGroup(() -> writeFields(shreddedRow, shreddedStructType, shreddedFieldWriters)); + } + ); + + return shreddedWriter::accept; + } + private ValueWriter twoLevelArrayWriter(String repeatedFieldName, ValueWriter elementWriter) { return (row, ordinal) -> { ArrayData array = row.getArray(ordinal); @@ -510,6 +630,13 @@ private Type convertField(HoodieSchema fieldSchema, StructField structField, Typ .as(LogicalTypeAnnotation.stringType()).named(structField.name()); } else if (dataType == DataTypes.BinaryType) { return Types.primitive(BINARY, repetition).named(structField.name()); + } else if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) { + return SparkAdapterSupport$.MODULE$.sparkAdapter().convertVariantFieldToParquetType( + dataType, + structField.name(), + resolvedSchema, + repetition + ); } else if (dataType instanceof DecimalType) { int precision = ((DecimalType) dataType).precision(); int scale = ((DecimalType) dataType).scale(); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index e4d97a2df6a29..142786567f2a5 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -21,7 +21,6 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap, filterIsSafeForPrimaryKey, getAppliedRequiredSchema} -import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.common.engine.HoodieReaderContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala index 4055206cfc5f5..04f18c1c2a881 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql -import org.apache.hudi.{HoodieUnsafeRDD} +import org.apache.hudi.HoodieUnsafeRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.types.StructType /** diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala deleted file mode 100644 index 9b068afac83d2..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.avro - -import org.apache.avro.Schema -import org.apache.spark.sql.types.DataType - -/** - * Allows to convert Avro schema into Spark's Catalyst one - */ -trait HoodieAvroSchemaConverters { - - def toSqlType(avroSchema: Schema): (DataType, Boolean) - - def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String = ""): Schema - -} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala index 11451f3b7ce08..a3bb13ff75753 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala @@ -18,28 +18,31 @@ package org.apache.spark.sql.avro -import org.apache.hudi.common.schema.HoodieSchema.TimePrecision +import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.common.schema.{HoodieJsonProperties, HoodieSchema, HoodieSchemaField, HoodieSchemaType} +import org.apache.hudi.common.schema.HoodieSchema.TimePrecision + import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.types.Decimal.minBytesForPrecision import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.Decimal.minBytesForPrecision import scala.collection.JavaConverters._ /** - * This object contains methods that are used to convert HoodieSchema to Spark SQL schemas and vice versa. + * Object containing methods to convert HoodieSchema to Spark SQL schemas and vice versa. * * This provides direct conversion between HoodieSchema and Spark DataType * without going through Avro Schema intermediary. * + * Version-specific types (like VariantType in Spark >4.x) are handled via SparkAdapterSupport. + * * NOTE: the package of this class is intentionally kept as "org.apache.spark.sql.avro" which is similar to the existing * Spark Avro connector's SchemaConverters.scala * (https://github.com/apache/spark/blob/master/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala). * The reason for this is so that Spark 3.3 is able to access private spark sql type classes like TimestampNTZType. */ - @DeveloperApi -object HoodieSparkSchemaConverters { +object HoodieSparkSchemaConverters extends SparkAdapterSupport { /** * Internal wrapper for SQL data type and nullability. @@ -118,7 +121,12 @@ object HoodieSparkSchemaConverters { HoodieSchema.createRecord(recordName, nameSpace, null, fields.asJava) } - case other => throw new IncompatibleSchemaException(s"Unexpected Spark DataType: $other") + // VARIANT type (Spark >4.x only), which will be handled via SparkAdapter + case other if sparkAdapter.isVariantType(other) => + HoodieSchema.createVariant(recordName, nameSpace, null) + + case other => + throw new IncompatibleSchemaException(s"Unexpected Spark DataType: $other") } // Wrap with null union if nullable (and not already a union) @@ -129,6 +137,9 @@ object HoodieSparkSchemaConverters { } } + /** + * Helper method to convert HoodieSchema to Catalyst DataType. + */ private def toSqlTypeHelper(hoodieSchema: HoodieSchema, existingRecordNames: Set[String]): SchemaType = { hoodieSchema.getType match { // Primitive types @@ -241,7 +252,16 @@ object HoodieSparkSchemaConverters { } } - case other => throw new IncompatibleSchemaException(s"Unsupported HoodieSchemaType: $other") + // VARIANT type (Spark >4.x only), which will be handled via SparkAdapter + // TODO: Check if internalSchema will throw any errors here: #18021 + case HoodieSchemaType.VARIANT => + sparkAdapter.getVariantDataType match { + case Some(variantType) => SchemaType(variantType, nullable = false) + case None => throw new IncompatibleSchemaException("VARIANT type is only supported in Spark 4.0+") + } + + case other => + throw new IncompatibleSchemaException(s"Unsupported HoodieSchemaType: $other") } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala index abf805a430f33..496a87071aa96 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala @@ -19,11 +19,13 @@ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.hudi.HoodieSparkUtils + import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.metadata.FileMetaData import org.apache.spark.sql.HoodieSchemaUtils -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampNTZType} object HoodieParquetFileFormatHelper { @@ -46,7 +48,7 @@ object HoodieParquetFileFormatHelper { val requiredType = f.dataType if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType, fileStructMap(f.name))) { val readerType = addMissingFields(requiredType, fileStructMap(f.name)) - implicitTypeChangeInfo.put(new Integer(requiredSchema.fieldIndex(f.name)), org.apache.hudi.common.util.collection.Pair.of(requiredType, readerType)) + implicitTypeChangeInfo.put(Integer.valueOf(requiredSchema.fieldIndex(f.name)), org.apache.hudi.common.util.collection.Pair.of(requiredType, readerType)) StructField(f.name, readerType, f.nullable) } else { f @@ -55,35 +57,43 @@ object HoodieParquetFileFormatHelper { (implicitTypeChangeInfo, StructType(sparkRequestStructFields)) } - def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match { - case (requiredType, fileType) if requiredType == fileType => true + def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = { + // Check if adapter can handle this comparison (e.g., VariantType in Spark 4.0+) + val adapterResult = HoodieSparkUtils.sparkAdapter.isDataTypeEqualForParquet(requiredType, fileType) + if (adapterResult.isDefined) { + adapterResult.get + } else { + (requiredType, fileType) match { + case (requiredType, fileType) if requiredType == fileType => true - // prevent illegal cast - case (TimestampNTZType, LongType) => true + // prevent illegal cast + case (TimestampNTZType, LongType) => true - case (ArrayType(rt, _), ArrayType(ft, _)) => - // Do not care about nullability as schema evolution require fields to be nullable - isDataTypeEqual(rt, ft) + case (ArrayType(rt, _), ArrayType(ft, _)) => + // Do not care about nullability as schema evolution require fields to be nullable + isDataTypeEqual(rt, ft) - case (MapType(requiredKey, requiredValue, _), MapType(fileKey, fileValue, _)) => - // Likewise, do not care about nullability as schema evolution require fields to be nullable - isDataTypeEqual(requiredKey, fileKey) && isDataTypeEqual(requiredValue, fileValue) + case (MapType(requiredKey, requiredValue, _), MapType(fileKey, fileValue, _)) => + // Likewise, do not care about nullability as schema evolution require fields to be nullable + isDataTypeEqual(requiredKey, fileKey) && isDataTypeEqual(requiredValue, fileValue) - case (StructType(requiredFields), StructType(fileFields)) => - // Find fields that are in requiredFields and fileFields as they might not be the same during add column + change column operations - val commonFieldNames = requiredFields.map(_.name) intersect fileFields.map(_.name) + case (StructType(requiredFields), StructType(fileFields)) => + // Find fields that are in requiredFields and fileFields as they might not be the same during add column + change column operations + val commonFieldNames = requiredFields.map(_.name) intersect fileFields.map(_.name) - // Need to match by name instead of StructField as name will stay the same whilst type may change - val fileFilteredFields = fileFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name) - val requiredFilteredFields = requiredFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name) + // Need to match by name instead of StructField as name will stay the same whilst type may change + val fileFilteredFields = fileFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name) + val requiredFilteredFields = requiredFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name) - // Sorting ensures that the same field names are being compared for type differences - requiredFilteredFields.zip(fileFilteredFields).forall { - case (requiredField, fileFilteredField) => - isDataTypeEqual(requiredField.dataType, fileFilteredField.dataType) - } + // Sorting ensures that the same field names are being compared for type differences + requiredFilteredFields.zip(fileFilteredFields).forall { + case (requiredField, fileFilteredField) => + isDataTypeEqual(requiredField.dataType, fileFilteredField.dataType) + } - case _ => false + case _ => false + } + } } def addMissingFields(requiredType: DataType, fileType: DataType): DataType = (requiredType, fileType) match { @@ -195,3 +205,4 @@ object HoodieParquetFileFormatHelper { } } } + diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 1afcc78df058e..848f5151c399f 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -21,19 +21,21 @@ package org.apache.spark.sql.hudi import org.apache.hudi.{HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping} import org.apache.hudi.client.model.HoodieInternalRow import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit import org.apache.hudi.storage.StorageConfiguration + import org.apache.hadoop.conf.Configuration -import org.apache.hudi.common.schema.HoodieSchema -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{MessageType, Type} +import org.apache.parquet.schema.Type.Repetition import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate, SpecializedGetters} import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.catalyst.trees.Origin @@ -46,11 +48,12 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.HoodieExtendedParserInterface import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{DataType, Metadata, StructType} -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String import java.util.{Locale, TimeZone} +import java.util.function.{BiConsumer, Consumer} /** * Interface adapting discrepancies and incompatibilities between different Spark versions @@ -374,4 +377,110 @@ trait SparkAdapter extends Serializable { * @return A streaming [[DataFrame]] */ def createStreamingDataFrame(sqlContext: SQLContext, relation: HadoopFsRelation, requiredSchema: StructType): DataFrame + + /** + * Gets the VariantType DataType if supported by this Spark version. + * Spark 3.x returns None (VariantType not supported). + * Spark 4.x returns Some(VariantType). + * + * @return Option[DataType] - Some(VariantType) for Spark 4.x, None for Spark 3.x + */ + def getVariantDataType: Option[DataType] + + /** + * Checks if two data types are equal for Parquet file format purposes. + * This handles version-specific types like VariantType (Spark 4.0+). + * + * Returns Some(true) if types are equal, Some(false) if not equal, or None if + * this adapter doesn't handle this specific type comparison (fallback to default logic). + * + * @param requiredType The required/expected data type + * @param fileType The data type from the file + * @return Option[Boolean] - Some(result) if handled by adapter, None otherwise + */ + def isDataTypeEqualForParquet(requiredType: DataType, fileType: DataType): Option[Boolean] + + /** + * Checks if the given DataType is a VariantType. + * Spark 3.x returns false, Spark 4.x checks for VariantType. + * + * @param dataType The data type to check + * @return true if it's a VariantType, false otherwise + */ + def isVariantType(dataType: DataType): Boolean + + /** + * Creates a ValueWriter function for VariantType if the data type is VariantType. + * Returns null for Spark 3.x or if the data type is not VariantType. + * + * The returned function accepts (SpecializedGetters, ordinal) and writes variant data. + * + * @param dataType The data type to create a writer for + * @param writeValue Function to write the variant value binary + * @param writeMetadata Function to write the variant metadata binary + * @return BiConsumer function or null + */ + def createVariantValueWriter( + dataType: DataType, + writeValue: Consumer[Array[Byte]], + writeMetadata: Consumer[Array[Byte]] + ): BiConsumer[SpecializedGetters, Integer] + + /** + * Converts a VariantType field to Parquet Type. + * Returns null for Spark 3.x or if the data type is not VariantType. + * + * @param dataType The data type to convert + * @param fieldName The field name + * @param fieldSchema The HoodieSchema for the field (to determine shredded vs unshredded) + * @param repetition The Parquet repetition type + * @return Parquet Type or null + */ + def convertVariantFieldToParquetType( + dataType: DataType, + fieldName: String, + fieldSchema: HoodieSchema, + repetition: Repetition + ): Type + + /** + * Checks if a StructType represents a shredded Variant schema (has special shredding metadata). + * This is used during writing to identify columns that need special shredding handling. + * + * For Spark 4.x, this uses SparkShreddingUtils.isVariantShreddingStruct. + * For Spark 3.x, this always returns false. + * + * @param structType The StructType to check + * @return true if this is a shredded Variant schema + */ + def isVariantShreddingStruct(structType: StructType): Boolean + + /** + * Generates a shredded Variant schema and marks it with write shredding metadata. + * + * For Spark 4.x, this uses SparkShreddingUtils to generate the schema and add metadata. + * For Spark 3.x, this throws UnsupportedOperationException. + * + * @param dataType The data type to generate the shredding schema for + * @param isTopLevel Whether this is the top-level schema + * @param isObjectField Whether this is an object field (affects value field nullability) + * @return The shredded schema with shredding metadata added + */ + def generateVariantWriteShreddingSchema(dataType: DataType, isTopLevel: Boolean, isObjectField: Boolean): StructType + + /** + * Creates a ValueWriter for a shredded Variant StructType. + * This writer converts a Variant value into its shredded components and writes them. + * + * For Spark 4.x, this uses SparkShreddingUtils.castShredded. + * For Spark 3.x, this throws UnsupportedOperationException. + * + * @param shreddedStructType The shredded StructType schema + * @param writeStruct Function to write the shredded InternalRow + * @return BiConsumer function that reads Variant and writes shredded components + */ + def createShreddedVariantWriter( + shreddedStructType: StructType, + writeStruct: Consumer[InternalRow] + ): BiConsumer[SpecializedGetters, Integer] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index f4c6189a50041..63fd807b5c979 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieAvroSchemaException; import org.apache.hudi.exception.HoodieIOException; @@ -28,8 +29,6 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.hudi.common.util.collection.Pair; - import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; @@ -542,7 +541,8 @@ public static HoodieSchema.Variant createVariant(String name, String namespace, null ); - List fields = Arrays.asList(metadataField, valueField); + // IMPORTANT: Field order must match VariantVal(value, metadata) constructor + List fields = Arrays.asList(valueField, metadataField); Schema recordSchema = Schema.createRecord(variantName, doc, namespace, false); List avroFields = fields.stream() @@ -581,15 +581,8 @@ public static HoodieSchema.Variant createVariantShredded(String name, String nam List fields = new ArrayList<>(); - // Create metadata field (required bytes) - fields.add(HoodieSchemaField.of( - Variant.VARIANT_METADATA_FIELD, - HoodieSchema.create(HoodieSchemaType.BYTES), - "Variant metadata component", - null - )); - - // Create value field (nullable bytes for shredded) + // IMPORTANT: Field order must match VariantVal(value, metadata) constructor + // Create value field first (nullable bytes for shredded) fields.add(HoodieSchemaField.of( Variant.VARIANT_VALUE_FIELD, HoodieSchema.createNullable(HoodieSchemaType.BYTES), @@ -597,6 +590,14 @@ public static HoodieSchema.Variant createVariantShredded(String name, String nam NULL_VALUE )); + // Create metadata field second (required bytes) + fields.add(HoodieSchemaField.of( + Variant.VARIANT_METADATA_FIELD, + HoodieSchema.create(HoodieSchemaType.BYTES), + "Variant metadata component", + null + )); + // Add typed_value field if provided if (typedValueSchema != null) { fields.add(HoodieSchemaField.of( @@ -619,6 +620,112 @@ public static HoodieSchema.Variant createVariantShredded(String name, String nam return new HoodieSchema.Variant(recordSchema); } + /** + * Creates a shredded field struct per the Parquet variant shredding spec. The returned struct contains two nullable fields: + *

+ * + *

Example output structure: + *

+   *   fieldName: struct
+   *     |-- value: binary (nullable)
+   *     |-- typed_value: <fieldType> (nullable)
+   * 

+ * + * @param fieldName the name for the record (used as the Avro record name) + * @param fieldType the schema for the typed_value within this field + * @return a new HoodieSchema representing the shredded field struct + */ + public static HoodieSchema createShreddedFieldStruct(String fieldName, HoodieSchema fieldType) { + ValidationUtils.checkArgument(fieldName != null && !fieldName.isEmpty(), "Field name cannot be null or empty"); + ValidationUtils.checkArgument(fieldType != null, "Field type cannot be null"); + List fields = Arrays.asList( + HoodieSchemaField.of( + Variant.VARIANT_VALUE_FIELD, + HoodieSchema.createNullable(HoodieSchemaType.BYTES), + "Fallback binary representation", + NULL_VALUE + ), + HoodieSchemaField.of( + Variant.VARIANT_TYPED_VALUE_FIELD, + HoodieSchema.createNullable(fieldType), + "Typed value representation", + NULL_VALUE + ) + ); + return HoodieSchema.createRecord(fieldName, null, null, fields); + } + + /** + * Creates a shredded Variant schema for an object type following the Parquet variant shredding spec. Each field in shreddedFields is wrapped in a struct with + * {@code {value: nullable binary, typed_value: nullable type}}. + * + *

Example usage: + *

{@code
+   * Map fields = new LinkedHashMap<>();
+   * fields.put("a", HoodieSchema.create(HoodieSchemaType.INT));
+   * fields.put("b", HoodieSchema.create(HoodieSchemaType.STRING));
+   * fields.put("c", HoodieSchema.createDecimal(15, 1));
+   * HoodieSchema.Variant variant = HoodieSchema.createVariantShreddedObject(fields);
+   * }

+ * + *

Produces the following structure: + *

+   * variant
+   *  |-- value: binary (nullable)
+   *  |-- metadata: binary
+   *  |-- typed_value: struct
+   *  |    |-- a: struct (nullable)
+   *  |    |    |-- value: binary (nullable)
+   *  |    |    |-- typed_value: integer (nullable)
+   *  |    |-- b: struct (nullable)
+   *  |    |    |-- value: binary (nullable)
+   *  |    |    |-- typed_value: string (nullable)
+   *  |    |-- c: struct (nullable)
+   *  |    |    |-- value: binary (nullable)
+   *  |    |    |-- typed_value: decimal(15,1) (nullable)
+   * 

+ * + * @param shreddedFields Map of field names to their typed value schemas. Use LinkedHashMap for ordered fields. + * @return a new HoodieSchema.Variant with properly nested typed_value + */ + public static HoodieSchema.Variant createVariantShreddedObject(Map shreddedFields) { + return createVariantShreddedObject(null, null, null, shreddedFields); + } + + /** + * Creates a shredded Variant schema for an object type with custom name, namespace, and documentation. + * + * @param name the variant record name (can be null, defaults to "variant") + * @param namespace the namespace (can be null) + * @param doc the documentation (can be null) + * @param shreddedFields Map of field names to their typed value schemas. Use LinkedHashMap for ordered fields. + * @return a new HoodieSchema.Variant with properly nested typed_value + */ + public static HoodieSchema.Variant createVariantShreddedObject( + String name, String namespace, String doc, Map shreddedFields) { + ValidationUtils.checkArgument(shreddedFields != null && !shreddedFields.isEmpty(), + "Shredded fields cannot be null or empty"); + + // Build typed_value fields, each wrapped in the spec-compliant {value, typed_value} struct + List typedValueFields = new ArrayList<>(); + for (Map.Entry entry : shreddedFields.entrySet()) { + HoodieSchema fieldStruct = createShreddedFieldStruct(entry.getKey(), entry.getValue()); + typedValueFields.add(HoodieSchemaField.of( + entry.getKey(), + HoodieSchema.createNullable(fieldStruct), + null, + NULL_VALUE + )); + } + + HoodieSchema typedValueSchema = HoodieSchema.createRecord( + Variant.VARIANT_TYPED_VALUE_FIELD, null, namespace, typedValueFields); + return createVariantShredded(name, namespace, doc, typedValueSchema); + } + /** * Returns the Hudi schema version information. * @@ -1825,6 +1932,60 @@ public Option getTypedValueField() { return typedValueSchema; } + /** + * Returns the typed_value schema with plain (unwrapped) types suitable for Spark shredding utilities, i.e. essentially removing the `value` field + * + *

If the typed_value follows the variant shredding spec (each field is a struct with + * {@code {value: bytes, typed_value: }}), this extracts only the inner typed_value types and returns a record schema containing just those plain types.

+ * + *

If the typed_value is already in plain form (created with {@code createVariantShredded}), + * returns the schema as-is.

+ * + * @return Option containing the plain typed_value schema, or Option.empty() if not present + */ + public Option getPlainTypedValueSchema() { + if (!typedValueSchema.isPresent()) { + return Option.empty(); + } + HoodieSchema tvSchema = typedValueSchema.get(); + if (tvSchema.getType() != HoodieSchemaType.RECORD) { + return typedValueSchema; + } + + List fields = tvSchema.getFields(); + // Check if all fields follow the nested shredding pattern: each field is a record with {value, typed_value} + boolean isNestedForm = !fields.isEmpty() && fields.stream().allMatch(field -> { + HoodieSchema fieldSchema = field.schema(); + if (fieldSchema.isNullable()) { + fieldSchema = fieldSchema.getNonNullType(); + } + if (fieldSchema.getType() != HoodieSchemaType.RECORD) { + return false; + } + Option valueSubField = fieldSchema.getField(VARIANT_VALUE_FIELD); + Option typedValueSubField = fieldSchema.getField(VARIANT_TYPED_VALUE_FIELD); + return valueSubField.isPresent() && typedValueSubField.isPresent() + && fieldSchema.getFields().size() == 2; + }); + + if (!isNestedForm) { + return typedValueSchema; + } + + // Extract the plain types from the nested form + List plainFields = new ArrayList<>(); + for (HoodieSchemaField field : fields) { + HoodieSchema fieldSchema = field.schema(); + if (fieldSchema.isNullable()) { + fieldSchema = fieldSchema.getNonNullType(); + } + HoodieSchema innerTypedValue = fieldSchema.getField(VARIANT_TYPED_VALUE_FIELD).get().schema(); + plainFields.add(HoodieSchemaField.of(field.name(), innerTypedValue)); + } + return Option.of(HoodieSchema.createRecord( + tvSchema.getAvroSchema().getName() + "_plain", null, null, plainFields)); + } + @Override public String getName() { return VARIANT_DEFAULT_NAME; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java index 81b7f94ace0d9..bcc1b4079f3e6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaType.java @@ -154,7 +154,7 @@ public static HoodieSchemaType fromAvro(Schema avroSchema) { return DATE; } else if (logicalType == LogicalTypes.uuid()) { return UUID; - } else if (logicalType instanceof VariantLogicalType) { + } else if (logicalType == VariantLogicalType.variant()) { return VARIANT; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java index 9783ecac1b5a5..f6db0eca699b3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java @@ -346,6 +346,18 @@ private static Type visitPrimitiveToBuildInternalType(HoodieSchema schema) { return Types.DateType.get(); case NULL: return null; + case VARIANT: + // Variant is represented as a record with value and metadata binary fields + // Convert it to the internal schema representation as a RecordType + // Since Variant is treated as a primitive here but needs to be a record, + // we return a RecordType with the appropriate structure + List variantFields = new ArrayList<>(2); + // Assign field IDs: these are used for schema evolution tracking + // Use negative IDs: indicate these are system-generated for Variant type + // TODO (voon): Check if we can remove the magic numbers? + variantFields.add(Types.Field.get(-1, false, "value", Types.BinaryType.get(), "Variant value component")); + variantFields.add(Types.Field.get(-2, false, "metadata", Types.BinaryType.get(), "Variant metadata component")); + return Types.RecordType.get(variantFields); default: throw new UnsupportedOperationException("Unsupported primitive type: " + schema.getType()); } @@ -441,6 +453,38 @@ private static HoodieSchema visitInternalSchemaToBuildHoodieSchema(Type type, Ma */ private static HoodieSchema visitInternalRecordToBuildHoodieRecord(Types.RecordType recordType, List fieldSchemas, String recordNameFallback) { List fields = recordType.fields(); + + // Check if this RecordType is actually a Variant type + // Unshredded Variant types are marked by having exactly 2 fields with negative IDs and specific names + if (fields.size() == 2) { + Types.Field field0 = fields.get(0); + Types.Field field1 = fields.get(1); + + // Check if both fields have negative IDs (system-generated for Variant) + boolean hasNegativeIds = field0.fieldId() < 0 && field1.fieldId() < 0; + + // Check if fields are named "value" and "metadata" (order may vary) + boolean hasVariantFields = (field0.name().equals("value") && field1.name().equals("metadata")) + || (field0.name().equals("metadata") && field1.name().equals("value")); + + if (hasNegativeIds && hasVariantFields) { + // Variant type: Determine if it is shredded or unshredded based on value field's optionality + // TODO (voon): This is incomplete for now, we are only handling unshredded, fields size of == 2 should always mean this is unshredded + String recordName = Option.ofNullable(recordType.name()).orElse(recordNameFallback); + Types.Field valueField = field0.name().equals("value") ? field0 : field1; + + if (valueField.isOptional()) { + // Optional value field indicates shredded variant + // Note: We don't have the typed_value schema here, so pass null for typedValueSchema + return HoodieSchema.createVariantShredded(recordName, null, null, null); + } else { + // Required value field indicates unshredded variant + return HoodieSchema.createVariant(recordName, null, null); + } + } + } + + // Not a Variant, create regular record List schemaFields = new ArrayList<>(fields.size()); for (int i = 0; i < fields.size(); i++) { Types.Field f = fields.get(i); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index ebf323243150d..31da3124e98c4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -2059,8 +2059,12 @@ private static boolean isColumnTypeSupportedV1(HoodieSchema schema, Option getInflightMetadataPartitions(HoodieTableConfig tableConfig) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java index 3a374b706ff87..b844cf1e73240 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java @@ -36,7 +36,9 @@ import java.io.ObjectOutputStream; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -1075,8 +1077,8 @@ public void testCreateUnshreddedVariant() { // Verify fields List fields = variantSchema.getFields(); assertEquals(2, fields.size()); - assertEquals("metadata", fields.get(0).name()); - assertEquals("value", fields.get(1).name()); + assertEquals("value", fields.get(0).name()); + assertEquals("metadata", fields.get(1).name()); // Verify field types assertEquals(HoodieSchemaType.BYTES, fields.get(0).schema().getType()); @@ -1118,12 +1120,12 @@ public void testCreateShreddedVariant() { // Verify fields List fields = variantSchema.getFields(); assertEquals(3, fields.size()); - assertEquals("metadata", fields.get(0).name()); - assertEquals("value", fields.get(1).name()); + assertEquals("value", fields.get(0).name()); + assertEquals("metadata", fields.get(1).name()); assertEquals("typed_value", fields.get(2).name()); // Value field should be nullable for shredded - assertTrue(fields.get(1).schema().isNullable()); + assertTrue(fields.get(0).schema().isNullable()); // Verify typed_value schema HoodieSchema retrievedTypedValueSchema = variantSchema.getTypedValueField().get(); @@ -1141,11 +1143,11 @@ public void testCreateShreddedVariantWithoutTypedValue() { // Verify fields should have metadata and nullable value, but no typed_value List fields = variantSchema.getFields(); assertEquals(2, fields.size()); - assertEquals("metadata", fields.get(0).name()); - assertEquals("value", fields.get(1).name()); + assertEquals("value", fields.get(0).name()); + assertEquals("metadata", fields.get(1).name()); // Value field should be nullable even without typed_value - assertTrue(fields.get(1).schema().isNullable()); + assertTrue(fields.get(0).schema().isNullable()); } @Test @@ -1651,4 +1653,204 @@ public void testVariantLogicalTypeSingleton() { // Verify the logical type name assertEquals("variant", instance1.getName()); } + + @Test + public void testCreateShreddedFieldStruct() { + HoodieSchema fieldStruct = HoodieSchema.createShreddedFieldStruct("age", HoodieSchema.create(HoodieSchemaType.INT)); + + assertNotNull(fieldStruct); + assertEquals(HoodieSchemaType.RECORD, fieldStruct.getType()); + assertEquals("age", fieldStruct.getAvroSchema().getName()); + + List fields = fieldStruct.getFields(); + assertEquals(2, fields.size()); + + // value: nullable bytes + assertEquals("value", fields.get(0).name()); + assertTrue(fields.get(0).schema().isNullable()); + assertEquals(HoodieSchemaType.BYTES, fields.get(0).schema().getNonNullType().getType()); + + // typed_value: nullable int + assertEquals("typed_value", fields.get(1).name()); + assertTrue(fields.get(1).schema().isNullable()); + assertEquals(HoodieSchemaType.INT, fields.get(1).schema().getNonNullType().getType()); + } + + @Test + public void testCreateShreddedFieldStructWithDecimal() { + HoodieSchema decimalSchema = HoodieSchema.createDecimal(15, 1); + HoodieSchema fieldStruct = HoodieSchema.createShreddedFieldStruct("price", decimalSchema); + + assertNotNull(fieldStruct); + List fields = fieldStruct.getFields(); + assertEquals(2, fields.size()); + + // typed_value: nullable decimal(15,1) + HoodieSchema typedValueSchema = fields.get(1).schema().getNonNullType(); + assertInstanceOf(HoodieSchema.Decimal.class, typedValueSchema); + assertEquals(15, ((HoodieSchema.Decimal) typedValueSchema).getPrecision()); + assertEquals(1, ((HoodieSchema.Decimal) typedValueSchema).getScale()); + } + + @Test + public void testCreateVariantShreddedObject() { + Map shreddedFields = new LinkedHashMap<>(); + shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.INT)); + shreddedFields.put("b", HoodieSchema.create(HoodieSchemaType.STRING)); + shreddedFields.put("c", HoodieSchema.createDecimal(15, 1)); + + HoodieSchema.Variant variant = HoodieSchema.createVariantShreddedObject(shreddedFields); + + assertNotNull(variant); + assertInstanceOf(HoodieSchema.Variant.class, variant); + assertTrue(variant.isShredded()); + assertTrue(variant.getTypedValueField().isPresent()); + + // Top-level fields: value, metadata, typed_value + List topFields = variant.getFields(); + assertEquals(3, topFields.size()); + assertEquals("value", topFields.get(0).name()); + assertEquals("metadata", topFields.get(1).name()); + assertEquals("typed_value", topFields.get(2).name()); + + // typed_value is a RECORD containing the shredded fields + HoodieSchema typedValueSchema = variant.getTypedValueField().get(); + assertEquals(HoodieSchemaType.RECORD, typedValueSchema.getType()); + List typedValueFields = typedValueSchema.getFields(); + assertEquals(3, typedValueFields.size()); + + // Verify field "a": nullable struct { value: nullable bytes, typed_value: nullable int } + HoodieSchemaField aField = typedValueFields.get(0); + assertEquals("a", aField.name()); + assertTrue(aField.schema().isNullable()); + HoodieSchema aStruct = aField.schema().getNonNullType(); + assertEquals(HoodieSchemaType.RECORD, aStruct.getType()); + List aSubFields = aStruct.getFields(); + assertEquals(2, aSubFields.size()); + assertEquals("value", aSubFields.get(0).name()); + assertTrue(aSubFields.get(0).schema().isNullable()); + assertEquals(HoodieSchemaType.BYTES, aSubFields.get(0).schema().getNonNullType().getType()); + assertEquals("typed_value", aSubFields.get(1).name()); + assertTrue(aSubFields.get(1).schema().isNullable()); + assertEquals(HoodieSchemaType.INT, aSubFields.get(1).schema().getNonNullType().getType()); + + // Verify field "b": nullable struct { value: nullable bytes, typed_value: nullable string } + HoodieSchemaField bField = typedValueFields.get(1); + assertEquals("b", bField.name()); + HoodieSchema bStruct = bField.schema().getNonNullType(); + List bSubFields = bStruct.getFields(); + assertEquals("typed_value", bSubFields.get(1).name()); + assertEquals(HoodieSchemaType.STRING, bSubFields.get(1).schema().getNonNullType().getType()); + + // Verify field "c": nullable struct { value: nullable bytes, typed_value: nullable decimal(15,1) } + HoodieSchemaField cField = typedValueFields.get(2); + assertEquals("c", cField.name()); + HoodieSchema cStruct = cField.schema().getNonNullType(); + List cSubFields = cStruct.getFields(); + HoodieSchema cTypedValue = cSubFields.get(1).schema().getNonNullType(); + assertInstanceOf(HoodieSchema.Decimal.class, cTypedValue); + assertEquals(15, ((HoodieSchema.Decimal) cTypedValue).getPrecision()); + assertEquals(1, ((HoodieSchema.Decimal) cTypedValue).getScale()); + } + + @Test + public void testCreateVariantShreddedObjectWithCustomName() { + Map shreddedFields = new LinkedHashMap<>(); + shreddedFields.put("age", HoodieSchema.create(HoodieSchemaType.INT)); + + HoodieSchema.Variant variant = HoodieSchema.createVariantShreddedObject( + "my_variant", "org.apache.hudi", "A shredded variant", shreddedFields); + + assertNotNull(variant); + assertEquals("my_variant", variant.getAvroSchema().getName()); + assertEquals("org.apache.hudi", variant.getAvroSchema().getNamespace()); + assertTrue(variant.isShredded()); + assertTrue(TestHoodieSchema.isVariantSchema(variant.getAvroSchema())); + } + + @Test + public void testCreateVariantShreddedObjectRoundTrip() { + Map shreddedFields = new LinkedHashMap<>(); + shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.INT)); + shreddedFields.put("b", HoodieSchema.create(HoodieSchemaType.STRING)); + + HoodieSchema.Variant original = HoodieSchema.createVariantShreddedObject(shreddedFields); + String jsonSchema = original.toString(); + + // Parse back from JSON + HoodieSchema parsed = HoodieSchema.parse(jsonSchema); + assertInstanceOf(HoodieSchema.Variant.class, parsed); + HoodieSchema.Variant parsedVariant = (HoodieSchema.Variant) parsed; + + assertTrue(parsedVariant.isShredded()); + assertTrue(parsedVariant.getTypedValueField().isPresent()); + + // Verify typed_value structure is preserved + HoodieSchema typedValueSchema = parsedVariant.getTypedValueField().get(); + assertEquals(HoodieSchemaType.RECORD, typedValueSchema.getType()); + List fields = typedValueSchema.getFields(); + assertEquals(2, fields.size()); + assertEquals("a", fields.get(0).name()); + assertEquals("b", fields.get(1).name()); + + // Verify inner struct structure is preserved + HoodieSchema aStruct = fields.get(0).schema().getNonNullType(); + assertEquals(HoodieSchemaType.RECORD, aStruct.getType()); + assertEquals(2, aStruct.getFields().size()); + assertEquals("value", aStruct.getFields().get(0).name()); + assertEquals("typed_value", aStruct.getFields().get(1).name()); + } + + @Test + public void testGetPlainTypedValueSchemaFromNestedForm() { + // Create a variant using createVariantShreddedObject (nested form) + Map shreddedFields = new LinkedHashMap<>(); + shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.INT)); + shreddedFields.put("b", HoodieSchema.create(HoodieSchemaType.STRING)); + shreddedFields.put("c", HoodieSchema.createDecimal(15, 1)); + + HoodieSchema.Variant variant = HoodieSchema.createVariantShreddedObject(shreddedFields); + + // getPlainTypedValueSchema should unwrap the nested {value, typed_value} structs + Option plainOpt = variant.getPlainTypedValueSchema(); + assertTrue(plainOpt.isPresent()); + HoodieSchema plainSchema = plainOpt.get(); + assertEquals(HoodieSchemaType.RECORD, plainSchema.getType()); + + List fields = plainSchema.getFields(); + assertEquals(3, fields.size()); + assertEquals("a", fields.get(0).name()); + assertEquals(HoodieSchemaType.INT, fields.get(0).schema().getNonNullType().getType()); + assertEquals("b", fields.get(1).name()); + assertEquals(HoodieSchemaType.STRING, fields.get(1).schema().getNonNullType().getType()); + assertEquals("c", fields.get(2).name()); + assertInstanceOf(HoodieSchema.Decimal.class, fields.get(2).schema().getNonNullType()); + } + + @Test + public void testGetPlainTypedValueSchemaFromPlainForm() { + // Create a variant using createVariantShredded (plain form) + HoodieSchema typedValueSchema = HoodieSchema.createRecord("TypedValue", null, null, + Collections.singletonList(HoodieSchemaField.of("data", HoodieSchema.create(HoodieSchemaType.STRING)))); + HoodieSchema.Variant variant = HoodieSchema.createVariantShredded(typedValueSchema); + + // getPlainTypedValueSchema should return as-is since it's already in plain form + Option plainOpt = variant.getPlainTypedValueSchema(); + assertTrue(plainOpt.isPresent()); + HoodieSchema plainSchema = plainOpt.get(); + assertEquals(HoodieSchemaType.RECORD, plainSchema.getType()); + assertEquals(1, plainSchema.getFields().size()); + assertEquals("data", plainSchema.getFields().get(0).name()); + } + + @Test + public void testGetPlainTypedValueSchemaEmpty() { + // Shredded variant without typed_value + HoodieSchema.Variant variant = HoodieSchema.createVariantShredded(null); + assertFalse(variant.getPlainTypedValueSchema().isPresent()); + + // Unshredded variant + HoodieSchema.Variant unshreddedVariant = HoodieSchema.createVariant(); + assertFalse(unshreddedVariant.getPlainTypedValueSchema().isPresent()); + } } diff --git a/hudi-common/src/test/resources/variant_backward_compat/README.md b/hudi-common/src/test/resources/variant_backward_compat/README.md new file mode 100644 index 0000000000000..94d45541b0bb1 --- /dev/null +++ b/hudi-common/src/test/resources/variant_backward_compat/README.md @@ -0,0 +1,41 @@ + + + +# Generation of test files + +These files are generated by using the test: +`org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala:31` + +Spark4.0 is required for this. + +The shell commands used to generate this are: + +```shell +cd /path/to/test/files/ +zip -r zip_file.zip htestvariantdatatype_1 +``` + +The test runs on these four arguments: +1. COW + HoodieRecordType.AVRO +2. COW + HoodieRecordType.SPARK +3. MOR + HoodieRecordType.AVRO +4. MOR + HoodieRecordType.SPARK + +COW tables generated are the same for both AVRO/SPARK. But for MOR, the log files metadata are +different. Hence, we only need to generate test files for either 1/2, 3 and 4, hence, 3 test +resource files. diff --git a/hudi-common/src/test/resources/variant_backward_compat/variant_cow.zip b/hudi-common/src/test/resources/variant_backward_compat/variant_cow.zip new file mode 100644 index 0000000000000..e1efe0f447f6c Binary files /dev/null and b/hudi-common/src/test/resources/variant_backward_compat/variant_cow.zip differ diff --git a/hudi-common/src/test/resources/variant_backward_compat/variant_mor_avro.zip b/hudi-common/src/test/resources/variant_backward_compat/variant_mor_avro.zip new file mode 100644 index 0000000000000..59244af7dd226 Binary files /dev/null and b/hudi-common/src/test/resources/variant_backward_compat/variant_mor_avro.zip differ diff --git a/hudi-common/src/test/resources/variant_backward_compat/variant_mor_spark.zip b/hudi-common/src/test/resources/variant_backward_compat/variant_mor_spark.zip new file mode 100644 index 0000000000000..b1ab29842049b Binary files /dev/null and b/hudi-common/src/test/resources/variant_backward_compat/variant_mor_spark.zip differ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java new file mode 100644 index 0000000000000..d22d5fae7ab51 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.utils.FlinkMiniCluster; +import org.apache.hudi.utils.TestTableEnvs; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * Integration test for cross-engine compatibility - verifying that Flink can read Variant tables written by Spark 4.0. + */ +@ExtendWith(FlinkMiniCluster.class) +public class ITTestVariantCrossEngineCompatibility { + + @TempDir + Path tempDir; + + /** + * Helper method to verify that Flink can read Spark 4.0 Variant tables. + * Variant data is represented as ROW in Flink. + */ + private void verifyFlinkCanReadSparkVariantTable(String tablePath, String tableType, String testDescription) throws Exception { + TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv(); + + // Create a Hudi table pointing to the Spark-written data + // In Flink, Variant is represented as ROW + // NOTE: value is a reserved keyword + String createTableDdl = String.format( + "CREATE TABLE variant_table (" + + " id INT," + + " name STRING," + + " v ROW<`value` BYTES, metadata BYTES>," + + " ts BIGINT," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hudi'," + + " 'path' = '%s'," + + " 'table.type' = '%s'" + + ")", + tablePath, tableType); + + tableEnv.executeSql(createTableDdl); + + // Query the table to verify Flink can read the data + TableResult result = tableEnv.executeSql("SELECT id, name, v, ts FROM variant_table ORDER BY id"); + List rows = CollectionUtil.iteratorToList(result.collect()); + + // Verify we got the expected row (after Spark 4.0 delete operation, only 1 row remains) + assertEquals(1, rows.size(), "Should have 1 row after delete operation in Spark 4.0 (" + testDescription + ")"); + + Row row = rows.get(0); + assertEquals(1, row.getField(0), "First column should be id=1"); + assertEquals("row1", row.getField(1), "Second column should be name=row1"); + assertEquals(1000L, row.getField(3), "Fourth column should be ts=1000"); + + // Verify the variant column is readable as a ROW with binary fields + Row variantRow = (Row) row.getField(2); + assertNotNull(variantRow, "Variant column should not be null"); + + byte[] valueBytes = (byte[]) variantRow.getField(0); + byte[] metadataBytes = (byte[]) variantRow.getField(1); + + // Expected byte values from Spark 4.0 Variant representation: {"updated": true, "new_field": 123} + byte[] expectedValueBytes = new byte[]{0x02, 0x02, 0x01, 0x00, 0x01, 0x00, 0x03, 0x04, 0x0C, 0x7B}; + byte[] expectedMetadataBytes = new byte[]{0x01, 0x02, 0x00, 0x07, 0x10, 0x75, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x64, 0x6E, 0x65, 0x77, 0x5F, 0x66, 0x69, 0x65, 0x6C, 0x64}; + + assertArrayEquals(expectedValueBytes, valueBytes, + String.format("Variant value bytes mismatch (%s). Expected: %s, Got: %s", + testDescription, + Arrays.toString(StringUtils.encodeHex(expectedValueBytes)), + Arrays.toString(StringUtils.encodeHex(valueBytes)))); + + assertArrayEquals(expectedMetadataBytes, metadataBytes, + String.format("Variant metadata bytes mismatch (%s). Expected: %s, Got: %s", + testDescription, + Arrays.toString(StringUtils.encodeHex(expectedMetadataBytes)), + Arrays.toString(StringUtils.encodeHex(metadataBytes)))); + + tableEnv.executeSql("DROP TABLE variant_table"); + } + + @Test + public void testFlinkReadSparkVariantCOWTable() throws Exception { + // Test that Flink can read a COW table with Variant data written by Spark 4.0 + Path cowTargetDir = tempDir.resolve("cow"); + HoodieTestUtils.extractZipToDirectory("variant_backward_compat/variant_cow.zip", cowTargetDir, getClass()); + String cowPath = cowTargetDir.resolve("variant_cow").toString(); + verifyFlinkCanReadSparkVariantTable(cowPath, "COPY_ON_WRITE", "COW table"); + } + + @Test + public void testFlinkReadSparkVariantMORTableWithAvro() throws Exception { + // Test that Flink can read a MOR table with AVRO record type and Variant data written by Spark 4.0 + Path morAvroTargetDir = tempDir.resolve("mor_avro"); + HoodieTestUtils.extractZipToDirectory("variant_backward_compat/variant_mor_avro.zip", morAvroTargetDir, getClass()); + String morAvroPath = morAvroTargetDir.resolve("variant_mor_avro").toString(); + verifyFlinkCanReadSparkVariantTable(morAvroPath, "MERGE_ON_READ", "MOR table with AVRO record type"); + } + + @Test + public void testFlinkReadSparkVariantMORTableWithSpark() throws Exception { + // Test that Flink can read a MOR table with SPARK record type and Variant data written by Spark 4.0 + Path morSparkTargetDir = tempDir.resolve("mor_spark"); + HoodieTestUtils.extractZipToDirectory("variant_backward_compat/variant_mor_spark.zip", morSparkTargetDir, getClass()); + String morSparkPath = morSparkTargetDir.resolve("variant_mor_spark").toString(); + verifyFlinkCanReadSparkVariantTable(morSparkPath, "MERGE_ON_READ", "MOR table with SPARK record type"); + } +} \ No newline at end of file diff --git a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java index 4f8d88d0f6d91..ce68c4a50a189 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java +++ b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java @@ -250,6 +250,10 @@ private Type convertField(String fieldName, HoodieSchema schema, Type.Repetition break; case UNION: return convertUnion(fieldName, schema, repetition, schemaPath); + case VARIANT: + // Variant is represented as a record with value and metadata binary fields + // Convert the variant schema's fields to Parquet types + return new GroupType(repetition, fieldName, convertFields(schema.getFields(), schemaPath)); default: throw new UnsupportedOperationException("Cannot convert Avro type " + type); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/hudi-hadoop-common/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index 50977e573b6c1..f9c80548a12ec 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -860,6 +860,53 @@ public void testUUIDTypeWithParquetUUID() throws Exception { "message myrecord {\n" + " required fixed_len_byte_array(16) uuid (UUID);\n" + "}\n"); } + @Test + public void testUnshreddedVariantType() throws Exception { + HoodieSchema variant = HoodieSchema.createVariant(); + HoodieSchema schema = HoodieSchema.createRecord("myrecord", null, null, false, + Collections.singletonList(HoodieSchemaField.of("myvariant", variant, null, null))); + testAvroToParquetConversion( + schema, + "message myrecord {\n" + + " required group myvariant {\n" + + " required binary value;\n" + + " required binary metadata;\n" + + " }\n" + + "}\n"); + } + + @Test + public void testShreddedVariantType() throws Exception { + HoodieSchema variant = HoodieSchema.createVariantShredded(HoodieSchema.create(HoodieSchemaType.INT)); + HoodieSchema schema = HoodieSchema.createRecord("myrecord", null, null, false, + Collections.singletonList(HoodieSchemaField.of("myvariant", variant, null, null))); + testAvroToParquetConversion( + schema, + "message myrecord {\n" + + " required group myvariant {\n" + + " optional binary value;\n" + + " required binary metadata;\n" + + " required int32 typed_value;\n" + + " }\n" + + "}\n"); + } + + @Test + public void testOptionalVariantType() throws Exception { + HoodieSchema variant = HoodieSchema.createVariant(); + HoodieSchema schema = HoodieSchema.createRecord("myrecord", null, null, false, + Collections.singletonList( + HoodieSchemaField.of("myvariant", HoodieSchema.createNullable(variant), null, HoodieSchema.NULL_VALUE))); + testAvroToParquetConversion( + schema, + "message myrecord {\n" + + " optional group myvariant {\n" + + " required binary value;\n" + + " required binary metadata;\n" + + " }\n" + + "}\n"); + } + @Test public void testAvroFixed12AsParquetInt96Type() throws Exception { HoodieSchema schema = getSchemaFromResource(TestAvroSchemaConverter.class, "/parquet-java/fixedToInt96.avsc"); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 0612dae85b266..2b7ade43817e3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -520,12 +520,18 @@ class HoodieSparkSqlWriterInternal { // if table has undergone upgrade, we need to reload table config tableMetaClient.reloadTableConfig() tableConfig = tableMetaClient.getTableConfig - // Convert to RDD[HoodieRecord] - val hoodieRecords = Try(HoodieCreateRecordUtils.createHoodieRecordRdd( + // Convert to RDD[HoodieRecord] and force type immediately + val hoodieRecords: JavaRDD[HoodieRecord[_]] = Try(HoodieCreateRecordUtils.createHoodieRecordRdd( HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema, processedDataSchema, operation, instantTime, preppedSparkSqlWrites, preppedSparkSqlMergeInto, preppedWriteOperation, tableConfig))) match { - case Success(recs) => recs + // CAST EXPLANATION: + // This cast is required due to Scala/Java generic variance mismatch. + // 1. Java returns JavaRDD[HoodieRecord[_ <: Object]] (wildcard maps to Object bound) + // 2. Scala expects JavaRDD[HoodieRecord[_]] (existential type maps to Any) + // 3. JavaRDD is Invariant in Scala, so these types are not compatible without a cast. + // Please do not remove, even if IDE marks it as redundant. + case Success(recs) => recs.asInstanceOf[JavaRDD[HoodieRecord[_]]] case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala new file mode 100644 index 0000000000000..0d830fde66a21 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.dml.schema + +import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.common.testutils.HoodieTestUtils +import org.apache.hudi.common.util.StringUtils + +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase + + +class TestVariantDataType extends HoodieSparkSqlTestBase { + + test(s"Test Table with Variant Data Type") { + // Variant type is only supported in Spark 4.0+ + assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher") + + Seq("cow", "mor").foreach { tableType => + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + // Create a table with a Variant column + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | v variant, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // Insert data with Variant values using parse_json (Spark 4.0+) + spark.sql( + s""" + |insert into $tableName + |values + | (1, 'row1', parse_json('{"key": "value1", "num": 1}'), 1000), + | (2, 'row2', parse_json('{"key": "value2", "list": [1, 2, 3]}'), 1000) + """.stripMargin) + + // Verify the data by casting Variant to String for deterministic comparison + checkAnswer(s"select id, name, cast(v as string), ts from $tableName order by id")( + Seq(1, "row1", "{\"key\":\"value1\",\"num\":1}", 1000), + Seq(2, "row2", "{\"key\":\"value2\",\"list\":[1,2,3]}", 1000) + ) + + // Test Updates on Variant column, MOR will generate logs + spark.sql( + s""" + |update $tableName + |set v = parse_json('{"updated": true, "new_field": 123}') + |where id = 1 + """.stripMargin) + + checkAnswer(s"select id, name, cast(v as string), ts from $tableName order by id")( + Seq(1, "row1", "{\"new_field\":123,\"updated\":true}", 1000), + Seq(2, "row2", "{\"key\":\"value2\",\"list\":[1,2,3]}", 1000) + ) + + // Test Delete + spark.sql(s"delete from $tableName where id = 2") + + checkAnswer(s"select id, name, cast(v as string), ts from $tableName order by id")( + Seq(1, "row1", "{\"new_field\":123,\"updated\":true}", 1000) + ) + }) + } + } + + test(s"Test Backward Compatibility: Read Spark 4.0 Variant Table in Spark 3.x") { + // This test only runs on Spark 3.x to verify backward compatibility + assume(HoodieSparkUtils.isSpark3, "This test verifies Spark 3.x can read Spark 4.0 Variant tables") + + withTempDir { tmpDir => + // Test COW table - record type does not affect file metadata for COW, only need one test + HoodieTestUtils.extractZipToDirectory("variant_backward_compat/variant_cow.zip", tmpDir.toPath, getClass) + val cowPath = tmpDir.toPath.resolve("variant_cow").toString + verifyVariantBackwardCompatibility(cowPath, "cow", "COW table") + + // Test MOR table with AVRO record type + HoodieTestUtils.extractZipToDirectory("variant_backward_compat/variant_mor_avro.zip", tmpDir.toPath, getClass) + val morAvroPath = tmpDir.toPath.resolve("variant_mor_avro").toString + verifyVariantBackwardCompatibility(morAvroPath, "mor", "MOR table with AVRO record type") + + // Test MOR table with SPARK record type + HoodieTestUtils.extractZipToDirectory("variant_backward_compat/variant_mor_spark.zip", tmpDir.toPath, getClass) + val morSparkPath = tmpDir.toPath.resolve("variant_mor_spark").toString + verifyVariantBackwardCompatibility(morSparkPath, "mor", "MOR table with SPARK record type") + } + } + + /** + * Helper method to verify backward compatibility of reading Spark 4.0 Variant tables in Spark 3.x + */ + private def verifyVariantBackwardCompatibility(resourcePath: String, tableType: String, testDescription: String): Unit = { + val tableName = generateTableName + + // Create a Hudi table pointing to the saved data location + // In Spark 3.x, we define the Variant column as a struct with binary fields since Variant type is not available + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | v struct, + | ts long + |) using hudi + |location '$resourcePath' + |tblproperties ( + | primaryKey = 'id', + | tableType = '$tableType', + | preCombineField = 'ts' + |) + """.stripMargin) + + // Verify we can read the basic columns + checkAnswer(s"select id, name, ts from $tableName order by id")(Seq(1, "row1", 1000)) + + // Read and verify the variant column as a struct with binary fields + val rows = spark.sql(s"select id, v from $tableName order by id").collect() + assert(rows.length == 1, s"Should have 1 row after delete operation in Spark 4.0 ($testDescription)") + assert(rows(0).getInt(0) == 1, "First column should be id=1") + assert(!rows(0).isNullAt(1), "Variant column should not be null") + + val variantStruct = rows(0).getStruct(1) + assert(variantStruct.size == 2, "Variant struct should have 2 fields: value and metadata") + + val valueBytes = variantStruct.getAs[Array[Byte]](0) + val metadataBytes = variantStruct.getAs[Array[Byte]](1) + + // Expected byte values from Spark 4.0 Variant representation: {"updated": true, "new_field": 123} + val expectedValueBytes = Array[Byte](0x02, 0x02, 0x01, 0x00, 0x01, 0x00, 0x03, 0x04, 0x0C, 0x7B) + val expectedMetadataBytes = Array[Byte](0x01, 0x02, 0x00, 0x07, 0x10, 0x75, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x64, 0x6E, 0x65, 0x77, 0x5F, 0x66, 0x69, 0x65, 0x6C, 0x64) + + assert(valueBytes.sameElements(expectedValueBytes), + s"Variant value bytes mismatch ($testDescription). " + + s"Expected: ${StringUtils.encodeHex(expectedValueBytes).mkString("Array(", ", ", ")")}, " + + s"Got: ${StringUtils.encodeHex(valueBytes).mkString("Array(", ", ", ")")}") + + assert(metadataBytes.sameElements(expectedMetadataBytes), + s"Variant metadata bytes mismatch ($testDescription). " + + s"Expected: ${StringUtils.encodeHex(expectedMetadataBytes).mkString("Array(", ", ", ")")}, " + + s"Got: ${StringUtils.encodeHex(metadataBytes).mkString("Array(", ", ", ")")}") + + // Verify we can select all columns without errors + assert(spark.sql(s"select * from $tableName").count() == 1, "Should be able to read all columns including variant") + + spark.sql(s"drop table $tableName") + } +} diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 4f2bbb4b5b75c..dca4967d692ea 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -26,6 +26,8 @@ import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.spark.internal.ReflectUtil +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Type.Repetition import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -34,7 +36,7 @@ import org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate} +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate, SpecializedGetters} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -45,7 +47,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.HoodieFormatTrait import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.sources.{BaseRelation, Filter} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String @@ -53,6 +55,7 @@ import org.apache.spark.unsafe.types.UTF8String import java.time.ZoneId import java.util.TimeZone import java.util.concurrent.ConcurrentHashMap +import java.util.function.{BiConsumer, Consumer} import scala.collection.JavaConverters._ @@ -176,4 +179,51 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { Dataset.ofRows(sqlContext.sparkSession, applyFiltersToPlan(logicalRelation, requiredSchema, resolvedSchema, relation.fileFormat.asInstanceOf[HoodieFormatTrait].getRequiredFilters)) } + + override def getVariantDataType: Option[DataType] = { + // Spark 3.x does not support VariantType + None + } + + override def isDataTypeEqualForParquet(requiredType: DataType, fileType: DataType): Option[Boolean] = { + // Spark 3.x does not support VariantType, so return None to use default logic + None + } + + override def isVariantType(dataType: DataType): Boolean = { + // Spark 3.x does not support VariantType + false + } + + override def createVariantValueWriter( + dataType: DataType, + writeValue: Consumer[Array[Byte]], + writeMetadata: Consumer[Array[Byte]] + ): BiConsumer[SpecializedGetters, Integer] = { + throw new UnsupportedOperationException("Spark 3.x does not support VariantType") + } + + override def convertVariantFieldToParquetType( + dataType: DataType, + fieldName: String, + fieldSchema: HoodieSchema, + repetition: Repetition + ): Type = { + throw new UnsupportedOperationException("Spark 3.x does not support VariantType") + } + override def isVariantShreddingStruct(structType: StructType): Boolean = { + // Spark 3.x does not support Variant shredding + false + } + + override def generateVariantWriteShreddingSchema(dataType: DataType, isTopLevel: Boolean, isObjectField: Boolean): StructType = { + throw new UnsupportedOperationException("Spark 3.x does not support Variant shredding") + } + + override def createShreddedVariantWriter( + shreddedStructType: StructType, + writeStruct: Consumer[InternalRow] + ): BiConsumer[SpecializedGetters, Integer] = { + throw new UnsupportedOperationException("Spark 3.x does not support Variant shredding") + } } diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala index 58ed3eb5b88c9..30639d37f88ed 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala @@ -27,7 +27,8 @@ import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.spark.internal.ReflectUtil import org.apache.hudi.storage.StorageConfiguration -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{MessageType, Type} +import org.apache.parquet.schema.Type.Repetition import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -36,7 +37,7 @@ import org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate} +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate, SpecializedGetters} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -45,18 +46,20 @@ import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.classic.ColumnConversions import org.apache.spark.sql.execution.{PartitionedFileUtil, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFilters} +import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFilters, SparkShreddingUtils} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, Filter} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{BinaryType, DataType, StructType, VariantType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.storage.StorageLevel +import org.apache.spark.types.variant.Variant import org.apache.spark.unsafe.types.UTF8String import java.time.ZoneId import java.util.TimeZone import java.util.concurrent.ConcurrentHashMap +import java.util.function.{BiConsumer, Consumer} import scala.collection.JavaConverters._ @@ -196,4 +199,109 @@ abstract class BaseSpark4Adapter extends SparkAdapter with Logging { storageConf.getBoolean(SQLConf.CASE_SENSITIVE.key, sqlConf.caseSensitiveAnalysis), getRebaseSpec("CORRECTED")) } + + override def getVariantDataType: Option[DataType] = { + Some(VariantType) + } + + override def isDataTypeEqualForParquet(requiredType: DataType, fileType: DataType): Option[Boolean] = { + /** + * Checks if a StructType is the physical representation of VariantType in Parquet. + * VariantType is stored in Parquet as a struct with two binary fields: "value" and "metadata". + */ + def isVariantPhysicalSchema(structType: StructType): Boolean = { + if (structType.fields.length != 2) { + false + } else { + val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap + fieldMap.contains("value") && fieldMap.contains("metadata") && + fieldMap("value") == BinaryType && fieldMap("metadata") == BinaryType + } + } + + // Handle VariantType comparisons + (requiredType, fileType) match { + case (_: VariantType, s: StructType) if isVariantPhysicalSchema(s) => Some(true) + case (s: StructType, _: VariantType) if isVariantPhysicalSchema(s) => Some(true) + case _ => None // Not a VariantType comparison, use default logic + } + } + + override def isVariantType(dataType: DataType): Boolean = { + import org.apache.spark.sql.types.VariantType + dataType.isInstanceOf[VariantType] + } + + override def createVariantValueWriter( + dataType: DataType, + writeValue: Consumer[Array[Byte]], + writeMetadata: Consumer[Array[Byte]] + ): BiConsumer[SpecializedGetters, Integer] = { + import org.apache.spark.sql.types.VariantType + + if (!dataType.isInstanceOf[VariantType]) { + throw new IllegalArgumentException(s"Expected VariantType but got $dataType") + } + + (row: SpecializedGetters, ordinal: Integer) => { + val variant = row.getVariant(ordinal) + writeValue.accept(variant.getValue) + writeMetadata.accept(variant.getMetadata) + } + } + + override def convertVariantFieldToParquetType( + dataType: DataType, + fieldName: String, + fieldSchema: HoodieSchema, + repetition: Repetition + ): Type = { + import org.apache.parquet.schema.{PrimitiveType, Types} + import org.apache.spark.sql.types.VariantType + + if (!dataType.isInstanceOf[VariantType]) { + throw new IllegalArgumentException(s"Expected VariantType but got $dataType") + } + + // Determine if this is a shredded variant + val isShredded = fieldSchema match { + case variant: HoodieSchema.Variant => variant.isShredded + case _ => false + } + + // For shredded variants, the value field is OPTIONAL (nullable) + // For unshredded variants, the value field is REQUIRED + val valueRepetition = if (isShredded) Repetition.OPTIONAL else Repetition.REQUIRED + + // VariantType is always stored in Parquet as a struct with separate value and metadata binary fields. + // This matches how the HoodieRowParquetWriteSupport writes variant data. + // Note: We intentionally omit 'typed_value' for shredded variants as this writer only accesses raw binary blobs. + Types.buildGroup(repetition) + .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, valueRepetition).named("value")) + .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Repetition.REQUIRED).named("metadata")) + .named(fieldName) + } + + override def isVariantShreddingStruct(structType: StructType): Boolean = { + SparkShreddingUtils.isVariantShreddingStruct(structType) + } + + override def generateVariantWriteShreddingSchema(dataType: DataType, isTopLevel: Boolean, isObjectField: Boolean): StructType = { + SparkShreddingUtils.addWriteShreddingMetadata( + SparkShreddingUtils.variantShreddingSchema(dataType, isTopLevel, isObjectField)) + } + + override def createShreddedVariantWriter( + shreddedStructType: StructType, + writeStruct: Consumer[InternalRow] + ): BiConsumer[SpecializedGetters, Integer] = { + val variantShreddingSchema = SparkShreddingUtils.buildVariantSchema(shreddedStructType) + + (row: SpecializedGetters, ordinal: Integer) => { + val variantVal = row.getVariant(ordinal) + val variant = new Variant(variantVal.getValue, variantVal.getMetadata) + val shreddedValues = SparkShreddingUtils.castShredded(variant, variantShreddingSchema) + writeStruct.accept(shreddedValues) + } + } } diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala index cbf0b2de8bb44..0e0b5be82f591 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark40HoodieFileScanRDD import org.apache.hudi.common.schema.HoodieSchema -import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 8623d24c6f580..7bd4c54588c1f 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.LegacyBehaviorPolicy import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{UTF8String, VariantVal} import java.math.BigDecimal import java.nio.ByteBuffer @@ -218,6 +218,37 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) updater.setDecimal(ordinal, decimal) + case (RECORD, VariantType) if avroType.getProp("logicalType") == "variant" => + // Validation & Pre-calculation with fail fast logic + val valueField = avroType.getField("value") + val metadataField = avroType.getField("metadata") + + if (valueField == null || metadataField == null) { + throw new IncompatibleSchemaException(incompatibleMsg + + ": Variant logical type requires 'value' and 'metadata' fields") + } + + val valueIdx = valueField.pos() + val metadataIdx = metadataField.pos() + + // Variant types are stored as records with "value" and "metadata" binary fields + // Deserialize them back to VariantVal + (updater, ordinal, value) => + val record = value.asInstanceOf[IndexedRecord] + + val valueBuffer = record.get(valueIdx).asInstanceOf[ByteBuffer] + val valueBytes = new Array[Byte](valueBuffer.remaining) + valueBuffer.get(valueBytes) + valueBuffer.rewind() + + val metadataBuffer = record.get(metadataIdx).asInstanceOf[ByteBuffer] + val metadataBytes = new Array[Byte](metadataBuffer.remaining) + metadataBuffer.get(metadataBytes) + metadataBuffer.rewind() + + val variant = new VariantVal(valueBytes, metadataBytes) + updater.set(ordinal, variant) + case (RECORD, st: StructType) => // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328. // We can always return `false` from `applyFilters` for nested records. diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index 968cc8ff02c67..c6e131fd26ac1 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -22,6 +22,7 @@ import org.apache.avro.Conversions.DecimalConversion import org.apache.avro.LogicalTypes.{LocalTimestampMicros, LocalTimestampMillis, TimestampMicros, TimestampMillis} import org.apache.avro.Schema.Type import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed, Record} import org.apache.avro.util.Utf8 import org.apache.spark.internal.Logging @@ -221,6 +222,32 @@ private[sql] class AvroSerializer(rootCatalystType: DataType, java.util.Arrays.asList(result: _*) } + case (VariantType, RECORD) if avroType.getProp("logicalType") == "variant" => + // Fail fast if schema is mismatched + val valueField = avroType.getField("value") + val metadataField = avroType.getField("metadata") + + if (valueField == null || metadataField == null) { + throw new IncompatibleSchemaException(errorPrefix + + s"Avro schema with 'variant' logical type must have 'value' and 'metadata' fields. " + + s"Found: ${avroType.getFields.asScala.map(_.name()).mkString(", ")}") + } + + // Pre-calculation: Cache indices for performance + val valueIdx = valueField.pos() + val metadataIdx = metadataField.pos() + + // Variant types are stored as records with "value" and "metadata" binary fields + // This matches the schema created in SchemaConverters.toAvroType + (getter, ordinal) => + val variant = getter.getVariant(ordinal) + val record = new GenericData.Record(avroType) + + // Use positional access in serialization loop + record.put(valueIdx, ByteBuffer.wrap(variant.getValue)) + record.put(metadataIdx, ByteBuffer.wrap(variant.getMetadata)) + record + case (st: StructType, RECORD) => val structConverter = newStructConverter(st, avroType, catalystPath, avroPath) val numFields = st.length diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowParquetWriteSupportVariant.java b/hudi-spark-datasource/hudi-spark4.0.x/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowParquetWriteSupportVariant.java new file mode 100644 index 0000000000000..323d0f0c7e5d7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowParquetWriteSupportVariant.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.VariantVal; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for Variant type support in {@link HoodieRowParquetWriteSupport}. + * Verifies that Spark VariantType data is correctly written to Parquet groups with 'metadata' and 'value' binary fields, + * respecting shredded vs unshredded schemas. + */ +public class TestHoodieRowParquetWriteSupportVariant { + + @TempDir + public File tempDir; + + /** + * Tests that an Unshredded Variant (defined by HoodieSchema) is written as: + *
+   * group {
+   * required binary metadata;
+   * required binary value;
+   * }
+   * 
+ */ + @Test + public void testWriteUnshreddedVariant() throws IOException { + + // Setup Hoodie Schema: Unshredded + HoodieSchema.Variant variantSchema = HoodieSchema.createVariant("v", null, "Unshredded variant"); + HoodieSchema recordSchema = HoodieSchema.createRecord("record", null, null, + Collections.singletonList(HoodieSchemaField.of("v", variantSchema, null, null))); + + // Spark Schema: Uses actual VariantType to trigger the correct path in WriteSupport + StructType sparkSchema = new StructType(new StructField[] { + new StructField("v", DataTypes.VariantType, false, Metadata.empty()) + }); + + // Setup Data + // Note: This relies on the test environment having a constructible VariantVal + VariantVal variantVal = getSampleVariantVal(); + InternalRow row = new GenericInternalRow(new Object[] {variantVal}); + + // Write to Parquet + File outputFile = new File(tempDir, "unshredded.parquet"); + writeRows(outputFile, sparkSchema, recordSchema, row); + + // Verify Parquet Structure + MessageType parquetSchema = readParquetSchema(outputFile); + GroupType vGroup = parquetSchema.getType("v").asGroupType(); + + assertEquals(2, vGroup.getFieldCount(), "Unshredded variant should have exactly 2 fields"); + + // Metadata (Required) + Type metaField = vGroup.getType("metadata"); + assertEquals(BINARY, metaField.asPrimitiveType().getPrimitiveTypeName()); + assertEquals(REQUIRED, metaField.getRepetition()); + + // Value (Required for Unshredded) + Type valueField = vGroup.getType("value"); + assertEquals(BINARY, valueField.asPrimitiveType().getPrimitiveTypeName()); + assertEquals(REQUIRED, valueField.getRepetition(), "Unshredded variant value must be REQUIRED"); + } + + /** + * Tests that a Shredded Variant (defined by HoodieSchema) is written as: + *
+   *   group {
+   *     required binary metadata;
+   *     optional binary value;
+   *   }
+   * 
+ *

+ * The schema that we are using here is as such: + *

+   *   root
+   *    |-- v: struct (nullable = true)
+   *    |    |-- metadata: binary (nullable = true)
+   *    |    |-- value: binary (nullable = true)
+   *    |    |-- typed_value: struct (nullable = true)
+   *    |    |    |-- a: struct (nullable = true)
+   *    |    |    |    |-- value: binary (nullable = true)
+   *    |    |    |    |-- typed_value: integer (nullable = true)
+   *    |    |    |-- b: struct (nullable = true)
+   *    |    |    |    |-- value: binary (nullable = true)
+   *    |    |    |    |-- typed_value: string (nullable = true)
+   *    |    |    |-- c: struct (nullable = true)
+   *    |    |    |    |-- value: binary (nullable = true)
+   *    |    |    |    |-- typed_value: decimal(15,1) (nullable = true)
+   * 
+ * Note: Even though typed_value is not populated, the schema must indicate 'value' is OPTIONAL. + */ + @Test + public void testWriteShreddedVariant() throws IOException { + HoodieSchema.Variant variantSchema = HoodieSchema.createVariantShreddedObject("v", null, "Shredded variant", + // These does not need to be nullable, #createVariantShreddedObject will handle the nullable coercion + Map.of("a", HoodieSchema.create(HoodieSchemaType.INT), + "b", HoodieSchema.create(HoodieSchemaType.STRING), + "c", HoodieSchema.createDecimal(15, 1))); + HoodieSchema recordSchema = HoodieSchema.createRecord("record", null, null, + Collections.singletonList(HoodieSchemaField.of("v", variantSchema, null, null))); + + // Spark Schema: Uses actual VariantType + StructType sparkSchema = new StructType(new StructField[] { + new StructField("v", DataTypes.VariantType, false, Metadata.empty()) + }); + + // Setup Data + VariantVal variantVal = getSampleVariantVal(); + InternalRow row = new GenericInternalRow(new Object[] {variantVal}); + + // Write to Parquet + File outputFile = new File(tempDir, "shredded.parquet"); + writeRows(outputFile, sparkSchema, recordSchema, row); + + // Verify Parquet Structure + MessageType parquetSchema = readParquetSchema(outputFile); + GroupType vGroup = parquetSchema.getType("v").asGroupType(); + + // Metadata (Required) + Type metaField = vGroup.getType("metadata"); + assertEquals(BINARY, metaField.asPrimitiveType().getPrimitiveTypeName()); + assertEquals(REQUIRED, metaField.getRepetition()); + + // Value (OPTIONAL for Shredded) + Type valueField = vGroup.getType("value"); + assertEquals(BINARY, valueField.asPrimitiveType().getPrimitiveTypeName()); + assertEquals(OPTIONAL, valueField.getRepetition(), "Shredded variant value must be OPTIONAL"); + + boolean hasTypedValue = vGroup.getFields().stream().anyMatch(f -> f.getName().equals("typed_value")); + assertTrue(hasTypedValue, "typed_value field should be omitted in this writer implementation"); + + // Check parquet metadata in the footer that shredded schemas are created + ParquetMetadata fileFooter = ParquetFileReader.readFooter(new Configuration(), new Path(outputFile.toURI()), ParquetMetadataConverter.NO_FILTER); + MessageType footerSchema = fileFooter.getFileMetaData().getSchema(); + assertEquals(INT32, footerSchema.getType("v", "typed_value", "a", "typed_value").asPrimitiveType().getPrimitiveTypeName()); + assertEquals(LogicalTypeAnnotation.stringType(), footerSchema.getType("v", "typed_value", "b", "typed_value").getLogicalTypeAnnotation()); + assertEquals(LogicalTypeAnnotation.decimalType(1, 15), footerSchema.getType("v", "typed_value", "c", "typed_value").getLogicalTypeAnnotation()); + } + + /** + * Tests that a root-level scalar shredded Variant is written as: + *
+   *   group {
+   *     required binary metadata;
+   *     optional binary value;
+   *     optional int64 typed_value;
+   *   }
+   * 
+ * This verifies that scalar typed_value (e.g. LONG) is correctly shredded at the root level, + * as opposed to object shredding where typed_value is a nested struct. + */ + @Test + public void testWriteShreddedVariantRootLevelScalar() throws IOException { + HoodieSchema.Variant variantSchema = HoodieSchema.createVariantShredded("v", null, "Scalar shredded variant", + HoodieSchema.create(HoodieSchemaType.LONG)); + HoodieSchema recordSchema = HoodieSchema.createRecord("record", null, null, + Collections.singletonList(HoodieSchemaField.of("v", variantSchema, null, null))); + + // Spark Schema: Uses actual VariantType + StructType sparkSchema = new StructType(new StructField[] { + new StructField("v", DataTypes.VariantType, false, Metadata.empty()) + }); + + // Setup Data: integer variant value 100 + VariantVal variantVal = getSampleVariantValWithRootLevelScalar(); + InternalRow row = new GenericInternalRow(new Object[] {variantVal}); + + // Write to Parquet + File outputFile = new File(tempDir, "shredded_scalar.parquet"); + writeRows(outputFile, sparkSchema, recordSchema, row); + + // Verify Parquet Structure + ParquetMetadata fileFooter = ParquetFileReader.readFooter(new Configuration(), new Path(outputFile.toURI()), ParquetMetadataConverter.NO_FILTER); + MessageType footerSchema = fileFooter.getFileMetaData().getSchema(); + GroupType vGroup = footerSchema.getType("v").asGroupType(); + + // Should have 3 fields: metadata, value, typed_value + assertEquals(3, vGroup.getFieldCount(), "Scalar shredded variant should have 3 fields"); + + // Metadata (Required) + Type metaField = vGroup.getType("metadata"); + assertEquals(BINARY, metaField.asPrimitiveType().getPrimitiveTypeName()); + assertEquals(REQUIRED, metaField.getRepetition()); + + // Value (OPTIONAL for Shredded) + Type valueField = vGroup.getType("value"); + assertEquals(BINARY, valueField.asPrimitiveType().getPrimitiveTypeName()); + assertEquals(OPTIONAL, valueField.getRepetition(), "Shredded variant value must be OPTIONAL"); + + // typed_value (OPTIONAL, INT64 for LONG) + Type typedValueField = vGroup.getType("typed_value"); + assertEquals(INT64, typedValueField.asPrimitiveType().getPrimitiveTypeName()); + assertEquals(OPTIONAL, typedValueField.getRepetition(), "Scalar typed_value must be OPTIONAL"); + } + + private void writeRows(File outputFile, StructType sparkSchema, HoodieSchema recordSchema, InternalRow row) throws IOException { + Configuration conf = new Configuration(); + TypedProperties props = new TypedProperties(); + props.setProperty(HoodieWriteConfig.AVRO_SCHEMA_STRING.key(), recordSchema.toString()); + + // Create config and write support + HoodieConfig hoodieConfig = new HoodieConfig(props); + HoodieRowParquetWriteSupport writeSupport = new HoodieRowParquetWriteSupport( + conf, sparkSchema, Option.empty(), hoodieConfig); + + try (ParquetWriter writer = new ParquetWriter<>(new Path(outputFile.getAbsolutePath()), writeSupport)) { + writer.write(row); + } + } + + private MessageType readParquetSchema(File file) throws IOException { + Configuration conf = new Configuration(); + ParquetMetadata metadata = ParquetFileReader.readFooter(conf, new Path(file.getAbsolutePath())); + return metadata.getFileMetaData().getSchema(); + } + + /** + * Helper method to create a VariantVal with sample data derived from Spark 4.0 investigation. + * Represents JSON: + *
{"a":1,"b":"2","c":3.3,"d":4.4}
+ *

+ * The values are obtained by following these steps: + * how to obtain raw bytes + */ + private static VariantVal getSampleVariantVal() { + byte[] metadata = new byte[] {0x01, 0x04, 0x00, 0x01, 0x02, 0x03, 0x04, 0x61, 0x62, 0x63, 0x64}; + byte[] value = new byte[] {0x02, 0x04, 0x00, 0x01, 0x02, 0x03, 0x00, 0x02, 0x04, 0x0a, 0x10, 0x0c, + 0x01, 0x05, 0x32, 0x20, 0x01, 0x21, 0x00, 0x00, 0x00, 0x20, 0x01, 0x2c, 0x00, 0x00, 0x00}; + return new VariantVal(value, metadata); + } + + /** + * Helper method to create a VariantVal with sample data derived from Spark 4.0 investigation. + * Represents JSON: + *

100
+ *

+ * The values are obtained by following these steps: + * how to obtain raw bytes + */ + private static VariantVal getSampleVariantValWithRootLevelScalar() { + byte[] metadata = new byte[] {0x01, 0x00, 0x00}; + byte[] value = new byte[] {0x0c, 0x64}; + return new VariantVal(value, metadata); + } +}