This project implements a flat User-Defined Type (UDT) framework for Apache Spark. It simplifies working with complex types in Spark SQL by flattening them into structured formats, enabling efficient serialization and deserialization into Catalyst's internal representation.
To include this library in your project, clone this repository and install it locally.
For Maven users, add the following dependency to your pom.xml:
<dependency>
<groupId>github.avinoamn</groupId>
<artifactId>spark-flat-udt_${spark.major.version}_${scala.major.version}</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>- Flattened Representation: Converts nested types into flat Spark SQL schemas for easier manipulation.
- Custom Serialization: Full control over serialization and deserialization of complex Scala/Java types.
- Ease of Use: Designed to simplify UDT definitions for intricate types, such as nested
Eitherconstructs.
FlatUDTType: Represents a single field in aFlatUDT.FlatUDTTypeFactory: Factory for creatingFlatUDTTypeinstances.FlatUDT: Abstract class that aggregates multipleFlatUDTTypeinstances, defining schemas and handling serialization / deserialization.
Let's walk through how to use this library with a complex type:
type Property = Either[Either[String, Boolean], Either[Long, Double]]We will create a UDT to handle this type in three steps.
Create a factory to generate FlatUDTType instances for each supported type in Property.
import github.avinoamn.spark.sql.UDT.flat.{FlatUDTType, FlatUDTTypeFactory}
object PropertyTypeFactory extends FlatUDTTypeFactory[Property] {
override val typesCount: Int = 4
override def defaultCtor[V]: Option[V => Property] = Some {
case v: String => Left(Left(v))
case v: Boolean => Left(Right(v))
case v: Long => Right(Left(v))
case v: Double => Right(Right(v))
}
override def defaultDtor[V]: Option[Property => Option[V]] = Some {
case Left(Left(v: V)) => Some(v)
case Left(Right(v: V)) => Some(v)
case Right(Left(v: V)) => Some(v)
case Right(Right(v: V)) => Some(v)
case _ => None
}
}Create a UDT class that uses the factory to define a schema and serialization logic.
import github.avinoamn.spark.sql.UDT.flat.{FlatUDT, FlatUDTType}
import org.apache.spark.sql.types.UDTRegistration
class PropertyUDT extends FlatUDT[Property] {
override def types: Array[FlatUDTType[Property, _]] = PropertyUDT.types
}
object PropertyUDT {
val types: Array[FlatUDTType[Property, _]] = Array(
PropertyFactory.createType[String](),
PropertyFactory.createType[Boolean](),
PropertyFactory.createType[Long](),
PropertyFactory.createType[Double]()
)
def register(): Unit = UDTRegistration.register(classOf[Property].getName, classOf[PropertyUDT].getName)
}Now, you can register and use the UDT in your Spark application.
case class Data(property: Property)
object MainApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("PropertyUDT Example")
.master("local[*]")
.getOrCreate()
// Register the UDT
PropertyUDT.register()
import spark.implicits._
// Sample data
val data: Seq[Data] = Seq(
Data(Left(Left("example string"))),
Data(Left(Right(true))),
Data(Right(Left(123L))),
Data(Right(Right(45.67)))
)
// Convert data to Dataset
val ds: Dataset[Data] = data.toDS()
ds.show()
// Serialize data using UDT
val df = ds.toDF("property")
df.printSchema()
df.show()
}
}The FlatUDT framework represents the UDT as a StructType. For Property, the schema would look like so:
StructType(
StructField("typeIndex", StringType, true),
StructField("0", StringType, true),
StructField("1", BooleanType, true),
StructField("2", LongType, true),
StructField("3", DoubleType, true)
)
typeIndex: Identifies the variant of the Either type.- Additional Fields: One field per type variant.
-
Type Mapping: Maps Scala types to Spark SQL Catalyst types using
ScalaReflection. -
Serialization: Converts user-defined types into Catalyst's
InternalRow. -
Deserialization: Reconstructs user-defined types from Catalyst's
InternalRow. -
Dynamic Schema: Automatically generates a schema for the given types.
The fromCatalystWrapper function is a hook provided in the FlatUDTTypeFactory class to allow additional transformation or processing of values when deserializing data from Catalyst's internal format.
Why is it needed?
-
Spark's Catalyst engine uses its own optimized format for data processing, which may not directly map to Scala types.
-
Some complex types (e.g., custom objects or nested data) might require additional conversion logic after being deserialized from Catalyst format.
How does it work?
- It takes the result of Spark's internal deserialization (
fromCatalyst) and applies any custom logic defined by the user. - By default, it returns the value unchanged, but users can override it to provide custom handling.
Example
If the data stored in Catalyst format is a UTF8String, you can convert it to a Scala String during deserialization using fromCatalystWrapper:
override def fromCatalystWrapper(fromCatalyst: Any): Any = {
fromCatalyst match {
case utf8: org.apache.spark.unsafe.types.UTF8String => utf8.toString
case other => other
}
}This allows you to adapt Catalyst's internal representations to your application's requirements seamlessly.