Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ public static DataType convertToDataType(HoodieSchema hoodieSchema) {
return convertRecord(hoodieSchema);
case UNION:
return convertUnion(hoodieSchema);
case VARIANT:
return convertVariant(hoodieSchema);
default:
throw new IllegalArgumentException("Unsupported HoodieSchemaType: " + type);
}
Expand Down Expand Up @@ -445,6 +447,25 @@ private static DataType convertUnion(HoodieSchema schema) {
return nullable ? rawDataType.nullable() : rawDataType;
}

/**
* Converts a Variant schema to Flink's ROW type.
* Variant is represented as ROW<`value` BYTES, `metadata` BYTES> in Flink.
*
* @param schema HoodieSchema to convert (must be a VARIANT type)
* @return DataType representing the Variant as a ROW with binary fields
*/
private static DataType convertVariant(HoodieSchema schema) {
if (schema.getType() != HoodieSchemaType.VARIANT) {
throw new IllegalStateException("Expected HoodieSchema.Variant but got: " + schema.getClass());
}

// Variant is stored as a struct with two binary fields: value and metadata
return DataTypes.ROW(
DataTypes.FIELD("value", DataTypes.BYTES().notNull()),
DataTypes.FIELD("metadata", DataTypes.BYTES().notNull())
).notNull();
}

/**
* Returns true if all the types are RECORD type with same number of fields.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io.storage.row;

import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
Expand Down Expand Up @@ -73,6 +74,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

import scala.Enumeration;
import scala.Function1;
Expand Down Expand Up @@ -121,6 +123,12 @@ public class HoodieRowParquetWriteSupport extends WriteSupport<InternalRow> {
private final ValueWriter[] rootFieldWriters;
private final HoodieSchema schema;
private final StructType structType;
/**
* The shredded schema. When Variant columns are configured for shredding, this schema has those VariantType columns replaced with their shredded struct schemas.
* <p>
* For non-shredded cases, this is identical to structType.
*/
private final StructType shreddedSchema;
private RecordConsumer recordConsumer;

public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
Expand All @@ -139,21 +147,89 @@ public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema, config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
// Generate shredded schema if there are shredded Variant columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to note the behavior when there are no shredded columns, like "falls back to provided schema if no shredded Variant columns are present"

this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
this.rootFieldWriters = getFieldWriters(structType, schema);
// Use shreddedSchema for creating writers when shredded Variants are present
this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}

/**
* Generates a shredded schema from the given structType and hoodieSchema.
* <p>
* For Variant fields that are configured for shredding (based on HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded struct schema.
*
* @param structType The original Spark StructType
* @param hoodieSchema The HoodieSchema containing shredding information
* @return A StructType with shredded Variant fields replaced by their shredded schemas
*/
private StructType generateShreddedSchema(StructType structType, HoodieSchema hoodieSchema) {
StructField[] fields = structType.fields();
StructField[] shreddedFields = new StructField[fields.length];
boolean hasShredding = false;

for (int i = 0; i < fields.length; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop only contains top level fields. Should we recursively inspect the struct fields?

If it is possible to have nested Variant fields, let's make sure we have a test for it as well.

StructField field = fields[i];
DataType dataType = field.dataType();

// Check if this is a Variant field that should be shredded
if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rely directly on the provided HoodieSchema here?

HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ever possible for the provided schema to be null? Is there a possibility of it being out of sync with the struct?

.flatMap(s -> s.getField(field.name()))
.map(f -> f.schema())
.orElse(null);

if (fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) {
HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) fieldHoodieSchema;
if (variantSchema.isShredded() && variantSchema.getTypedValueField().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect a case where isShredded is true but the value field is not present?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. The value field must always be present from my understanding of things.

In unstructured data, the data can either be shredded or not and by design, since shredding is a feature ontop of variants. As such, the default state of all variant fields is unshredded, so the field has to be there for all data that cannot be shredded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this condition then?

// Use plain types for SparkShreddingUtils (unwraps nested {value, typed_value} structs if present)
HoodieSchema typedValueSchema = variantSchema.getPlainTypedValueSchema().get();
DataType typedValueDataType = HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(typedValueSchema);

// Generate the shredding schema with write metadata using SparkAdapter
StructType markedShreddedStruct = SparkAdapterSupport$.MODULE$.sparkAdapter()
.generateVariantWriteShreddingSchema(typedValueDataType, true, false);

shreddedFields[i] = new StructField(field.name(), markedShreddedStruct, field.nullable(), field.metadata());
hasShredding = true;
continue;
}
}
}

