Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
dataSchema: StructType,
rootPaths: Seq[String],
properties: Map[String, String],
hadoopConf: Configuration): ValidationResult = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
override def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
dataSchema: StructType,
rootPaths: Seq[String],
properties: Map[String, String],
hadoopConf: Configuration): ValidationResult = {
Expand All @@ -117,9 +118,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
}

def validateFormat(): Option[String] = {
def validateTypes(validatorFunc: PartialFunction[StructField, String]): Option[String] = {
def validateTypes(
validatorFunc: PartialFunction[StructField, String],
fieldsToValidate: Array[StructField]): Option[String] = {
// Collect unsupported types.
val unsupportedDataTypeReason = fields.collect(validatorFunc)
val unsupportedDataTypeReason = fieldsToValidate.collect(validatorFunc)
if (unsupportedDataTypeReason.nonEmpty) {
Some(
s"Found unsupported data type in $format: ${unsupportedDataTypeReason.mkString(", ")}.")
Expand Down Expand Up @@ -152,7 +155,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
if (!VeloxConfig.get.veloxOrcScanEnabled) {
Some(s"Velox ORC scan is turned off, ${VeloxConfig.VELOX_ORC_SCAN_ENABLED.key}")
} else {
val typeValidator: PartialFunction[StructField, String] = {
val fieldTypeValidator: PartialFunction[StructField, String] = {
case StructField(_, arrayType: ArrayType, _, _)
if arrayType.elementType.isInstanceOf[StructType] =>
"StructType as element in ArrayType"
Expand All @@ -165,12 +168,16 @@ object VeloxBackendSettings extends BackendSettingsApi {
case StructField(_, mapType: MapType, _, _)
if mapType.valueType.isInstanceOf[ArrayType] =>
"ArrayType as Value in MapType"
case StructField(_, TimestampType, _, _) => "TimestampType"
}

val schemaTypeValidator: PartialFunction[StructField, String] = {
case StructField(_, stringType: StringType, _, metadata)
if isCharType(stringType, metadata) =>
CharVarcharUtils.getRawTypeString(metadata) + "(force fallback)"
case StructField(_, TimestampType, _, _) => "TimestampType"
}
validateTypes(typeValidator)
validateTypes(fieldTypeValidator, fields)
.orElse(validateTypes(schemaTypeValidator, dataSchema.fields))
}
case _ => Some(s"Unsupported file format $format.")
}
Expand All @@ -193,10 +200,28 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
}

def validateDataSchema(): Option[String] = {
if (VeloxConfig.get.parquetUseColumnNames && VeloxConfig.get.orcUseColumnNames) {
return None
}

// If we are using column indices for schema evolution, we need to pass the table schema to
// Velox. We need to ensure all types in the table schema are supported.
val validationResults =
dataSchema.fields.flatMap(field => VeloxValidatorApi.validateSchema(field.dataType))
if (validationResults.nonEmpty) {
Some(s"""Found unsupported data type(s) in file
|schema: ${validationResults.mkString(", ")}.""".stripMargin)
} else {
None
}
}

val validationChecks = Seq(
validateScheme(),
validateFormat(),
validateEncryption()
validateEncryption(),
validateDataSchema()
)

for (check <- validationChecks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package org.apache.gluten.backendsapi.velox

import org.apache.gluten.backendsapi.{BackendsApiManager, IteratorApi}
import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName
import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.execution._
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel.{LocalFilesBuilder, SplitInfo}
import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.vectorized._

Expand All @@ -49,9 +50,25 @@ import scala.collection.JavaConverters._

class VeloxIteratorApi extends IteratorApi with Logging {

private def setFileSchemaForLocalFiles(
localFilesNode: LocalFilesNode,
fileSchema: StructType,
fileFormat: ReadFileFormat): LocalFilesNode = {
if (
((fileFormat == ReadFileFormat.OrcReadFormat || fileFormat == ReadFileFormat.DwrfReadFormat)
&& !VeloxConfig.get.orcUseColumnNames)
|| (fileFormat == ReadFileFormat.ParquetReadFormat && !VeloxConfig.get.parquetUseColumnNames)
) {
localFilesNode.setFileSchema(fileSchema)
}

localFilesNode
}

override def genSplitInfo(
partition: InputPartition,
partitionSchema: StructType,
dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
Expand All @@ -69,19 +86,23 @@ class VeloxIteratorApi extends IteratorApi with Logging {
constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
f.index,
paths,
starts,
lengths,
fileSizes,
modificationTimes,
partitionColumns,
metadataColumns,
fileFormat,
preferredLocations.toList.asJava,
mapAsJavaMap(properties),
otherMetadataColumns
setFileSchemaForLocalFiles(
LocalFilesBuilder.makeLocalFiles(
f.index,
paths,
starts,
lengths,
fileSizes,
modificationTimes,
partitionColumns,
metadataColumns,
fileFormat,
preferredLocations.toList.asJava,
mapAsJavaMap(properties),
otherMetadataColumns
),
dataSchema,
fileFormat
)
case _ =>
throw new UnsupportedOperationException(s"Unsupported input partition.")
Expand All @@ -92,6 +113,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionIndex: Int,
partitions: Seq[InputPartition],
partitionSchema: StructType,
dataSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
Expand All @@ -115,19 +137,23 @@ class VeloxIteratorApi extends IteratorApi with Logging {
metadataColumns,
otherMetadataColumns) =
constructSplitInfo(partitionSchema, partitionFiles, metadataColumnNames)
LocalFilesBuilder.makeLocalFiles(
partitionIndex,
paths,
starts,
lengths,
fileSizes,
modificationTimes,
partitionColumns,
metadataColumns,
fileFormat,
locations.toList.asJava,
mapAsJavaMap(properties),
otherMetadataColumns
setFileSchemaForLocalFiles(
LocalFilesBuilder.makeLocalFiles(
partitionIndex,
paths,
starts,
lengths,
fileSizes,
modificationTimes,
partitionColumns,
metadataColumns,
fileFormat,
locations.toList.asJava,
mapAsJavaMap(properties),
otherMetadataColumns
),
dataSchema,
fileFormat
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.task.TaskResources
import scala.collection.JavaConverters._

class VeloxValidatorApi extends ValidatorApi {
import VeloxValidatorApi._

/** For velox backend, key validation is on native side. */
override def doExprValidate(substraitExprName: String, expr: Expression): Boolean =
Expand All @@ -53,6 +54,27 @@ class VeloxValidatorApi extends ValidatorApi {
info.fallbackInfo.asScala.reduce[String] { case (l, r) => l + "\n |- " + r }))
}

override def doSchemaValidate(schema: DataType): Option[String] = {
validateSchema(schema)
}

override def doColumnarShuffleExchangeExecValidate(
outputAttributes: Seq[Attribute],
outputPartitioning: Partitioning,
child: SparkPlan): Option[String] = {
if (outputAttributes.isEmpty) {
// See: https://github.com/apache/incubator-gluten/issues/7600.
return Some("Shuffle with empty output schema is not supported")
}
if (child.output.isEmpty) {
// See: https://github.com/apache/incubator-gluten/issues/7600.
return Some("Shuffle with empty input schema is not supported")
}
doSchemaValidate(child.schema)
}
}

object VeloxValidatorApi {
private def isPrimitiveType(dataType: DataType): Boolean = {
dataType match {
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
Expand All @@ -63,41 +85,26 @@ class VeloxValidatorApi extends ValidatorApi {
}
}

override def doSchemaValidate(schema: DataType): Option[String] = {
def validateSchema(schema: DataType): Option[String] = {
if (isPrimitiveType(schema)) {
return None
}
schema match {
case map: MapType =>
doSchemaValidate(map.keyType).orElse(doSchemaValidate(map.valueType))
validateSchema(map.keyType).orElse(validateSchema(map.valueType))
case struct: StructType =>
struct.foreach {
field =>
val reason = doSchemaValidate(field.dataType)
val reason = validateSchema(field.dataType)
if (reason.isDefined) {
return reason
}
}
None
case array: ArrayType =>
doSchemaValidate(array.elementType)
validateSchema(array.elementType)
case _ =>
Some(s"Schema / data type not supported: $schema")
}
}

override def doColumnarShuffleExchangeExecValidate(
outputAttributes: Seq[Attribute],
outputPartitioning: Partitioning,
child: SparkPlan): Option[String] = {
if (outputAttributes.isEmpty) {
// See: https://github.com/apache/incubator-gluten/issues/7600.
return Some("Shuffle with empty output schema is not supported")
}
if (child.output.isEmpty) {
// See: https://github.com/apache/incubator-gluten/issues/7600.
return Some("Shuffle with empty input schema is not supported")
}
doSchemaValidate(child.schema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def veloxPreferredBatchBytes: Long = getConf(COLUMNAR_VELOX_PREFERRED_BATCH_BYTES)

def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN)

def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)

def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
}

object VeloxConfig extends ConfigRegistry {
Expand Down Expand Up @@ -665,4 +669,16 @@ object VeloxConfig extends ConfigRegistry {
"instance per thread of execution.")
.intConf
.createWithDefault(100)

val ORC_USE_COLUMN_NAMES =
buildConf("spark.gluten.sql.columnar.backend.velox.orcUseColumnNames")
.doc("Maps table field names to file field names using names, not indices for ORC files.")
.booleanConf
.createWithDefault(true)

val PARQUET_USE_COLUMN_NAMES =
buildConf("spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames")
.doc("Maps table field names to file field names using names, not indices for Parquet files.")
.booleanConf
.createWithDefault(true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarShuffleExchangeExec, SparkPlan}
Expand Down Expand Up @@ -270,4 +270,38 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
}
}

testWithMinSparkVersion("fallback with index based schema evolution", "3.3") {
val query = "SELECT c2 FROM test"
Seq("parquet", "orc").foreach {
format =>
Seq("true", "false").foreach {
parquetUseColumnNames =>
Seq("true", "false").foreach {
orcUseColumnNames =>
withSQLConf(
VeloxConfig.PARQUET_USE_COLUMN_NAMES.key -> parquetUseColumnNames,
VeloxConfig.ORC_USE_COLUMN_NAMES.key -> orcUseColumnNames
) {
withTable("test") {
spark
.range(100)
.selectExpr("to_timestamp_ntz(from_unixtime(id % 3)) as c1", "id as c2")
.write
.format(format)
.saveAsTable("test")

runQueryAndCompare(query) {
df =>
val plan = df.queryExecution.executedPlan
val fallback = parquetUseColumnNames == "false" ||
orcUseColumnNames == "false"
assert(collect(plan) { case g: GlutenPlan => g }.isEmpty == fallback)
}
}
}
}
}
}
}
}
Loading