From b860aa7912ee06d248525fbcc05310c178357487 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Mon, 9 Mar 2026 05:34:41 +0000 Subject: [PATCH] [GLUTEN-11550][VL] Enable GlutenParquetVariantShreddingSuite for Spark 4.1 Fallback to vanilla Spark when Parquet files contain variant logical type annotations and spark.sql.parquet.ignoreVariantAnnotation is false (default). Spark 4.1 introduced Parquet variant logical type annotations. When reading a variant-annotated file with a non-variant schema, Spark's ParquetSchemaConverter validates the annotation and throws an error. Velox native reader does not check variant annotations, so the scan must fall back to Spark for correct behavior. Changes: - Add shouldFallbackForParquetVariantAnnotation shim method (SparkShims) - Implement variant annotation detection in Spark41Shims - Add validateVariantAnnotation in ParquetMetadataUtils (not gated by parquetMetadataValidationEnabled since it is a correctness issue) - Enable GlutenParquetVariantShreddingSuite in spark41 VeloxTestSettings Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../backendsapi/velox/VeloxBackend.scala | 8 ++++ .../gluten/utils/ParquetMetadataUtils.scala | 42 +++++++++++++++++++ .../utils/velox/VeloxTestSettings.scala | 2 +- .../apache/gluten/sql/shims/SparkShims.scala | 2 + .../sql/shims/spark41/Spark41Shims.scala | 18 +++++++- 5 files changed, 70 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 24d08a57920d..c64cc3d273a2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -203,6 +203,14 @@ object VeloxBackendSettings extends BackendSettingsApi { return None } val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit + // Variant annotation check is always performed (not gated by + // parquetMetadataValidationEnabled) because it is a correctness issue. + val variantAnnotationResult = + ParquetMetadataUtils.validateVariantAnnotation(rootPaths, hadoopConf, fileLimit) + if (variantAnnotationResult.isDefined) { + return variantAnnotationResult.map( + reason => s"Detected unsupported metadata in parquet files: $reason") + } val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get) val parquetMetadataValidationResult = ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index ab76cba4aa5d..dcb1051e664d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -162,6 +162,48 @@ object ParquetMetadataUtils extends Logging { None } + /** + * Checks whether Parquet files contain variant logical type annotations that require fallback to + * vanilla Spark. This check is always performed (not gated by parquetMetadataValidationEnabled) + * because it is a correctness issue: Velox native reader does not check variant annotations. + */ + def validateVariantAnnotation( + rootPaths: Seq[String], + hadoopConf: Configuration, + fileLimit: Int + ): Option[String] = { + rootPaths.foreach { + rootPath => + val fs = new Path(rootPath).getFileSystem(hadoopConf) + try { + val filesIterator = fs.listFiles(new Path(rootPath), true) + var checkedFileCount = 0 + while (filesIterator.hasNext && checkedFileCount < fileLimit) { + val fileStatus = filesIterator.next() + checkedFileCount += 1 + try { + val footer = ParquetFooterReaderShim.readFooter( + hadoopConf, + fileStatus, + ParquetMetadataConverter.NO_FILTER) + if ( + SparkShimLoader.getSparkShims + .shouldFallbackForParquetVariantAnnotation(footer) + ) { + return Some("Variant annotation detected in Parquet file.") + } + } catch { + case _: Exception => // ignore + } + } + } catch { + case e: Exception => + logWarning("Catch exception when checking variant annotation", e) + } + } + None + } + private def isTimezoneFoundInMetadata( footer: ParquetMetadata, parquetOptions: ParquetOptions): Option[String] = { diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 0dadfa1d0bd8..8b4886d376c7 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -397,7 +397,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") .exclude("parquet widening conversion ShortType -> DoubleType") - // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure + enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index 1f6d015393f1..2f5350f38a08 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -237,6 +237,8 @@ trait SparkShims { def isParquetFileEncrypted(footer: ParquetMetadata): Boolean + def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false + def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]] diff --git a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala index 0e3e752f9970..de894073e823 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types._ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata} import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType} import java.time.ZoneOffset import java.util.{Map => JMap} @@ -571,6 +571,22 @@ class Spark41Shims extends SparkShims { } } + override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = { + if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) { + return false + } + containsVariantAnnotation(footer.getFileMetaData.getSchema) + } + + private def containsVariantAnnotation(groupType: GroupType): Boolean = { + groupType.getFields.asScala.exists { + field => + Option(field.getLogicalTypeAnnotation) + .exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) || + (!field.isPrimitive && containsVariantAnnotation(field.asGroupType())) + } + } + override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]