From f85f35837c53779d279c2ab05734f425ec1b8ffa Mon Sep 17 00:00:00 2001 From: Becker Ewing Date: Mon, 26 Jan 2026 23:07:35 -0500 Subject: [PATCH] Set data file sort_order_id in manifest for writes from Spark - Successor to https://github.com/apache/iceberg/pull/14683 & https://github.com/apache/iceberg/pull/13636 --- .../apache/iceberg/util/SortOrderUtil.java | 17 +++ .../iceberg/util/TestSortOrderUtil.java | 64 ++++++++++ .../apache/iceberg/spark/SparkWriteConf.java | 32 +++++ .../iceberg/spark/SparkWriteOptions.java | 1 + .../iceberg/spark/SparkWriteRequirements.java | 31 ++++- .../apache/iceberg/spark/SparkWriteUtil.java | 111 +++++++++++++----- .../SparkShufflingFileRewriteRunner.java | 16 +++ .../spark/source/SparkPositionDeltaWrite.java | 9 +- .../iceberg/spark/source/SparkWrite.java | 9 +- .../spark/source/SparkWriteBuilder.java | 4 +- .../iceberg/spark/TestSparkWriteConf.java | 22 ++++ .../actions/TestRewriteDataFilesAction.java | 62 ++++++++++ .../spark/source/TestSparkDataWrite.java | 51 ++++++++ 13 files changed, 392 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java b/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java index 37e0c1fffab0..4d7a631ab559 100644 --- a/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java @@ -46,6 +46,23 @@ public static SortOrder buildSortOrder(Table table, SortOrder sortOrder) { return buildSortOrder(table.schema(), table.spec(), sortOrder); } + /** + * Attempts to match a user-supplied {@link SortOrder} with an equivalent sort order from a {@link + * Table}. + * + * @param table the table to try and match the sort order against + * @param userSuppliedSortOrder the user supplied sort order to try and match with a table sort + * order + * @return the matching {@link SortOrder} from the table (with the orderId set) or {@link + * SortOrder#unsorted()} if no match is found. + */ + public static SortOrder maybeFindTableSortOrder(Table table, SortOrder userSuppliedSortOrder) { + return table.sortOrders().values().stream() + .filter(sortOrder -> sortOrder.sameOrder(userSuppliedSortOrder)) + .findFirst() + .orElseGet(SortOrder::unsorted); + } + /** * Build a final sort order that satisfies the clustering required by the partition spec. * diff --git a/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java b/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java index 02c81de93222..3757b70dd334 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestSortOrderUtil.java @@ -287,4 +287,68 @@ public void testSortOrderClusteringWithRedundantPartitionFieldsMissing() { .as("Should add spec fields as prefix") .isEqualTo(expected); } + + @Test + public void testFindSortOrderForTable() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2); + + SortOrder tableSortOrder = table.sortOrder(); + + SortOrder actualOrder = SortOrderUtil.maybeFindTableSortOrder(table, tableSortOrder); + + assertThat(actualOrder).as("Should find current table sort order").isEqualTo(table.sortOrder()); + } + + @Test + public void testFindSortOrderForTableWithoutFieldId() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2); + + SortOrder userSuppliedOrder = + SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build(); + + SortOrder actualOrder = SortOrderUtil.maybeFindTableSortOrder(table, userSuppliedOrder); + + assertThat(actualOrder).as("Should find current table sort order").isEqualTo(table.sortOrder()); + } + + @Test + public void testFindSortOrderForTableThatIsNotCurrentOrder() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2); + + table.replaceSortOrder().asc("data").desc("ts").commit(); + + SortOrder userSuppliedOrder = + SortOrder.builderFor(table.schema()).asc("id", NULLS_LAST).build(); + + SortOrder actualOrder = SortOrderUtil.maybeFindTableSortOrder(table, userSuppliedOrder); + + assertThat(actualOrder) + .as("Should find first sorted table sort order") + .isEqualTo(table.sortOrders().get(1)); + } + + @Test + public void testReturnsEmptyForFindingNonMatchingSortOrder() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + SortOrder order = SortOrder.builderFor(SCHEMA).withOrderId(1).asc("id", NULLS_LAST).build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, spec, order, 2); + + table.replaceSortOrder().asc("data").desc("ts").commit(); + + SortOrder userSuppliedOrder = + SortOrder.builderFor(table.schema()).desc("id", NULLS_LAST).build(); + + SortOrder actualOrder = SortOrderUtil.maybeFindTableSortOrder(table, userSuppliedOrder); + + assertThat(actualOrder) + .as( + "Should return unsorted order if user supplied order does not match any table sort order") + .isEqualTo(SortOrder.unsorted()); + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 96131e0e56dd..f85fb0dfb9ff 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -42,6 +42,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.IsolationLevel; import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableUtil; @@ -164,6 +165,22 @@ public int outputSpecId() { return outputSpecId; } + public SortOrder outputSortOrder() { + int outputSortOrderId = + confParser + .intConf() + .option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID) + .defaultValue(SortOrder.unsorted().orderId()) + .parse(); + + Preconditions.checkArgument( + table.sortOrders().containsKey(outputSortOrderId), + "Output sort order id %s is not a valid sort order id for table", + outputSortOrderId); + + return table.sortOrders().get(outputSortOrderId); + } + public FileFormat dataFileFormat() { String valueAsString = confParser @@ -284,6 +301,21 @@ public SparkWriteRequirements writeRequirements() { table, distributionMode(), fanoutWriterEnabled(), dataAdvisoryPartitionSize()); } + public SparkWriteRequirements rewriteFilesWriteRequirements() { + Preconditions.checkNotNull( + rewrittenFileSetId(), "Can only use rewrite files write requirements during rewrite job!"); + + SortOrder outputSortOrder = outputSortOrder(); + if (outputSortOrder.isSorted()) { + LOG.info( + "Found explicit sort order {} set in job configuration. Going to apply that to the sort-order-id of the rewritten files", + Spark3Util.describe(outputSortOrder)); + return writeRequirements().withTableSortOrder(outputSortOrder); + } + + return writeRequirements(); + } + @VisibleForTesting DistributionMode distributionMode() { String modeName = diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 33db70bae587..1be02feaf0c0 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -54,6 +54,7 @@ private SparkWriteOptions() {} public static final String REWRITTEN_FILE_SCAN_TASK_SET_ID = "rewritten-file-scan-task-set-id"; public static final String OUTPUT_SPEC_ID = "output-spec-id"; + public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id"; public static final String OVERWRITE_MODE = "overwrite-mode"; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteRequirements.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteRequirements.java index 833e0e44e391..dd4bc863912f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteRequirements.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteRequirements.java @@ -26,18 +26,32 @@ /** A set of requirements such as distribution and ordering reported to Spark during writes. */ public class SparkWriteRequirements { + public static final long NO_ADVISORY_PARTITION_SIZE = 0; public static final SparkWriteRequirements EMPTY = - new SparkWriteRequirements(Distributions.unspecified(), new SortOrder[0], 0); + new SparkWriteRequirements( + Distributions.unspecified(), + new SortOrder[0], + org.apache.iceberg.SortOrder.unsorted(), + NO_ADVISORY_PARTITION_SIZE); private final Distribution distribution; private final SortOrder[] ordering; + private final org.apache.iceberg.SortOrder icebergOrdering; private final long advisoryPartitionSize; SparkWriteRequirements( - Distribution distribution, SortOrder[] ordering, long advisoryPartitionSize) { + Distribution distribution, + SortOrder[] ordering, + org.apache.iceberg.SortOrder icebergOrdering, + long advisoryPartitionSize) { this.distribution = distribution; this.ordering = ordering; - this.advisoryPartitionSize = advisoryPartitionSize; + this.icebergOrdering = icebergOrdering; + // Spark prohibits requesting a particular advisory partition size without distribution + this.advisoryPartitionSize = + distribution instanceof UnspecifiedDistribution + ? NO_ADVISORY_PARTITION_SIZE + : advisoryPartitionSize; } public Distribution distribution() { @@ -48,12 +62,19 @@ public SortOrder[] ordering() { return ordering; } + public org.apache.iceberg.SortOrder icebergOrdering() { + return icebergOrdering; + } + public boolean hasOrdering() { return ordering.length != 0; } public long advisoryPartitionSize() { - // Spark prohibits requesting a particular advisory partition size without distribution - return distribution instanceof UnspecifiedDistribution ? 0 : advisoryPartitionSize; + return advisoryPartitionSize; + } + + public SparkWriteRequirements withTableSortOrder(org.apache.iceberg.SortOrder sortOrder) { + return new SparkWriteRequirements(distribution, ordering, sortOrder, advisoryPartitionSize); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java index 0d68a0d8cdd0..535674aba977 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java @@ -55,13 +55,23 @@ public class SparkWriteUtil { private static final Expression[] PARTITION_FILE_CLUSTERING = clusterBy(SPEC_ID, PARTITION, FILE_PATH); - private static final SortOrder[] EMPTY_ORDERING = new SortOrder[0]; - private static final SortOrder[] EXISTING_ROW_ORDERING = orderBy(FILE_PATH, ROW_POSITION); - private static final SortOrder[] PARTITION_ORDERING = orderBy(SPEC_ID, PARTITION); - private static final SortOrder[] PARTITION_FILE_ORDERING = orderBy(SPEC_ID, PARTITION, FILE_PATH); - private static final SortOrder[] POSITION_DELETE_ORDERING = + private static final SortOrder[] EMPTY_SPARK_ORDERING = new SortOrder[0]; + private static final SortOrder[] EXISTING_ROW_SPARK_ORDERING = orderBy(FILE_PATH, ROW_POSITION); + private static final SortOrder[] PARTITION_SPARK_ORDERING = orderBy(SPEC_ID, PARTITION); + private static final SortOrder[] PARTITION_FILE_SPARK_ORDERING = + orderBy(SPEC_ID, PARTITION, FILE_PATH); + private static final SortOrder[] POSITION_DELETE_SPARK_ORDERING = orderBy(SPEC_ID, PARTITION, FILE_PATH, ROW_POSITION); + private static final SparkAndIcebergOrdering EXISTING_ROW_ORDERING = + SparkAndIcebergOrdering.unsorted().prependOrder(EXISTING_ROW_SPARK_ORDERING); + private static final SparkAndIcebergOrdering PARTITION_ORDERING = + SparkAndIcebergOrdering.unsorted().prependOrder(PARTITION_SPARK_ORDERING); + private static final SparkAndIcebergOrdering PARTITION_FILE_ORDERING = + SparkAndIcebergOrdering.unsorted().prependOrder(PARTITION_FILE_SPARK_ORDERING); + private static final SparkAndIcebergOrdering POSITION_DELETE_ORDERING = + SparkAndIcebergOrdering.unsorted().prependOrder(POSITION_DELETE_SPARK_ORDERING); + private SparkWriteUtil() {} /** Builds requirements for batch and micro-batch writes such as append or overwrite. */ @@ -69,8 +79,9 @@ public static SparkWriteRequirements writeRequirements( Table table, DistributionMode mode, boolean fanoutEnabled, long advisoryPartitionSize) { Distribution distribution = writeDistribution(table, mode); - SortOrder[] ordering = writeOrdering(table, fanoutEnabled); - return new SparkWriteRequirements(distribution, ordering, advisoryPartitionSize); + SparkAndIcebergOrdering ordering = writeOrdering(table, fanoutEnabled); + return new SparkWriteRequirements( + distribution, ordering.sparkOrder(), ordering.icebergOrder(), advisoryPartitionSize); } private static Distribution writeDistribution(Table table, DistributionMode mode) { @@ -82,7 +93,7 @@ private static Distribution writeDistribution(Table table, DistributionMode mode return Distributions.clustered(clustering(table)); case RANGE: - return Distributions.ordered(ordering(table)); + return Distributions.ordered(ordering(table).sparkOrder()); default: throw new IllegalArgumentException("Unsupported distribution mode: " + mode); @@ -99,8 +110,9 @@ public static SparkWriteRequirements copyOnWriteRequirements( if (command == DELETE || command == UPDATE) { Distribution distribution = copyOnWriteDeleteUpdateDistribution(table, mode); - SortOrder[] ordering = writeOrdering(table, fanoutEnabled); - return new SparkWriteRequirements(distribution, ordering, advisoryPartitionSize); + SparkAndIcebergOrdering ordering = writeOrdering(table, fanoutEnabled); + return new SparkWriteRequirements( + distribution, ordering.sparkOrder(), ordering.icebergOrder(), advisoryPartitionSize); } else { return writeRequirements(table, mode, fanoutEnabled, advisoryPartitionSize); } @@ -122,9 +134,9 @@ private static Distribution copyOnWriteDeleteUpdateDistribution( case RANGE: if (table.spec().isPartitioned() || table.sortOrder().isSorted()) { - return Distributions.ordered(ordering(table)); + return Distributions.ordered(ordering(table).sparkOrder()); } else { - return Distributions.ordered(EXISTING_ROW_ORDERING); + return Distributions.ordered(EXISTING_ROW_ORDERING.sparkOrder()); } default: @@ -142,12 +154,15 @@ public static SparkWriteRequirements positionDeltaRequirements( if (command == UPDATE || command == MERGE) { Distribution distribution = positionDeltaUpdateMergeDistribution(table, mode); - SortOrder[] ordering = positionDeltaUpdateMergeOrdering(table, fanoutEnabled); - return new SparkWriteRequirements(distribution, ordering, advisoryPartitionSize); + SparkAndIcebergOrdering ordering = positionDeltaUpdateMergeOrdering(table, fanoutEnabled); + return new SparkWriteRequirements( + distribution, ordering.sparkOrder(), ordering.icebergOrder(), advisoryPartitionSize); } else { Distribution distribution = positionDeltaDeleteDistribution(table, mode); - SortOrder[] ordering = fanoutEnabled ? EMPTY_ORDERING : POSITION_DELETE_ORDERING; - return new SparkWriteRequirements(distribution, ordering, advisoryPartitionSize); + SparkAndIcebergOrdering ordering = + fanoutEnabled ? SparkAndIcebergOrdering.unsorted() : POSITION_DELETE_ORDERING; + return new SparkWriteRequirements( + distribution, ordering.sparkOrder(), ordering.icebergOrder(), advisoryPartitionSize); } } @@ -167,9 +182,15 @@ private static Distribution positionDeltaUpdateMergeDistribution( case RANGE: if (table.spec().isUnpartitioned()) { - return Distributions.ordered(concat(PARTITION_FILE_ORDERING, ordering(table))); + return Distributions.ordered( + SparkAndIcebergOrdering.forTable(table) + .prependOrder(PARTITION_FILE_SPARK_ORDERING) + .sparkOrder()); } else { - return Distributions.ordered(concat(PARTITION_ORDERING, ordering(table))); + return Distributions.ordered( + SparkAndIcebergOrdering.forTable(table) + .prependOrder(PARTITION_SPARK_ORDERING) + .sparkOrder()); } default: @@ -177,11 +198,12 @@ private static Distribution positionDeltaUpdateMergeDistribution( } } - private static SortOrder[] positionDeltaUpdateMergeOrdering(Table table, boolean fanoutEnabled) { + private static SparkAndIcebergOrdering positionDeltaUpdateMergeOrdering( + Table table, boolean fanoutEnabled) { if (fanoutEnabled && table.sortOrder().isUnsorted()) { - return EMPTY_ORDERING; + return SparkAndIcebergOrdering.unsorted(); } else { - return concat(POSITION_DELETE_ORDERING, ordering(table)); + return SparkAndIcebergOrdering.forTable(table).prependOrder(POSITION_DELETE_SPARK_ORDERING); } } @@ -199,9 +221,9 @@ private static Distribution positionDeltaDeleteDistribution(Table table, Distrib case RANGE: if (table.spec().isUnpartitioned()) { - return Distributions.ordered(PARTITION_FILE_ORDERING); + return Distributions.ordered(PARTITION_FILE_ORDERING.sparkOrder()); } else { - return Distributions.ordered(PARTITION_ORDERING); + return Distributions.ordered(PARTITION_ORDERING.sparkOrder()); } default: @@ -213,9 +235,9 @@ private static Distribution positionDeltaDeleteDistribution(Table table, Distrib // - there is a defined table sort order, so it is clear how the data should be ordered // - the table is partitioned and fanout writers are disabled, // so records for one partition must be co-located within a task - private static SortOrder[] writeOrdering(Table table, boolean fanoutEnabled) { + private static SparkAndIcebergOrdering writeOrdering(Table table, boolean fanoutEnabled) { if (fanoutEnabled && table.sortOrder().isUnsorted()) { - return EMPTY_ORDERING; + return SparkAndIcebergOrdering.unsorted(); } else { return ordering(table); } @@ -225,8 +247,8 @@ private static Expression[] clustering(Table table) { return Spark3Util.toTransforms(table.spec()); } - private static SortOrder[] ordering(Table table) { - return Spark3Util.toOrdering(SortOrderUtil.buildSortOrder(table)); + private static SparkAndIcebergOrdering ordering(Table table) { + return SparkAndIcebergOrdering.forTable(table); } private static Expression[] concat(Expression[] clustering, Expression... otherClustering) { @@ -256,4 +278,39 @@ private static SortOrder[] orderBy(Expression... exprs) { private static SortOrder sort(Expression expr) { return Expressions.sort(expr, SortDirection.ASCENDING); } + + private static class SparkAndIcebergOrdering { + private static final SparkAndIcebergOrdering UNSORTED = + new SparkAndIcebergOrdering(org.apache.iceberg.SortOrder.unsorted(), EMPTY_SPARK_ORDERING); + + private final org.apache.iceberg.SortOrder icebergSortOrder; + private final SortOrder[] sparkSortOrder; + + private SparkAndIcebergOrdering( + org.apache.iceberg.SortOrder icebergSortOrder, SortOrder[] sparkSortOrder) { + this.icebergSortOrder = icebergSortOrder; + this.sparkSortOrder = sparkSortOrder; + } + + public static SparkAndIcebergOrdering forTable(Table table) { + return new SparkAndIcebergOrdering( + table.sortOrder(), Spark3Util.toOrdering(SortOrderUtil.buildSortOrder(table))); + } + + public static SparkAndIcebergOrdering unsorted() { + return UNSORTED; + } + + public SparkAndIcebergOrdering prependOrder(SortOrder[] ordering) { + return new SparkAndIcebergOrdering(icebergSortOrder, concat(ordering, sparkSortOrder)); + } + + public org.apache.iceberg.SortOrder icebergOrder() { + return icebergSortOrder; + } + + public SortOrder[] sparkOrder() { + return sparkSortOrder; + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java index 569eb252cba5..1ba4c7e2fac2 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java @@ -47,10 +47,14 @@ import org.apache.spark.sql.connector.expressions.SortOrder; import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering; import org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; abstract class SparkShufflingFileRewriteRunner extends SparkDataFileRewriteRunner { + private static final Logger LOG = LoggerFactory.getLogger(SparkShufflingFileRewriteRunner.class); + /** * The number of shuffle partitions to use for each output file. By default, this file rewriter * assumes each shuffle partition would become a separate output file. Attempting to generate @@ -119,6 +123,17 @@ public void doRewrite(String groupId, RewriteFileGroup fileGroup) { spec(fileGroup.outputSpecId()), fileGroup.expectedOutputFiles())); + org.apache.iceberg.SortOrder sortOrderInJobSpec = sortOrder(); + + org.apache.iceberg.SortOrder maybeMatchingTableSortOrder = + SortOrderUtil.maybeFindTableSortOrder(table(), sortOrder()); + + if (sortOrderInJobSpec.isSorted() && maybeMatchingTableSortOrder.isUnsorted()) { + LOG.warn( + "Sort order specified for job {} doesn't match any table sort orders, so going to not mark rewritten files as sorted in the manifest files", + Spark3Util.describe(sortOrderInJobSpec)); + } + sortedDF .write() .format("iceberg") @@ -126,6 +141,7 @@ public void doRewrite(String groupId, RewriteFileGroup fileGroup) { .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, fileGroup.maxOutputFileSize()) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") .option(SparkWriteOptions.OUTPUT_SPEC_ID, fileGroup.outputSpecId()) + .option(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, maybeMatchingTableSortOrder.orderId()) .mode("append") .save(groupId); } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 6f091eb2a471..9492dd3ec3ab 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -185,7 +185,8 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { broadcastRewritableDeletes(), command, context, - writeProperties); + writeProperties, + writeRequirements.icebergOrdering()); } private Broadcast> broadcastRewritableDeletes() { @@ -405,18 +406,21 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Command command; private final Context context; private final Map writeProperties; + private final org.apache.iceberg.SortOrder sortOrder; PositionDeltaWriteFactory( Broadcast tableBroadcast, Broadcast> rewritableDeletesBroadcast, Command command, Context context, - Map writeProperties) { + Map writeProperties, + org.apache.iceberg.SortOrder sortOrder) { this.tableBroadcast = tableBroadcast; this.rewritableDeletesBroadcast = rewritableDeletesBroadcast; this.command = command; this.context = context; this.writeProperties = writeProperties; + this.sortOrder = sortOrder; } @Override @@ -443,6 +447,7 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .deleteFileFormat(context.deleteFileFormat()) .positionDeleteSparkType(context.deleteSparkType()) .writeProperties(writeProperties) + .dataSortOrder(sortOrder) .build(); if (command == DELETE) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index e0a05ff11a7e..018ae15a0243 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -207,7 +207,8 @@ private WriterFactory createWriterFactory() { writeSchema, dsSchema, useFanoutWriter, - writeProperties); + writeProperties, + writeRequirements.icebergOrdering()); } private void commitOperation(SnapshotUpdate operation, String description) { @@ -700,6 +701,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final boolean useFanoutWriter; private final String queryId; private final Map writeProperties; + private final org.apache.iceberg.SortOrder sortOrder; protected WriterFactory( Broadcast
tableBroadcast, @@ -710,7 +712,8 @@ protected WriterFactory( Schema writeSchema, StructType dsSchema, boolean useFanoutWriter, - Map writeProperties) { + Map writeProperties, + org.apache.iceberg.SortOrder sortOrder) { this.tableBroadcast = tableBroadcast; this.format = format; this.outputSpecId = outputSpecId; @@ -720,6 +723,7 @@ protected WriterFactory( this.useFanoutWriter = useFanoutWriter; this.queryId = queryId; this.writeProperties = writeProperties; + this.sortOrder = sortOrder; } @Override @@ -744,6 +748,7 @@ public DataWriter createWriter(int partitionId, long taskId, long e .dataSchema(writeSchema) .dataSparkType(dsSchema) .writeProperties(writeProperties) + .dataSortOrder(sortOrder) .build(); Function rowLineageExtractor = new ExtractRowLineage(writeSchema); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java index 89af7740d988..70230a91fc28 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java @@ -190,7 +190,9 @@ public StreamingWrite toStreaming() { } private SparkWriteRequirements writeRequirements() { - if (overwriteFiles) { + if (rewrittenFileSetId != null) { + return writeConf.rewriteFilesWriteRequirements(); + } else if (overwriteFiles) { return writeConf.copyOnWriteRequirements(copyOnWriteCommand); } else { return writeConf.writeRequirements(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 61aacfa4589d..247119523756 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -45,6 +45,7 @@ import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.time.Duration; @@ -600,6 +601,27 @@ public void testDVWriteConf() { assertThat(writeConf.deleteFileFormat()).isEqualTo(FileFormat.PUFFIN); } + @TestTemplate + public void testSortOrderWriteConf() { + Table table = validationCatalog.loadTable(tableIdent); + + table.replaceSortOrder().asc("id").commit(); + + SparkWriteConf writeConf = + new SparkWriteConf( + spark, table, ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, "1")); + + assertThat(writeConf.outputSortOrder()).isEqualTo(table.sortOrder()); + + SparkWriteConf writeConfForUnknownSortOrder = + new SparkWriteConf( + spark, table, ImmutableMap.of(SparkWriteOptions.OUTPUT_SORT_ORDER_ID, "999")); + + assertThatIllegalArgumentException() + .isThrownBy(writeConfForUnknownSortOrder::outputSortOrder) + .withMessage("Output sort order id 999 is not a valid sort order id for table"); + } + private void testWriteProperties(List> propertiesSuite) { withSQLConf( propertiesSuite.get(0), diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 6d965f3dcc62..159744821c96 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1587,6 +1587,7 @@ public void testSimpleSort() { shouldHaveACleanCache(table); shouldHaveMultipleFiles(table); shouldHaveLastCommitSorted(table, "c2"); + dataFilesSortOrderShouldMatchTableSortOrder(table); } @TestTemplate @@ -1623,6 +1624,7 @@ public void testSortAfterPartitionChange() { shouldHaveACleanCache(table); shouldHaveMultipleFiles(table); shouldHaveLastCommitSorted(table, "c2"); + dataFilesSortOrderShouldMatchTableSortOrder(table); } @TestTemplate @@ -1654,6 +1656,7 @@ public void testSortCustomSortOrder() { shouldHaveACleanCache(table); shouldHaveMultipleFiles(table); shouldHaveLastCommitSorted(table, "c2"); + dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted()); } @TestTemplate @@ -1694,6 +1697,50 @@ public void testSortCustomSortOrderRequiresRepartition() { shouldHaveMultipleFiles(table); shouldHaveLastCommitUnsorted(table, "c2"); shouldHaveLastCommitSorted(table, "c3"); + dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted()); + } + + @TestTemplate + public void testSortPastTableSortOrderGetsAppliedToFiles() { + int partitions = 4; + Table table = createTable(); + writeRecords(20, SCALE, partitions); + shouldHaveLastCommitUnsorted(table, "c3"); + + table.updateSpec().addField("c1").commit(); + + table.replaceSortOrder().asc("c3").commit(); + SortOrder c3SortOrder = table.sortOrder(); + + table.replaceSortOrder().asc("c2").commit(); + shouldHaveFiles(table, 20); + + List originalData = currentData(); + long dataSizeBefore = testDataSize(table); + + RewriteDataFiles.Result result = + basicRewrite(table) + .sort(SortOrder.builderFor(table.schema()).asc("c3").build()) + .option(SizeBasedFileRewritePlanner.REWRITE_ALL, "true") + .option( + RewriteDataFiles.TARGET_FILE_SIZE_BYTES, + Integer.toString(averageFileSize(table) / partitions)) + .execute(); + + assertThat(result.rewriteResults()).as("Should have 1 fileGroups").hasSize(1); + assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + shouldHaveMultipleFiles(table); + shouldHaveLastCommitUnsorted(table, "c2"); + shouldHaveLastCommitSorted(table, "c3"); + dataFilesShouldHaveSortOrderIdMatching(table, c3SortOrder); } @TestTemplate @@ -1734,6 +1781,7 @@ public void testAutoSortShuffleOutput() { shouldHaveACleanCache(table); shouldHaveMultipleFiles(table); shouldHaveLastCommitSorted(table, "c2"); + dataFilesShouldHaveSortOrderIdMatching(table, SortOrder.unsorted()); } @TestTemplate @@ -2657,4 +2705,18 @@ public boolean matches(RewriteFileGroup argument) { return groupIDs.contains(argument.info().globalIndex()); } } + + private void dataFilesSortOrderShouldMatchTableSortOrder(Table table) { + dataFilesShouldHaveSortOrderIdMatching(table, table.sortOrder()); + } + + private void dataFilesShouldHaveSortOrderIdMatching(Table table, SortOrder sortOrder) { + try (CloseableIterable files = table.newScan().planFiles()) { + assertThat(files) + .extracting(fileScanTask -> fileScanTask.file().sortOrderId()) + .containsOnly(sortOrder.orderId()); + } catch (IOException e) { + throw new RuntimeException("Failed to close file scan tasks", e); + } + } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 94547c2cf8fb..1c9fdd54b633 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -44,6 +44,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.CommitStateUnknownException; @@ -154,6 +155,7 @@ public void testBasicWrite() { assertThat(file.splitOffsets()).as("Split offsets not present").isNotNull(); } assertThat(file.recordCount()).as("Should have reported record count as 1").isEqualTo(1); + assertThat(file.sortOrderId()).isEqualTo(SortOrder.unsorted().orderId()); // TODO: append more metric info if (format.equals(FileFormat.PARQUET)) { assertThat(file.columnSizes()).as("Column sizes metric not present").isNotNull(); @@ -555,6 +557,55 @@ public void testViewsReturnRecentResults() { assertThat(actual2).hasSameSizeAs(expected2).isEqualTo(expected2); } + @TestTemplate + public void testWriteDataFilesInTableSortOrder() { + File parent = temp.resolve(format.toString()).toFile(); + File location = new File(parent, "test"); + String targetLocation = locationWithBranch(location); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + table.replaceSortOrder().asc("id").commit(); + + List expected = Lists.newArrayListWithCapacity(4000); + for (int i = 0; i < 4000; i++) { + expected.add(new SimpleRecord(i, "a")); + } + + Dataset df = spark.createDataFrame(expected, SimpleRecord.class); + + df.select("id", "data") + .write() + .format("iceberg") + .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) + .mode(SaveMode.Append) + .save(location.toString()); + + createBranch(table); + table.refresh(); + + Dataset result = spark.read().format("iceberg").load(targetLocation); + + List actual = + result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + + List files = Lists.newArrayList(); + for (ManifestFile manifest : + SnapshotUtil.latestSnapshot(table, branch).allManifests(table.io())) { + for (DataFile file : ManifestFiles.read(manifest, table.io())) { + files.add(file); + } + } + + assertThat(files) + .extracting(DataFile::sortOrderId) + .as("All DataFiles are written with the table sort order id") + .containsOnly(table.sortOrder().orderId()); + } + public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType option) { File parent = temp.resolve(format.toString()).toFile(); File location = new File(parent, "test");