Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,31 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
// The columns present in the table, if not available default to the baseSchema.
auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema : baseSchema;

// Build dataColumns from tableSchema, excluding partition columns.
// HiveTableHandle::dataColumns() is used as fileSchema for the reader.
// Partition columns should not be validated against the file's physical types
// (their values come from the partition path, not from the file).
std::unordered_set<std::string> partitionColNames;
for (int idx = 0; idx < colNameList.size(); idx++) {
if (columnTypes[idx] == ColumnType::kPartitionKey) {
partitionColNames.insert(colNameList[idx]);
}
}
RowTypePtr dataColumns;
if (partitionColNames.empty()) {
dataColumns = tableSchema;
} else {
std::vector<std::string> dataColNames;
std::vector<TypePtr> dataColTypes;
for (int idx = 0; idx < tableSchema->size(); idx++) {
if (partitionColNames.find(tableSchema->nameOf(idx)) == partitionColNames.end()) {
dataColNames.push_back(tableSchema->nameOf(idx));
dataColTypes.push_back(tableSchema->childAt(idx));
}
}
dataColumns = ROW(std::move(dataColNames), std::move(dataColTypes));
}

connector::ConnectorTableHandlePtr tableHandle;
auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr;
auto connectorId = kHiveConnectorId;
Expand All @@ -1468,7 +1493,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
common::SubfieldFilters subfieldFilters;
tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
connectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter, tableSchema);
connectorId, "hive_table", filterPushdownEnabled, std::move(subfieldFilters), remainingFilter, dataColumns);

// Get assignments and out names.
std::vector<std::string> outNames;
Expand Down
4 changes: 2 additions & 2 deletions ep/build-velox/src/get-velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
set -exu

CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
VELOX_REPO=https://github.com/IBM/velox.git
VELOX_BRANCH=dft-2026_02_24
VELOX_REPO=https://github.com/baibaichen/velox.git
VELOX_BRANCH=feature/enable-parquet-type-widening-suite
VELOX_ENHANCED_BRANCH=ibm-2026_02_24
VELOX_HOME=""
RUN_SETUP_SCRIPT=ON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.apache.gluten.vectorized;

import org.apache.gluten.columnarbatch.ColumnarBatches;
import org.apache.gluten.exception.GlutenException;
import org.apache.gluten.iterator.ClosableIterator;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.RuntimeAware;

import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.IOException;
Expand Down Expand Up @@ -130,6 +132,28 @@ public void requestBarrier() {
nativeRequestBarrier(iterHandle);
}

@Override
protected RuntimeException translateException(Exception e) {
String msg = findRootCauseMessage(e);
if (msg != null
&& (msg.contains("not allowed for requested type")
|| msg.contains("Not a valid type for"))) {
return new SchemaColumnConvertNotSupportedException("unknown", msg, "unknown");
}
return new GlutenException(e);
}

private static String findRootCauseMessage(Throwable t) {
while (t != null) {
String msg = t.getMessage();
if (msg != null) {
return msg;
}
t = t.getCause();
}
return null;
}

@Override
public void close0() {
// To make sure the outputted batches are still accessible after the iterator is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public final boolean hasNext() {
try {
return hasNext0();
} catch (Exception e) {
throw new GlutenException(e);
throw translateException(e);
}
}

Expand All @@ -47,7 +47,7 @@ public final T next() {
try {
return next0();
} catch (Exception e) {
throw new GlutenException(e);
throw translateException(e);
}
}

