[VL] Make the filename written by Iceberg Native consistent with in Java#11435
[VL] Make the filename written by Iceberg Native consistent with in Java#11435jinchengchenghh merged 2 commits intoapache:mainfrom
Conversation
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
482382c to
4010907
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
|
CC @jinchengchenghh for a look, thanks |
jinchengchenghh
left a comment
There was a problem hiding this comment.
Thanks for your enhancement!
|
The CH CI failure is not related to your PR, let us wait for the community feedback, I have raised this issue. |
|
Run Gluten Clickhouse CI on x86 |
|
Please rebase to fix the CH CI |
8109e01 to
392d44c
Compare
|
Run Gluten Clickhouse CI on x86 |
| val taskId = context.taskAttemptId() | ||
| val attemptId = context.attemptNumber() | ||
| val dataWriter = factory.createWriter().asInstanceOf[W] | ||
| val dataWriter = factory.createWriter(partId, taskId).asInstanceOf[W] |
There was a problem hiding this comment.
https://github.com/apache/incubator-gluten/blob/main/cpp/velox/compute/VeloxRuntime.h#L45C25-L45C38 we can get these two variables in native side by VeloxRuntime
There was a problem hiding this comment.
I tried obtain them here
std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
RowTypePtr rowType,
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
const std::string& operationId,
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const std::unordered_map<std::string, std::string>& sparkConfs) {
GLUTEN_CHECK(taskInfo_.has_value(), "Task info must be set before creating IcebergWriter");
auto veloxPool = memoryManager()->getLeafMemoryPool();
auto connectorPool = memoryManager()->getAggregateMemoryPool();
return std::make_shared<IcebergWriter>(
rowType, format, outputDirectory, compressionKind, taskInfo_->partitionId, taskInfo_->taskId, operationId, spec, protoField, sparkConfs, veloxPool, connectorPool);
}but got this error, perhaps the timing of setting taskInfo is inconsistent with Iceberg's write creation.
22:18:47.228 ERROR org.apache.spark.task.TaskResources: Task 0 failed by error:
org.apache.gluten.exception.GlutenException: Task info must be set before creating IcebergWriter
at org.apache.gluten.execution.IcebergWriteJniWrapper.init(Native Method)
at org.apache.gluten.connector.write.IcebergDataWriteFactory.getJniWrapper(IcebergDataWriteFactory.scala:103)
at org.apache.gluten.connector.write.IcebergDataWriteFactory.createWriter(IcebergDataWriteFactory.scala:77)
at org.apache.spark.sql.datasources.v2.WritingColumnarBatchSparkTask.run(ColumnarWriteToDataSourceV2Exec.scala:49)
at org.apache.spark.sql.datasources.v2.WritingColumnarBatchSparkTask.run$(ColumnarWriteToDataSourceV2Exec.scala:39)
at org.apache.spark.sql.datasources.v2.DataWritingColumnarBatchSparkTask$.run(ColumnarWriteToDataSourceV2Exec.scala:93)
Actually, I think maybe my implementation is fine too, especially the adjustment to the ColumnarBatchDataWriterFactory interface—it now fully aligns with Spark's DataWriterFactory.
Gluten's ColumnarBatchDataWriterFactory
public interface ColumnarBatchDataWriterFactory extends Serializable {
DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId);
}Spark's DataWriterFactory
public interface DataWriterFactory extends Serializable {
DataWriter<InternalRow> createWriter(int partitionId, long taskId);
}There was a problem hiding this comment.
@jinchengchenghh Can you have a look again, thanks
What changes are proposed in this pull request?
Follow the format of filename written by iceberg java
How was this patch tested?