diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 24d08a57920d..c64cc3d273a2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -203,6 +203,14 @@ object VeloxBackendSettings extends BackendSettingsApi { return None } val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit + // Variant annotation check is always performed (not gated by + // parquetMetadataValidationEnabled) because it is a correctness issue. + val variantAnnotationResult = + ParquetMetadataUtils.validateVariantAnnotation(rootPaths, hadoopConf, fileLimit) + if (variantAnnotationResult.isDefined) { + return variantAnnotationResult.map( + reason => s"Detected unsupported metadata in parquet files: $reason") + } val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get) val parquetMetadataValidationResult = ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index ab76cba4aa5d..dcb1051e664d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -162,6 +162,48 @@ object ParquetMetadataUtils extends Logging { None } + /** + * Checks whether Parquet files contain variant logical type annotations that require fallback to + * vanilla Spark. This check is always performed (not gated by parquetMetadataValidationEnabled) + * because it is a correctness issue: Velox native reader does not check variant annotations. + */ + def validateVariantAnnotation( + rootPaths: Seq[String], + hadoopConf: Configuration, + fileLimit: Int + ): Option[String] = { + rootPaths.foreach { + rootPath => + val fs = new Path(rootPath).getFileSystem(hadoopConf) + try { + val filesIterator = fs.listFiles(new Path(rootPath), true) + var checkedFileCount = 0 + while (filesIterator.hasNext && checkedFileCount < fileLimit) { + val fileStatus = filesIterator.next() + checkedFileCount += 1 + try { + val footer = ParquetFooterReaderShim.readFooter( + hadoopConf, + fileStatus, + ParquetMetadataConverter.NO_FILTER) + if ( + SparkShimLoader.getSparkShims + .shouldFallbackForParquetVariantAnnotation(footer) + ) { + return Some("Variant annotation detected in Parquet file.") + } + } catch { + case _: Exception => // ignore + } + } + } catch { + case e: Exception => + logWarning("Catch exception when checking variant annotation", e) + } + } + None + } + private def isTimezoneFoundInMetadata( footer: ParquetMetadata, parquetOptions: ParquetOptions): Option[String] = { diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 0dadfa1d0bd8..8b4886d376c7 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -397,7 +397,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") .exclude("parquet widening conversion ShortType -> DoubleType") - // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure + enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index 1f6d015393f1..2f5350f38a08 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -237,6 +237,8 @@ trait SparkShims { def isParquetFileEncrypted(footer: ParquetMetadata): Boolean + def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false + def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]] diff --git a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala index 0e3e752f9970..de894073e823 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types._ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata} import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType} import java.time.ZoneOffset import java.util.{Map => JMap} @@ -571,6 +571,22 @@ class Spark41Shims extends SparkShims { } } + override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = { + if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) { + return false + } + containsVariantAnnotation(footer.getFileMetaData.getSchema) + } + + private def containsVariantAnnotation(groupType: GroupType): Boolean = { + groupType.getFields.asScala.exists { + field => + Option(field.getLogicalTypeAnnotation) + .exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) || + (!field.isPrimitive && containsVariantAnnotation(field.asGroupType())) + } + } + override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]