Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ object VeloxBackendSettings extends BackendSettingsApi {
return None
}
val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
// Variant annotation check is always performed (not gated by
// parquetMetadataValidationEnabled) because it is a correctness issue.
val variantAnnotationResult =
ParquetMetadataUtils.validateVariantAnnotation(rootPaths, hadoopConf, fileLimit)
if (variantAnnotationResult.isDefined) {
return variantAnnotationResult.map(
reason => s"Detected unsupported metadata in parquet files: $reason")
}
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
val parquetMetadataValidationResult =
ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,13 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
ExpressionNames.TO_JSON,
ToJsonRestrictions.NOT_SUPPORT_WITH_OPTIONS)
}
expr.child.dataType match {
case _: StructType | _: MapType | _: ArrayType =>
case _ =>
GlutenExceptionUtil.throwsNotFullySupported(
ExpressionNames.TO_JSON,
ToJsonRestrictions.NOT_SUPPORT_UNSUPPORTED_CHILD_TYPE)
}
if (
!SQLConf.get.caseSensitiveAnalysis &&
ExpressionUtils.hasUppercaseStructFieldName(child.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ object VeloxValidatorApi {
case map: MapType =>
validateSchema(map.keyType).orElse(validateSchema(map.valueType))
case struct: StructType =>
// Detect variant shredded struct produced by Spark's PushVariantIntoScan.
// These structs have all fields annotated with __VARIANT_METADATA_KEY metadata.
// Velox cannot read the variant shredding encoding in Parquet files.
if (
struct.fields.nonEmpty &&
struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY"))
) {
return Some(s"Variant shredded struct is not supported: $struct")
}
struct.foreach {
field =>
val reason = validateSchema(field.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,16 @@ object ToJsonRestrictions extends ExpressionRestrictions {
s"When 'spark.sql.caseSensitive = false', ${ExpressionNames.TO_JSON} produces unexpected" +
s" result for struct field with uppercase name"

val NOT_SUPPORT_UNSUPPORTED_CHILD_TYPE: String =
s"${ExpressionNames.TO_JSON} only supports StructType, MapType and ArrayType child in Velox"

override val functionName: String = ExpressionNames.TO_JSON

override val restrictionMessages: Array[String] =
Array(NOT_SUPPORT_WITH_OPTIONS, NOT_SUPPORT_UPPERCASE_STRUCT)
Array(
NOT_SUPPORT_WITH_OPTIONS,
NOT_SUPPORT_UPPERCASE_STRUCT,
NOT_SUPPORT_UNSUPPORTED_CHILD_TYPE)
}

object Unbase64Restrictions extends ExpressionRestrictions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,48 @@ object ParquetMetadataUtils extends Logging {
None
}

/**
* Checks whether Parquet files contain variant logical type annotations that require fallback to
* vanilla Spark. This check is always performed (not gated by parquetMetadataValidationEnabled)
* because it is a correctness issue: Velox native reader does not check variant annotations.
*/
def validateVariantAnnotation(
rootPaths: Seq[String],
hadoopConf: Configuration,
fileLimit: Int
): Option[String] = {
rootPaths.foreach {
rootPath =>
val fs = new Path(rootPath).getFileSystem(hadoopConf)
try {
val filesIterator = fs.listFiles(new Path(rootPath), true)
var checkedFileCount = 0
while (filesIterator.hasNext && checkedFileCount < fileLimit) {
val fileStatus = filesIterator.next()
checkedFileCount += 1
try {
val footer = ParquetFooterReaderShim.readFooter(
hadoopConf,
fileStatus,
ParquetMetadataConverter.NO_FILTER)
if (
SparkShimLoader.getSparkShims
.shouldFallbackForParquetVariantAnnotation(footer)
) {
return Some("Variant annotation detected in Parquet file.")
}
} catch {
case _: Exception => // ignore
}
}
} catch {
case e: Exception =>
logWarning("Catch exception when checking variant annotation", e)
}
}
None
}

private def isTimezoneFoundInMetadata(
footer: ParquetMetadata,
parquetOptions: ParquetOptions): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,8 +853,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
enableSuite[GlutenUserDefinedTypeSuite]
// TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
// TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
enableSuite[GlutenVariantEndToEndSuite]
enableSuite[GlutenVariantShreddingSuite]
enableSuite[GlutenVariantSuite]
enableSuite[GlutenVariantWriteShreddingSuite]
// TODO: 4.x enableSuite[GlutenXmlFunctionsSuite] // 10 failures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("parquet widening conversion ShortType -> DecimalType(20,0)")
.exclude("parquet widening conversion ShortType -> DecimalType(38,0)")
.exclude("parquet widening conversion ShortType -> DoubleType")
// TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure
enableSuite[GlutenParquetVariantShreddingSuite]
// Generated suites for org.apache.spark.sql.execution.datasources.text
// TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
// TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure
Expand Down Expand Up @@ -818,8 +818,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
enableSuite[GlutenUserDefinedTypeSuite]
// TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
// TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
enableSuite[GlutenVariantEndToEndSuite]
enableSuite[GlutenVariantShreddingSuite]
enableSuite[GlutenVariantSuite]
enableSuite[GlutenVariantWriteShreddingSuite]
// TODO: 4.x enableSuite[GlutenXmlFunctionsSuite] // 10 failures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ trait SparkShims {

def isParquetFileEncrypted(footer: ParquetMetadata): Boolean

def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false

def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] =
Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.types._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata}
import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType}

import java.time.ZoneOffset
import java.util.{Map => JMap}
Expand Down Expand Up @@ -571,6 +571,22 @@ class Spark41Shims extends SparkShims {
}
}

override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = {
if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) {
return false
}
containsVariantAnnotation(footer.getFileMetaData.getSchema)
}

private def containsVariantAnnotation(groupType: GroupType): Boolean = {
groupType.getFields.asScala.exists {
field =>
Option(field.getLogicalTypeAnnotation)
.exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) ||
(!field.isPrimitive && containsVariantAnnotation(field.asGroupType()))
}
}

override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] =
file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]

Expand Down
Loading