diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 2ab3af7ceaad..227896b33105 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -208,6 +208,14 @@ object VeloxBackendSettings extends BackendSettingsApi { return None } val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit + // Variant annotation check is always performed (not gated by + // parquetMetadataValidationEnabled) because it is a correctness issue. + val variantAnnotationResult = + ParquetMetadataUtils.validateVariantAnnotation(rootPaths, hadoopConf, fileLimit) + if (variantAnnotationResult.isDefined) { + return variantAnnotationResult.map( + reason => s"Detected unsupported metadata in parquet files: $reason") + } val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get) val parquetMetadataValidationResult = ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 338bef20dfe5..787e8b599480 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -977,6 +977,13 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { ExpressionNames.TO_JSON, ToJsonRestrictions.NOT_SUPPORT_WITH_OPTIONS) } + expr.child.dataType match { + case _: StructType | _: MapType | _: ArrayType => + case _ => + GlutenExceptionUtil.throwsNotFullySupported( + ExpressionNames.TO_JSON, + ToJsonRestrictions.NOT_SUPPORT_UNSUPPORTED_CHILD_TYPE) + } if ( !SQLConf.get.caseSensitiveAnalysis && ExpressionUtils.hasUppercaseStructFieldName(child.dataType) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 8b2193b58042..0ba86c16c782 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -121,6 +121,15 @@ object VeloxValidatorApi { case map: MapType => validateSchema(map.keyType).orElse(validateSchema(map.valueType)) case struct: StructType => + // Detect variant shredded struct produced by Spark's PushVariantIntoScan. + // These structs have all fields annotated with __VARIANT_METADATA_KEY metadata. + // Velox cannot read the variant shredding encoding in Parquet files. + if ( + struct.fields.nonEmpty && + struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY")) + ) { + return Some(s"Variant shredded struct is not supported: $struct") + } struct.foreach { field => val reason = validateSchema(field.dataType) diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/ExpressionRestrictions.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/ExpressionRestrictions.scala index 04f776fa4266..6af6b3992252 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/ExpressionRestrictions.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/ExpressionRestrictions.scala @@ -68,10 +68,16 @@ object ToJsonRestrictions extends ExpressionRestrictions { s"When 'spark.sql.caseSensitive = false', ${ExpressionNames.TO_JSON} produces unexpected" + s" result for struct field with uppercase name" + val NOT_SUPPORT_UNSUPPORTED_CHILD_TYPE: String = + s"${ExpressionNames.TO_JSON} only supports StructType, MapType and ArrayType child in Velox" + override val functionName: String = ExpressionNames.TO_JSON override val restrictionMessages: Array[String] = - Array(NOT_SUPPORT_WITH_OPTIONS, NOT_SUPPORT_UPPERCASE_STRUCT) + Array( + NOT_SUPPORT_WITH_OPTIONS, + NOT_SUPPORT_UPPERCASE_STRUCT, + NOT_SUPPORT_UNSUPPORTED_CHILD_TYPE) } object Unbase64Restrictions extends ExpressionRestrictions { diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index ab76cba4aa5d..dcb1051e664d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -162,6 +162,48 @@ object ParquetMetadataUtils extends Logging { None } + /** + * Checks whether Parquet files contain variant logical type annotations that require fallback to + * vanilla Spark. This check is always performed (not gated by parquetMetadataValidationEnabled) + * because it is a correctness issue: Velox native reader does not check variant annotations. + */ + def validateVariantAnnotation( + rootPaths: Seq[String], + hadoopConf: Configuration, + fileLimit: Int + ): Option[String] = { + rootPaths.foreach { + rootPath => + val fs = new Path(rootPath).getFileSystem(hadoopConf) + try { + val filesIterator = fs.listFiles(new Path(rootPath), true) + var checkedFileCount = 0 + while (filesIterator.hasNext && checkedFileCount < fileLimit) { + val fileStatus = filesIterator.next() + checkedFileCount += 1 + try { + val footer = ParquetFooterReaderShim.readFooter( + hadoopConf, + fileStatus, + ParquetMetadataConverter.NO_FILTER) + if ( + SparkShimLoader.getSparkShims + .shouldFallbackForParquetVariantAnnotation(footer) + ) { + return Some("Variant annotation detected in Parquet file.") + } + } catch { + case _: Exception => // ignore + } + } + } catch { + case e: Exception => + logWarning("Catch exception when checking variant annotation", e) + } + } + None + } + private def isTimezoneFoundInMetadata( footer: ParquetMetadata, parquetOptions: ParquetOptions): Option[String] = { diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 4f7c67daaad6..a898566fc5ee 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -853,8 +853,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenUDTRegistrationSuite] enableSuite[GlutenUnsafeRowSuite] enableSuite[GlutenUserDefinedTypeSuite] - // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures - // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures + enableSuite[GlutenVariantEndToEndSuite] + enableSuite[GlutenVariantShreddingSuite] enableSuite[GlutenVariantSuite] enableSuite[GlutenVariantWriteShreddingSuite] // TODO: 4.x enableSuite[GlutenXmlFunctionsSuite] // 10 failures diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 0dadfa1d0bd8..7b2e158e0f38 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -397,7 +397,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") .exclude("parquet widening conversion ShortType -> DoubleType") - // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure + enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure @@ -818,8 +818,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenUDTRegistrationSuite] enableSuite[GlutenUnsafeRowSuite] enableSuite[GlutenUserDefinedTypeSuite] - // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures - // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures + enableSuite[GlutenVariantEndToEndSuite] + enableSuite[GlutenVariantShreddingSuite] enableSuite[GlutenVariantSuite] enableSuite[GlutenVariantWriteShreddingSuite] // TODO: 4.x enableSuite[GlutenXmlFunctionsSuite] // 10 failures diff --git a/pom.xml b/pom.xml index df2c3a1a8eb4..d0c5398f27f1 100644 --- a/pom.xml +++ b/pom.xml @@ -167,6 +167,7 @@ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true + -Dfile.encoding=UTF-8 file:src/test/resources/log4j2.properties diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index 1f6d015393f1..2f5350f38a08 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -237,6 +237,8 @@ trait SparkShims { def isParquetFileEncrypted(footer: ParquetMetadata): Boolean + def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false + def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]] diff --git a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala index 0e3e752f9970..de894073e823 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types._ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata} import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType} import java.time.ZoneOffset import java.util.{Map => JMap} @@ -571,6 +571,22 @@ class Spark41Shims extends SparkShims { } } + override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = { + if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) { + return false + } + containsVariantAnnotation(footer.getFileMetaData.getSchema) + } + + private def containsVariantAnnotation(groupType: GroupType): Boolean = { + groupType.getFields.asScala.exists { + field => + Option(field.getLogicalTypeAnnotation) + .exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) || + (!field.isPrimitive && containsVariantAnnotation(field.asGroupType())) + } + } + override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] = file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]