Expand All @@ -63,4 +63,12 @@ public final void close() {
protected abstract boolean hasNext0() throws Exception;

protected abstract T next0() throws Exception;

/**
* Translates a native exception into an appropriate Java exception. Subclasses can override this
* to translate backend-specific exceptions into Spark-compatible exceptions.
*/
protected RuntimeException translateException(Exception e) {
return new GlutenException(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,57 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetAvroCompatibilitySuite]
enableSuite[GlutenParquetCommitterSuite]
enableSuite[GlutenParquetFieldIdSchemaSuite]
// TODO: 4.x enableSuite[GlutenParquetTypeWideningSuite] // 74 failures - MAJOR ISSUE
enableSuite[GlutenParquetTypeWideningSuite]
// Velox always uses native reader (= vectorized). Override tests in
// GlutenParquetTypeWideningSuite set expectError = true for both reader configs.
.exclude("unsupported parquet conversion ByteType -> DecimalType(1,0)")
.exclude("unsupported parquet conversion ByteType -> DecimalType(3,0)")
.exclude("unsupported parquet conversion ShortType -> DecimalType(3,0)")
.exclude("unsupported parquet conversion ShortType -> DecimalType(5,0)")
.exclude("unsupported parquet conversion IntegerType -> DecimalType(5,0)")
.exclude("unsupported parquet conversion ByteType -> DecimalType(4,1)")
.exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)")
.exclude("unsupported parquet conversion LongType -> DecimalType(10,0)")
.exclude("unsupported parquet conversion ByteType -> DecimalType(2,0)")
.exclude("unsupported parquet conversion ShortType -> DecimalType(4,0)")
.exclude("unsupported parquet conversion IntegerType -> DecimalType(9,0)")
.exclude("unsupported parquet conversion LongType -> DecimalType(19,0)")
.exclude("unsupported parquet conversion ByteType -> DecimalType(3,1)")
.exclude("unsupported parquet conversion ShortType -> DecimalType(5,1)")
.exclude("unsupported parquet conversion IntegerType -> DecimalType(10,1)")
.exclude("unsupported parquet conversion LongType -> DecimalType(20,1)")
// Velox does not support DELTA_BYTE_ARRAY encoding used by Spark V2 writer
// for FIXED_LEN_BYTE_ARRAY decimals (precision > 18).
.exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(22, 2)")
// Override tests in GlutenParquetTypeWideningSuite set expectError = true for:
// - Decimal narrowing (same scale): Velox rejects matching vectorized reader.
// - Decimal scale narrowing/mixed: Velox rejects matching vectorized reader.
.exclude("parquet decimal precision change Decimal(7, 2) -> Decimal(5, 2)")
.exclude("parquet decimal precision change Decimal(10, 2) -> Decimal(5, 2)")
.exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(5, 2)")
.exclude("parquet decimal precision change Decimal(12, 2) -> Decimal(10, 2)")
.exclude("parquet decimal precision change Decimal(20, 2) -> Decimal(10, 2)")
.exclude("parquet decimal precision change Decimal(22, 2) -> Decimal(20, 2)")
// Velox does not support DELTA_BYTE_ARRAY encoding for FIXED_LEN_BYTE_ARRAY decimals.
.exclude("parquet decimal precision and scale change Decimal(20, 2) -> Decimal(22, 4)")
.exclude("parquet decimal precision and scale change Decimal(7, 4) -> Decimal(5, 2)")
.exclude("parquet decimal precision and scale change Decimal(10, 7) -> Decimal(5, 2)")
.exclude("parquet decimal precision and scale change Decimal(20, 17) -> Decimal(5, 2)")
.exclude("parquet decimal precision and scale change Decimal(12, 4) -> Decimal(10, 2)")
.exclude("parquet decimal precision and scale change Decimal(20, 17) -> Decimal(10, 2)")
.exclude("parquet decimal precision and scale change Decimal(22, 4) -> Decimal(20, 2)")
.exclude("parquet decimal precision and scale change Decimal(10, 6) -> Decimal(12, 4)")
.exclude("parquet decimal precision and scale change Decimal(20, 7) -> Decimal(22, 5)")
.exclude("parquet decimal precision and scale change Decimal(12, 4) -> Decimal(10, 6)")
.exclude("parquet decimal precision and scale change Decimal(22, 5) -> Decimal(20, 7)")
.exclude("parquet decimal precision and scale change Decimal(5, 2) -> Decimal(6, 4)")
.exclude("parquet decimal precision and scale change Decimal(10, 4) -> Decimal(12, 7)")
.exclude("parquet decimal precision and scale change Decimal(20, 5) -> Decimal(22, 8)")
// Test only exercises parquet-mr reader (vectorized=false) for decimal narrowing overflow→null.
// Spark vectorized reader rejects Decimal(5,2)→Decimal(3,2) in isDecimalTypeMatched()
// (precisionIncrease < 0). Gluten always uses Velox native reader, cannot reproduce
// parquet-mr's overflow→null behavior.
.exclude("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr")
enableSuite[GlutenParquetVariantShreddingSuite]
// Generated suites for org.apache.spark.sql.execution.datasources.text
// TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,167 @@
*/
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.SparkException
import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DecimalType.{ByteDecimal, IntDecimal, LongDecimal, ShortDecimal}

import org.apache.parquet.hadoop.ParquetOutputFormat

class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {

import testImplicits._

// Disable native writer so that writeParquetFiles() uses Spark's Parquet writer.
// This suite tests the READ path (type widening during reads). The native writer
// doesn't produce DELTA_BINARY_PACKED/DELTA_BYTE_ARRAY encodings that the parent
// test's V2 encoding assertions expect.
override def sparkConf: SparkConf =
super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "false")

// Velox always uses native reader (equivalent to Spark's vectorized reader).
// For INT->Decimal with insufficient precision, Spark's vectorized reader rejects them
// while parquet-mr allows them. Velox now rejects them (matching vectorized reader).
// Override to set expectError = true for both reader config settings.
for {
(values, fromType, toType) <- Seq(
(Seq("1", "2"), ByteType, DecimalType(1, 0)),
(Seq("1", "2"), ByteType, ByteDecimal),
(Seq("1", "2"), ShortType, ByteDecimal),
(Seq("1", "2"), ShortType, ShortDecimal),
(Seq("1", "2"), IntegerType, ShortDecimal),
(Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision + 1, 1)),
(Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision + 1, 1)),
(Seq("1", "2"), LongType, IntDecimal),
(Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision - 1, 0)),
(Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision - 1, 0)),
(Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision - 1, 0)),
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision - 1, 0)),
(Seq("1", "2"), ByteType, DecimalType(ByteDecimal.precision, 1)),
(Seq("1", "2"), ShortType, DecimalType(ShortDecimal.precision, 1)),
(Seq("1", "2"), IntegerType, DecimalType(IntDecimal.precision, 1)),
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
)
}
testGluten(s"unsupported parquet conversion $fromType -> $toType") {
for (dictionaryEnabled <- Seq(true, false)) {
withClue(s"with dictionary encoding '$dictionaryEnabled'") {
withAllParquetWriters {
withTempDir {
dir =>
val df = values.toDF("a").select(col("a").cast(fromType))
withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) {
df.write.mode("overwrite").parquet(dir.getAbsolutePath)
}
withAllParquetReaders {
val exception = intercept[SparkException] {
spark.read.schema(s"a ${toType.sql}").parquet(dir.getAbsolutePath).collect()
}
assert(
exception.getCause
.isInstanceOf[SchemaColumnConvertNotSupportedException] ||
exception.getCause.getMessage.contains("not allowed for requested type"))
}
}
}
}
}
}