// Not a shredded Variant, keep the original field
shreddedFields[i] = field;
}

return hasShredding ? new StructType(shreddedFields) : structType;
}

/**
* Creates field writers for each field in the schema.
*
* @param schema The schema to create writers for (may contain shredded Variant struct types)
* @param hoodieSchema The HoodieSchema for type information
* @return Array of ValueWriters for each field
*/
private ValueWriter[] getFieldWriters(StructType schema, HoodieSchema hoodieSchema) {
return Arrays.stream(schema.fields()).map(field -> {
StructField[] fields = schema.fields();
ValueWriter[] writers = new ValueWriter[fields.length];

for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];

HoodieSchema fieldSchema = Option.ofNullable(hoodieSchema)
.flatMap(s -> s.getField(field.name()))
// Note: Cannot use HoodieSchemaField::schema method reference due to Java 17 compilation ambiguity
.map(f -> f.schema())
.orElse(null);
return makeWriter(fieldSchema, field.dataType());
}).toArray(ValueWriter[]::new);

writers[i] = makeWriter(fieldSchema, field.dataType());
}

return writers;
}

@Override
Expand All @@ -166,7 +242,9 @@ public WriteContext init(Configuration configuration) {
}
Configuration configurationCopy = new Configuration(configuration);
configurationCopy.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, Boolean.toString(writeLegacyListFormat));
MessageType messageType = convert(structType, schema);
// Use shreddedSchema for Parquet schema conversion when shredding is enabled
// This ensures the Parquet file structure includes the shredded typed_value columns
MessageType messageType = convert(shreddedSchema, schema);
return new WriteContext(messageType, metadata);
}

Expand Down Expand Up @@ -281,6 +359,18 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) {
} else if (dataType == DataTypes.BinaryType) {
return (row, ordinal) -> recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getBinary(ordinal)));
} else if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
// Maps VariantType to a group containing 'metadata' and 'value' fields.
// This ensures Spark 4.0 compatibility and supports both Shredded and Unshredded schemas.
// Note: We intentionally omit 'typed_value' for shredded variants as this writer only accesses raw binary blobs.
BiConsumer<SpecializedGetters, Integer> variantWriter = SparkAdapterSupport$.MODULE$.sparkAdapter().createVariantValueWriter(
dataType,
valueBytes -> consumeField("value", 0, () -> recordConsumer.addBinary(Binary.fromReusedByteArray(valueBytes))),
metadataBytes -> consumeField("metadata", 1, () -> recordConsumer.addBinary(Binary.fromReusedByteArray(metadataBytes)))
);
return (row, ordinal) -> {
consumeGroup(() -> variantWriter.accept(row, ordinal));
};
} else if (dataType instanceof DecimalType) {
return (row, ordinal) -> {
int precision = ((DecimalType) dataType).precision();
Expand Down Expand Up @@ -337,6 +427,9 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) {
}
});
};
} else if (dataType instanceof StructType
&& SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantShreddingStruct((StructType) dataType)) {
return makeShreddedVariantWriter((StructType) dataType);
} else if (dataType instanceof StructType) {
StructType structType = (StructType) dataType;
ValueWriter[] fieldWriters = getFieldWriters(structType, resolvedSchema);
Expand All @@ -349,6 +442,33 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) {
}
}

/**
* Creates a ValueWriter for a shredded Variant column.
* This writer converts a Variant value into its shredded components (metadata, value, typed_value) and writes them to Parquet.
*
* @param shreddedStructType The shredded StructType (with shredding metadata)
* @return A ValueWriter that handles shredded Variant writing
*/
private ValueWriter makeShreddedVariantWriter(StructType shreddedStructType) {
// Create writers for the shredded struct fields
// The shreddedStructType contains: metadata (binary), value (binary), typed_value (optional)
ValueWriter[] shreddedFieldWriters = Arrays.stream(shreddedStructType.fields())
.map(field -> makeWriter(null, field.dataType()))
.toArray(ValueWriter[]::new);

// Use the SparkAdapter to create a shredded variant writer that converts Variant to shredded components
BiConsumer<SpecializedGetters, Integer> shreddedWriter = SparkAdapterSupport$.MODULE$.sparkAdapter()
.createShreddedVariantWriter(
shreddedStructType,
shreddedRow -> {
// Write the shredded row as a group
consumeGroup(() -> writeFields(shreddedRow, shreddedStructType, shreddedFieldWriters));
}
);

return shreddedWriter::accept;
}

