Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ WholeStageResultIterator::WholeStageResultIterator(
nullptr,
true,
deleteFiles,
std::unordered_map<std::string, std::string>(),
metadataColumn,
properties[idx]);
} else {
auto connectorId = kHiveConnectorId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
List<Long> starts,
List<Long> lengths,
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
LocalFilesNode.ReadFileFormat fileFormat,
List<String> preferredLocations,
List<List<DeleteFile>> deleteFilesList) {
Expand All @@ -37,6 +38,7 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
starts,
lengths,
partitionColumns,
metadataColumns,
fileFormat,
preferredLocations,
deleteFilesList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
List<Long> starts,
List<Long> lengths,
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
ReadFileFormat fileFormat,
List<String> preferredLocations,
List<List<DeleteFile>> deleteFilesList) {
Expand All @@ -46,7 +47,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
new ArrayList<>(),
new ArrayList<>(),
partitionColumns,
new ArrayList<>(),
metadataColumns,
fileFormat,
preferredLocations,
new HashMap<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ case class IcebergScanTransformer(
IcebergScanTransformer.supportsBatchScan(scan)
}

override def withNewOutput(newOutput: Seq[AttributeReference]): BatchScanExecTransformerBase = {
this.copy(output = newOutput)
}

override def doValidateInternal(): ValidationResult = {
val validationResult = super.doValidateInternal();
if (!validationResult.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions.{InputFileBlockLength, InputFileBlockStart, InputFileName}
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.types.StructType

Expand All @@ -41,45 +42,7 @@ object GlutenIcebergSourceUtil {
index: Int,
readPartitionSchema: StructType): SplitInfo = inputPartition match {
case partition: SparkInputPartition =>
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]()
val lengths = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]()
val deleteFilesList = new JArrayList[JList[DeleteFile]]()
var fileFormat = ReadFileFormat.UnknownFormat

val tasks = partition.taskGroup[ScanTask]().tasks().asScala
asFileScanTask(tasks.toList).foreach {
task =>
paths.add(
BackendsApiManager.getTransformerApiInstance
.encodeFilePathIfNeed(task.file().path().toString))
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
deleteFilesList.add(task.deletes())
val currentFileFormat = convertFileFormat(task.file().format())
if (fileFormat == ReadFileFormat.UnknownFormat) {
fileFormat = currentFileFormat
} else if (fileFormat != currentFileFormat) {
throw new UnsupportedOperationException(
s"Only one file format is supported, " +
s"find different file format $fileFormat and $currentFileFormat")
}
}
val preferredLoc = SoftAffinity.getFilePartitionLocations(
paths.asScala.toArray,
inputPartition.preferredLocations())
IcebergLocalFilesBuilder.makeIcebergLocalFiles(
index,
paths,
starts,
lengths,
partitionColumns,
fileFormat,
preferredLoc.toList.asJava,
deleteFilesList
)
genSplitInfo(Seq(partition), index, readPartitionSchema)
case _ =>
throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.")
}
Expand All @@ -93,6 +56,7 @@ object GlutenIcebergSourceUtil {
val lengths = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]()
val deleteFilesList = new JArrayList[JList[DeleteFile]]()
val metadataColumns = new JArrayList[JMap[String, String]]
val preferredLocs = new JArrayList[String]()
var fileFormat = ReadFileFormat.UnknownFormat

Expand All @@ -101,11 +65,19 @@ object GlutenIcebergSourceUtil {
val tasks = partition.taskGroup[ScanTask]().tasks().asScala
asFileScanTask(tasks.toList).foreach {
task =>
val path = task.file().location()
val start = task.start()
val length = task.length()
paths.add(
BackendsApiManager.getTransformerApiInstance
.encodeFilePathIfNeed(task.file().path().toString))
starts.add(task.start())
lengths.add(task.length())
.encodeFilePathIfNeed(path))
starts.add(start)
lengths.add(length)
val metadataColumn = new JHashMap[String, String]()
metadataColumn.put(InputFileName().prettyName, path)
metadataColumn.put(InputFileBlockStart().prettyName, start.toString)
metadataColumn.put(InputFileBlockLength().prettyName, length.toString)
metadataColumns.add(metadataColumn)
partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
deleteFilesList.add(task.deletes())
val currentFileFormat = convertFileFormat(task.file().format())
Expand All @@ -125,6 +97,7 @@ object GlutenIcebergSourceUtil {
starts,
lengths,
partitionColumns,
metadataColumns,
fileFormat,
SoftAffinity
.getFilePartitionLocations(paths.asScala.toArray, preferredLocs.asScala.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
package org.apache.gluten.execution

import org.apache.spark.SparkConf
import org.apache.spark.TaskContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.functions.{countDistinct, udf}

import org.scalatest.prop.TableDrivenPropertyChecks._

import java.nio.file.{Files, Paths}

abstract class IcebergSuite extends WholeStageTransformerSuite {
protected val rootPath: String = getClass.getResource("/").getPath
Expand Down Expand Up @@ -664,4 +672,231 @@ abstract class IcebergSuite extends WholeStageTransformerSuite {
assert(result.head.getString(1) == "test_data")
}
}

test("test iceberg input_file_name, input_file_block_start, input_file_block_length") {
// Test both partitioned and non-partitioned tables
val tables =
org.scalatest.prop.TableDrivenPropertyChecks.Table(
("table_name", "is_partitioned"),
("partitioned_table", true),
("non_partitioned_table", false)
)
forAll(tables) {
(tableName: String, isPartitioned: Boolean) =>
withTable(tableName) {
// Create table with or without partitioning
if (isPartitioned) {
spark.sql(s"""
|create table $tableName(id int, name string)
|using iceberg
|partitioned by (p string);
|""".stripMargin)
} else {
spark.sql(s"""
|create table $tableName(id int, name string, p string)
|using iceberg;
|""".stripMargin)
}

// Same insert for both tables
spark.sql(
s"""
|insert into table $tableName values
|(4, 'a5', 'p4'),
|(1, 'a1', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(2, 'a3', 'p2'),
|(1, 'a2', 'p1'),
|(3, 'a4', 'p3'),
|(10, 'a4', 'p3');
|""".stripMargin
)

// Same query and validation for both tables
runQueryAndCompare(s"""
|select input_file_name() as file_name,
|input_file_block_start(),
|input_file_block_length()
|from $tableName
|order by 1, 2, 3;
|""".stripMargin) {
df =>
{
val plan = df.queryExecution.executedPlan
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[IcebergScanTransformer]
}) == 1)
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[ProjectExecTransformer]
}) == 1)
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[ProjectExec]
}) == 0)
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[BatchScanExec]
}) == 0)
foreach(plan) {
case plan: IcebergScanTransformer =>
assert(plan.output.head.name == "input_file_name")
assert(plan.output.apply(1).name == "input_file_block_start")
assert(plan.output.last.name == "input_file_block_length")
case _ => // do nothing
}
val getTaskId = udf(
() => {
val context = TaskContext.get()
assert(context != null)
context.taskAttemptId()
})
val dfWithTaskInfo = df.withColumn("task_id", getTaskId())
val filesPerTask = dfWithTaskInfo
.groupBy("task_id")
.agg(
countDistinct("file_name").alias("distinct_file_count")
)
.collect()
assert(filesPerTask.length == 1)
if (isPartitioned) {
assert(filesPerTask.head.getAs[Long]("distinct_file_count") > 1)
} else {
assert(filesPerTask.head.getAs[Long]("distinct_file_count") == 1)
}
}
}
}
}
}

