From 82cced9368a07b9ecb0484f0eee67f21cb0d06c9 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Thu, 8 Jan 2026 17:34:54 -0700 Subject: [PATCH 01/25] Core: Merge DVs referencing the same data files as a safeguard --- .../iceberg/MergingSnapshotProducer.java | 88 ++++++++++++++++++- .../iceberg/deletes/BaseDVFileWriter.java | 11 +++ .../apache/iceberg/deletes/DVFileWriter.java | 16 ++++ .../org/apache/iceberg/deletes/Deletes.java | 19 ++++ .../java/org/apache/iceberg/io/IOUtil.java | 20 +++++ .../apache/iceberg/io/OutputFileFactory.java | 25 +++++- .../java/org/apache/iceberg/TestRowDelta.java | 60 +++++++++++++ 7 files changed, 234 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 51d17fbdd0f2..9f47c4856438 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -30,6 +30,11 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.ValidationException; @@ -37,7 +42,9 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Predicate; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -55,6 +62,7 @@ import org.apache.iceberg.util.PartitionSet; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -269,7 +277,6 @@ private void addInternal(DeleteFile file) { DeleteFileSet deleteFiles = newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); if (deleteFiles.add(file)) { - addedFilesSummary.addedFile(spec, file); hasNewDeleteFiles = true; if (ContentFileUtil.isDV(file)) { newDVRefs.add(file.referencedDataFile()); @@ -1063,7 +1070,9 @@ private List newDeleteFilesAsManifests() { newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { PartitionSpec spec = ops().current().spec(specId); - List newDeleteManifests = writeDeleteManifests(deleteFiles, spec); + DeleteFileSet dedupedAndMergedDeletes = mergeDVsAndUpdateSummary(deleteFiles, spec); + List newDeleteManifests = + writeDeleteManifests(dedupedAndMergedDeletes, spec); cachedNewDeleteManifests.addAll(newDeleteManifests); }); @@ -1073,6 +1082,81 @@ private List newDeleteFilesAsManifests() { return cachedNewDeleteManifests; } + private DeleteFileSet mergeDVsAndUpdateSummary(DeleteFileSet deleteFiles, PartitionSpec spec) { + // Filter out DVs and group them by referenced data file + Map> dvsByReferencedLocation = + deleteFiles.stream() + .filter(ContentFileUtil::isDV) + .collect( + Collectors.toMap( + DeleteFile::referencedDataFile, + Lists::newArrayList, + (existingDVs, newDVs) -> { + existingDVs.addAll(newDVs); + return existingDVs; + })); + + // Merge DVs + Map> dvsThatNeedMerging = + dvsByReferencedLocation.entrySet().stream() + .filter(e -> e.getValue().size() > 1) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + List newDVs = Lists.newArrayList(); + Tasks.foreach(dvsThatNeedMerging.entrySet()) + .executeWith(ThreadPools.getDeleteWorkerPool()) + .stopOnFailure() + .throwFailureWhenFinished() + .run( + dvsToMergeForDataFile -> + newDVs.add( + mergeAndWriteDV( + dvsToMergeForDataFile.getKey(), dvsToMergeForDataFile.getValue(), spec))); + + // Remove the merged DVs from the tracking set + for (List dvsThatWereMerged : dvsThatNeedMerging.values()) { + dvsThatWereMerged.forEach(deleteFiles::remove); + } + + // Add the new DVs to the tracking set + deleteFiles.addAll(newDVs); + + // Update summaries for all delete files including eq. deletes for this partition spec + deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file)); + return deleteFiles; + } + + private DeleteFile mergeAndWriteDV( + String referencedDataFile, List dvs, PartitionSpec spec) { + DeleteFile firstDV = dvs.get(0); + PositionDeleteIndex positionDeleteIndex = + Deletes.readDV(firstDV, ops().io(), ops().encryption()); + for (int i = 1; i < dvs.size(); i++) { + DeleteFile dv = dvs.get(i); + Preconditions.checkArgument( + Objects.equals(dv.dataSequenceNumber(), firstDV.dataSequenceNumber()), + "Cannot merge added DVs when data sequence numbers are different, expected all to be added with sequence %s, but got %s", + firstDV.dataSequenceNumber(), + dv.dataSequenceNumber()); + positionDeleteIndex.merge(Deletes.readDV(dvs.get(i), ops().io(), ops().encryption())); + } + + try { + DVFileWriter dvFileWriter = + new BaseDVFileWriter( + OutputFileFactory.builderFor(ops(), spec(firstDV.specId()), FileFormat.PUFFIN, 1, 1) + .build(), + path -> null); + dvFileWriter.delete(referencedDataFile, positionDeleteIndex, spec, firstDV.partition()); + dvFileWriter.close(); + DeleteWriteResult result = dvFileWriter.result(); + return Delegates.pendingDeleteFile( + Iterables.getOnlyElement(result.deleteFiles()), firstDV.dataSequenceNumber()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private class DataFileFilterManager extends ManifestFilterManager { private DataFileFilterManager() { super(ops().current().specsById(), MergingSnapshotProducer.this::workerPool); diff --git a/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java b/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java index 6eabd64514df..5bd4c84fe04c 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java @@ -71,6 +71,17 @@ public void delete(String path, long pos, PartitionSpec spec, StructLike partiti positions.delete(pos); } + @Override + public void delete( + String path, + PositionDeleteIndex positionDeleteIndex, + PartitionSpec spec, + StructLike partition) { + Deletes deletes = + deletesByPath.computeIfAbsent(path, key -> new Deletes(path, spec, partition)); + deletes.positions().merge(positionDeleteIndex); + } + @Override public DeleteWriteResult result() { Preconditions.checkState(result != null, "Cannot get result from unclosed writer"); diff --git a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java index 2561f7be3d34..a2289b190b76 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java @@ -43,4 +43,20 @@ public interface DVFileWriter extends Closeable { * @return the writer result */ DeleteWriteResult result(); + + /** + * Marks every position that is deleted in positionDeleteIndex as deleted in the given data file. + * + * @param path the data file path + * @param positionDeleteIndex the position delete index containing all the positions to delete + * @param spec the data file partition spec + * @param partition the data file partition + */ + default void delete( + String path, + PositionDeleteIndex positionDeleteIndex, + PartitionSpec spec, + StructLike partition) { + throw new UnsupportedOperationException("Delete with positionDeleteIndex is not supported"); + } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 46df91982ab7..81c4b0c62271 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -29,13 +29,20 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptingFileIO; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.IOUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceMap; +import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; import org.apache.iceberg.util.StructLikeSet; @@ -126,6 +133,18 @@ public static CharSequenceMap toPosi return toPositionIndexes(posDeletes, null /* unknown delete file */); } + public static PositionDeleteIndex readDV( + DeleteFile deleteFile, FileIO fileIO, EncryptionManager encryptionManager) { + Preconditions.checkArgument( + ContentFileUtil.isDV(deleteFile), "Delete file must be a deletion vector"); + InputFile inputFile = + EncryptingFileIO.combine(fileIO, encryptionManager).newInputFile(deleteFile); + long offset = deleteFile.contentOffset(); + int length = deleteFile.contentSizeInBytes().intValue(); + byte[] bytes = IOUtil.readBytes(inputFile, offset, length); + return PositionDeleteIndex.deserialize(bytes, deleteFile); + } + /** * Builds a map of position delete indexes by path. * diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java b/core/src/main/java/org/apache/iceberg/io/IOUtil.java index 37962d322d87..979234997226 100644 --- a/core/src/main/java/org/apache/iceberg/io/IOUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; public class IOUtil { // not meant to be instantiated @@ -49,6 +51,24 @@ public static void readFully(InputStream stream, byte[] bytes, int offset, int l } } + public static byte[] readBytes(InputFile inputFile, long offset, int length) { + try (SeekableInputStream stream = inputFile.newStream()) { + byte[] bytes = new byte[length]; + + if (stream instanceof RangeReadable) { + RangeReadable rangeReadable = (RangeReadable) stream; + rangeReadable.readFully(offset, bytes); + } else { + stream.seek(offset); + ByteStreams.readFully(stream, bytes); + } + + return bytes; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + /** Writes a buffer into a stream, making multiple write calls if necessary. */ public static void writeFully(OutputStream outputStream, ByteBuffer buffer) throws IOException { if (!buffer.hasRemaining()) { diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index 50e84143ffb7..b40fbb4ca62a 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; @@ -90,6 +91,11 @@ public static Builder builderFor(Table table, int partitionId, long taskId) { return new Builder(table, partitionId, taskId); } + public static Builder builderFor( + TableOperations ops, PartitionSpec spec, FileFormat format, int partitionId, long taskId) { + return new Builder(ops, spec, format, partitionId, taskId); + } + private String generateFilename() { return format.addExtension( String.format( @@ -121,7 +127,6 @@ public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitio } public static class Builder { - private final Table table; private final int partitionId; private final long taskId; private PartitionSpec defaultSpec; @@ -129,6 +134,8 @@ public static class Builder { private FileFormat format; private String suffix; private Supplier ioSupplier; + private Table table; + private TableOperations ops; private Builder(Table table, int partitionId, long taskId) { this.table = table; @@ -143,6 +150,17 @@ private Builder(Table table, int partitionId, long taskId) { this.ioSupplier = table::io; } + private Builder( + TableOperations ops, PartitionSpec spec, FileFormat format, int partitionId, long taskId) { + this.ops = ops; + this.ioSupplier = ops::io; + this.partitionId = partitionId; + this.taskId = taskId; + this.defaultSpec = spec; + this.operationId = UUID.randomUUID().toString(); + this.format = format; + } + public Builder defaultSpec(PartitionSpec newDefaultSpec) { this.defaultSpec = newDefaultSpec; return this; @@ -176,8 +194,9 @@ public Builder ioSupplier(Supplier newIoSupplier) { } public OutputFileFactory build() { - LocationProvider locations = table.locationProvider(); - EncryptionManager encryption = table.encryption(); + LocationProvider locations = + table != null ? table.locationProvider() : ops.locationProvider(); + EncryptionManager encryption = table != null ? table.encryption() : ops.encryption(); return new OutputFileFactory( defaultSpec, format, diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 4eff3a400f72..0e54ce9f29e3 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -39,10 +39,16 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.ManifestEntry.Status; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -1882,6 +1888,46 @@ public void testConcurrentDVsForSameDataFile() { .hasMessageContaining("Found concurrently added DV for %s", dataFile.location()); } + @TestTemplate + public void testDVsAreMergedForDuplicateReferenceFiles() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + DataFile dataFile = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile), branch); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + List> firstDeletes = Lists.newArrayList(); + // First delete deletes positions 0 and 1 + for (int i = 0; i < 2; i++) { + firstDeletes.add(PositionDelete.create().set(dataFile.location(), i)); + } + + DeleteFile deleteFile1 = writeDV(firstDeletes, dataFile.partition(), fileFactory); + List> secondDeletes = Lists.newArrayList(); + // Second delete deletes positions 2 and 3 for same data file + for (int i = 2; i < 4; i++) { + secondDeletes.add(PositionDelete.create().set(dataFile.location(), i)); + } + + DeleteFile deleteFile2 = writeDV(secondDeletes, dataFile.partition(), fileFactory); + RowDelta rowDelta1 = table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2); + + commit(table, rowDelta1, branch); + + Iterable addedDeleteFiles = + latestSnapshot(table, branch).addedDeleteFiles(table.io()); + assertThat(Iterables.size(addedDeleteFiles)).isEqualTo(1); + DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles); + assertThat(mergedDV).isNotNull(); + assertThat(mergedDV.recordCount()).as("The cardinality of the DV should be 4").isEqualTo(4); + PositionDeleteIndex positionDeleteIndex = + Deletes.readDV(mergedDV, table.io(), table.encryption()); + for (int i = 0; i < 4; i++) { + assertThat(positionDeleteIndex.isDeleted(i)).isTrue(); + } + } + @TestTemplate public void testManifestMergingAfterUpgradeToV3() { assumeThat(formatVersion).isEqualTo(2); @@ -1959,4 +2005,18 @@ private List planFiles() { throw new RuntimeException(e); } } + + private DeleteFile writeDV( + List> deletes, StructLike partition, OutputFileFactory fileFactory) + throws IOException { + + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (PositionDelete delete : deletes) { + closeableWriter.delete(delete.path().toString(), delete.pos(), table.spec(), partition); + } + } + + return Iterables.getOnlyElement(writer.result().deleteFiles()); + } } From e41943d2dfee131771d296830572e03e439f8961 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 9 Jan 2026 15:57:51 -0700 Subject: [PATCH 02/25] Fix dangling delete tests --- .../spark/actions/TestRemoveDanglingDeleteAction.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java index 76084c2b9402..1e4c21d214a8 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_A2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-a.parquet") + .withPath("/path/to/data-a2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=a") // easy way to set partition data for now .withRecordCount(1) @@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_B2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-b.parquet") + .withPath("/path/to/data-b2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=b") // easy way to set partition data for now .withRecordCount(1) @@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_C2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-c.parquet") + .withPath("/path/to/data-c2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=c") // easy way to set partition data for now .withRecordCount(1) @@ -370,7 +370,6 @@ public void testPartitionedDeletesWithEqSeqNo() { // Add Data Files with EQ and POS deletes DeleteFile fileADeletes = fileADeletes(); DeleteFile fileA2Deletes = fileA2Deletes(); - DeleteFile fileBDeletes = fileBDeletes(); DeleteFile fileB2Deletes = fileB2Deletes(); table .newRowDelta() @@ -382,7 +381,6 @@ public void testPartitionedDeletesWithEqSeqNo() { .addDeletes(fileA2Deletes) .addDeletes(FILE_A_EQ_DELETES) .addDeletes(FILE_A2_EQ_DELETES) - .addDeletes(fileBDeletes) .addDeletes(fileB2Deletes) .addDeletes(FILE_B_EQ_DELETES) .addDeletes(FILE_B2_EQ_DELETES) @@ -400,7 +398,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), Tuple2.apply(2L, FILE_B_EQ_DELETES.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()), Tuple2.apply(2L, fileB2Deletes.location()), @@ -433,7 +430,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2.location()), Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, fileB2Deletes.location()), Tuple2.apply(2L, FILE_C2.location()), From 76e24e404106b96dfc41018f0516330c52ca9c94 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 9 Jan 2026 16:10:25 -0700 Subject: [PATCH 03/25] Simplification in OutputFileFactory --- .../apache/iceberg/io/OutputFileFactory.java | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index b40fbb4ca62a..1fc6dd3ed7ed 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -88,7 +89,15 @@ private OutputFileFactory( } public static Builder builderFor(Table table, int partitionId, long taskId) { - return new Builder(table, partitionId, taskId); + String formatAsString = + table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + PartitionSpec spec = table.spec(); + return builderFor( + ((HasTableOperations) table).operations(), + spec, + FileFormat.fromString(formatAsString), + partitionId, + taskId); } public static Builder builderFor( @@ -134,22 +143,8 @@ public static class Builder { private FileFormat format; private String suffix; private Supplier ioSupplier; - private Table table; private TableOperations ops; - private Builder(Table table, int partitionId, long taskId) { - this.table = table; - this.partitionId = partitionId; - this.taskId = taskId; - this.defaultSpec = table.spec(); - this.operationId = UUID.randomUUID().toString(); - - String formatAsString = - table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - this.format = FileFormat.fromString(formatAsString); - this.ioSupplier = table::io; - } - private Builder( TableOperations ops, PartitionSpec spec, FileFormat format, int partitionId, long taskId) { this.ops = ops; @@ -194,9 +189,8 @@ public Builder ioSupplier(Supplier newIoSupplier) { } public OutputFileFactory build() { - LocationProvider locations = - table != null ? table.locationProvider() : ops.locationProvider(); - EncryptionManager encryption = table != null ? table.encryption() : ops.encryption(); + LocationProvider locations = ops.locationProvider(); + EncryptionManager encryption = ops.encryption(); return new OutputFileFactory( defaultSpec, format, From a740ff91afa7c00d5b1aa987c9328fc447080e8d Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 9 Jan 2026 16:37:44 -0700 Subject: [PATCH 04/25] minor optimization --- .../iceberg/MergingSnapshotProducer.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 9f47c4856438..1317a11cc3e1 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -111,6 +111,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private boolean hasNewDeleteFiles = false; private boolean caseSensitive = true; + private boolean foundDuplicateDVs = false; MergingSnapshotProducer(String tableName, TableOperations ops) { super(ops); @@ -279,7 +280,9 @@ private void addInternal(DeleteFile file) { if (deleteFiles.add(file)) { hasNewDeleteFiles = true; if (ContentFileUtil.isDV(file)) { - newDVRefs.add(file.referencedDataFile()); + if (!newDVRefs.add(file.referencedDataFile())) { + this.foundDuplicateDVs = true; + } } } } @@ -1070,19 +1073,25 @@ private List newDeleteFilesAsManifests() { newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { PartitionSpec spec = ops().current().spec(specId); - DeleteFileSet dedupedAndMergedDeletes = mergeDVsAndUpdateSummary(deleteFiles, spec); - List newDeleteManifests = - writeDeleteManifests(dedupedAndMergedDeletes, spec); + if (foundDuplicateDVs) { + mergeDVsAndUpdateDeleteFiles(deleteFiles, spec); + } + + // Update summaries for all delete files including eq. deletes for this partition spec + deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file)); + + List newDeleteManifests = writeDeleteManifests(deleteFiles, spec); cachedNewDeleteManifests.addAll(newDeleteManifests); }); this.hasNewDeleteFiles = false; + this.foundDuplicateDVs = false; } return cachedNewDeleteManifests; } - private DeleteFileSet mergeDVsAndUpdateSummary(DeleteFileSet deleteFiles, PartitionSpec spec) { + private void mergeDVsAndUpdateDeleteFiles(DeleteFileSet deleteFiles, PartitionSpec spec) { // Filter out DVs and group them by referenced data file Map> dvsByReferencedLocation = deleteFiles.stream() @@ -1120,10 +1129,6 @@ private DeleteFileSet mergeDVsAndUpdateSummary(DeleteFileSet deleteFiles, Partit // Add the new DVs to the tracking set deleteFiles.addAll(newDVs); - - // Update summaries for all delete files including eq. deletes for this partition spec - deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file)); - return deleteFiles; } private DeleteFile mergeAndWriteDV( From 11ffc2f41b309e177eec786600e680402ed2930e Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 9 Jan 2026 17:15:28 -0700 Subject: [PATCH 05/25] cleanup, make outputfilefactory take in more fields so that we don't require HasTableOPerations --- .../iceberg/MergingSnapshotProducer.java | 34 +++++++++++++++--- .../apache/iceberg/io/OutputFileFactory.java | 35 +++++++++++++------ 2 files changed, 54 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 1317a11cc3e1..4ac0c0e0729b 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1140,23 +1140,49 @@ private DeleteFile mergeAndWriteDV( DeleteFile dv = dvs.get(i); Preconditions.checkArgument( Objects.equals(dv.dataSequenceNumber(), firstDV.dataSequenceNumber()), - "Cannot merge added DVs when data sequence numbers are different, expected all to be added with sequence %s, but got %s", + "Cannot merge duplicate added DVs when data sequence numbers are different," + + "expected all to be added with sequence %s, but got %s", firstDV.dataSequenceNumber(), dv.dataSequenceNumber()); + + Preconditions.checkArgument( + Objects.equals(dv.partition(), firstDV.partition()), + "Cannot merge duplicate added DVs when partition tuples are different"); positionDeleteIndex.merge(Deletes.readDV(dvs.get(i), ops().io(), ops().encryption())); } + return writeDV( + referencedDataFile, + positionDeleteIndex, + spec, + firstDV.partition(), + firstDV.dataSequenceNumber()); + } + + private DeleteFile writeDV( + String referencedDataFile, + PositionDeleteIndex positionDeleteIndex, + PartitionSpec spec, + StructLike partition, + Long dataSequenceNumber) { try { DVFileWriter dvFileWriter = new BaseDVFileWriter( - OutputFileFactory.builderFor(ops(), spec(firstDV.specId()), FileFormat.PUFFIN, 1, 1) + OutputFileFactory.builderFor( + ops().locationProvider(), + ops().encryption(), + ops()::io, + spec, + FileFormat.PUFFIN, + 1, + 1) .build(), path -> null); - dvFileWriter.delete(referencedDataFile, positionDeleteIndex, spec, firstDV.partition()); + dvFileWriter.delete(referencedDataFile, positionDeleteIndex, spec, partition); dvFileWriter.close(); DeleteWriteResult result = dvFileWriter.result(); return Delegates.pendingDeleteFile( - Iterables.getOnlyElement(result.deleteFiles()), firstDV.dataSequenceNumber()); + Iterables.getOnlyElement(result.deleteFiles()), dataSequenceNumber); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index 1fc6dd3ed7ed..d8ada8d20615 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -26,11 +26,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; @@ -93,7 +91,9 @@ public static Builder builderFor(Table table, int partitionId, long taskId) { table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); PartitionSpec spec = table.spec(); return builderFor( - ((HasTableOperations) table).operations(), + table.locationProvider(), + table.encryption(), + table::io, spec, FileFormat.fromString(formatAsString), partitionId, @@ -101,8 +101,15 @@ public static Builder builderFor(Table table, int partitionId, long taskId) { } public static Builder builderFor( - TableOperations ops, PartitionSpec spec, FileFormat format, int partitionId, long taskId) { - return new Builder(ops, spec, format, partitionId, taskId); + LocationProvider locationProvider, + EncryptionManager encryptionManager, + Supplier ioSupplier, + PartitionSpec spec, + FileFormat format, + int partitionId, + long taskId) { + return new Builder( + locationProvider, encryptionManager, ioSupplier, spec, format, partitionId, taskId); } private String generateFilename() { @@ -138,17 +145,25 @@ public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitio public static class Builder { private final int partitionId; private final long taskId; + private final LocationProvider locations; + private final EncryptionManager encryption; private PartitionSpec defaultSpec; private String operationId; private FileFormat format; private String suffix; private Supplier ioSupplier; - private TableOperations ops; private Builder( - TableOperations ops, PartitionSpec spec, FileFormat format, int partitionId, long taskId) { - this.ops = ops; - this.ioSupplier = ops::io; + LocationProvider locationProvider, + EncryptionManager encryptionManager, + Supplier ioSupplier, + PartitionSpec spec, + FileFormat format, + int partitionId, + long taskId) { + this.locations = locationProvider; + this.encryption = encryptionManager; + this.ioSupplier = ioSupplier; this.partitionId = partitionId; this.taskId = taskId; this.defaultSpec = spec; @@ -189,8 +204,6 @@ public Builder ioSupplier(Supplier newIoSupplier) { } public OutputFileFactory build() { - LocationProvider locations = ops.locationProvider(); - EncryptionManager encryption = ops.encryption(); return new OutputFileFactory( defaultSpec, format, From 772e3c20908cc4cf55f3f0b825d14f8741e10eb6 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Sat, 10 Jan 2026 12:20:16 -0700 Subject: [PATCH 06/25] change the duplicate tracking algorithm, fix spark tests --- .../iceberg/MergingSnapshotProducer.java | 121 ++++++++++-------- .../source/TestPositionDeletesTable.java | 8 +- .../TestRemoveDanglingDeleteAction.java | 12 +- .../source/TestPositionDeletesTable.java | 8 +- .../source/TestPositionDeletesTable.java | 8 +- 5 files changed, 85 insertions(+), 72 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 4ac0c0e0729b..b4452bfe24a4 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -26,11 +26,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import org.apache.iceberg.deletes.BaseDVFileWriter; import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.deletes.Deletes; @@ -95,6 +95,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final Map newDataFilesBySpec = Maps.newHashMap(); private Long newDataFilesDataSequenceNumber; private final Map newDeleteFilesBySpec = Maps.newHashMap(); + private final Map duplicateDVsForDataFile = Maps.newHashMap(); private final Set newDVRefs = Sets.newHashSet(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); @@ -111,7 +112,6 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private boolean hasNewDeleteFiles = false; private boolean caseSensitive = true; - private boolean foundDuplicateDVs = false; MergingSnapshotProducer(String tableName, TableOperations ops) { super(ops); @@ -281,7 +281,24 @@ private void addInternal(DeleteFile file) { hasNewDeleteFiles = true; if (ContentFileUtil.isDV(file)) { if (!newDVRefs.add(file.referencedDataFile())) { - this.foundDuplicateDVs = true; + DeleteFileSet duplicateDVs = + duplicateDVsForDataFile.computeIfAbsent( + file.referencedDataFile(), + referencedFile -> { + // Find any delete file that references the same data file. + // There would necessarily be at least one here in this condition + DeleteFile firstMatchingDV = + deleteFiles.stream() + .filter(df -> df.referencedDataFile().equals(referencedFile)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Expected at least one delete file for " + referencedFile)); + + return DeleteFileSet.of(List.of(firstMatchingDV)); + }); + duplicateDVs.add(file); } } } @@ -1070,85 +1087,80 @@ private List newDeleteFilesAsManifests() { } if (cachedNewDeleteManifests.isEmpty()) { + // Found duplicates, merge them and update newDeleteFilesBySpec to remove duplicates and add + // the new merged one + if (!duplicateDVsForDataFile.isEmpty()) { + Map mergedDVs = mergeDuplicateDVs(); + for (Map.Entry mergedDV : mergedDVs.entrySet()) { + String referencedFile = mergedDV.getKey(); + DeleteFile newDV = mergedDV.getValue(); + DeleteFileSet duplicateDVs = duplicateDVsForDataFile.get(referencedFile); + DeleteFileSet allDeleteFilesForSpec = newDeleteFilesBySpec.get(newDV.specId()); + allDeleteFilesForSpec.removeAll(duplicateDVs); + allDeleteFilesForSpec.add(newDV); + } + } + newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { PartitionSpec spec = ops().current().spec(specId); - if (foundDuplicateDVs) { - mergeDVsAndUpdateDeleteFiles(deleteFiles, spec); - } - - // Update summaries for all delete files including eq. deletes for this partition spec + // Update summaries for all added delete files including eq. deletes for this partition + // spec deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file)); - List newDeleteManifests = writeDeleteManifests(deleteFiles, spec); cachedNewDeleteManifests.addAll(newDeleteManifests); }); this.hasNewDeleteFiles = false; - this.foundDuplicateDVs = false; + this.duplicateDVsForDataFile.clear(); } return cachedNewDeleteManifests; } - private void mergeDVsAndUpdateDeleteFiles(DeleteFileSet deleteFiles, PartitionSpec spec) { - // Filter out DVs and group them by referenced data file - Map> dvsByReferencedLocation = - deleteFiles.stream() - .filter(ContentFileUtil::isDV) - .collect( - Collectors.toMap( - DeleteFile::referencedDataFile, - Lists::newArrayList, - (existingDVs, newDVs) -> { - existingDVs.addAll(newDVs); - return existingDVs; - })); - - // Merge DVs - Map> dvsThatNeedMerging = - dvsByReferencedLocation.entrySet().stream() - .filter(e -> e.getValue().size() > 1) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - List newDVs = Lists.newArrayList(); - Tasks.foreach(dvsThatNeedMerging.entrySet()) + // Find duplicate DVs for a given partition spec, and return a Pair of the new DVs and the DVs + // that were merged + private Map mergeDuplicateDVs() { + Map mergedDVs = Maps.newConcurrentMap(); + Tasks.foreach(duplicateDVsForDataFile.entrySet()) .executeWith(ThreadPools.getDeleteWorkerPool()) .stopOnFailure() .throwFailureWhenFinished() .run( - dvsToMergeForDataFile -> - newDVs.add( - mergeAndWriteDV( - dvsToMergeForDataFile.getKey(), dvsToMergeForDataFile.getValue(), spec))); - - // Remove the merged DVs from the tracking set - for (List dvsThatWereMerged : dvsThatNeedMerging.values()) { - dvsThatWereMerged.forEach(deleteFiles::remove); - } - - // Add the new DVs to the tracking set - deleteFiles.addAll(newDVs); + dvsToMergeForDataFile -> { + String referencedLocation = dvsToMergeForDataFile.getKey(); + mergedDVs.put( + referencedLocation, + mergeAndWriteDV(referencedLocation, dvsToMergeForDataFile.getValue())); + }); + + return mergedDVs; } - private DeleteFile mergeAndWriteDV( - String referencedDataFile, List dvs, PartitionSpec spec) { - DeleteFile firstDV = dvs.get(0); + private DeleteFile mergeAndWriteDV(String referencedDataFile, DeleteFileSet dvs) { + Iterator dvIterator = dvs.iterator(); + DeleteFile firstDV = dvIterator.next(); PositionDeleteIndex positionDeleteIndex = Deletes.readDV(firstDV, ops().io(), ops().encryption()); - for (int i = 1; i < dvs.size(); i++) { - DeleteFile dv = dvs.get(i); + PartitionSpec spec = spec(firstDV.specId()); + while (dvIterator.hasNext()) { + DeleteFile dv = dvIterator.next(); Preconditions.checkArgument( Objects.equals(dv.dataSequenceNumber(), firstDV.dataSequenceNumber()), "Cannot merge duplicate added DVs when data sequence numbers are different," + "expected all to be added with sequence %s, but got %s", firstDV.dataSequenceNumber(), dv.dataSequenceNumber()); - + Preconditions.checkArgument( + dv.specId() == firstDV.specId(), + "Cannot merge duplicate added DVs when partition specs are different," + + "expected all to be added with spec %s, but got %s", + firstDV.specId(), + dv.specId()); Preconditions.checkArgument( Objects.equals(dv.partition(), firstDV.partition()), "Cannot merge duplicate added DVs when partition tuples are different"); - positionDeleteIndex.merge(Deletes.readDV(dvs.get(i), ops().io(), ops().encryption())); + positionDeleteIndex.merge(Deletes.readDV(dv, ops().io(), ops().encryption())); } return writeDV( @@ -1165,8 +1177,9 @@ private DeleteFile writeDV( PartitionSpec spec, StructLike partition, Long dataSequenceNumber) { + DVFileWriter dvFileWriter; try { - DVFileWriter dvFileWriter = + dvFileWriter = new BaseDVFileWriter( OutputFileFactory.builderFor( ops().locationProvider(), @@ -1185,6 +1198,10 @@ private DeleteFile writeDV( Iterables.getOnlyElement(result.deleteFiles()), dataSequenceNumber); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + if (dvFileWriter != null) { + dvFileWriter.close(); + } } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index c30a730917ae..81954c19b504 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -409,7 +409,7 @@ public void testPartitionFilter() throws IOException { // Add position deletes for both partitions Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); @@ -455,7 +455,7 @@ public void testPartitionTransformFilter() throws IOException { Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"}); Pair>, DeleteFile> deletesB = - deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"}); + deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"}); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Prepare expected values @@ -496,7 +496,7 @@ public void testPartitionEvolutionReplace() throws Exception { DataFile dataFileB = dataFile(tab, "b"); tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Switch partition spec from (data) to (id) @@ -508,7 +508,7 @@ public void testPartitionEvolutionReplace() throws Exception { tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit(); Pair>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10); - Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99); + Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile99, 99); tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit(); // Query partition of old spec diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java index 76084c2b9402..4df99ca1998b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_A2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-a.parquet") + .withPath("/path/to/data-a2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=a") // easy way to set partition data for now .withRecordCount(1) @@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_B2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-b.parquet") + .withPath("/path/to/data-b2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=b") // easy way to set partition data for now .withRecordCount(1) @@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_C2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-c.parquet") + .withPath("/path/to/data-c2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=c") // easy way to set partition data for now .withRecordCount(1) @@ -119,7 +119,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_D2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-d.parquet") + .withPath("/path/to/data-d2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=d") // easy way to set partition data for now .withRecordCount(1) @@ -370,7 +370,6 @@ public void testPartitionedDeletesWithEqSeqNo() { // Add Data Files with EQ and POS deletes DeleteFile fileADeletes = fileADeletes(); DeleteFile fileA2Deletes = fileA2Deletes(); - DeleteFile fileBDeletes = fileBDeletes(); DeleteFile fileB2Deletes = fileB2Deletes(); table .newRowDelta() @@ -382,7 +381,6 @@ public void testPartitionedDeletesWithEqSeqNo() { .addDeletes(fileA2Deletes) .addDeletes(FILE_A_EQ_DELETES) .addDeletes(FILE_A2_EQ_DELETES) - .addDeletes(fileBDeletes) .addDeletes(fileB2Deletes) .addDeletes(FILE_B_EQ_DELETES) .addDeletes(FILE_B2_EQ_DELETES) @@ -400,7 +398,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), Tuple2.apply(2L, FILE_B_EQ_DELETES.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()), Tuple2.apply(2L, fileB2Deletes.location()), @@ -433,7 +430,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2.location()), Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, fileB2Deletes.location()), Tuple2.apply(2L, FILE_C2.location()), diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 87cbbe3cea5f..8032b0b782f4 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -409,7 +409,7 @@ public void testPartitionFilter() throws IOException { // Add position deletes for both partitions Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); @@ -455,7 +455,7 @@ public void testPartitionTransformFilter() throws IOException { Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"}); Pair>, DeleteFile> deletesB = - deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"}); + deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"}); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Prepare expected values @@ -496,7 +496,7 @@ public void testPartitionEvolutionReplace() throws Exception { DataFile dataFileB = dataFile(tab, "b"); tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Switch partition spec from (data) to (id) @@ -508,7 +508,7 @@ public void testPartitionEvolutionReplace() throws Exception { tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit(); Pair>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10); - Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99); + Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile99, 99); tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit(); // Query partition of old spec diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 7892fd65b405..f5456db8e4b0 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -409,7 +409,7 @@ public void testPartitionFilter() throws IOException { // Add position deletes for both partitions Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); @@ -455,7 +455,7 @@ public void testPartitionTransformFilter() throws IOException { Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"}); Pair>, DeleteFile> deletesB = - deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"}); + deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"}); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Prepare expected values @@ -496,7 +496,7 @@ public void testPartitionEvolutionReplace() throws Exception { DataFile dataFileB = dataFile(tab, "b"); tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Switch partition spec from (data) to (id) @@ -508,7 +508,7 @@ public void testPartitionEvolutionReplace() throws Exception { tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit(); Pair>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10); - Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99); + Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile99, 99); tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit(); // Query partition of old spec From 3404a860857e8dc835f5b59a3ce8c7d6f4b72467 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Sun, 11 Jan 2026 12:49:37 -0700 Subject: [PATCH 07/25] Add more tests for multiple DVs and w equality deletes --- .../iceberg/MergingSnapshotProducer.java | 18 +-- .../java/org/apache/iceberg/TestRowDelta.java | 147 +++++++++++++++--- 2 files changed, 132 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index b4452bfe24a4..04a9ff115ea6 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -289,13 +289,12 @@ private void addInternal(DeleteFile file) { // There would necessarily be at least one here in this condition DeleteFile firstMatchingDV = deleteFiles.stream() - .filter(df -> df.referencedDataFile().equals(referencedFile)) + .filter(ContentFileUtil::isDV) + .filter( + deleteFile -> + Objects.equals(referencedFile, deleteFile.referencedDataFile())) .findFirst() - .orElseThrow( - () -> - new IllegalStateException( - "Expected at least one delete file for " + referencedFile)); - + .get(); return DeleteFileSet.of(List.of(firstMatchingDV)); }); duplicateDVs.add(file); @@ -1177,9 +1176,8 @@ private DeleteFile writeDV( PartitionSpec spec, StructLike partition, Long dataSequenceNumber) { - DVFileWriter dvFileWriter; try { - dvFileWriter = + DVFileWriter dvFileWriter = new BaseDVFileWriter( OutputFileFactory.builderFor( ops().locationProvider(), @@ -1198,10 +1196,6 @@ private DeleteFile writeDV( Iterables.getOnlyElement(result.deleteFiles()), dataSequenceNumber); } catch (IOException e) { throw new UncheckedIOException(e); - } finally { - if (dvFileWriter != null) { - dvFileWriter.close(); - } } } diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 0e54ce9f29e3..9e4d156c9663 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -35,8 +35,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.LongStream; import java.util.stream.Stream; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.deletes.BaseDVFileWriter; @@ -54,6 +56,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ContentFileUtil; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -1889,7 +1892,7 @@ public void testConcurrentDVsForSameDataFile() { } @TestTemplate - public void testDVsAreMergedForDuplicateReferenceFiles() throws IOException { + public void testDuplicateDVsAreMerged() throws IOException { assumeThat(formatVersion).isGreaterThanOrEqualTo(3); DataFile dataFile = newDataFile("data_bucket=0"); @@ -1897,20 +1900,9 @@ public void testDVsAreMergedForDuplicateReferenceFiles() throws IOException { OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); - List> firstDeletes = Lists.newArrayList(); - // First delete deletes positions 0 and 1 - for (int i = 0; i < 2; i++) { - firstDeletes.add(PositionDelete.create().set(dataFile.location(), i)); - } - - DeleteFile deleteFile1 = writeDV(firstDeletes, dataFile.partition(), fileFactory); - List> secondDeletes = Lists.newArrayList(); - // Second delete deletes positions 2 and 3 for same data file - for (int i = 2; i < 4; i++) { - secondDeletes.add(PositionDelete.create().set(dataFile.location(), i)); - } - DeleteFile deleteFile2 = writeDV(secondDeletes, dataFile.partition(), fileFactory); + DeleteFile deleteFile1 = dvWithPositions(dataFile, fileFactory, 0, 2); + DeleteFile deleteFile2 = dvWithPositions(dataFile, fileFactory, 2, 4); RowDelta rowDelta1 = table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2); commit(table, rowDelta1, branch); @@ -1919,13 +1911,104 @@ public void testDVsAreMergedForDuplicateReferenceFiles() throws IOException { latestSnapshot(table, branch).addedDeleteFiles(table.io()); assertThat(Iterables.size(addedDeleteFiles)).isEqualTo(1); DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles); - assertThat(mergedDV).isNotNull(); - assertThat(mergedDV.recordCount()).as("The cardinality of the DV should be 4").isEqualTo(4); - PositionDeleteIndex positionDeleteIndex = - Deletes.readDV(mergedDV, table.io(), table.encryption()); - for (int i = 0; i < 4; i++) { - assertThat(positionDeleteIndex.isDeleted(i)).isTrue(); - } + + assertDVHasDeletedPositions(mergedDV, LongStream.range(0, 4).boxed()::iterator); + } + + @TestTemplate + public void testDuplicateDVsAreMergedForMultipleReferenceFiles() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + DataFile dataFile1 = newDataFile("data_bucket=0"); + DataFile dataFile2 = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2), branch); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + + // For each data file, create two DVs covering positions [0,2) and [2,4) + DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2); + DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4); + DeleteFile deleteFile2a = dvWithPositions(dataFile2, fileFactory, 0, 2); + DeleteFile deleteFile2b = dvWithPositions(dataFile2, fileFactory, 2, 4); + + // Commit all four duplicate DVs + RowDelta rowDelta = + table + .newRowDelta() + .addDeletes(deleteFile1a) + .addDeletes(deleteFile1b) + .addDeletes(deleteFile2a) + .addDeletes(deleteFile2b); + + commit(table, rowDelta, branch); + + // Expect two merged DVs, one per data file + Iterable addedDeleteFiles = + latestSnapshot(table, branch).addedDeleteFiles(table.io()); + List mergedDVs = Lists.newArrayList(addedDeleteFiles); + + assertThat(mergedDVs).hasSize(2); + + DeleteFile committedDVForDataFile1 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter(dv -> Objects.equals(dv.referencedDataFile(), dataFile1.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0, 4).boxed()::iterator); + + DeleteFile committedDVForDataFile2 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter(dv -> Objects.equals(dv.referencedDataFile(), dataFile2.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 4).boxed()::iterator); + } + + @TestTemplate + public void testDuplicateDVsAreMergedAndEqDelete() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + DataFile dataFile = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile), branch); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + + // Two DVs for the same data file: [0,2) and [2,4) => 4 deleted positions total + DeleteFile dv1 = dvWithPositions(dataFile, fileFactory, 0, 2); + DeleteFile dv2 = dvWithPositions(dataFile, fileFactory, 2, 4); + + // One equality delete file for the same partition + DeleteFile eqDelete = + newEqualityDeleteFile( + table.spec().specId(), + "data_bucket=0", + table.schema().asStruct().fields().get(0).fieldId()); + + RowDelta rowDelta = table.newRowDelta().addDeletes(eqDelete).addDeletes(dv1).addDeletes(dv2); + + commit(table, rowDelta, branch); + + Iterable addedDeleteFiles = + latestSnapshot(table, branch).addedDeleteFiles(table.io()); + List committedDeletes = Lists.newArrayList(addedDeleteFiles); + + // 1 DV + 1 equality delete + assertThat(committedDeletes).hasSize(2); + + DeleteFile committedDV = + Iterables.getOnlyElement( + committedDeletes.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDV, LongStream.range(0, 4).boxed()::iterator); + + DeleteFile committedEqDelete = + Iterables.getOnlyElement( + committedDeletes.stream() + .filter(df -> df.content() == FileContent.EQUALITY_DELETES) + .collect(Collectors.toList())); + assertThat(committedEqDelete).isNotNull(); + assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES); } @TestTemplate @@ -2006,6 +2089,28 @@ private List planFiles() { } } + private DeleteFile dvWithPositions( + DataFile dataFile, OutputFileFactory fileFactory, int fromInclusive, int toExclusive) + throws IOException { + + List> deletes = Lists.newArrayList(); + for (int i = fromInclusive; i < toExclusive; i++) { + deletes.add(PositionDelete.create().set(dataFile.location(), i)); + } + return writeDV(deletes, dataFile.partition(), fileFactory); + } + + private void assertDVHasDeletedPositions(DeleteFile dv, Iterable positions) + throws IOException { + assertThat(dv).isNotNull(); + + PositionDeleteIndex index = Deletes.readDV(dv, table.io(), table.encryption()); + + for (long pos : positions) { + assertThat(index.isDeleted(pos)).as("Expected position %s to be deleted", pos).isTrue(); + } + } + private DeleteFile writeDV( List> deletes, StructLike partition, OutputFileFactory fileFactory) throws IOException { From c04d0e0e9920627b2dd245b08ae48a478ecd5045 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Sun, 11 Jan 2026 13:02:42 -0700 Subject: [PATCH 08/25] Rebase and fix spark 4.1 tests --- .../actions/TestRemoveDanglingDeleteAction.java | 12 ++++-------- .../spark/source/TestPositionDeletesTable.java | 8 ++++---- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java index 76084c2b9402..4df99ca1998b 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_A2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-a.parquet") + .withPath("/path/to/data-a2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=a") // easy way to set partition data for now .withRecordCount(1) @@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_B2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-b.parquet") + .withPath("/path/to/data-b2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=b") // easy way to set partition data for now .withRecordCount(1) @@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_C2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-c.parquet") + .withPath("/path/to/data-c2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=c") // easy way to set partition data for now .withRecordCount(1) @@ -119,7 +119,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_D2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-d.parquet") + .withPath("/path/to/data-d2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=d") // easy way to set partition data for now .withRecordCount(1) @@ -370,7 +370,6 @@ public void testPartitionedDeletesWithEqSeqNo() { // Add Data Files with EQ and POS deletes DeleteFile fileADeletes = fileADeletes(); DeleteFile fileA2Deletes = fileA2Deletes(); - DeleteFile fileBDeletes = fileBDeletes(); DeleteFile fileB2Deletes = fileB2Deletes(); table .newRowDelta() @@ -382,7 +381,6 @@ public void testPartitionedDeletesWithEqSeqNo() { .addDeletes(fileA2Deletes) .addDeletes(FILE_A_EQ_DELETES) .addDeletes(FILE_A2_EQ_DELETES) - .addDeletes(fileBDeletes) .addDeletes(fileB2Deletes) .addDeletes(FILE_B_EQ_DELETES) .addDeletes(FILE_B2_EQ_DELETES) @@ -400,7 +398,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), Tuple2.apply(2L, FILE_B_EQ_DELETES.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()), Tuple2.apply(2L, fileB2Deletes.location()), @@ -433,7 +430,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2.location()), Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, fileB2Deletes.location()), Tuple2.apply(2L, FILE_C2.location()), diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 7892fd65b405..f5456db8e4b0 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -409,7 +409,7 @@ public void testPartitionFilter() throws IOException { // Add position deletes for both partitions Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); @@ -455,7 +455,7 @@ public void testPartitionTransformFilter() throws IOException { Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"}); Pair>, DeleteFile> deletesB = - deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"}); + deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"}); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Prepare expected values @@ -496,7 +496,7 @@ public void testPartitionEvolutionReplace() throws Exception { DataFile dataFileB = dataFile(tab, "b"); tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Switch partition spec from (data) to (id) @@ -508,7 +508,7 @@ public void testPartitionEvolutionReplace() throws Exception { tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit(); Pair>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10); - Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99); + Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile99, 99); tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit(); // Query partition of old spec From a39b0737f653653551fbe39ef359478570c677ca Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Sun, 11 Jan 2026 14:35:11 -0700 Subject: [PATCH 09/25] more cleanup, put dvfilewriter in try w resources --- .../apache/iceberg/MergingSnapshotProducer.java | 17 ++++------------- .../apache/iceberg/io/OutputFileFactory.java | 13 ++++--------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 04a9ff115ea6..602a9dd1cc7f 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1176,19 +1176,10 @@ private DeleteFile writeDV( PartitionSpec spec, StructLike partition, Long dataSequenceNumber) { - try { - DVFileWriter dvFileWriter = - new BaseDVFileWriter( - OutputFileFactory.builderFor( - ops().locationProvider(), - ops().encryption(), - ops()::io, - spec, - FileFormat.PUFFIN, - 1, - 1) - .build(), - path -> null); + try (DVFileWriter dvFileWriter = + new BaseDVFileWriter( + OutputFileFactory.builderFor(ops(), spec, FileFormat.PUFFIN, 1, 1).build(), + path -> null)) { dvFileWriter.delete(referencedDataFile, positionDeleteIndex, spec, partition); dvFileWriter.close(); DeleteWriteResult result = dvFileWriter.result(); diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index d8ada8d20615..4930d2f3aa34 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; @@ -90,7 +91,7 @@ public static Builder builderFor(Table table, int partitionId, long taskId) { String formatAsString = table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); PartitionSpec spec = table.spec(); - return builderFor( + return new Builder( table.locationProvider(), table.encryption(), table::io, @@ -101,15 +102,9 @@ public static Builder builderFor(Table table, int partitionId, long taskId) { } public static Builder builderFor( - LocationProvider locationProvider, - EncryptionManager encryptionManager, - Supplier ioSupplier, - PartitionSpec spec, - FileFormat format, - int partitionId, - long taskId) { + TableOperations ops, PartitionSpec spec, FileFormat format, int partitionId, long taskId) { return new Builder( - locationProvider, encryptionManager, ioSupplier, spec, format, partitionId, taskId); + ops.locationProvider(), ops.encryption(), ops::io, spec, format, partitionId, taskId); } private String generateFilename() { From a079d2231be8a4604a834de88a71f8f6b6e3fa48 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Sun, 11 Jan 2026 19:13:21 -0700 Subject: [PATCH 10/25] Add logging, some more cleanup --- .../iceberg/MergingSnapshotProducer.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 602a9dd1cc7f..5b36f09fb99f 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1090,21 +1090,23 @@ private List newDeleteFilesAsManifests() { // the new merged one if (!duplicateDVsForDataFile.isEmpty()) { Map mergedDVs = mergeDuplicateDVs(); - for (Map.Entry mergedDV : mergedDVs.entrySet()) { - String referencedFile = mergedDV.getKey(); - DeleteFile newDV = mergedDV.getValue(); - DeleteFileSet duplicateDVs = duplicateDVsForDataFile.get(referencedFile); - DeleteFileSet allDeleteFilesForSpec = newDeleteFilesBySpec.get(newDV.specId()); - allDeleteFilesForSpec.removeAll(duplicateDVs); - allDeleteFilesForSpec.add(newDV); - } + mergedDVs.forEach( + (referencedFile, newDV) -> { + DeleteFileSet duplicateDVs = duplicateDVsForDataFile.get(referencedFile); + DeleteFileSet allDeleteFilesForSpec = newDeleteFilesBySpec.get(newDV.specId()); + LOG.warn( + "Merged {} duplicate deletion vectors for data file {} in table {}. The merged DVs are orphaned, and writers should merge DVs per file before committing", + duplicateDVs.size(), + referencedFile, + tableName); + allDeleteFilesForSpec.removeAll(duplicateDVs); + allDeleteFilesForSpec.add(newDV); + }); } newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { PartitionSpec spec = ops().current().spec(specId); - // Update summaries for all added delete files including eq. deletes for this partition - // spec deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file)); List newDeleteManifests = writeDeleteManifests(deleteFiles, spec); cachedNewDeleteManifests.addAll(newDeleteManifests); From d7eadb0056433b555ba6174f1acb92a945cf0d55 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Sun, 11 Jan 2026 20:08:49 -0700 Subject: [PATCH 11/25] more cleanup --- .../iceberg/MergingSnapshotProducer.java | 61 +++++++++++-------- 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 5b36f09fb99f..d02b15d999d5 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.deletes.BaseDVFileWriter; import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.deletes.Deletes; @@ -277,30 +278,39 @@ private void addInternal(DeleteFile file) { DeleteFileSet deleteFiles = newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); - if (deleteFiles.add(file)) { - hasNewDeleteFiles = true; - if (ContentFileUtil.isDV(file)) { - if (!newDVRefs.add(file.referencedDataFile())) { - DeleteFileSet duplicateDVs = - duplicateDVsForDataFile.computeIfAbsent( - file.referencedDataFile(), - referencedFile -> { - // Find any delete file that references the same data file. - // There would necessarily be at least one here in this condition - DeleteFile firstMatchingDV = - deleteFiles.stream() - .filter(ContentFileUtil::isDV) - .filter( - deleteFile -> - Objects.equals(referencedFile, deleteFile.referencedDataFile())) - .findFirst() - .get(); - return DeleteFileSet.of(List.of(firstMatchingDV)); - }); - duplicateDVs.add(file); - } - } + + if (!deleteFiles.add(file)) { + return; } + + hasNewDeleteFiles = true; + if (ContentFileUtil.isDV(file)) { + addDV(file, deleteFiles); + } + } + + private void addDV(DeleteFile newDV, DeleteFileSet deleteFiles) { + String referencedFile = newDV.referencedDataFile(); + + if (newDVRefs.add(referencedFile)) { + return; + } + // newDV is a duplicate DV for the given referenced file + DeleteFileSet duplicateDVs = + duplicateDVsForDataFile.computeIfAbsent( + referencedFile, + referencedDataFile -> { + DeleteFile previouslyAddedDV = + Iterables.getOnlyElement( + deleteFiles.stream() + .filter(ContentFileUtil::isDV) + .filter(dv -> !dv.equals(newDV)) // exclude the new DV that was just added + .filter(dv -> Objects.equals(referencedDataFile, dv.referencedDataFile())) + .collect(Collectors.toList())); + return DeleteFileSet.of(List.of(previouslyAddedDV)); + }); + + duplicateDVs.add(newDV); } protected void validateNewDeleteFile(DeleteFile file) { @@ -1119,8 +1129,7 @@ private List newDeleteFilesAsManifests() { return cachedNewDeleteManifests; } - // Find duplicate DVs for a given partition spec, and return a Pair of the new DVs and the DVs - // that were merged + // Returns the merged DV for a given data file that had duplicate DVs private Map mergeDuplicateDVs() { Map mergedDVs = Maps.newConcurrentMap(); Tasks.foreach(duplicateDVsForDataFile.entrySet()) @@ -1138,6 +1147,8 @@ private Map mergeDuplicateDVs() { return mergedDVs; } + // Merges the set of DVs for a given referenced files into a single DV + // and produces a single Puffin file private DeleteFile mergeAndWriteDV(String referencedDataFile, DeleteFileSet dvs) { Iterator dvIterator = dvs.iterator(); DeleteFile firstDV = dvIterator.next(); From 0a053a6cf74bb3a3fa04ff2497f0454d0781f555 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Thu, 15 Jan 2026 08:56:13 -0700 Subject: [PATCH 12/25] Make dv refs a multimap, group by partition to write single puffin for merged duplicates, add tests for multiple specs --- .../iceberg/MergingSnapshotProducer.java | 208 ++++++++++-------- .../java/org/apache/iceberg/TestRowDelta.java | 103 ++++++++- 2 files changed, 211 insertions(+), 100 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index d02b15d999d5..7dc9c5afbaef 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -96,8 +97,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final Map newDataFilesBySpec = Maps.newHashMap(); private Long newDataFilesDataSequenceNumber; private final Map newDeleteFilesBySpec = Maps.newHashMap(); - private final Map duplicateDVsForDataFile = Maps.newHashMap(); - private final Set newDVRefs = Sets.newHashSet(); + private final Map dvsByReferencedFile = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); @@ -278,39 +278,15 @@ private void addInternal(DeleteFile file) { DeleteFileSet deleteFiles = newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); - - if (!deleteFiles.add(file)) { - return; - } - - hasNewDeleteFiles = true; - if (ContentFileUtil.isDV(file)) { - addDV(file, deleteFiles); - } - } - - private void addDV(DeleteFile newDV, DeleteFileSet deleteFiles) { - String referencedFile = newDV.referencedDataFile(); - - if (newDVRefs.add(referencedFile)) { - return; + if (deleteFiles.add(file)) { + hasNewDeleteFiles = true; + if (ContentFileUtil.isDV(file)) { + DeleteFileSet deletesForReferencedFile = + dvsByReferencedFile.computeIfAbsent( + file.referencedDataFile(), newFile -> DeleteFileSet.create()); + deletesForReferencedFile.add(file); + } } - // newDV is a duplicate DV for the given referenced file - DeleteFileSet duplicateDVs = - duplicateDVsForDataFile.computeIfAbsent( - referencedFile, - referencedDataFile -> { - DeleteFile previouslyAddedDV = - Iterables.getOnlyElement( - deleteFiles.stream() - .filter(ContentFileUtil::isDV) - .filter(dv -> !dv.equals(newDV)) // exclude the new DV that was just added - .filter(dv -> Objects.equals(referencedDataFile, dv.referencedDataFile())) - .collect(Collectors.toList())); - return DeleteFileSet.of(List.of(previouslyAddedDV)); - }); - - duplicateDVs.add(newDV); } protected void validateNewDeleteFile(DeleteFile file) { @@ -850,7 +826,7 @@ protected void validateAddedDVs( Expression conflictDetectionFilter, Snapshot parent) { // skip if there is no current table state or this operation doesn't add new DVs - if (parent == null || newDVRefs.isEmpty()) { + if (parent == null || dvsByReferencedFile.isEmpty()) { return; } @@ -883,7 +859,7 @@ private void validateAddedDVs( DeleteFile file = entry.file(); if (newSnapshotIds.contains(entry.snapshotId()) && ContentFileUtil.isDV(file)) { ValidationException.check( - !newDVRefs.contains(file.referencedDataFile()), + !dvsByReferencedFile.containsKey(file.referencedDataFile()), "Found concurrently added DV for %s: %s", file.referencedDataFile(), ContentFileUtil.dvDesc(file)); @@ -1096,23 +1072,7 @@ private List newDeleteFilesAsManifests() { } if (cachedNewDeleteManifests.isEmpty()) { - // Found duplicates, merge them and update newDeleteFilesBySpec to remove duplicates and add - // the new merged one - if (!duplicateDVsForDataFile.isEmpty()) { - Map mergedDVs = mergeDuplicateDVs(); - mergedDVs.forEach( - (referencedFile, newDV) -> { - DeleteFileSet duplicateDVs = duplicateDVsForDataFile.get(referencedFile); - DeleteFileSet allDeleteFilesForSpec = newDeleteFilesBySpec.get(newDV.specId()); - LOG.warn( - "Merged {} duplicate deletion vectors for data file {} in table {}. The merged DVs are orphaned, and writers should merge DVs per file before committing", - duplicateDVs.size(), - referencedFile, - tableName); - allDeleteFilesForSpec.removeAll(duplicateDVs); - allDeleteFilesForSpec.add(newDV); - }); - } + mergeDVsAndWrite(); newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { @@ -1123,38 +1083,120 @@ private List newDeleteFilesAsManifests() { }); this.hasNewDeleteFiles = false; - this.duplicateDVsForDataFile.clear(); } return cachedNewDeleteManifests; } - // Returns the merged DV for a given data file that had duplicate DVs - private Map mergeDuplicateDVs() { - Map mergedDVs = Maps.newConcurrentMap(); - Tasks.foreach(duplicateDVsForDataFile.entrySet()) + // Merge duplicates, internally takes care of updating newDeleteFilesBySpec to remove + // duplicates and add the newly merged DV + private void mergeDVsAndWrite() { + Map> mergedIndicesBySpec = Maps.newConcurrentMap(); + + Tasks.foreach(dvsByReferencedFile.entrySet()) .executeWith(ThreadPools.getDeleteWorkerPool()) .stopOnFailure() .throwFailureWhenFinished() .run( - dvsToMergeForDataFile -> { - String referencedLocation = dvsToMergeForDataFile.getKey(); - mergedDVs.put( - referencedLocation, - mergeAndWriteDV(referencedLocation, dvsToMergeForDataFile.getValue())); + entry -> { + String referencedLocation = entry.getKey(); + DeleteFileSet dvsToMerge = entry.getValue(); + // Nothing to merge + if (dvsToMerge.size() < 2) { + return; + } + + MergedDVContent merged = mergePositions(referencedLocation, dvsToMerge); + + mergedIndicesBySpec + .computeIfAbsent( + merged.specId, spec -> Collections.synchronizedList(Lists.newArrayList())) + .add(merged); }); - return mergedDVs; + // Update newDeleteFilesBySpec to remove all the duplicates + mergedIndicesBySpec.forEach( + (specId, mergedDVContent) -> { + mergedDVContent.stream() + .map(content -> content.mergedDVs) + .forEach(duplicateDVs -> newDeleteFilesBySpec.get(specId).removeAll(duplicateDVs)); + }); + + writeMergedDVs(mergedIndicesBySpec); + } + + // Produces a Puffin per partition spec containing the merged DVs for that spec + private void writeMergedDVs(Map> mergedDVContentBySpec) { + Map mergedDVsBySpec = Maps.newHashMap(); + + mergedDVContentBySpec.forEach( + (specId, mergedDVsForSpec) -> { + try (DVFileWriter dvFileWriter = + new BaseDVFileWriter( + OutputFileFactory.builderFor(ops(), spec(specId), FileFormat.PUFFIN, 1, 1) + .build(), + path -> null)) { + + for (MergedDVContent mergedDV : mergedDVsForSpec) { + LOG.warn( + "Merged {} duplicate deletion vectors for data file {} in table {}. The merged DVs are orphaned, and writers should merge DVs per file before committing", + mergedDV.mergedDVs.size(), + mergedDV.referencedLocation, + tableName); + dvFileWriter.delete( + mergedDV.referencedLocation, + mergedDV.mergedPositions, + spec(mergedDV.specId), + mergedDV.partition); + } + + dvFileWriter.close(); + DeleteWriteResult result = dvFileWriter.result(); + + DeleteFileSet dvsForSpec = + mergedDVsBySpec.computeIfAbsent(specId, k -> DeleteFileSet.create()); + dvsForSpec.addAll( + result.deleteFiles().stream() + .map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber())) + .collect(Collectors.toList())); + + // Add the merged DV to the delete files by spec + newDeleteFilesBySpec.get(specId).addAll(dvsForSpec); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + // Data class for referenced file, DVs that were merged, the merged position delete index, + // partition spec and tuple + private static class MergedDVContent { + private DeleteFileSet mergedDVs; + private String referencedLocation; + private PositionDeleteIndex mergedPositions; + private int specId; + private StructLike partition; + + MergedDVContent( + String referencedLocation, + DeleteFileSet mergedDVs, + PositionDeleteIndex mergedPositions, + int specId, + StructLike partition) { + this.referencedLocation = referencedLocation; + this.mergedDVs = mergedDVs; + this.mergedPositions = mergedPositions; + this.specId = specId; + this.partition = partition; + } } // Merges the set of DVs for a given referenced files into a single DV // and produces a single Puffin file - private DeleteFile mergeAndWriteDV(String referencedDataFile, DeleteFileSet dvs) { + private MergedDVContent mergePositions(String referencedLocation, DeleteFileSet dvs) { Iterator dvIterator = dvs.iterator(); DeleteFile firstDV = dvIterator.next(); - PositionDeleteIndex positionDeleteIndex = - Deletes.readDV(firstDV, ops().io(), ops().encryption()); - PartitionSpec spec = spec(firstDV.specId()); + PositionDeleteIndex mergedPositions = Deletes.readDV(firstDV, ops().io(), ops().encryption()); while (dvIterator.hasNext()) { DeleteFile dv = dvIterator.next(); Preconditions.checkArgument( @@ -1172,35 +1214,11 @@ private DeleteFile mergeAndWriteDV(String referencedDataFile, DeleteFileSet dvs) Preconditions.checkArgument( Objects.equals(dv.partition(), firstDV.partition()), "Cannot merge duplicate added DVs when partition tuples are different"); - positionDeleteIndex.merge(Deletes.readDV(dv, ops().io(), ops().encryption())); + mergedPositions.merge(Deletes.readDV(dv, ops().io(), ops().encryption())); } - return writeDV( - referencedDataFile, - positionDeleteIndex, - spec, - firstDV.partition(), - firstDV.dataSequenceNumber()); - } - - private DeleteFile writeDV( - String referencedDataFile, - PositionDeleteIndex positionDeleteIndex, - PartitionSpec spec, - StructLike partition, - Long dataSequenceNumber) { - try (DVFileWriter dvFileWriter = - new BaseDVFileWriter( - OutputFileFactory.builderFor(ops(), spec, FileFormat.PUFFIN, 1, 1).build(), - path -> null)) { - dvFileWriter.delete(referencedDataFile, positionDeleteIndex, spec, partition); - dvFileWriter.close(); - DeleteWriteResult result = dvFileWriter.result(); - return Delegates.pendingDeleteFile( - Iterables.getOnlyElement(result.deleteFiles()), dataSequenceNumber); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return new MergedDVContent( + referencedLocation, dvs, mergedPositions, firstDV.specId(), firstDV.partition()); } private class DataFileFilterManager extends ManifestFilterManager { diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 9e4d156c9663..2f7b2a7265b4 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -1903,7 +1903,9 @@ public void testDuplicateDVsAreMerged() throws IOException { DeleteFile deleteFile1 = dvWithPositions(dataFile, fileFactory, 0, 2); DeleteFile deleteFile2 = dvWithPositions(dataFile, fileFactory, 2, 4); - RowDelta rowDelta1 = table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2); + DeleteFile deleteFile3 = dvWithPositions(dataFile, fileFactory, 4, 8); + RowDelta rowDelta1 = + table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).addDeletes(deleteFile3); commit(table, rowDelta1, branch); @@ -1912,7 +1914,91 @@ public void testDuplicateDVsAreMerged() throws IOException { assertThat(Iterables.size(addedDeleteFiles)).isEqualTo(1); DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles); - assertDVHasDeletedPositions(mergedDV, LongStream.range(0, 4).boxed()::iterator); + assertDVHasDeletedPositions(mergedDV, LongStream.range(0, 8).boxed()::iterator); + } + + @TestTemplate + public void testDuplicateDVsMergedMultipleSpecs() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + // append a partitioned data file + DataFile firstSnapshotDataFile = newDataFile("data_bucket=0"); + commit(table, table.newAppend().appendFile(firstSnapshotDataFile), branch); + + // remove the only partition field to make the spec unpartitioned + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + + // append an unpartitioned data file + DataFile secondSnapshotDataFile = newDataFile(""); + commit(table, table.newAppend().appendFile(secondSnapshotDataFile), branch); + + // evolve the spec and add a new partition field + table.updateSpec().addField("data").commit(); + + // append a data file with the new spec + DataFile thirdSnapshotDataFile = newDataFile("data=abc"); + commit(table, table.newAppend().appendFile(thirdSnapshotDataFile), branch); + + assertThat(table.specs()).hasSize(3); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + + DataFile dataFile = newDataFile("data=xyz"); + // For each data file, create two DVs covering positions [0,2) and [2,4) + DeleteFile deleteFile1a = dvWithPositions(firstSnapshotDataFile, fileFactory, 0, 2); + DeleteFile deleteFile1b = dvWithPositions(firstSnapshotDataFile, fileFactory, 2, 4); + DeleteFile deleteFile2a = dvWithPositions(secondSnapshotDataFile, fileFactory, 0, 2); + DeleteFile deleteFile2b = dvWithPositions(secondSnapshotDataFile, fileFactory, 2, 4); + DeleteFile deleteFile3a = dvWithPositions(thirdSnapshotDataFile, fileFactory, 0, 2); + DeleteFile deleteFile3b = dvWithPositions(thirdSnapshotDataFile, fileFactory, 2, 4); + + commit( + table, + table + .newRowDelta() + .addRows(dataFile) + .addDeletes(deleteFile1a) + .addDeletes(deleteFile1b) + .addDeletes(deleteFile2a) + .addDeletes(deleteFile2b) + .addDeletes(deleteFile3a) + .addDeletes(deleteFile3b), + branch); + + Snapshot snapshot = latestSnapshot(table, branch); + // Expect 3 merged DVs, one per data file + Iterable addedDeleteFiles = snapshot.addedDeleteFiles(table.io()); + List mergedDVs = Lists.newArrayList(addedDeleteFiles); + assertThat(mergedDVs).hasSize(3); + // Should be a Puffin produced per merged DV spec + assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet())) + .hasSize(3); + + DeleteFile committedDVForDataFile1 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter( + dv -> Objects.equals(dv.referencedDataFile(), firstSnapshotDataFile.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0, 4).boxed()::iterator); + + DeleteFile committedDVForDataFile2 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter( + dv -> + Objects.equals(dv.referencedDataFile(), secondSnapshotDataFile.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 4).boxed()::iterator); + + DeleteFile committedDVForDataFile3 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter( + dv -> Objects.equals(dv.referencedDataFile(), thirdSnapshotDataFile.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile3, LongStream.range(0, 4).boxed()::iterator); } @TestTemplate @@ -1949,6 +2035,9 @@ public void testDuplicateDVsAreMergedForMultipleReferenceFiles() throws IOExcept List mergedDVs = Lists.newArrayList(addedDeleteFiles); assertThat(mergedDVs).hasSize(2); + // Should be a single Puffin produced + assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet())) + .hasSize(1); DeleteFile committedDVForDataFile1 = Iterables.getOnlyElement( @@ -2097,7 +2186,7 @@ private DeleteFile dvWithPositions( for (int i = fromInclusive; i < toExclusive; i++) { deletes.add(PositionDelete.create().set(dataFile.location(), i)); } - return writeDV(deletes, dataFile.partition(), fileFactory); + return writeDV(deletes, dataFile.specId(), dataFile.partition(), fileFactory); } private void assertDVHasDeletedPositions(DeleteFile dv, Iterable positions) @@ -2112,13 +2201,17 @@ private void assertDVHasDeletedPositions(DeleteFile dv, Iterable positions } private DeleteFile writeDV( - List> deletes, StructLike partition, OutputFileFactory fileFactory) + List> deletes, + int specId, + StructLike partition, + OutputFileFactory fileFactory) throws IOException { DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); try (DVFileWriter closeableWriter = writer) { for (PositionDelete delete : deletes) { - closeableWriter.delete(delete.path().toString(), delete.pos(), table.spec(), partition); + closeableWriter.delete( + delete.path().toString(), delete.pos(), table.specs().get(specId), partition); } } From 6b04dd989374e3ffbaa040209721d5264948c73e Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Thu, 15 Jan 2026 12:12:55 -0700 Subject: [PATCH 13/25] Filter files with duplicates before sifting through them and merging --- .../iceberg/MergingSnapshotProducer.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 7dc9c5afbaef..f09a5c0fd95f 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1092,22 +1092,20 @@ private List newDeleteFilesAsManifests() { // duplicates and add the newly merged DV private void mergeDVsAndWrite() { Map> mergedIndicesBySpec = Maps.newConcurrentMap(); + Map dataFilesWithDuplicateDVs = + dvsByReferencedFile.entrySet().stream() + .filter(entry -> entry.getValue().size() > 1) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Tasks.foreach(dvsByReferencedFile.entrySet()) + Tasks.foreach(dataFilesWithDuplicateDVs.entrySet()) .executeWith(ThreadPools.getDeleteWorkerPool()) .stopOnFailure() .throwFailureWhenFinished() .run( entry -> { String referencedLocation = entry.getKey(); - DeleteFileSet dvsToMerge = entry.getValue(); - // Nothing to merge - if (dvsToMerge.size() < 2) { - return; - } - - MergedDVContent merged = mergePositions(referencedLocation, dvsToMerge); - + DeleteFileSet duplicateDVs = entry.getValue(); + MergedDVContent merged = mergePositions(referencedLocation, duplicateDVs); mergedIndicesBySpec .computeIfAbsent( merged.specId, spec -> Collections.synchronizedList(Lists.newArrayList())) @@ -1118,7 +1116,7 @@ private void mergeDVsAndWrite() { mergedIndicesBySpec.forEach( (specId, mergedDVContent) -> { mergedDVContent.stream() - .map(content -> content.mergedDVs) + .map(content -> content.duplicateDVs) .forEach(duplicateDVs -> newDeleteFilesBySpec.get(specId).removeAll(duplicateDVs)); }); @@ -1139,8 +1137,8 @@ private void writeMergedDVs(Map> mergedDVContentB for (MergedDVContent mergedDV : mergedDVsForSpec) { LOG.warn( - "Merged {} duplicate deletion vectors for data file {} in table {}. The merged DVs are orphaned, and writers should merge DVs per file before committing", - mergedDV.mergedDVs.size(), + "Merged {} duplicate deletion vectors for data file {} in table {}. The duplicate DVs are orphaned, and writers should merge DVs per file before committing", + mergedDV.duplicateDVs.size(), mergedDV.referencedLocation, tableName); dvFileWriter.delete( @@ -1171,7 +1169,7 @@ private void writeMergedDVs(Map> mergedDVContentB // Data class for referenced file, DVs that were merged, the merged position delete index, // partition spec and tuple private static class MergedDVContent { - private DeleteFileSet mergedDVs; + private DeleteFileSet duplicateDVs; private String referencedLocation; private PositionDeleteIndex mergedPositions; private int specId; @@ -1179,12 +1177,12 @@ private static class MergedDVContent { MergedDVContent( String referencedLocation, - DeleteFileSet mergedDVs, + DeleteFileSet duplicateDVs, PositionDeleteIndex mergedPositions, int specId, StructLike partition) { this.referencedLocation = referencedLocation; - this.mergedDVs = mergedDVs; + this.duplicateDVs = duplicateDVs; this.mergedPositions = mergedPositions; this.specId = specId; this.partition = partition; From a50fb32106dcb8a3b52634935d8cf934f651a3c3 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Thu, 15 Jan 2026 12:14:40 -0700 Subject: [PATCH 14/25] update old comment --- .../java/org/apache/iceberg/MergingSnapshotProducer.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index f09a5c0fd95f..314256d23b6a 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1189,10 +1189,9 @@ private static class MergedDVContent { } } - // Merges the set of DVs for a given referenced files into a single DV - // and produces a single Puffin file - private MergedDVContent mergePositions(String referencedLocation, DeleteFileSet dvs) { - Iterator dvIterator = dvs.iterator(); + // Merges the position indices for the duplicate DVs for a given referenced file + private MergedDVContent mergePositions(String referencedLocation, DeleteFileSet duplicateDVs) { + Iterator dvIterator = duplicateDVs.iterator(); DeleteFile firstDV = dvIterator.next(); PositionDeleteIndex mergedPositions = Deletes.readDV(firstDV, ops().io(), ops().encryption()); while (dvIterator.hasNext()) { @@ -1216,7 +1215,7 @@ private MergedDVContent mergePositions(String referencedLocation, DeleteFileSet } return new MergedDVContent( - referencedLocation, dvs, mergedPositions, firstDV.specId(), firstDV.partition()); + referencedLocation, duplicateDVs, mergedPositions, firstDV.specId(), firstDV.partition()); } private class DataFileFilterManager extends ManifestFilterManager { From 301f0fe7abded5f181a41067017ce7aa93fe3fc8 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 16 Jan 2026 15:16:36 -0700 Subject: [PATCH 15/25] Use an unpartitioned spec for the output file factory for the final puffin, don't produce a puffin per spec --- .../iceberg/MergingSnapshotProducer.java | 91 ++++++++----------- .../java/org/apache/iceberg/TestRowDelta.java | 2 +- 2 files changed, 40 insertions(+), 53 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 314256d23b6a..9e7950609c99 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1091,7 +1091,7 @@ private List newDeleteFilesAsManifests() { // Merge duplicates, internally takes care of updating newDeleteFilesBySpec to remove // duplicates and add the newly merged DV private void mergeDVsAndWrite() { - Map> mergedIndicesBySpec = Maps.newConcurrentMap(); + List mergedDVs = Collections.synchronizedList(Lists.newArrayList()); Map dataFilesWithDuplicateDVs = dvsByReferencedFile.entrySet().stream() .filter(entry -> entry.getValue().size() > 1) @@ -1105,68 +1105,55 @@ private void mergeDVsAndWrite() { entry -> { String referencedLocation = entry.getKey(); DeleteFileSet duplicateDVs = entry.getValue(); - MergedDVContent merged = mergePositions(referencedLocation, duplicateDVs); - mergedIndicesBySpec - .computeIfAbsent( - merged.specId, spec -> Collections.synchronizedList(Lists.newArrayList())) - .add(merged); + mergedDVs.add(mergePositions(referencedLocation, duplicateDVs)); }); // Update newDeleteFilesBySpec to remove all the duplicates - mergedIndicesBySpec.forEach( - (specId, mergedDVContent) -> { - mergedDVContent.stream() - .map(content -> content.duplicateDVs) - .forEach(duplicateDVs -> newDeleteFilesBySpec.get(specId).removeAll(duplicateDVs)); - }); + mergedDVs.forEach( + mergedDV -> newDeleteFilesBySpec.get(mergedDV.specId).removeAll(mergedDV.duplicateDVs)); - writeMergedDVs(mergedIndicesBySpec); + writeMergedDVs(mergedDVs); } // Produces a Puffin per partition spec containing the merged DVs for that spec - private void writeMergedDVs(Map> mergedDVContentBySpec) { - Map mergedDVsBySpec = Maps.newHashMap(); - - mergedDVContentBySpec.forEach( - (specId, mergedDVsForSpec) -> { - try (DVFileWriter dvFileWriter = - new BaseDVFileWriter( - OutputFileFactory.builderFor(ops(), spec(specId), FileFormat.PUFFIN, 1, 1) - .build(), - path -> null)) { - - for (MergedDVContent mergedDV : mergedDVsForSpec) { - LOG.warn( - "Merged {} duplicate deletion vectors for data file {} in table {}. The duplicate DVs are orphaned, and writers should merge DVs per file before committing", - mergedDV.duplicateDVs.size(), - mergedDV.referencedLocation, - tableName); - dvFileWriter.delete( - mergedDV.referencedLocation, - mergedDV.mergedPositions, - spec(mergedDV.specId), - mergedDV.partition); - } - - dvFileWriter.close(); - DeleteWriteResult result = dvFileWriter.result(); + private void writeMergedDVs(List mergedDVs) { + try (DVFileWriter dvFileWriter = + new BaseDVFileWriter( + // Use an unpartitioned spec for the location provider for the puffin containing + // all the merged DVs + OutputFileFactory.builderFor( + ops(), PartitionSpec.unpartitioned(), FileFormat.PUFFIN, 1, 1) + .build(), + path -> null)) { + + for (MergedDVContent mergedDV : mergedDVs) { + LOG.warn( + "Merged {} duplicate deletion vectors for data file {} in table {}. The duplicate DVs are orphaned, and writers should merge DVs per file before committing", + mergedDV.duplicateDVs.size(), + mergedDV.referencedLocation, + tableName); + dvFileWriter.delete( + mergedDV.referencedLocation, + mergedDV.mergedPositions, + spec(mergedDV.specId), + mergedDV.partition); + } - DeleteFileSet dvsForSpec = - mergedDVsBySpec.computeIfAbsent(specId, k -> DeleteFileSet.create()); - dvsForSpec.addAll( - result.deleteFiles().stream() - .map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber())) - .collect(Collectors.toList())); + dvFileWriter.close(); + DeleteWriteResult result = dvFileWriter.result(); + result.deleteFiles().forEach(this::addPendingDelete); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } - // Add the merged DV to the delete files by spec - newDeleteFilesBySpec.get(specId).addAll(dvsForSpec); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); + private void addPendingDelete(DeleteFile file) { + newDeleteFilesBySpec + .get(file.specId()) + .add(Delegates.pendingDeleteFile(file, file.dataSequenceNumber())); } - // Data class for referenced file, DVs that were merged, the merged position delete index, + // Data class for referenced file, the duplicate DVs, the merged position delete index, // partition spec and tuple private static class MergedDVContent { private DeleteFileSet duplicateDVs; diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 2f7b2a7265b4..f3a2b03972b5 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -1973,7 +1973,7 @@ public void testDuplicateDVsMergedMultipleSpecs() throws IOException { assertThat(mergedDVs).hasSize(3); // Should be a Puffin produced per merged DV spec assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet())) - .hasSize(3); + .hasSize(1); DeleteFile committedDVForDataFile1 = Iterables.getOnlyElement( From 112b086a8aba4ffa76b45c2c54b9a49e8d5ca099 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 16 Jan 2026 15:24:47 -0700 Subject: [PATCH 16/25] address feedback --- .../apache/iceberg/MergingSnapshotProducer.java | 14 +++++++------- .../org/apache/iceberg/io/OutputFileFactory.java | 8 +++----- .../java/org/apache/iceberg/TestRowDelta.java | 15 ++++++++------- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 9e7950609c99..900be023922b 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1156,11 +1156,11 @@ private void addPendingDelete(DeleteFile file) { // Data class for referenced file, the duplicate DVs, the merged position delete index, // partition spec and tuple private static class MergedDVContent { - private DeleteFileSet duplicateDVs; - private String referencedLocation; - private PositionDeleteIndex mergedPositions; - private int specId; - private StructLike partition; + private final DeleteFileSet duplicateDVs; + private final String referencedLocation; + private final PositionDeleteIndex mergedPositions; + private final int specId; + private final StructLike partition; MergedDVContent( String referencedLocation, @@ -1185,13 +1185,13 @@ private MergedDVContent mergePositions(String referencedLocation, DeleteFileSet DeleteFile dv = dvIterator.next(); Preconditions.checkArgument( Objects.equals(dv.dataSequenceNumber(), firstDV.dataSequenceNumber()), - "Cannot merge duplicate added DVs when data sequence numbers are different," + "Cannot merge duplicate added DVs when data sequence numbers are different, " + "expected all to be added with sequence %s, but got %s", firstDV.dataSequenceNumber(), dv.dataSequenceNumber()); Preconditions.checkArgument( dv.specId() == firstDV.specId(), - "Cannot merge duplicate added DVs when partition specs are different," + "Cannot merge duplicate added DVs when partition specs are different, " + "expected all to be added with spec %s, but got %s", firstDV.specId(), dv.specId()); diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index 4930d2f3aa34..2f5263db2330 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -88,15 +88,13 @@ private OutputFileFactory( } public static Builder builderFor(Table table, int partitionId, long taskId) { - String formatAsString = - table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - PartitionSpec spec = table.spec(); return new Builder( table.locationProvider(), table.encryption(), table::io, - spec, - FileFormat.fromString(formatAsString), + table.spec(), + FileFormat.fromString( + table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)), partitionId, taskId); } diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index f3a2b03972b5..a2f5c277cca3 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -2186,18 +2186,19 @@ private DeleteFile dvWithPositions( for (int i = fromInclusive; i < toExclusive; i++) { deletes.add(PositionDelete.create().set(dataFile.location(), i)); } + return writeDV(deletes, dataFile.specId(), dataFile.partition(), fileFactory); } - private void assertDVHasDeletedPositions(DeleteFile dv, Iterable positions) - throws IOException { + private void assertDVHasDeletedPositions(DeleteFile dv, Iterable positions) { assertThat(dv).isNotNull(); - PositionDeleteIndex index = Deletes.readDV(dv, table.io(), table.encryption()); - - for (long pos : positions) { - assertThat(index.isDeleted(pos)).as("Expected position %s to be deleted", pos).isTrue(); - } + assertThat(positions) + .allSatisfy( + pos -> + assertThat(index.isDeleted(pos)) + .as("Expected position %s to be deleted", pos) + .isTrue()); } private DeleteFile writeDV( From eecacad84ea334d2b0f05158d90ff38d1862f5c3 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 16 Jan 2026 15:26:16 -0700 Subject: [PATCH 17/25] Add some spacing to make precondition checks more readable --- .../main/java/org/apache/iceberg/MergingSnapshotProducer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 900be023922b..3c9ea151c514 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1189,12 +1189,14 @@ private MergedDVContent mergePositions(String referencedLocation, DeleteFileSet + "expected all to be added with sequence %s, but got %s", firstDV.dataSequenceNumber(), dv.dataSequenceNumber()); + Preconditions.checkArgument( dv.specId() == firstDV.specId(), "Cannot merge duplicate added DVs when partition specs are different, " + "expected all to be added with spec %s, but got %s", firstDV.specId(), dv.specId()); + Preconditions.checkArgument( Objects.equals(dv.partition(), firstDV.partition()), "Cannot merge duplicate added DVs when partition tuples are different"); From 9673f85932ce502e32f062b04e312d93fd9321c3 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 16 Jan 2026 15:27:31 -0700 Subject: [PATCH 18/25] more style stuff --- .../main/java/org/apache/iceberg/MergingSnapshotProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 3c9ea151c514..57908589dda9 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1091,12 +1091,12 @@ private List newDeleteFilesAsManifests() { // Merge duplicates, internally takes care of updating newDeleteFilesBySpec to remove // duplicates and add the newly merged DV private void mergeDVsAndWrite() { - List mergedDVs = Collections.synchronizedList(Lists.newArrayList()); Map dataFilesWithDuplicateDVs = dvsByReferencedFile.entrySet().stream() .filter(entry -> entry.getValue().size() > 1) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + List mergedDVs = Collections.synchronizedList(Lists.newArrayList()); Tasks.foreach(dataFilesWithDuplicateDVs.entrySet()) .executeWith(ThreadPools.getDeleteWorkerPool()) .stopOnFailure() From 41040972979f5ee36d8185fd603a712c3684c74b Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 16 Jan 2026 15:29:32 -0700 Subject: [PATCH 19/25] Update delete loader to use IOUtil.readDV API --- .../apache/iceberg/data/BaseDeleteLoader.java | 24 ++----------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java index 99f5c742d37c..ac8775eacc17 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java @@ -41,9 +41,8 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.RangeReadable; -import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.orc.OrcRowReader; import org.apache.iceberg.parquet.Parquet; @@ -51,7 +50,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.CharSequenceMap; import org.apache.iceberg.util.ContentFileUtil; @@ -183,7 +181,7 @@ private PositionDeleteIndex readDV(DeleteFile dv) { InputFile inputFile = loadInputFile.apply(dv); long offset = dv.contentOffset(); int length = dv.contentSizeInBytes().intValue(); - byte[] bytes = readBytes(inputFile, offset, length); + byte[] bytes = IOUtil.readBytes(inputFile, offset, length); return PositionDeleteIndex.deserialize(bytes, dv); } @@ -322,22 +320,4 @@ private void validateDV(DeleteFile dv, CharSequence filePath) { filePath, dv.referencedDataFile()); } - - private static byte[] readBytes(InputFile inputFile, long offset, int length) { - try (SeekableInputStream stream = inputFile.newStream()) { - byte[] bytes = new byte[length]; - - if (stream instanceof RangeReadable) { - RangeReadable rangeReadable = (RangeReadable) stream; - rangeReadable.readFully(offset, bytes); - } else { - stream.seek(offset); - ByteStreams.readFully(stream, bytes); - } - - return bytes; - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } } From 9972ce0631d4b1837499725f303b96a9ad6202a2 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Fri, 16 Jan 2026 15:53:42 -0700 Subject: [PATCH 20/25] Update interface documentation --- core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java index a2289b190b76..208b174a3942 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java @@ -46,6 +46,7 @@ public interface DVFileWriter extends Closeable { /** * Marks every position that is deleted in positionDeleteIndex as deleted in the given data file. + * Implementations should merge with existing position indices for the provided path * * @param path the data file path * @param positionDeleteIndex the position delete index containing all the positions to delete From e75bb033698e4ba71f00b3ad63698b804ebe338d Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Sun, 25 Jan 2026 16:57:20 -0700 Subject: [PATCH 21/25] address Ryan's feedback --- .../main/java/org/apache/iceberg/DVUtil.java | 146 ++++++++++++++ .../iceberg/MergingSnapshotProducer.java | 179 ++++-------------- .../java/org/apache/iceberg/TestRowDelta.java | 51 +++++ 3 files changed, 233 insertions(+), 143 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/DVUtil.java diff --git a/core/src/main/java/org/apache/iceberg/DVUtil.java b/core/src/main/java/org/apache/iceberg/DVUtil.java new file mode 100644 index 000000000000..2f43a9374fab --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/DVUtil.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DVUtil { + private static final Logger LOG = LoggerFactory.getLogger(DVUtil.class); + + private DVUtil() {} + + /** + * Merges duplicate DVs for the same data file and writes the merged DV Puffin files. + * + * @param duplicateDVsByReferencedFile map of data file location to duplicate DVs (all entries + * must have size > 1) + * @return newly merged DVs + */ + static List mergeDVsAndWrite( + TableOperations ops, + Map> duplicateDVsByReferencedFile, + String tableName, + Map specs, + ExecutorService threadpool) { + Map mergedIndices = + duplicateDVsByReferencedFile.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> mergePositions(ops, entry.getValue(), threadpool))); + + return writeMergedDVs(mergedIndices, duplicateDVsByReferencedFile, ops, tableName, specs); + } + + // Merges the position indices for the duplicate DVs for a given referenced file + private static PositionDeleteIndex mergePositions( + TableOperations ops, List dvsForFile, ExecutorService pool) { + Preconditions.checkArgument(dvsForFile.size() > 1, "Expected more than 1 DV"); + PositionDeleteIndex[] duplicateDVIndices = new PositionDeleteIndex[dvsForFile.size()]; + Tasks.range(dvsForFile.size()) + .executeWith(pool) + .stopOnFailure() + .throwFailureWhenFinished() + .run( + i -> { + duplicateDVIndices[i] = Deletes.readDV(dvsForFile.get(i), ops.io(), ops.encryption()); + }); + PositionDeleteIndex mergedPositions = duplicateDVIndices[0]; + DeleteFile firstDV = dvsForFile.get(0); + for (int i = 1; i < duplicateDVIndices.length; i++) { + DeleteFile dv = dvsForFile.get(i); + Preconditions.checkArgument( + Objects.equals(dv.dataSequenceNumber(), firstDV.dataSequenceNumber()), + "Cannot merge duplicate added DVs when data sequence numbers are different, " + + "expected all to be added with sequence %s, but got %s", + firstDV.dataSequenceNumber(), + dv.dataSequenceNumber()); + + Preconditions.checkArgument( + dv.specId() == firstDV.specId(), + "Cannot merge duplicate added DVs when partition specs are different, " + + "expected all to be added with spec %s, but got %s", + firstDV.specId(), + dv.specId()); + + Preconditions.checkArgument( + Objects.equals(dv.partition(), firstDV.partition()), + "Cannot merge duplicate added DVs when partition tuples are different"); + mergedPositions.merge(duplicateDVIndices[i]); + } + + return mergedPositions; + } + + // Produces a Puffin per partition spec containing the merged DVs for that spec + private static List writeMergedDVs( + Map mergedIndices, + Map> dataFilesWithDuplicateDVs, + TableOperations ops, + String tableName, + Map specsById) { + try (DVFileWriter dvFileWriter = + new BaseDVFileWriter( + // Use an unpartitioned spec for the location provider for the puffin containing + // all the merged DVs + OutputFileFactory.builderFor( + ops, PartitionSpec.unpartitioned(), FileFormat.PUFFIN, 1, 1) + .build(), + path -> null)) { + + for (Map.Entry entry : mergedIndices.entrySet()) { + String referencedLocation = entry.getKey(); + PositionDeleteIndex mergedPositions = entry.getValue(); + List duplicateDVs = dataFilesWithDuplicateDVs.get(referencedLocation); + DeleteFile firstDV = duplicateDVs.get(0); + LOG.warn( + "Merged {} DVs for data file {}. These will be orphaned DVs in table {}", + duplicateDVs.size(), + referencedLocation, + tableName); + dvFileWriter.delete( + referencedLocation, + mergedPositions, + specsById.get(firstDV.specId()), + firstDV.partition()); + } + + dvFileWriter.close(); + DeleteWriteResult writeResult = dvFileWriter.result(); + return writeResult.deleteFiles(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 57908589dda9..8b59a1ce7f29 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -26,17 +26,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import org.apache.iceberg.deletes.BaseDVFileWriter; -import org.apache.iceberg.deletes.DVFileWriter; -import org.apache.iceberg.deletes.Deletes; -import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.ValidationException; @@ -44,9 +38,7 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Predicate; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -56,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.DataFileSet; @@ -96,8 +89,8 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final Map newDataFilesBySpec = Maps.newHashMap(); private Long newDataFilesDataSequenceNumber; - private final Map newDeleteFilesBySpec = Maps.newHashMap(); - private final Map dvsByReferencedFile = Maps.newHashMap(); + private final List positionAndEqualityDeletes = Lists.newArrayList(); + private final Map> dvsByReferencedFile = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); @@ -232,7 +225,7 @@ protected boolean addsDataFiles() { } protected boolean addsDeleteFiles() { - return !newDeleteFilesBySpec.isEmpty(); + return !positionAndEqualityDeletes.isEmpty() || !dvsByReferencedFile.isEmpty(); } /** Add a data file to the new snapshot. */ @@ -275,17 +268,14 @@ private void addInternal(DeleteFile file) { "Cannot find partition spec %s for delete file: %s", file.specId(), file.location()); - - DeleteFileSet deleteFiles = - newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); - if (deleteFiles.add(file)) { - hasNewDeleteFiles = true; - if (ContentFileUtil.isDV(file)) { - DeleteFileSet deletesForReferencedFile = - dvsByReferencedFile.computeIfAbsent( - file.referencedDataFile(), newFile -> DeleteFileSet.create()); - deletesForReferencedFile.add(file); - } + hasNewDeleteFiles = true; + if (ContentFileUtil.isDV(file)) { + List dvsForReferencedFile = + dvsByReferencedFile.computeIfAbsent( + file.referencedDataFile(), newFile -> Lists.newArrayList()); + dvsForReferencedFile.add(file); + } else { + positionAndEqualityDeletes.add(file); } } @@ -1054,7 +1044,7 @@ private List newDataFilesAsManifests() { } private Iterable prepareDeleteManifests() { - if (newDeleteFilesBySpec.isEmpty()) { + if (!addsDeleteFiles()) { return ImmutableList.of(); } @@ -1072,8 +1062,30 @@ private List newDeleteFilesAsManifests() { } if (cachedNewDeleteManifests.isEmpty()) { - mergeDVsAndWrite(); + Map> duplicateDVs = Maps.newHashMap(); + List validDVs = Lists.newArrayList(); + for (Map.Entry> entry : dvsByReferencedFile.entrySet()) { + if (entry.getValue().size() > 1) { + duplicateDVs.put(entry.getKey(), entry.getValue()); + } else { + validDVs.addAll(entry.getValue()); + } + } + List mergedDVs = + duplicateDVs.isEmpty() + ? ImmutableList.of() + : DVUtil.mergeDVsAndWrite( + ops(), + duplicateDVs, + tableName, + ops().current().specsById(), + ThreadPools.getDeleteWorkerPool()); + + Map> newDeleteFilesBySpec = + Streams.stream(Iterables.concat(mergedDVs, validDVs, positionAndEqualityDeletes)) + .map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber())) + .collect(Collectors.groupingBy(ContentFile::specId)); newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { PartitionSpec spec = ops().current().spec(specId); @@ -1088,125 +1100,6 @@ private List newDeleteFilesAsManifests() { return cachedNewDeleteManifests; } - // Merge duplicates, internally takes care of updating newDeleteFilesBySpec to remove - // duplicates and add the newly merged DV - private void mergeDVsAndWrite() { - Map dataFilesWithDuplicateDVs = - dvsByReferencedFile.entrySet().stream() - .filter(entry -> entry.getValue().size() > 1) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - List mergedDVs = Collections.synchronizedList(Lists.newArrayList()); - Tasks.foreach(dataFilesWithDuplicateDVs.entrySet()) - .executeWith(ThreadPools.getDeleteWorkerPool()) - .stopOnFailure() - .throwFailureWhenFinished() - .run( - entry -> { - String referencedLocation = entry.getKey(); - DeleteFileSet duplicateDVs = entry.getValue(); - mergedDVs.add(mergePositions(referencedLocation, duplicateDVs)); - }); - - // Update newDeleteFilesBySpec to remove all the duplicates - mergedDVs.forEach( - mergedDV -> newDeleteFilesBySpec.get(mergedDV.specId).removeAll(mergedDV.duplicateDVs)); - - writeMergedDVs(mergedDVs); - } - - // Produces a Puffin per partition spec containing the merged DVs for that spec - private void writeMergedDVs(List mergedDVs) { - try (DVFileWriter dvFileWriter = - new BaseDVFileWriter( - // Use an unpartitioned spec for the location provider for the puffin containing - // all the merged DVs - OutputFileFactory.builderFor( - ops(), PartitionSpec.unpartitioned(), FileFormat.PUFFIN, 1, 1) - .build(), - path -> null)) { - - for (MergedDVContent mergedDV : mergedDVs) { - LOG.warn( - "Merged {} duplicate deletion vectors for data file {} in table {}. The duplicate DVs are orphaned, and writers should merge DVs per file before committing", - mergedDV.duplicateDVs.size(), - mergedDV.referencedLocation, - tableName); - dvFileWriter.delete( - mergedDV.referencedLocation, - mergedDV.mergedPositions, - spec(mergedDV.specId), - mergedDV.partition); - } - - dvFileWriter.close(); - DeleteWriteResult result = dvFileWriter.result(); - result.deleteFiles().forEach(this::addPendingDelete); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private void addPendingDelete(DeleteFile file) { - newDeleteFilesBySpec - .get(file.specId()) - .add(Delegates.pendingDeleteFile(file, file.dataSequenceNumber())); - } - - // Data class for referenced file, the duplicate DVs, the merged position delete index, - // partition spec and tuple - private static class MergedDVContent { - private final DeleteFileSet duplicateDVs; - private final String referencedLocation; - private final PositionDeleteIndex mergedPositions; - private final int specId; - private final StructLike partition; - - MergedDVContent( - String referencedLocation, - DeleteFileSet duplicateDVs, - PositionDeleteIndex mergedPositions, - int specId, - StructLike partition) { - this.referencedLocation = referencedLocation; - this.duplicateDVs = duplicateDVs; - this.mergedPositions = mergedPositions; - this.specId = specId; - this.partition = partition; - } - } - - // Merges the position indices for the duplicate DVs for a given referenced file - private MergedDVContent mergePositions(String referencedLocation, DeleteFileSet duplicateDVs) { - Iterator dvIterator = duplicateDVs.iterator(); - DeleteFile firstDV = dvIterator.next(); - PositionDeleteIndex mergedPositions = Deletes.readDV(firstDV, ops().io(), ops().encryption()); - while (dvIterator.hasNext()) { - DeleteFile dv = dvIterator.next(); - Preconditions.checkArgument( - Objects.equals(dv.dataSequenceNumber(), firstDV.dataSequenceNumber()), - "Cannot merge duplicate added DVs when data sequence numbers are different, " - + "expected all to be added with sequence %s, but got %s", - firstDV.dataSequenceNumber(), - dv.dataSequenceNumber()); - - Preconditions.checkArgument( - dv.specId() == firstDV.specId(), - "Cannot merge duplicate added DVs when partition specs are different, " - + "expected all to be added with spec %s, but got %s", - firstDV.specId(), - dv.specId()); - - Preconditions.checkArgument( - Objects.equals(dv.partition(), firstDV.partition()), - "Cannot merge duplicate added DVs when partition tuples are different"); - mergedPositions.merge(Deletes.readDV(dv, ops().io(), ops().encryption())); - } - - return new MergedDVContent( - referencedLocation, duplicateDVs, mergedPositions, firstDV.specId(), firstDV.partition()); - } - private class DataFileFilterManager extends ManifestFilterManager { private DataFileFilterManager() { super(ops().current().specsById(), MergingSnapshotProducer.this::workerPool); diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index a2f5c277cca3..77737b6f1fcf 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -2054,6 +2054,57 @@ public void testDuplicateDVsAreMergedForMultipleReferenceFiles() throws IOExcept assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 4).boxed()::iterator); } + @TestTemplate + public void testDuplicateDVsAndValidDV() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + DataFile dataFile1 = newDataFile("data_bucket=0"); + DataFile dataFile2 = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2), branch); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + + // dataFile1 has duplicate DVs that need merging + DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2); + DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4); + + // dataFile2 has a valid DV + DeleteFile deleteFile2 = dvWithPositions(dataFile2, fileFactory, 0, 3); + + RowDelta rowDelta = + table + .newRowDelta() + .addDeletes(deleteFile1a) + .addDeletes(deleteFile1b) + .addDeletes(deleteFile2); + + commit(table, rowDelta, branch); + + // Expect two DVs: one merged for dataFile1 and deleteFile2 + Iterable addedDeleteFiles = + latestSnapshot(table, branch).addedDeleteFiles(table.io()); + List committedDVs = Lists.newArrayList(addedDeleteFiles); + + assertThat(committedDVs).hasSize(2); + + // Verify merged DV for dataFile1 has positions [0,4) + DeleteFile committedDVForDataFile1 = + Iterables.getOnlyElement( + committedDVs.stream() + .filter(dv -> Objects.equals(dv.referencedDataFile(), dataFile1.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0, 4).boxed()::iterator); + + // Verify deleteFile2 state + DeleteFile committedDVForDataFile2 = + Iterables.getOnlyElement( + committedDVs.stream() + .filter(dv -> Objects.equals(dv.referencedDataFile(), dataFile2.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 3).boxed()::iterator); + } + @TestTemplate public void testDuplicateDVsAreMergedAndEqDelete() throws IOException { assumeThat(formatVersion).isGreaterThanOrEqualTo(3); From 6bccc52ec130702e10518fbc3ed311d4b6f92ffd Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Sun, 25 Jan 2026 17:18:37 -0700 Subject: [PATCH 22/25] Dedupe pos/equality deletes to preserve previous behavior, prevent any duplicates in metadata --- .../java/org/apache/iceberg/MergingSnapshotProducer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 8b59a1ce7f29..2e43ca747df2 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1081,9 +1081,11 @@ private List newDeleteFilesAsManifests() { tableName, ops().current().specsById(), ThreadPools.getDeleteWorkerPool()); - + // Prevent commiting duplicate V2 deletes by deduping them Map> newDeleteFilesBySpec = - Streams.stream(Iterables.concat(mergedDVs, validDVs, positionAndEqualityDeletes)) + Streams.stream( + Iterables.concat( + mergedDVs, validDVs, DeleteFileSet.of(positionAndEqualityDeletes))) .map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber())) .collect(Collectors.groupingBy(ContentFile::specId)); newDeleteFilesBySpec.forEach( From 9bb9c56c8cefdbcb89d518071ea0422486dcbc10 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Mon, 26 Jan 2026 19:04:01 -0700 Subject: [PATCH 23/25] Make DV tracking a linked hashmap to preserve ordering of entries --- .../main/java/org/apache/iceberg/MergingSnapshotProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 2e43ca747df2..6c9c13f17d2e 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -90,7 +90,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final Map newDataFilesBySpec = Maps.newHashMap(); private Long newDataFilesDataSequenceNumber; private final List positionAndEqualityDeletes = Lists.newArrayList(); - private final Map> dvsByReferencedFile = Maps.newHashMap(); + private final Map> dvsByReferencedFile = Maps.newLinkedHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); From 85801f1f741597470b29c2a8d4bd599d66f6c7fb Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Mon, 26 Jan 2026 19:40:11 -0700 Subject: [PATCH 24/25] Remove passing in specs to util --- core/src/main/java/org/apache/iceberg/DVUtil.java | 4 ++-- .../java/org/apache/iceberg/MergingSnapshotProducer.java | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/DVUtil.java b/core/src/main/java/org/apache/iceberg/DVUtil.java index 2f43a9374fab..1bc8cbec10db 100644 --- a/core/src/main/java/org/apache/iceberg/DVUtil.java +++ b/core/src/main/java/org/apache/iceberg/DVUtil.java @@ -52,7 +52,6 @@ static List mergeDVsAndWrite( TableOperations ops, Map> duplicateDVsByReferencedFile, String tableName, - Map specs, ExecutorService threadpool) { Map mergedIndices = duplicateDVsByReferencedFile.entrySet().stream() @@ -60,7 +59,8 @@ static List mergeDVsAndWrite( Collectors.toMap( Map.Entry::getKey, entry -> mergePositions(ops, entry.getValue(), threadpool))); - return writeMergedDVs(mergedIndices, duplicateDVsByReferencedFile, ops, tableName, specs); + return writeMergedDVs( + mergedIndices, duplicateDVsByReferencedFile, ops, tableName, ops.current().specsById()); } // Merges the position indices for the duplicate DVs for a given referenced file diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 6c9c13f17d2e..9e2e549f3378 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -1076,11 +1076,7 @@ private List newDeleteFilesAsManifests() { duplicateDVs.isEmpty() ? ImmutableList.of() : DVUtil.mergeDVsAndWrite( - ops(), - duplicateDVs, - tableName, - ops().current().specsById(), - ThreadPools.getDeleteWorkerPool()); + ops(), duplicateDVs, tableName, ThreadPools.getDeleteWorkerPool()); // Prevent commiting duplicate V2 deletes by deduping them Map> newDeleteFilesBySpec = Streams.stream( From 669f1258068a64c6c52978a117cb8d41c7317788 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Tue, 27 Jan 2026 09:36:02 -0700 Subject: [PATCH 25/25] more cleanup --- .../main/java/org/apache/iceberg/DVUtil.java | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/DVUtil.java b/core/src/main/java/org/apache/iceberg/DVUtil.java index 1bc8cbec10db..15a7b9271b0e 100644 --- a/core/src/main/java/org/apache/iceberg/DVUtil.java +++ b/core/src/main/java/org/apache/iceberg/DVUtil.java @@ -57,28 +57,22 @@ static List mergeDVsAndWrite( duplicateDVsByReferencedFile.entrySet().stream() .collect( Collectors.toMap( - Map.Entry::getKey, entry -> mergePositions(ops, entry.getValue(), threadpool))); + Map.Entry::getKey, + entry -> readDVsAndMerge(ops, entry.getValue(), threadpool))); return writeMergedDVs( mergedIndices, duplicateDVsByReferencedFile, ops, tableName, ops.current().specsById()); } // Merges the position indices for the duplicate DVs for a given referenced file - private static PositionDeleteIndex mergePositions( + private static PositionDeleteIndex readDVsAndMerge( TableOperations ops, List dvsForFile, ExecutorService pool) { Preconditions.checkArgument(dvsForFile.size() > 1, "Expected more than 1 DV"); - PositionDeleteIndex[] duplicateDVIndices = new PositionDeleteIndex[dvsForFile.size()]; - Tasks.range(dvsForFile.size()) - .executeWith(pool) - .stopOnFailure() - .throwFailureWhenFinished() - .run( - i -> { - duplicateDVIndices[i] = Deletes.readDV(dvsForFile.get(i), ops.io(), ops.encryption()); - }); - PositionDeleteIndex mergedPositions = duplicateDVIndices[0]; + PositionDeleteIndex[] dvIndices = readDVs(dvsForFile, pool, ops); + PositionDeleteIndex mergedPositions = dvIndices[0]; DeleteFile firstDV = dvsForFile.get(0); - for (int i = 1; i < duplicateDVIndices.length; i++) { + + for (int i = 1; i < dvIndices.length; i++) { DeleteFile dv = dvsForFile.get(i); Preconditions.checkArgument( Objects.equals(dv.dataSequenceNumber(), firstDV.dataSequenceNumber()), @@ -97,12 +91,28 @@ private static PositionDeleteIndex mergePositions( Preconditions.checkArgument( Objects.equals(dv.partition(), firstDV.partition()), "Cannot merge duplicate added DVs when partition tuples are different"); - mergedPositions.merge(duplicateDVIndices[i]); + + mergedPositions.merge(dvIndices[i]); } return mergedPositions; } + private static PositionDeleteIndex[] readDVs( + List dvs, ExecutorService pool, TableOperations ops) { + PositionDeleteIndex[] dvIndices = new PositionDeleteIndex[dvs.size()]; + Tasks.range(dvIndices.length) + .executeWith(pool) + .stopOnFailure() + .throwFailureWhenFinished() + .run( + i -> { + dvIndices[i] = Deletes.readDV(dvs.get(i), ops.io(), ops.encryption()); + }); + + return dvIndices; + } + // Produces a Puffin per partition spec containing the merged DVs for that spec private static List writeMergedDVs( Map mergedIndices,