Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ public static DataType convertToDataType(HoodieSchema hoodieSchema) {
return convertRecord(hoodieSchema);
case UNION:
return convertUnion(hoodieSchema);
case VARIANT:
return convertVariant(hoodieSchema);
default:
throw new IllegalArgumentException("Unsupported HoodieSchemaType: " + type);
}
Expand Down Expand Up @@ -445,6 +447,26 @@ private static DataType convertUnion(HoodieSchema schema) {
return nullable ? rawDataType.nullable() : rawDataType;
}

/**
* Converts a Variant schema to Flink's ROW type.
* Variant is represented as ROW<`value` BYTES, `metadata` BYTES> in Flink.
* // TODO: We are only supporting unshredded for now, support shredded in the future
*
* @param schema HoodieSchema to convert (must be a VARIANT type)
* @return DataType representing the Variant as a ROW with binary fields
*/
private static DataType convertVariant(HoodieSchema schema) {
if (schema.getType() != HoodieSchemaType.VARIANT) {
throw new IllegalStateException("Expected HoodieSchema.Variant but got: " + schema.getClass());
}

// Variant is stored as a struct with two binary fields: value and metadata
return DataTypes.ROW(
DataTypes.FIELD("value", DataTypes.BYTES().notNull()),
DataTypes.FIELD("metadata", DataTypes.BYTES().notNull())
).notNull();
}

/**
* Returns true if all the types are RECORD type with same number of fields.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

import scala.Enumeration;
import scala.Function1;
Expand Down Expand Up @@ -281,6 +282,18 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) {
} else if (dataType == DataTypes.BinaryType) {
return (row, ordinal) -> recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getBinary(ordinal)));
} else if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need something similar in HoodieAvroWriteSupport?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think so, i have test that uses HoodieRecordType.{AVRO, SPARK}. They should trigger both write support and it seems there are no test failures.

In Avro, Variant is already an Avro record from HoodieSchema.createVariant. Where Fields: value (bytes), metadata (bytes).

IIUC, Parquet's AvroWriteSupport handles this automatically as it will know how to convert:

  • Avro record -> Parquet group
  • Avro bytes -> Parquet binary

HoodieAvroWriteSupport just wraps AvroWriteSupport to add bloom filter support and does not override write logic.

In the Spark Row path, custom handling is needed because Spark's VariantType requires special APIs (createVariantValueWriter) to extract the raw bytes as there are no automatic Spark VariantType -> Parquet conversion from what i can see in our code.

// Maps VariantType to a group containing 'metadata' and 'value' fields.
// This ensures Spark 4.0 compatibility and supports both Shredded and Unshredded schemas.
// Note: We intentionally omit 'typed_value' for shredded variants as this writer only accesses raw binary blobs.
BiConsumer<SpecializedGetters, Integer> variantWriter = SparkAdapterSupport$.MODULE$.sparkAdapter().createVariantValueWriter(
dataType,
valueBytes -> consumeField("value", 0, () -> recordConsumer.addBinary(Binary.fromConstantByteArray(valueBytes))),
metadataBytes -> consumeField("metadata", 1, () -> recordConsumer.addBinary(Binary.fromConstantByteArray(metadataBytes)))
);
return (row, ordinal) -> {
consumeGroup(() -> variantWriter.accept(row, ordinal));
};
} else if (dataType instanceof DecimalType) {
return (row, ordinal) -> {
int precision = ((DecimalType) dataType).precision();
Expand Down Expand Up @@ -510,6 +523,13 @@ private Type convertField(HoodieSchema fieldSchema, StructField structField, Typ
.as(LogicalTypeAnnotation.stringType()).named(structField.name());
} else if (dataType == DataTypes.BinaryType) {
return Types.primitive(BINARY, repetition).named(structField.name());
} else if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
return SparkAdapterSupport$.MODULE$.sparkAdapter().convertVariantFieldToParquetType(
dataType,
structField.name(),
resolvedSchema,
repetition
);
} else if (dataType instanceof DecimalType) {
int precision = ((DecimalType) dataType).precision();
int scale = ((DecimalType) dataType).scale();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap, filterIsSafeForPrimaryKey, getAppliedRequiredSchema}
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.spark.sql

import org.apache.hudi.{HoodieUnsafeRDD}
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.types.StructType

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@

package org.apache.spark.sql.avro

import org.apache.hudi.common.schema.HoodieSchema.TimePrecision
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.schema.{HoodieJsonProperties, HoodieSchema, HoodieSchemaField, HoodieSchemaType}
import org.apache.hudi.common.schema.HoodieSchema.TimePrecision

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.Decimal.minBytesForPrecision

import scala.collection.JavaConverters._

/**
* This object contains methods that are used to convert HoodieSchema to Spark SQL schemas and vice versa.
* Object containing methods to convert HoodieSchema to Spark SQL schemas and vice versa.
*
* This provides direct conversion between HoodieSchema and Spark DataType
* without going through Avro Schema intermediary.
*
* Version-specific types (like VariantType in Spark >4.x) are handled via SparkAdapterSupport.
*
* NOTE: the package of this class is intentionally kept as "org.apache.spark.sql.avro" which is similar to the existing
* Spark Avro connector's SchemaConverters.scala
* (https://github.com/apache/spark/blob/master/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala).
* The reason for this is so that Spark 3.3 is able to access private spark sql type classes like TimestampNTZType.
*/