private ValueWriter twoLevelArrayWriter(String repeatedFieldName, ValueWriter elementWriter) {
return (row, ordinal) -> {
ArrayData array = row.getArray(ordinal);
Expand Down Expand Up @@ -510,6 +630,13 @@ private Type convertField(HoodieSchema fieldSchema, StructField structField, Typ
.as(LogicalTypeAnnotation.stringType()).named(structField.name());
} else if (dataType == DataTypes.BinaryType) {
return Types.primitive(BINARY, repetition).named(structField.name());
} else if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
return SparkAdapterSupport$.MODULE$.sparkAdapter().convertVariantFieldToParquetType(
dataType,
structField.name(),
resolvedSchema,
repetition
);
} else if (dataType instanceof DecimalType) {
int precision = ((DecimalType) dataType).precision();
int scale = ((DecimalType) dataType).scale();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap, filterIsSafeForPrimaryKey, getAppliedRequiredSchema}
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.spark.sql

import org.apache.hudi.{HoodieUnsafeRDD}
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.types.StructType

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@

package org.apache.spark.sql.avro

import org.apache.hudi.common.schema.HoodieSchema.TimePrecision
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.schema.{HoodieJsonProperties, HoodieSchema, HoodieSchemaField, HoodieSchemaType}
import org.apache.hudi.common.schema.HoodieSchema.TimePrecision

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.Decimal.minBytesForPrecision

import scala.collection.JavaConverters._

/**
* This object contains methods that are used to convert HoodieSchema to Spark SQL schemas and vice versa.
* Object containing methods to convert HoodieSchema to Spark SQL schemas and vice versa.
*
* This provides direct conversion between HoodieSchema and Spark DataType
* without going through Avro Schema intermediary.
*
* Version-specific types (like VariantType in Spark >4.x) are handled via SparkAdapterSupport.
*
* NOTE: the package of this class is intentionally kept as "org.apache.spark.sql.avro" which is similar to the existing
* Spark Avro connector's SchemaConverters.scala
* (https://github.com/apache/spark/blob/master/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala).
* The reason for this is so that Spark 3.3 is able to access private spark sql type classes like TimestampNTZType.
*/

@DeveloperApi
object HoodieSparkSchemaConverters {
object HoodieSparkSchemaConverters extends SparkAdapterSupport {

/**
* Internal wrapper for SQL data type and nullability.
Expand Down Expand Up @@ -118,7 +121,12 @@ object HoodieSparkSchemaConverters {
HoodieSchema.createRecord(recordName, nameSpace, null, fields.asJava)
}

case other => throw new IncompatibleSchemaException(s"Unexpected Spark DataType: $other")
// VARIANT type (Spark >4.x only), which will be handled via SparkAdapter
case other if sparkAdapter.isVariantType(other) =>
HoodieSchema.createVariant(recordName, nameSpace, null)

case other =>
throw new IncompatibleSchemaException(s"Unexpected Spark DataType: $other")
}

// Wrap with null union if nullable (and not already a union)
Expand All @@ -129,6 +137,9 @@ object HoodieSparkSchemaConverters {
}
}

/**
* Helper method to convert HoodieSchema to Catalyst DataType.
*/
private def toSqlTypeHelper(hoodieSchema: HoodieSchema, existingRecordNames: Set[String]): SchemaType = {
hoodieSchema.getType match {
// Primitive types
Expand Down Expand Up @@ -241,7 +252,16 @@ object HoodieSparkSchemaConverters {
}
}

case other => throw new IncompatibleSchemaException(s"Unsupported HoodieSchemaType: $other")
// VARIANT type (Spark >4.x only), which will be handled via SparkAdapter
// TODO: Check if internalSchema will throw any errors here: #18021
case HoodieSchemaType.VARIANT =>
sparkAdapter.getVariantDataType match {
case Some(variantType) => SchemaType(variantType, nullable = false)
case None => throw new IncompatibleSchemaException("VARIANT type is only supported in Spark 4.0+")
}

case other =>
throw new IncompatibleSchemaException(s"Unsupported HoodieSchemaType: $other")
}
}

Expand Down
Loading
Loading