test("test iceberg input file name with non-ascii characters") {
// Test both partitioned and non-partitioned tables
val tables =
org.scalatest.prop.TableDrivenPropertyChecks.Table(
("table_name", "is_partitioned"),
("partitioned_table", true),
("non_partitioned_table", false)
)
forAll(tables) {
(tableName: String, isPartitioned: Boolean) =>
withTable(tableName) {
// Create table with or without partitioning
if (isPartitioned) {
spark.sql(s"""
|create table $tableName(id int, name string)
|using iceberg
|partitioned by (p string);
|""".stripMargin)
} else {
spark.sql(s"""
|create table $tableName(id int, name string, p string)
|using iceberg;
|""".stripMargin)
}

// Same insert for both tables
spark.sql(
s"""
|insert into table $tableName values
|(4, 'a5', 'p4'),
|(1, 'a1', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(1, 'a2', 'p1'),
|(2, 'a3', 'p2'),
|(1, 'a2', 'p1'),
|(3, 'a4', 'p3'),
|(10, 'a4', 'p3');
|""".stripMargin
)

// Verify input_file_name() returns non-ascii file names
val newTableName = s"${tableName}_with_non_ascii"
withTable(newTableName) {
if (isPartitioned) {
spark.sql(s"""
CREATE TABLE $newTableName (id int, name string)
USING iceberg
PARTITIONED BY (p string)
""")
} else {
spark.sql(s"""
CREATE TABLE $newTableName (id int, name string, p string)
USING iceberg
""")
}

// Get a parent directory of partition directories
val filePaths = spark
.sql(s"""
|select distinct input_file_name()
|from $tableName
|""".stripMargin)
.collect()
.map(_.getString(0))
// this is a data directory for non-partitioned table
val partitionDir = new java.io.File(filePaths.head).getParent
val parentOfPartitionDirs = new java.io.File(partitionDir).getParent
val nonAsciiPrefix = "논아스키코드" // scalastyle:ignore nonascii
// Process each file path, copy it, and add to the new table
filePaths.foreach {
filePath =>
// Convert file:/ URI to actual path
val path = Paths.get(new java.net.URI(filePath))
val directory = path.getParent
val originalFileName = path.getFileName.toString

// Create new filename with non-ascii prefix
val newFileName = nonAsciiPrefix + originalFileName
val newPath = directory.resolve(newFileName)

// Copy the file
Files.copy(path, newPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING)
}

spark.sql(s"""
CALL spark_catalog.system.add_files(
table => '$newTableName',
source_table => '`parquet`.`${if (isPartitioned) parentOfPartitionDirs
else partitionDir}`'
)
""")

val fileNamesFromNewTable = spark
.sql(s"""
SELECT DISTINCT input_file_name() as file_name
FROM $newTableName
""")
.collect()
if (isPartitioned) {
assert(fileNamesFromNewTable.length == 8)
} else {
assert(fileNamesFromNewTable.length == 2)
}
val (nonAsciiFiles, asciiFiles) = fileNamesFromNewTable.partition(
_.getString(0)
.contains(nonAsciiPrefix))
assert(nonAsciiFiles.length == asciiFiles.length)
}
}
}
}
}
Loading
Loading