From e85fa478d4fe273db3a243736aa788fa6c840c1e Mon Sep 17 00:00:00 2001 From: Chang chen Date: Tue, 3 Mar 2026 07:39:55 +0000 Subject: [PATCH 1/6] Fix SPARK-18108: exclude partition columns from HiveTableHandle dataColumns When Gluten creates HiveTableHandle, it was passing all columns (including partition columns) as dataColumns. This caused Velox's convertType() to validate partition column types against the Parquet file's physical types, failing when they differ (e.g., LongType in file vs IntegerType from partition inference). Fix: build dataColumns excluding partition columns (ColumnType::kPartitionKey). Partition column values come from the partition path, not from the file. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 27 ++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 834127e20cc1..f754f4ae0bbf 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1495,6 +1495,31 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // The columns present in the table, if not available default to the baseSchema. auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema : baseSchema; + // Build dataColumns from tableSchema, excluding partition columns. + // HiveTableHandle::dataColumns() is used as fileSchema for the reader. + // Partition columns should not be validated against the file's physical types + // (their values come from the partition path, not from the file). + std::unordered_set partitionColNames; + for (int idx = 0; idx < colNameList.size(); idx++) { + if (columnTypes[idx] == ColumnType::kPartitionKey) { + partitionColNames.insert(colNameList[idx]); + } + } + RowTypePtr dataColumns; + if (partitionColNames.empty()) { + dataColumns = tableSchema; + } else { + std::vector dataColNames; + std::vector dataColTypes; + for (int idx = 0; idx < tableSchema->size(); idx++) { + if (partitionColNames.find(tableSchema->nameOf(idx)) == partitionColNames.end()) { + dataColNames.push_back(tableSchema->nameOf(idx)); + dataColTypes.push_back(tableSchema->childAt(idx)); + } + } + dataColumns = ROW(std::move(dataColNames), std::move(dataColTypes)); + } + connector::ConnectorTableHandlePtr tableHandle; auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; auto connectorId = kHiveConnectorId; @@ -1506,7 +1531,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } common::SubfieldFilters subfieldFilters; tableHandle = std::make_shared( - connectorId, "hive_table", std::move(subfieldFilters), remainingFilter, tableSchema); + connectorId, "hive_table", std::move(subfieldFilters), remainingFilter, dataColumns); // Get assignments and out names. std::vector outNames; From ec7a482fd63d8e4aa3c8165e24f162cac41ca369 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Tue, 3 Mar 2026 11:14:13 +0000 Subject: [PATCH 2/6] Update VeloxTestSettings for Velox PR2 With OAP INT narrowing commit replaced by upstream Velox PR #15173: - Remove 2 excludes now passing: LongType->IntegerType, LongType->DateType - Add 2 excludes for new failures: IntegerType->ShortType (OAP removed) Exclude 63 (net unchanged: -2 +2). Test results: 21 pass / 63 ignored. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../org/apache/gluten/utils/velox/VeloxTestSettings.scala | 4 ++-- .../org/apache/gluten/utils/velox/VeloxTestSettings.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 4f7c67daaad6..828e22eb88c7 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -363,11 +363,9 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("unsupported parquet conversion IntegerType -> DecimalType(10,1)") .exclude("unsupported parquet conversion IntegerType -> DecimalType(5,0)") .exclude("unsupported parquet conversion IntegerType -> DecimalType(9,0)") - .exclude("unsupported parquet conversion LongType -> DateType") .exclude("unsupported parquet conversion LongType -> DecimalType(10,0)") .exclude("unsupported parquet conversion LongType -> DecimalType(19,0)") .exclude("unsupported parquet conversion LongType -> DecimalType(20,1)") - .exclude("unsupported parquet conversion LongType -> IntegerType") .exclude("unsupported parquet conversion ShortType -> DecimalType(3,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(4,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)") @@ -379,6 +377,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion IntegerType -> DecimalType(20,0)") .exclude("parquet widening conversion IntegerType -> DecimalType(38,0)") .exclude("parquet widening conversion IntegerType -> DoubleType") + .exclude("parquet widening conversion IntegerType -> ShortType") .exclude("parquet widening conversion LongType -> DecimalType(20,0)") .exclude("parquet widening conversion LongType -> DecimalType(21,1)") .exclude("parquet widening conversion LongType -> DecimalType(38,0)") @@ -386,6 +385,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") .exclude("parquet widening conversion ShortType -> DoubleType") + .exclude("parquet decimal type change IntegerType -> ShortType overflows") enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 0dadfa1d0bd8..7fc3704a6271 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -374,11 +374,9 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("unsupported parquet conversion IntegerType -> DecimalType(10,1)") .exclude("unsupported parquet conversion IntegerType -> DecimalType(5,0)") .exclude("unsupported parquet conversion IntegerType -> DecimalType(9,0)") - .exclude("unsupported parquet conversion LongType -> DateType") .exclude("unsupported parquet conversion LongType -> DecimalType(10,0)") .exclude("unsupported parquet conversion LongType -> DecimalType(19,0)") .exclude("unsupported parquet conversion LongType -> DecimalType(20,1)") - .exclude("unsupported parquet conversion LongType -> IntegerType") .exclude("unsupported parquet conversion ShortType -> DecimalType(3,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(4,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)") @@ -390,6 +388,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion IntegerType -> DecimalType(20,0)") .exclude("parquet widening conversion IntegerType -> DecimalType(38,0)") .exclude("parquet widening conversion IntegerType -> DoubleType") + .exclude("parquet widening conversion IntegerType -> ShortType") .exclude("parquet widening conversion LongType -> DecimalType(20,0)") .exclude("parquet widening conversion LongType -> DecimalType(21,1)") .exclude("parquet widening conversion LongType -> DecimalType(38,0)") @@ -397,6 +396,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") .exclude("parquet widening conversion ShortType -> DoubleType") + .exclude("parquet decimal type change IntegerType -> ShortType overflows") // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure From 017e014700603a4d75557456b512b49d063a13d8 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Fri, 6 Mar 2026 15:33:49 +0800 Subject: [PATCH 3/6] Point Velox to PR3 branch with parquet type widening support --- ep/build-velox/src/get-velox.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index c3e3cf0f427a..32ad05d1e7fb 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,8 +17,8 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_03_10-iceberg +VELOX_REPO=https://github.com/baibaichen/velox.git +VELOX_BRANCH=pr3/parquet-type-widening VELOX_ENHANCED_BRANCH=ibm-2026_03_10 VELOX_HOME="" RUN_SETUP_SCRIPT=ON From c3924be668a7d07a54d69c59a7a2c34a2ce621d4 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Sun, 8 Mar 2026 01:58:28 +0800 Subject: [PATCH 4/6] Update VeloxTestSettings for Velox type widening support With Velox PR3 type widening (INT->Decimal, INT->Double, Float->Double): - Remove 15 excludes for widening tests now passing Remaining 48 excludes. Test results: 36 pass / 48 ignored. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../gluten/utils/velox/VeloxTestSettings.scala | 15 --------------- .../gluten/utils/velox/VeloxTestSettings.scala | 15 --------------- 2 files changed, 30 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 828e22eb88c7..8bee91b3b8b4 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -371,21 +371,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(5,1)") .exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)") - .exclude("parquet widening conversion ByteType -> DecimalType(11,1)") - .exclude("parquet widening conversion ByteType -> DecimalType(20,0)") - .exclude("parquet widening conversion IntegerType -> DecimalType(11,1)") - .exclude("parquet widening conversion IntegerType -> DecimalType(20,0)") - .exclude("parquet widening conversion IntegerType -> DecimalType(38,0)") - .exclude("parquet widening conversion IntegerType -> DoubleType") - .exclude("parquet widening conversion IntegerType -> ShortType") - .exclude("parquet widening conversion LongType -> DecimalType(20,0)") - .exclude("parquet widening conversion LongType -> DecimalType(21,1)") - .exclude("parquet widening conversion LongType -> DecimalType(38,0)") - .exclude("parquet widening conversion ShortType -> DecimalType(11,1)") - .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") - .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") - .exclude("parquet widening conversion ShortType -> DoubleType") - .exclude("parquet decimal type change IntegerType -> ShortType overflows") enableSuite[GlutenParquetVariantShreddingSuite] // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 7fc3704a6271..ef4d38d52483 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -382,21 +382,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)") .exclude("unsupported parquet conversion ShortType -> DecimalType(5,1)") .exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)") - .exclude("parquet widening conversion ByteType -> DecimalType(11,1)") - .exclude("parquet widening conversion ByteType -> DecimalType(20,0)") - .exclude("parquet widening conversion IntegerType -> DecimalType(11,1)") - .exclude("parquet widening conversion IntegerType -> DecimalType(20,0)") - .exclude("parquet widening conversion IntegerType -> DecimalType(38,0)") - .exclude("parquet widening conversion IntegerType -> DoubleType") - .exclude("parquet widening conversion IntegerType -> ShortType") - .exclude("parquet widening conversion LongType -> DecimalType(20,0)") - .exclude("parquet widening conversion LongType -> DecimalType(21,1)") - .exclude("parquet widening conversion LongType -> DecimalType(38,0)") - .exclude("parquet widening conversion ShortType -> DecimalType(11,1)") - .exclude("parquet widening conversion ShortType -> DecimalType(20,0)") - .exclude("parquet widening conversion ShortType -> DecimalType(38,0)") - .exclude("parquet widening conversion ShortType -> DoubleType") - .exclude("parquet decimal type change IntegerType -> ShortType overflows") // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure // Generated suites for org.apache.spark.sql.execution.datasources.text // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure From 0e2e491c9df11c188cc2c921859cf25669ee2244 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Sat, 7 Mar 2026 23:04:03 +0800 Subject: [PATCH 5/6] Disable native writer for ParquetTypeWideningSuite This suite tests the READ path only. Disable native writer so Spark's writer produces correct V2 encodings (DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY). - Remove 10 excludes for decimal widening tests now passing Remaining 38 excludes: - 34: Velox native reader rejects incompatible decimal conversions regardless of reader config (no parquet-mr fallback) - 4: Velox does not support DELTA_BYTE_ARRAY encoding Test results: 46 pass / 38 ignored. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../gluten/utils/velox/VeloxTestSettings.scala | 12 ++---------- .../parquet/GlutenParquetTypeWideningSuite.scala | 13 ++++++++++++- .../gluten/utils/velox/VeloxTestSettings.scala | 12 ++---------- .../parquet/GlutenParquetTypeWideningSuite.scala | 13 ++++++++++++- 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 8bee91b3b8b4..7ddc78efb537 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -323,10 +323,12 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetCommitterSuite] enableSuite[GlutenParquetFieldIdSchemaSuite] enableSuite[GlutenParquetTypeWideningSuite] + // Velox does not support DELTA_BYTE_ARRAY encoding for FIXED_LEN_BYTE_ARRAY decimals. .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(22, 2)") .exclude("parquet decimal precision and scale change Decimal(20, 7) -> Decimal(22, 5)") .exclude("parquet decimal precision and scale change Decimal(20, 5) -> Decimal(22, 8)") .exclude("parquet decimal precision and scale change Decimal(20, 2) -> Decimal(22, 4)") + // Velox native reader aligns with vectorized reader behavior, always rejecting incompatible decimal conversions. .exclude("parquet decimal precision and scale change Decimal(10, 4) -> Decimal(12, 7)") .exclude("parquet decimal precision and scale change Decimal(10, 6) -> Decimal(12, 4)") .exclude("parquet decimal precision and scale change Decimal(10, 7) -> Decimal(5, 2)") @@ -338,22 +340,12 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet decimal precision and scale change Decimal(22, 5) -> Decimal(20, 7)") .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(6, 4)") .exclude("parquet decimal precision and scale change Decimal(7, 4) -> Decimal(5, 2)") - .exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(12, 4)") - .exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(20, 12)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(10, 7)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(20, 17)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(7, 4)") .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(5, 2)") .exclude("parquet decimal precision change Decimal(12, 2) -> Decimal(10, 2)") .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(10, 2)") .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(5, 2)") .exclude("parquet decimal precision change Decimal(22, 2) -> Decimal(20, 2)") .exclude("parquet decimal precision change Decimal(7, 2) -> Decimal(5, 2)") - .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(12, 2)") - .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(20, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(10, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(20, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(7, 2)") .exclude("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") .exclude("unsupported parquet conversion ByteType -> DecimalType(1,0)") .exclude("unsupported parquet conversion ByteType -> DecimalType(2,0)") diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala index 2090b70f7727..91d658aafea9 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala @@ -16,6 +16,17 @@ */ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.SparkConf import org.apache.spark.sql.GlutenSQLTestsTrait -class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {} +class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait { + + // Disable native writer so that writeParquetFiles() uses Spark's Parquet writer. + // This suite tests the READ path. The native writer doesn't produce + // DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent test's + // V2 encoding assertions expect. + override def sparkConf: SparkConf = + super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false") +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index ef4d38d52483..c13884555400 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -334,10 +334,12 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetCommitterSuite] enableSuite[GlutenParquetFieldIdSchemaSuite] enableSuite[GlutenParquetTypeWideningSuite] + // Velox does not support DELTA_BYTE_ARRAY encoding for FIXED_LEN_BYTE_ARRAY decimals. .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(22, 2)") .exclude("parquet decimal precision and scale change Decimal(20, 7) -> Decimal(22, 5)") .exclude("parquet decimal precision and scale change Decimal(20, 5) -> Decimal(22, 8)") .exclude("parquet decimal precision and scale change Decimal(20, 2) -> Decimal(22, 4)") + // Velox native reader aligns with vectorized reader behavior, always rejecting incompatible decimal conversions. .exclude("parquet decimal precision and scale change Decimal(10, 4) -> Decimal(12, 7)") .exclude("parquet decimal precision and scale change Decimal(10, 6) -> Decimal(12, 4)") .exclude("parquet decimal precision and scale change Decimal(10, 7) -> Decimal(5, 2)") @@ -349,22 +351,12 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("parquet decimal precision and scale change Decimal(22, 5) -> Decimal(20, 7)") .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(6, 4)") .exclude("parquet decimal precision and scale change Decimal(7, 4) -> Decimal(5, 2)") - .exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(12, 4)") - .exclude("parquet decimal precision and scale change Decimal(10, 2) -> Decimal(20, 12)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(10, 7)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(20, 17)") - .exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(7, 4)") .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(5, 2)") .exclude("parquet decimal precision change Decimal(12, 2) -> Decimal(10, 2)") .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(10, 2)") .exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(5, 2)") .exclude("parquet decimal precision change Decimal(22, 2) -> Decimal(20, 2)") .exclude("parquet decimal precision change Decimal(7, 2) -> Decimal(5, 2)") - .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(12, 2)") - .exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(20, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(10, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(20, 2)") - .exclude("parquet decimal precision change Decimal(5, 2) -> Decimal(7, 2)") .exclude("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") .exclude("unsupported parquet conversion ByteType -> DecimalType(1,0)") .exclude("unsupported parquet conversion ByteType -> DecimalType(2,0)") diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala index 2090b70f7727..91d658aafea9 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala @@ -16,6 +16,17 @@ */ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.SparkConf import org.apache.spark.sql.GlutenSQLTestsTrait -class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {} +class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait { + + // Disable native writer so that writeParquetFiles() uses Spark's Parquet writer. + // This suite tests the READ path. The native writer doesn't produce + // DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent test's + // V2 encoding assertions expect. + override def sparkConf: SparkConf = + super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false") +} From b376d6d64742fdc728e5bf8eafc7031403e59d86 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Wed, 11 Mar 2026 16:27:40 +0800 Subject: [PATCH 6/6] Override 33 type widening tests with expectError=true Velox native reader always behaves like Spark's vectorized reader, so tests that rely on parquet-mr behavior (vectorized=false) fail. Instead of just excluding these 33 tests, add testGluten overrides with expectError=true to verify Velox correctly rejects incompatible conversions. - 16 unsupported INT->Decimal conversions - 6 decimal precision narrowing cases - 11 decimal precision+scale narrowing/mixed cases VeloxTestSettings: 38 excludes (parent tests) + 33 testGluten overrides Test results: 79 pass / 38 ignored (33 excluded parent + 5 truly excluded) --- .../GlutenParquetTypeWideningSuite.scala | 214 +++++++++++++++++- .../GlutenParquetTypeWideningSuite.scala | 214 +++++++++++++++++- 2 files changed, 424 insertions(+), 4 deletions(-) diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala index 91d658aafea9..b785dce559e3 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala @@ -18,15 +18,225 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.gluten.config.GlutenConfig -import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait} +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.DecimalType.{ByteDecimal, IntDecimal, LongDecimal, ShortDecimal} + +import org.apache.hadoop.fs.Path +import org.apache.parquet.column.{Encoding, ParquetProperties} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} + +import java.io.File class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait { + import testImplicits._ + // Disable native writer so that writeParquetFiles() uses Spark's Parquet writer. // This suite tests the READ path. The native writer doesn't produce // DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent test's // V2 encoding assertions expect. override def sparkConf: SparkConf = super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false") + + // ====== Private methods copied from ParquetTypeWideningSuite ====== + // These are private in the parent class, so we must copy them to use in overridden tests. + // The key change: removed withAllParquetReaders wrapper since Velox native reader + // always behaves like the vectorized reader. + + private def checkAllParquetReaders( + values: Seq[String], + fromType: DataType, + toType: DataType, + expectError: Boolean): Unit = { + val timestampRebaseModes = toType match { + case _: TimestampNTZType | _: DateType => + Seq(LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.LEGACY) + case _ => + Seq(LegacyBehaviorPolicy.CORRECTED) + } + for { + dictionaryEnabled <- Seq(true, false) + timestampRebaseMode <- timestampRebaseModes + } + withClue( + s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + + s"'$timestampRebaseMode''") { + withAllParquetWriters { + withTempDir { + dir => + val expected = + writeParquetFiles(dir, values, fromType, dictionaryEnabled, timestampRebaseMode) + if (expectError) { + val exception = intercept[SparkException] { + readParquetFiles(dir, toType).collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause + .isInstanceOf[org.apache.parquet.io.ParquetDecodingException] || + exception.getCause.getMessage.contains("PARQUET_CONVERSION_FAILURE")) + } else { + checkAnswer(readParquetFiles(dir, toType), expected.select($"a".cast(toType))) + } + } + } + } + } + + private def readParquetFiles(dir: File, dataType: DataType): DataFrame = { + spark.read.schema(s"a ${dataType.sql}").parquet(dir.getAbsolutePath) + } + + private def writeParquetFiles( + dir: File, + values: Seq[String], + dataType: DataType, + dictionaryEnabled: Boolean, + timestampRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED) + : DataFrame = { + val repeatedValues = List.fill(if (dictionaryEnabled) 10 else 1)(values).flatten + val df = repeatedValues.toDF("a").select(col("a").cast(dataType)) + withSQLConf( + ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> timestampRebaseMode.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + + if (dictionaryEnabled && !DecimalType.isByteArrayDecimalType(dataType)) { + assertAllParquetFilesDictionaryEncoded(dir) + } + + val isParquetV2 = spark.conf + .getOption(ParquetOutputFormat.WRITER_VERSION) + .contains(ParquetProperties.WriterVersion.PARQUET_2_0.toString) + if (isParquetV2) { + if (dictionaryEnabled) { + assertParquetV2Encoding(dir, Encoding.PLAIN) + } else if (DecimalType.is64BitDecimalType(dataType)) { + assertParquetV2Encoding(dir, Encoding.DELTA_BINARY_PACKED) + } else if (DecimalType.isByteArrayDecimalType(dataType)) { + assertParquetV2Encoding(dir, Encoding.DELTA_BYTE_ARRAY) + } + } + df + } + + private def assertAllParquetFilesDictionaryEncoded(dir: File): Unit = { + dir.listFiles(_.getName.endsWith(".parquet")).foreach { + file => + val parquetMetadata = ParquetFileReader.readFooter( + spark.sessionState.newHadoopConf(), + new Path(dir.toString, file.getName), + ParquetMetadataConverter.NO_FILTER) + parquetMetadata.getBlocks.forEach { + block => + block.getColumns.forEach { + col => + assert( + col.hasDictionaryPage, + "This test covers dictionary encoding but column " + + s"'${col.getPath.toDotString}' in the test data is not dictionary encoded.") + } + } + } + } + + private def assertParquetV2Encoding(dir: File, expected_encoding: Encoding): Unit = { + dir.listFiles(_.getName.endsWith(".parquet")).foreach { + file => + val parquetMetadata = ParquetFileReader.readFooter( + spark.sessionState.newHadoopConf(), + new Path(dir.toString, file.getName), + ParquetMetadataConverter.NO_FILTER) + parquetMetadata.getBlocks.forEach { + block => + block.getColumns.forEach { + col => + assert( + col.getEncodings.contains(expected_encoding), + s"Expected column '${col.getPath.toDotString}' " + + s"to use encoding $expected_encoding " + + s"but found ${col.getEncodings}." + ) + } + } + } + } + + // ====== Override tests ====== + // Velox native reader always behaves like Spark's vectorized reader (no parquet-mr fallback). + // In the parent tests, `expectError` is conditional on PARQUET_VECTORIZED_READER_ENABLED: + // parquet-mr allows conversions that the vectorized reader rejects. + // Since Velox always rejects, we override with expectError = true. + + for { + (values: Seq[String], fromType: DataType, toType: DecimalType) <- Seq( + (Seq("1", "2"), ByteType, DecimalType(1, 0)), + (Seq("1", "2"), ByteType, ByteDecimal), + (Seq("1", "2"), ShortType, ByteDecimal), + (Seq("1", "2"), ShortType, ShortDecimal), + (Seq("1", "2"), IntegerType, ShortDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision + 1, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision + 1, 1)), + (Seq("1", "2"), LongType, IntDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision - 1, 0)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision - 1, 0)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision - 1, 0)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision - 1, 0)), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision, 1)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision, 1)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) + ) + } + testGluten(s"unsupported parquet conversion $fromType -> $toType") { + checkAllParquetReaders(values, fromType, toType, expectError = true) + } + + for { + (fromPrecision, toPrecision) <- + Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) + } + testGluten( + s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, 2), + toType = DecimalType(toPrecision, 2), + expectError = true) + } + + for { + ((fromPrecision, fromScale), (toPrecision, toScale)) <- + // Narrowing precision and scale by the same amount. + Seq( + (7, 4) -> (5, 2), + (10, 7) -> (5, 2), + (20, 17) -> (5, 2), + (12, 4) -> (10, 2), + (20, 17) -> (10, 2), + (22, 4) -> (20, 2)) ++ + // Increasing precision and decreasing scale. + Seq((10, 6) -> (12, 4)) ++ + // Decreasing precision and increasing scale. + Seq((12, 4) -> (10, 6), (22, 5) -> (20, 7)) ++ + // Increasing precision by a smaller amount than scale. + Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7)) + } + testGluten( + s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + + s"Decimal($toPrecision, $toScale)") { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, fromScale), + toType = DecimalType(toPrecision, toScale), + expectError = true) + } } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala index 91d658aafea9..b785dce559e3 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetTypeWideningSuite.scala @@ -18,15 +18,225 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.gluten.config.GlutenConfig -import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait} +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.DecimalType.{ByteDecimal, IntDecimal, LongDecimal, ShortDecimal} + +import org.apache.hadoop.fs.Path +import org.apache.parquet.column.{Encoding, ParquetProperties} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} + +import java.io.File class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait { + import testImplicits._ + // Disable native writer so that writeParquetFiles() uses Spark's Parquet writer. // This suite tests the READ path. The native writer doesn't produce // DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent test's // V2 encoding assertions expect. override def sparkConf: SparkConf = super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false") + + // ====== Private methods copied from ParquetTypeWideningSuite ====== + // These are private in the parent class, so we must copy them to use in overridden tests. + // The key change: removed withAllParquetReaders wrapper since Velox native reader + // always behaves like the vectorized reader. + + private def checkAllParquetReaders( + values: Seq[String], + fromType: DataType, + toType: DataType, + expectError: Boolean): Unit = { + val timestampRebaseModes = toType match { + case _: TimestampNTZType | _: DateType => + Seq(LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.LEGACY) + case _ => + Seq(LegacyBehaviorPolicy.CORRECTED) + } + for { + dictionaryEnabled <- Seq(true, false) + timestampRebaseMode <- timestampRebaseModes + } + withClue( + s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + + s"'$timestampRebaseMode''") { + withAllParquetWriters { + withTempDir { + dir => + val expected = + writeParquetFiles(dir, values, fromType, dictionaryEnabled, timestampRebaseMode) + if (expectError) { + val exception = intercept[SparkException] { + readParquetFiles(dir, toType).collect() + } + assert( + exception.getCause + .isInstanceOf[SchemaColumnConvertNotSupportedException] || + exception.getCause + .isInstanceOf[org.apache.parquet.io.ParquetDecodingException] || + exception.getCause.getMessage.contains("PARQUET_CONVERSION_FAILURE")) + } else { + checkAnswer(readParquetFiles(dir, toType), expected.select($"a".cast(toType))) + } + } + } + } + } + + private def readParquetFiles(dir: File, dataType: DataType): DataFrame = { + spark.read.schema(s"a ${dataType.sql}").parquet(dir.getAbsolutePath) + } + + private def writeParquetFiles( + dir: File, + values: Seq[String], + dataType: DataType, + dictionaryEnabled: Boolean, + timestampRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED) + : DataFrame = { + val repeatedValues = List.fill(if (dictionaryEnabled) 10 else 1)(values).flatten + val df = repeatedValues.toDF("a").select(col("a").cast(dataType)) + withSQLConf( + ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> timestampRebaseMode.toString) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + } + + if (dictionaryEnabled && !DecimalType.isByteArrayDecimalType(dataType)) { + assertAllParquetFilesDictionaryEncoded(dir) + } + + val isParquetV2 = spark.conf + .getOption(ParquetOutputFormat.WRITER_VERSION) + .contains(ParquetProperties.WriterVersion.PARQUET_2_0.toString) + if (isParquetV2) { + if (dictionaryEnabled) { + assertParquetV2Encoding(dir, Encoding.PLAIN) + } else if (DecimalType.is64BitDecimalType(dataType)) { + assertParquetV2Encoding(dir, Encoding.DELTA_BINARY_PACKED) + } else if (DecimalType.isByteArrayDecimalType(dataType)) { + assertParquetV2Encoding(dir, Encoding.DELTA_BYTE_ARRAY) + } + } + df + } + + private def assertAllParquetFilesDictionaryEncoded(dir: File): Unit = { + dir.listFiles(_.getName.endsWith(".parquet")).foreach { + file => + val parquetMetadata = ParquetFileReader.readFooter( + spark.sessionState.newHadoopConf(), + new Path(dir.toString, file.getName), + ParquetMetadataConverter.NO_FILTER) + parquetMetadata.getBlocks.forEach { + block => + block.getColumns.forEach { + col => + assert( + col.hasDictionaryPage, + "This test covers dictionary encoding but column " + + s"'${col.getPath.toDotString}' in the test data is not dictionary encoded.") + } + } + } + } + + private def assertParquetV2Encoding(dir: File, expected_encoding: Encoding): Unit = { + dir.listFiles(_.getName.endsWith(".parquet")).foreach { + file => + val parquetMetadata = ParquetFileReader.readFooter( + spark.sessionState.newHadoopConf(), + new Path(dir.toString, file.getName), + ParquetMetadataConverter.NO_FILTER) + parquetMetadata.getBlocks.forEach { + block => + block.getColumns.forEach { + col => + assert( + col.getEncodings.contains(expected_encoding), + s"Expected column '${col.getPath.toDotString}' " + + s"to use encoding $expected_encoding " + + s"but found ${col.getEncodings}." + ) + } + } + } + } + + // ====== Override tests ====== + // Velox native reader always behaves like Spark's vectorized reader (no parquet-mr fallback). + // In the parent tests, `expectError` is conditional on PARQUET_VECTORIZED_READER_ENABLED: + // parquet-mr allows conversions that the vectorized reader rejects. + // Since Velox always rejects, we override with expectError = true. + + for { + (values: Seq[String], fromType: DataType, toType: DecimalType) <- Seq( + (Seq("1", "2"), ByteType, DecimalType(1, 0)), + (Seq("1", "2"), ByteType, ByteDecimal), + (Seq("1", "2"), ShortType, ByteDecimal), + (Seq("1", "2"), ShortType, ShortDecimal), + (Seq("1", "2"), IntegerType, ShortDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision + 1, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision + 1, 1)), + (Seq("1", "2"), LongType, IntDecimal), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision - 1, 0)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision - 1, 0)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision - 1, 0)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision - 1, 0)), + (Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision, 1)), + (Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision, 1)), + (Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision, 1)), + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) + ) + } + testGluten(s"unsupported parquet conversion $fromType -> $toType") { + checkAllParquetReaders(values, fromType, toType, expectError = true) + } + + for { + (fromPrecision, toPrecision) <- + Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) + } + testGluten( + s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, 2), + toType = DecimalType(toPrecision, 2), + expectError = true) + } + + for { + ((fromPrecision, fromScale), (toPrecision, toScale)) <- + // Narrowing precision and scale by the same amount. + Seq( + (7, 4) -> (5, 2), + (10, 7) -> (5, 2), + (20, 17) -> (5, 2), + (12, 4) -> (10, 2), + (20, 17) -> (10, 2), + (22, 4) -> (20, 2)) ++ + // Increasing precision and decreasing scale. + Seq((10, 6) -> (12, 4)) ++ + // Decreasing precision and increasing scale. + Seq((12, 4) -> (10, 6), (22, 5) -> (20, 7)) ++ + // Increasing precision by a smaller amount than scale. + Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7)) + } + testGluten( + s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + + s"Decimal($toPrecision, $toScale)") { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, fromScale), + toType = DecimalType(toPrecision, toScale), + expectError = true) + } }