diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala index 7004fa57ec3b..a9b6ec04c2ab 100644 --- a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala @@ -42,7 +42,8 @@ case class IcebergDataWriteFactory( codec: String, partitionSpec: PartitionSpec, sortOrder: SortOrder, - field: IcebergNestedField) + field: IcebergNestedField, + queryId: String) extends ColumnarBatchDataWriterFactory { /** @@ -52,7 +53,7 @@ case class IcebergDataWriteFactory( *

If this method fails (by throwing an exception), the corresponding Spark write task would * fail and get retried until hitting the maximum retry times. */ - override def createWriter(): DataWriter[ColumnarBatch] = { + override def createWriter(partitionId: Int, taskId: Long): DataWriter[ColumnarBatch] = { val fields = partitionSpec .fields() .stream() @@ -63,8 +64,19 @@ case class IcebergDataWriteFactory( .setSpecId(partitionSpec.specId()) .addAllFields(fields) .build() + val epochId = 0 + val operationId = queryId + "-" + epochId val (writerHandle, jniWrapper) = - getJniWrapper(schema, format, directory, codec, specProto, field) + getJniWrapper( + schema, + format, + directory, + codec, + partitionId, + taskId, + operationId, + specProto, + field) IcebergColumnarBatchDataWriter(writerHandle, jniWrapper, format, partitionSpec, sortOrder) } @@ -73,6 +85,9 @@ case class IcebergDataWriteFactory( format: Int, directory: String, codec: String, + partitionId: Int, + taskId: Long, + operationId: String, partitionSpec: IcebergPartitionSpec, field: IcebergNestedField): (Long, IcebergWriteJniWrapper) = { val schema = SparkArrowUtil.toArrowSchema(localSchema, SQLConf.get.sessionLocalTimeZone) @@ -87,6 +102,9 @@ case class IcebergDataWriteFactory( format, directory, codec, + partitionId, + taskId, + operationId, partitionSpec.toByteArray, field.toByteArray) cSchema.close() diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala index 3ac518cf7665..fb1b25856ce0 100644 --- a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala @@ -36,7 +36,8 @@ abstract class AbstractIcebergWriteExec extends IcebergWriteExec { getCodec, getPartitionSpec, IcebergWriteUtil.getSortOrder(write), - nestedField + nestedField, + IcebergWriteUtil.getQueryId(write) ) } } diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java index 54ae50e57813..60bab597862f 100644 --- a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java @@ -31,6 +31,9 @@ public IcebergWriteJniWrapper(Runtime runtime) { public native long init(long cSchema, int format, String directory, String codec, + int partitionId, + long taskId, + String operationId, byte[] partitionSpec, byte[] field); diff --git a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala index 7b91b12b75bc..7139bab4258b 100644 --- a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala +++ b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala @@ -325,4 +325,25 @@ class VeloxIcebergSuite extends IcebergSuite { assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 1) } } + + test("iceberg write file name") { + withTable("iceberg_tbl") { + spark.sql("create table if not exists iceberg_tbl (id int) using iceberg") + spark.sql("insert into iceberg_tbl values 1") + + val filePath = spark + .sql("select * from default.iceberg_tbl.files") + .select("file_path") + .collect() + .apply(0) + .getString(0) + + val fileName = filePath.split('/').last + // Expected format: {partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}.parquet + // Example: 00000-0-query_id-0-00001.parquet + assert( + fileName.matches("\\d{5}-\\d+-.*-\\d{5}\\.parquet"), + s"File name does not match expected format: $fileName") + } + } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 945e245969fc..0698c7242673 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -308,7 +308,7 @@ object VeloxBackendSettings extends BackendSettingsApi { val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw") val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options) if (unSupportedCompressions.contains(compressionCodec)) { - Some("Brotli, lzo, lz4raw and lz4_raw compression codec is unsupported in Velox backend.") + Some(s"$compressionCodec compression codec is unsupported in Velox backend.") } else { None } diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index 81c9870a5260..e5435487aeed 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -33,25 +33,14 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { override protected val fileFormat: String = "parquet" // The parquet compression codec extensions - private val parquetCompressionCodecExtensions = if (isSparkVersionGE("3.5")) { + private val parquetCompressionCodecExtensions = { Map( "none" -> "", "uncompressed" -> "", "snappy" -> ".snappy", "gzip" -> ".gz", "lzo" -> ".lzo", - "lz4" -> ".lz4hadoop", // Specific extension for version 3.5 - "brotli" -> ".br", - "zstd" -> ".zstd" - ) - } else { - Map( - "none" -> "", - "uncompressed" -> "", - "snappy" -> ".snappy", - "gzip" -> ".gz", - "lzo" -> ".lzo", - "lz4" -> ".lz4", + "lz4" -> (if (isSparkVersionGE("3.5")) ".lz4hadoop" else ".lz4"), "brotli" -> ".br", "zstd" -> ".zstd" ) diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 966f2af6af7b..ceaf7497f3fb 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -222,13 +222,16 @@ std::shared_ptr VeloxRuntime::createIcebergWriter( int32_t format, const std::string& outputDirectory, facebook::velox::common::CompressionKind compressionKind, + int32_t partitionId, + int64_t taskId, + const std::string& operationId, std::shared_ptr spec, const gluten::IcebergNestedField& protoField, const std::unordered_map& sparkConfs) { auto veloxPool = memoryManager()->getLeafMemoryPool(); auto connectorPool = memoryManager()->getAggregateMemoryPool(); return std::make_shared( - rowType, format, outputDirectory, compressionKind, spec, protoField, sparkConfs, veloxPool, connectorPool); + rowType, format, outputDirectory, compressionKind, partitionId, taskId, operationId, spec, protoField, sparkConfs, veloxPool, connectorPool); } #endif diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 59eb43028f8b..b39733e83938 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -74,6 +74,9 @@ class VeloxRuntime final : public Runtime { int32_t format, const std::string& outputDirectory, facebook::velox::common::CompressionKind compressionKind, + int32_t partitionId, + int64_t taskId, + const std::string& operationId, std::shared_ptr spec, const gluten::IcebergNestedField& protoField, const std::unordered_map& sparkConfs); diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc b/cpp/velox/compute/iceberg/IcebergWriter.cc index 2e9aa8660500..1d6e7fa344d9 100644 --- a/cpp/velox/compute/iceberg/IcebergWriter.cc +++ b/cpp/velox/compute/iceberg/IcebergWriter.cc @@ -30,6 +30,77 @@ using namespace facebook::velox::connector::hive; using namespace facebook::velox::connector::hive::iceberg; namespace { +// Custom Iceberg file name generator for Gluten +class GlutenIcebergFileNameGenerator : public connector::hive::FileNameGenerator { + public: + GlutenIcebergFileNameGenerator( + int32_t partitionId, + int64_t taskId, + const std::string& operationId, + dwio::common::FileFormat fileFormat) + : partitionId_(partitionId), + taskId_(taskId), + operationId_(operationId), + fileFormat_(fileFormat), + fileCount_(0) {} + + std::pair gen( + std::optional bucketId, + const std::shared_ptr insertTableHandle, + const connector::ConnectorQueryCtx& connectorQueryCtx, + bool commitRequired) const override { + auto targetFileName = insertTableHandle->locationHandle()->targetFileName(); + if (targetFileName.empty()) { + // Generate file name following Iceberg format: + // {partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}{suffix} + fileCount_++; + + std::string fileExtension; + switch (fileFormat_) { + case dwio::common::FileFormat::PARQUET: + fileExtension = ".parquet"; + break; + case dwio::common::FileFormat::ORC: + fileExtension = ".orc"; + break; + default: + fileExtension = ".parquet"; + } + + char buffer[256]; + snprintf( + buffer, + sizeof(buffer), + "%05d-%" PRId64 "-%s-%05d%s", + partitionId_, + taskId_, + operationId_.c_str(), + fileCount_, + fileExtension.c_str()); + targetFileName = std::string(buffer); + } + + return {targetFileName, targetFileName}; + } + + folly::dynamic serialize() const override { + VELOX_UNREACHABLE("Unexpected code path, implement serialize() first."); + } + + std::string toString() const override { + return fmt::format( + "GlutenIcebergFileNameGenerator(partitionId={}, taskId={}, operationId={})", + partitionId_, taskId_, operationId_); + } + + private: + int32_t partitionId_; + int64_t taskId_; + std::string operationId_; + dwio::common::FileFormat fileFormat_; + mutable int32_t fileCount_; +}; + iceberg::IcebergNestedField convertToIcebergNestedField(const gluten::IcebergNestedField& protoField) { IcebergNestedField result; result.id = protoField.id(); @@ -48,6 +119,9 @@ std::shared_ptr createIcebergInsertTableHandle( const std::string& outputDirectoryPath, dwio::common::FileFormat fileFormat, facebook::velox::common::CompressionKind compressionKind, + int32_t partitionId, + int64_t taskId, + const std::string& operationId, std::shared_ptr spec, const iceberg::IcebergNestedField& nestedField, facebook::velox::memory::MemoryPool* pool) { @@ -80,12 +154,17 @@ std::shared_ptr createIcebergInsertTableHandle( nestedField.children[i])); } } + + auto fileNameGenerator = std::make_shared( + partitionId, taskId, operationId, fileFormat); + std::shared_ptr locationHandle = std::make_shared( outputDirectoryPath, outputDirectoryPath, connector::hive::LocationHandle::TableType::kExisting); const std::vector sortedBy; + const std::unordered_map serdeParameters; return std::make_shared( - columnHandles, locationHandle, spec, pool, fileFormat, sortedBy, compressionKind); + columnHandles, locationHandle, spec, pool, fileFormat, sortedBy, compressionKind, serdeParameters, fileNameGenerator); } } // namespace @@ -96,12 +175,15 @@ IcebergWriter::IcebergWriter( int32_t format, const std::string& outputDirectory, facebook::velox::common::CompressionKind compressionKind, + int32_t partitionId, + int64_t taskId, + const std::string& operationId, std::shared_ptr spec, const gluten::IcebergNestedField& field, const std::unordered_map& sparkConfs, std::shared_ptr memoryPool, std::shared_ptr connectorPool) - : rowType_(rowType), field_(convertToIcebergNestedField(field)), pool_(memoryPool), connectorPool_(connectorPool), createTimeNs_(getCurrentTimeNano()) { + : rowType_(rowType), field_(convertToIcebergNestedField(field)), partitionId_(partitionId), taskId_(taskId), operationId_(operationId), pool_(memoryPool), connectorPool_(connectorPool), createTimeNs_(getCurrentTimeNano()) { auto veloxCfg = std::make_shared(std::unordered_map(sparkConfs)); connectorSessionProperties_ = createHiveConnectorSessionConfig(veloxCfg); @@ -123,7 +205,7 @@ IcebergWriter::IcebergWriter( dataSink_ = std::make_unique( rowType_, createIcebergInsertTableHandle( - rowType_, outputDirectory, icebergFormatToVelox(format), compressionKind, spec, field_, pool_.get()), + rowType_, outputDirectory, icebergFormatToVelox(format), compressionKind, partitionId_, taskId_, operationId_, spec, field_, pool_.get()), connectorQueryCtx_.get(), facebook::velox::connector::CommitStrategy::kNoCommit, connectorConfig_); diff --git a/cpp/velox/compute/iceberg/IcebergWriter.h b/cpp/velox/compute/iceberg/IcebergWriter.h index f5871950708a..2fa13dcd698c 100644 --- a/cpp/velox/compute/iceberg/IcebergWriter.h +++ b/cpp/velox/compute/iceberg/IcebergWriter.h @@ -43,6 +43,9 @@ class IcebergWriter { int32_t format, const std::string& outputDirectory, facebook::velox::common::CompressionKind compressionKind, + int32_t partitionId, + int64_t taskId, + const std::string& operationId, std::shared_ptr spec, const gluten::IcebergNestedField& field, const std::unordered_map& sparkConfs, @@ -58,6 +61,9 @@ class IcebergWriter { private: facebook::velox::RowTypePtr rowType_; const facebook::velox::connector::hive::iceberg::IcebergNestedField field_; + int32_t partitionId_; + int64_t taskId_; + std::string operationId_; std::shared_ptr pool_; std::shared_ptr connectorPool_; std::shared_ptr connectorConfig_; diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 5633084c0446..74adb1dff5fc 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -842,6 +842,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_ jint format, jstring directory, jstring codecJstr, + jint partitionId, + jlong taskId, + jstring operationId, jbyteArray partition, jbyteArray fieldBytes) { JNI_METHOD_START @@ -863,6 +866,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_ format, jStringToCString(env, directory), facebook::velox::common::stringToCompressionKind(jStringToCString(env, codecJstr)), + partitionId, + taskId, + jStringToCString(env, operationId), spec, protoField, sparkConf)); diff --git a/cpp/velox/tests/iceberg/IcebergWriteTest.cc b/cpp/velox/tests/iceberg/IcebergWriteTest.cc index 574d7493702c..740fa66f3f3d 100644 --- a/cpp/velox/tests/iceberg/IcebergWriteTest.cc +++ b/cpp/velox/tests/iceberg/IcebergWriteTest.cc @@ -58,6 +58,9 @@ TEST_F(VeloxIcebergWriteTest, write) { 1, tmpPath + "/iceberg_write_test_table", common::CompressionKind::CompressionKind_ZSTD, + 0, // partitionId + 0, // taskId + folly::to(folly::Random::rand64()), // operationId partitionSpec, root, std::unordered_map(), diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala index c7c4c4e672b9..3f7ab278d0b6 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala @@ -57,6 +57,12 @@ object IcebergWriteUtil { field } + private lazy val queryIdField = { + val field = classOf[SparkWrite].getDeclaredField("queryId") + field.setAccessible(true) + field + } + def supportsWrite(write: Write): Boolean = { write.isInstanceOf[SparkWrite] } @@ -98,6 +104,10 @@ object IcebergWriteUtil { fileFormatField.get(write).asInstanceOf[FileFormat] } + def getQueryId(write: Write): String = { + queryIdField.get(write).asInstanceOf[String] + } + def getDirectory(write: Write): String = { val loc = getTable(write).locationProvider().newDataLocation("") loc.substring(0, loc.length - 1) diff --git a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java index 4254c5baff4f..676be92e6a4f 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java @@ -16,6 +16,7 @@ */ package org.apache.gluten.connector.write; +import org.apache.spark.TaskContext; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.connector.write.DataWriter; import org.apache.spark.sql.connector.write.DataWriterFactory; @@ -43,6 +44,12 @@ public interface ColumnarBatchDataWriterFactory extends Serializable { * *

If this method fails (by throwing an exception), the corresponding Spark write task would * fail and get retried until hitting the maximum retry times. + * + * @param partitionId A unique id of the RDD partition that the returned writer will process. + * Usually Spark processes many RDD partitions at the same time, implementations should use + * the partition id to distinguish writers for different partitions. + * @param taskId The task id returned by {@link TaskContext#taskAttemptId()}. Spark may run + * multiple tasks for the same partition (due to speculation or task failures, for example). */ - DataWriter createWriter(); + DataWriter createWriter(int partitionId, long taskId); } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala index ef432aa11986..b0d70499553a 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala @@ -46,7 +46,7 @@ trait WritingColumnarBatchSparkTask[W <: DataWriter[ColumnarBatch]] val partId = context.partitionId() val taskId = context.taskAttemptId() val attemptId = context.attemptNumber() - val dataWriter = factory.createWriter().asInstanceOf[W] + val dataWriter = factory.createWriter(partId, taskId).asInstanceOf[W] var count = 0 // write the data and commit this writer.