Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ case class IcebergDataWriteFactory(
codec: String,
partitionSpec: PartitionSpec,
sortOrder: SortOrder,
field: IcebergNestedField)
field: IcebergNestedField,
queryId: String)
extends ColumnarBatchDataWriterFactory {

/**
Expand All @@ -52,7 +53,7 @@ case class IcebergDataWriteFactory(
* <p> If this method fails (by throwing an exception), the corresponding Spark write task would
* fail and get retried until hitting the maximum retry times.
*/
override def createWriter(): DataWriter[ColumnarBatch] = {
override def createWriter(partitionId: Int, taskId: Long): DataWriter[ColumnarBatch] = {
val fields = partitionSpec
.fields()
.stream()
Expand All @@ -63,8 +64,19 @@ case class IcebergDataWriteFactory(
.setSpecId(partitionSpec.specId())
.addAllFields(fields)
.build()
val epochId = 0
val operationId = queryId + "-" + epochId
val (writerHandle, jniWrapper) =
getJniWrapper(schema, format, directory, codec, specProto, field)
getJniWrapper(
schema,
format,
directory,
codec,
partitionId,
taskId,
operationId,
specProto,
field)
IcebergColumnarBatchDataWriter(writerHandle, jniWrapper, format, partitionSpec, sortOrder)
}

Expand All @@ -73,6 +85,9 @@ case class IcebergDataWriteFactory(
format: Int,
directory: String,
codec: String,
partitionId: Int,
taskId: Long,
operationId: String,
partitionSpec: IcebergPartitionSpec,
field: IcebergNestedField): (Long, IcebergWriteJniWrapper) = {
val schema = SparkArrowUtil.toArrowSchema(localSchema, SQLConf.get.sessionLocalTimeZone)
Expand All @@ -87,6 +102,9 @@ case class IcebergDataWriteFactory(
format,
directory,
codec,
partitionId,
taskId,
operationId,
partitionSpec.toByteArray,
field.toByteArray)
cSchema.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ abstract class AbstractIcebergWriteExec extends IcebergWriteExec {
getCodec,
getPartitionSpec,
IcebergWriteUtil.getSortOrder(write),
nestedField
nestedField,
IcebergWriteUtil.getQueryId(write)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public IcebergWriteJniWrapper(Runtime runtime) {
public native long init(long cSchema, int format,
String directory,
String codec,
int partitionId,
long taskId,
String operationId,
byte[] partitionSpec,
byte[] field);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,25 @@ class VeloxIcebergSuite extends IcebergSuite {
assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 1)
}
}

test("iceberg write file name") {
withTable("iceberg_tbl") {
spark.sql("create table if not exists iceberg_tbl (id int) using iceberg")
spark.sql("insert into iceberg_tbl values 1")

val filePath = spark
.sql("select * from default.iceberg_tbl.files")
.select("file_path")
.collect()
.apply(0)
.getString(0)

val fileName = filePath.split('/').last
// Expected format: {partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}.parquet
// Example: 00000-0-query_id-0-00001.parquet
assert(
fileName.matches("\\d{5}-\\d+-.*-\\d{5}\\.parquet"),
s"File name does not match expected format: $fileName")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options)
if (unSupportedCompressions.contains(compressionCodec)) {
Some("Brotli, lzo, lz4raw and lz4_raw compression codec is unsupported in Velox backend.")
Some(s"$compressionCodec compression codec is unsupported in Velox backend.")
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,14 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
override protected val fileFormat: String = "parquet"

// The parquet compression codec extensions
private val parquetCompressionCodecExtensions = if (isSparkVersionGE("3.5")) {
private val parquetCompressionCodecExtensions = {
Map(
"none" -> "",
"uncompressed" -> "",
"snappy" -> ".snappy",
"gzip" -> ".gz",
"lzo" -> ".lzo",
"lz4" -> ".lz4hadoop", // Specific extension for version 3.5
"brotli" -> ".br",
"zstd" -> ".zstd"
)
} else {
Map(
"none" -> "",
"uncompressed" -> "",
"snappy" -> ".snappy",
"gzip" -> ".gz",
"lzo" -> ".lzo",
"lz4" -> ".lz4",
"lz4" -> (if (isSparkVersionGE("3.5")) ".lz4hadoop" else ".lz4"),
"brotli" -> ".br",
"zstd" -> ".zstd"
)
Expand Down
5 changes: 4 additions & 1 deletion cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,16 @@ std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
int32_t partitionId,
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const std::unordered_map<std::string, std::string>& sparkConfs) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
auto connectorPool = memoryManager()->getAggregateMemoryPool();
return std::make_shared<IcebergWriter>(
rowType, format, outputDirectory, compressionKind, spec, protoField, sparkConfs, veloxPool, connectorPool);
rowType, format, outputDirectory, compressionKind, partitionId, taskId, operationId, spec, protoField, sparkConfs, veloxPool, connectorPool);
}
#endif

Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class VeloxRuntime final : public Runtime {
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
int32_t partitionId,
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const std::unordered_map<std::string, std::string>& sparkConfs);
Expand Down
88 changes: 85 additions & 3 deletions cpp/velox/compute/iceberg/IcebergWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,77 @@ using namespace facebook::velox::connector::hive;
using namespace facebook::velox::connector::hive::iceberg;
namespace {

// Custom Iceberg file name generator for Gluten
class GlutenIcebergFileNameGenerator : public connector::hive::FileNameGenerator {
public:
GlutenIcebergFileNameGenerator(
int32_t partitionId,
int64_t taskId,
const std::string& operationId,
dwio::common::FileFormat fileFormat)
: partitionId_(partitionId),
taskId_(taskId),
operationId_(operationId),
fileFormat_(fileFormat),
fileCount_(0) {}

std::pair<std::string, std::string> gen(
std::optional<uint32_t> bucketId,
const std::shared_ptr<const connector::hive::HiveInsertTableHandle> insertTableHandle,
const connector::ConnectorQueryCtx& connectorQueryCtx,
bool commitRequired) const override {
auto targetFileName = insertTableHandle->locationHandle()->targetFileName();
if (targetFileName.empty()) {
// Generate file name following Iceberg format:
// {partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}{suffix}
fileCount_++;

std::string fileExtension;
switch (fileFormat_) {
case dwio::common::FileFormat::PARQUET:
fileExtension = ".parquet";
break;
case dwio::common::FileFormat::ORC:
fileExtension = ".orc";
break;
default:
fileExtension = ".parquet";
}

char buffer[256];
snprintf(
buffer,
sizeof(buffer),
"%05d-%" PRId64 "-%s-%05d%s",
partitionId_,
taskId_,
operationId_.c_str(),
fileCount_,
fileExtension.c_str());
targetFileName = std::string(buffer);
}

return {targetFileName, targetFileName};
}

folly::dynamic serialize() const override {
VELOX_UNREACHABLE("Unexpected code path, implement serialize() first.");
}

std::string toString() const override {
return fmt::format(
"GlutenIcebergFileNameGenerator(partitionId={}, taskId={}, operationId={})",
partitionId_, taskId_, operationId_);
}

private:
int32_t partitionId_;
int64_t taskId_;
std::string operationId_;
dwio::common::FileFormat fileFormat_;
mutable int32_t fileCount_;
};

iceberg::IcebergNestedField convertToIcebergNestedField(const gluten::IcebergNestedField& protoField) {
IcebergNestedField result;
result.id = protoField.id();
Expand All @@ -48,6 +119,9 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
const std::string& outputDirectoryPath,
dwio::common::FileFormat fileFormat,
facebook::velox::common::CompressionKind compressionKind,
int32_t partitionId,
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const IcebergPartitionSpec> spec,
const iceberg::IcebergNestedField& nestedField,
facebook::velox::memory::MemoryPool* pool) {
Expand Down Expand Up @@ -80,12 +154,17 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
nestedField.children[i]));
}
}

auto fileNameGenerator = std::make_shared<const GlutenIcebergFileNameGenerator>(
partitionId, taskId, operationId, fileFormat);

std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
std::make_shared<connector::hive::LocationHandle>(
outputDirectoryPath, outputDirectoryPath, connector::hive::LocationHandle::TableType::kExisting);
const std::vector<IcebergSortingColumn> sortedBy;
const std::unordered_map<std::string, std::string> serdeParameters;
return std::make_shared<connector::hive::iceberg::IcebergInsertTableHandle>(
columnHandles, locationHandle, spec, pool, fileFormat, sortedBy, compressionKind);
columnHandles, locationHandle, spec, pool, fileFormat, sortedBy, compressionKind, serdeParameters, fileNameGenerator);
}

} // namespace
Expand All @@ -96,12 +175,15 @@ IcebergWriter::IcebergWriter(
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
int32_t partitionId,
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& field,
const std::unordered_map<std::string, std::string>& sparkConfs,
std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool)
: rowType_(rowType), field_(convertToIcebergNestedField(field)), pool_(memoryPool), connectorPool_(connectorPool), createTimeNs_(getCurrentTimeNano()) {
: rowType_(rowType), field_(convertToIcebergNestedField(field)), partitionId_(partitionId), taskId_(taskId), operationId_(operationId), pool_(memoryPool), connectorPool_(connectorPool), createTimeNs_(getCurrentTimeNano()) {
auto veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(sparkConfs));
connectorSessionProperties_ = createHiveConnectorSessionConfig(veloxCfg);
Expand All @@ -123,7 +205,7 @@ IcebergWriter::IcebergWriter(
dataSink_ = std::make_unique<IcebergDataSink>(
rowType_,
createIcebergInsertTableHandle(
rowType_, outputDirectory, icebergFormatToVelox(format), compressionKind, spec, field_, pool_.get()),
rowType_, outputDirectory, icebergFormatToVelox(format), compressionKind, partitionId_, taskId_, operationId_, spec, field_, pool_.get()),
connectorQueryCtx_.get(),
facebook::velox::connector::CommitStrategy::kNoCommit,
connectorConfig_);
Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/compute/iceberg/IcebergWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class IcebergWriter {
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
int32_t partitionId,
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& field,
const std::unordered_map<std::string, std::string>& sparkConfs,
Expand All @@ -58,6 +61,9 @@ class IcebergWriter {
private:
facebook::velox::RowTypePtr rowType_;
const facebook::velox::connector::hive::iceberg::IcebergNestedField field_;
int32_t partitionId_;
int64_t taskId_;
std::string operationId_;
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool_;
std::shared_ptr<facebook::velox::connector::hive::HiveConfig> connectorConfig_;
Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
jint format,
jstring directory,
jstring codecJstr,
jint partitionId,
jlong taskId,
jstring operationId,
jbyteArray partition,
jbyteArray fieldBytes) {
JNI_METHOD_START
Expand All @@ -863,6 +866,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
format,
jStringToCString(env, directory),
facebook::velox::common::stringToCompressionKind(jStringToCString(env, codecJstr)),
partitionId,
taskId,
jStringToCString(env, operationId),
spec,
protoField,
sparkConf));
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/tests/iceberg/IcebergWriteTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ TEST_F(VeloxIcebergWriteTest, write) {
1,
tmpPath + "/iceberg_write_test_table",
common::CompressionKind::CompressionKind_ZSTD,
0, // partitionId
0, // taskId
folly::to<std::string>(folly::Random::rand64()), // operationId
partitionSpec,
root,
std::unordered_map<std::string, std::string>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ object IcebergWriteUtil {
field
}

private lazy val queryIdField = {
val field = classOf[SparkWrite].getDeclaredField("queryId")
field.setAccessible(true)
field
}

def supportsWrite(write: Write): Boolean = {
write.isInstanceOf[SparkWrite]
}
Expand Down Expand Up @@ -98,6 +104,10 @@ object IcebergWriteUtil {
fileFormatField.get(write).asInstanceOf[FileFormat]
}

def getQueryId(write: Write): String = {
queryIdField.get(write).asInstanceOf[String]
}

def getDirectory(write: Write): String = {
val loc = getTable(write).locationProvider().newDataLocation("")
loc.substring(0, loc.length - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.connector.write;

import org.apache.spark.TaskContext;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
Expand Down Expand Up @@ -43,6 +44,12 @@ public interface ColumnarBatchDataWriterFactory extends Serializable {
*
* <p>If this method fails (by throwing an exception), the corresponding Spark write task would
* fail and get retried until hitting the maximum retry times.
*
* @param partitionId A unique id of the RDD partition that the returned writer will process.
* Usually Spark processes many RDD partitions at the same time, implementations should use
* the partition id to distinguish writers for different partitions.
* @param taskId The task id returned by {@link TaskContext#taskAttemptId()}. Spark may run
* multiple tasks for the same partition (due to speculation or task failures, for example).
*/
DataWriter<ColumnarBatch> createWriter();
DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId);
}
Loading