// Velox rejects Decimal->Decimal narrowing (matching Spark vectorized reader behavior).
// Override to set expectError = true for both reader configs.
for {
(fromPrecision, toPrecision) <-
// Narrowing precision (same scale=2): Velox rejects like vectorized reader.
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
}
testGluten(
s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)"
) {
for (dictionaryEnabled <- Seq(true, false)) {
withClue(s"with dictionary encoding '$dictionaryEnabled'") {
withAllParquetWriters {
withTempDir {
dir =>
val df = Seq("1.23", "10.34")
.toDF("a")
.select(col("a").cast(DecimalType(fromPrecision, 2)))
withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) {
df.write.mode("overwrite").parquet(dir.getAbsolutePath)
}
withAllParquetReaders {
val exception = intercept[SparkException] {
spark.read
.schema(s"a ${DecimalType(toPrecision, 2).sql}")
.parquet(dir.getAbsolutePath)
.collect()
}
assert(
exception.getCause
.isInstanceOf[SchemaColumnConvertNotSupportedException] ||
exception.getCause.getMessage.contains("not allowed for requested type"))
}
}
}
}
}
}

class GlutenParquetTypeWideningSuite extends ParquetTypeWideningSuite with GlutenSQLTestsTrait {}
// Velox rejects Decimal->Decimal scale narrowing and mixed scale changes
// (convertType() enforces scaleIncrease >= 0 && precisionIncrease >= scaleIncrease).
// Override to set expectError = true for both reader configs.
for {
((fromPrecision, fromScale), (toPrecision, toScale)) <-
// Narrowing precision and scale by the same amount.
Seq(
(7, 4) -> (5, 2),
(10, 7) -> (5, 2),
(20, 17) -> (5, 2),
(12, 4) -> (10, 2),
(20, 17) -> (10, 2),
(22, 4) -> (20, 2)) ++
// Increasing precision and decreasing scale.
Seq((10, 6) -> (12, 4), (20, 7) -> (22, 5)) ++
// Decreasing precision and increasing scale.
Seq((12, 4) -> (10, 6), (22, 5) -> (20, 7)) ++
// Increasing precision by a smaller amount than scale.
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
}
testGluten(
s"parquet decimal precision and scale change " +
s"Decimal($fromPrecision, $fromScale) -> Decimal($toPrecision, $toScale)"
) {
for (dictionaryEnabled <- Seq(true, false)) {
withClue(s"with dictionary encoding '$dictionaryEnabled'") {
withAllParquetWriters {
withTempDir {
dir =>
val df = Seq("1.23", "10.34")
.toDF("a")
.select(col("a").cast(DecimalType(fromPrecision, fromScale)))
withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString) {
df.write.mode("overwrite").parquet(dir.getAbsolutePath)
}
withAllParquetReaders {
val exception = intercept[SparkException] {
spark.read
.schema(s"a ${DecimalType(toPrecision, toScale).sql}")
.parquet(dir.getAbsolutePath)
.collect()
}
assert(
exception.getCause
.isInstanceOf[SchemaColumnConvertNotSupportedException] ||
exception.getCause.getMessage.contains("not allowed for requested type"))
}
}
}
}
}
}
}
Loading
Loading