Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.{DeltaExcludedTestMixin, DeltaSQLCommandTest}

import org.scalatest.Ignore

// spotless:off
class DeleteSQLSuite extends DeleteSuiteBase
with DeltaExcludedTestMixin
Expand Down Expand Up @@ -95,9 +93,6 @@ class DeleteSQLSuite extends DeleteSuiteBase
}
}

// FIXME: Enable the test.
// Skipping as function input_file_name doesn't get correctly resolved.
@Ignore
class DeleteSQLNameColumnMappingSuite extends DeleteSQLSuite
with DeltaColumnMappingEnableNameMode {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.collection.mutable.ListBuffer

object DeltaPostTransformRules {
def rules: Seq[Rule[SparkPlan]] =
RemoveTransitions :: columnMappingRule :: pushDownInputFileExprRule :: Nil
RemoveTransitions :: pushDownInputFileExprRule :: columnMappingRule :: Nil

private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] =
TreeNodeTag[String]("org.apache.gluten.delta.column.mapping")
Expand Down Expand Up @@ -76,6 +76,16 @@ object DeltaPostTransformRules {
}
}

private def isInputFileRelatedAttribute(attr: Attribute): Boolean = {
attr match {
case AttributeReference(name, _, _, _) =>
Seq(InputFileName(), InputFileBlockStart(), InputFileBlockLength())
.map(_.prettyName)
.contains(name)
case _ => false
}
}

private[gluten] def containsIncrementMetricExpr(expr: Expression): Boolean = {
expr match {
case e if e.prettyName == "increment_metric" => true
Expand Down Expand Up @@ -108,12 +118,14 @@ object DeltaPostTransformRules {
val originColumnNames = ListBuffer.empty[String]
val transformedAttrs = ListBuffer.empty[Attribute]
def mapAttribute(attr: Attribute) = {
val newAttr = if (!plan.isMetadataColumn(attr)) {
val newAttr = if (plan.isMetadataColumn(attr)) {
attr
} else if (isInputFileRelatedAttribute(attr)) {
attr
} else {
DeltaColumnMapping
.createPhysicalAttributes(Seq(attr), fmt.referenceSchema, fmt.columnMappingMode)
.head
} else {
attr
}
if (!originColumnNames.contains(attr.name)) {
transformedAttrs += newAttr
Expand Down