@DeveloperApi
object HoodieSparkSchemaConverters {
object HoodieSparkSchemaConverters extends SparkAdapterSupport {

/**
* Internal wrapper for SQL data type and nullability.
Expand Down Expand Up @@ -118,7 +121,12 @@ object HoodieSparkSchemaConverters {
HoodieSchema.createRecord(recordName, nameSpace, null, fields.asJava)
}

case other => throw new IncompatibleSchemaException(s"Unexpected Spark DataType: $other")
// VARIANT type (Spark >4.x only), which will be handled via SparkAdapter
case other if sparkAdapter.isVariantType(other) =>
HoodieSchema.createVariant(recordName, nameSpace, null)

case other =>
throw new IncompatibleSchemaException(s"Unexpected Spark DataType: $other")
}

// Wrap with null union if nullable (and not already a union)
Expand All @@ -129,6 +137,9 @@ object HoodieSparkSchemaConverters {
}
}

/**
* Helper method to convert HoodieSchema to Catalyst DataType.
*/
private def toSqlTypeHelper(hoodieSchema: HoodieSchema, existingRecordNames: Set[String]): SchemaType = {
hoodieSchema.getType match {
// Primitive types
Expand Down Expand Up @@ -241,7 +252,16 @@ object HoodieSparkSchemaConverters {
}
}

case other => throw new IncompatibleSchemaException(s"Unsupported HoodieSchemaType: $other")
// VARIANT type (Spark >4.x only), which will be handled via SparkAdapter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For lower spark versions, do we want to just return the underlying struct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation in Spark3.5, this is dead code as the variant column will not have a logicalType of variant. It's a record instead.

The reason why it's dead code is because is:

  1. The tableSchema is resolved from schemaSpec.map(s => convertToHoodieSchema(s, tableName)) in HoodieBaseHadoopFsRelationFactory.
  2. This converts a struct where the variant column is a struct of metadata and value into HoodieSchema.
  3. Hence, when the code flow reaches HoodieSparkSchemaConverters, the variant column with HoodieSchema will not have the variant logical type to fall into this code path. It will resolve to the RECORD path instead.

This might change if the table has an internalSchema though, which i think we need to investigate. I'll create an issue for this!

#18021

Will inline the issue too.

// TODO: Check if internalSchema will throw any errors here: #18021
case HoodieSchemaType.VARIANT =>
sparkAdapter.getVariantDataType match {
case Some(variantType) => SchemaType(variantType, nullable = false)
case None => throw new IncompatibleSchemaException("VARIANT type is only supported in Spark 4.0+")
}

case other =>
throw new IncompatibleSchemaException(s"Unsupported HoodieSchemaType: $other")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hudi.HoodieSparkUtils

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampNTZType}

object HoodieParquetFileFormatHelper {
Expand All @@ -46,7 +48,7 @@ object HoodieParquetFileFormatHelper {
val requiredType = f.dataType
if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType, fileStructMap(f.name))) {
val readerType = addMissingFields(requiredType, fileStructMap(f.name))
implicitTypeChangeInfo.put(new Integer(requiredSchema.fieldIndex(f.name)), org.apache.hudi.common.util.collection.Pair.of(requiredType, readerType))
implicitTypeChangeInfo.put(Integer.valueOf(requiredSchema.fieldIndex(f.name)), org.apache.hudi.common.util.collection.Pair.of(requiredType, readerType))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new Integer is marked for removal in Java 9. Using Integer#valueOf instead. This could be a separate PR.

StructField(f.name, readerType, f.nullable)
} else {
f
Expand All @@ -55,35 +57,43 @@ object HoodieParquetFileFormatHelper {
(implicitTypeChangeInfo, StructType(sparkRequestStructFields))
}

def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match {
case (requiredType, fileType) if requiredType == fileType => true
def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = {
// Check if adapter can handle this comparison (e.g., VariantType in Spark 4.0+)
val adapterResult = HoodieSparkUtils.sparkAdapter.isDataTypeEqualForParquet(requiredType, fileType)
if (adapterResult.isDefined) {
adapterResult.get
} else {
(requiredType, fileType) match {
case (requiredType, fileType) if requiredType == fileType => true

// prevent illegal cast
case (TimestampNTZType, LongType) => true
// prevent illegal cast
case (TimestampNTZType, LongType) => true

case (ArrayType(rt, _), ArrayType(ft, _)) =>
// Do not care about nullability as schema evolution require fields to be nullable
isDataTypeEqual(rt, ft)
case (ArrayType(rt, _), ArrayType(ft, _)) =>
// Do not care about nullability as schema evolution require fields to be nullable
isDataTypeEqual(rt, ft)

case (MapType(requiredKey, requiredValue, _), MapType(fileKey, fileValue, _)) =>
// Likewise, do not care about nullability as schema evolution require fields to be nullable
isDataTypeEqual(requiredKey, fileKey) && isDataTypeEqual(requiredValue, fileValue)
case (MapType(requiredKey, requiredValue, _), MapType(fileKey, fileValue, _)) =>
// Likewise, do not care about nullability as schema evolution require fields to be nullable
isDataTypeEqual(requiredKey, fileKey) && isDataTypeEqual(requiredValue, fileValue)

case (StructType(requiredFields), StructType(fileFields)) =>
// Find fields that are in requiredFields and fileFields as they might not be the same during add column + change column operations
val commonFieldNames = requiredFields.map(_.name) intersect fileFields.map(_.name)
case (StructType(requiredFields), StructType(fileFields)) =>
// Find fields that are in requiredFields and fileFields as they might not be the same during add column + change column operations
val commonFieldNames = requiredFields.map(_.name) intersect fileFields.map(_.name)

// Need to match by name instead of StructField as name will stay the same whilst type may change
val fileFilteredFields = fileFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name)
val requiredFilteredFields = requiredFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name)
// Need to match by name instead of StructField as name will stay the same whilst type may change
val fileFilteredFields = fileFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name)
val requiredFilteredFields = requiredFields.filter(f => commonFieldNames.contains(f.name)).sortWith(_.name < _.name)

// Sorting ensures that the same field names are being compared for type differences
requiredFilteredFields.zip(fileFilteredFields).forall {
case (requiredField, fileFilteredField) =>
isDataTypeEqual(requiredField.dataType, fileFilteredField.dataType)
}
// Sorting ensures that the same field names are being compared for type differences
requiredFilteredFields.zip(fileFilteredFields).forall {
case (requiredField, fileFilteredField) =>
isDataTypeEqual(requiredField.dataType, fileFilteredField.dataType)
}

case _ => false
case _ => false
}
}
}

def addMissingFields(requiredType: DataType, fileType: DataType): DataType = (requiredType, fileType) match {
Expand Down Expand Up @@ -195,3 +205,4 @@ object HoodieParquetFileFormatHelper {
}
}
}

Loading
Loading