diff --git a/core/src/main/java/org/apache/iceberg/DVUtil.java b/core/src/main/java/org/apache/iceberg/DVUtil.java new file mode 100644 index 000000000000..15a7b9271b0e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/DVUtil.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DVUtil { + private static final Logger LOG = LoggerFactory.getLogger(DVUtil.class); + + private DVUtil() {} + + /** + * Merges duplicate DVs for the same data file and writes the merged DV Puffin files. + * + * @param duplicateDVsByReferencedFile map of data file location to duplicate DVs (all entries + * must have size > 1) + * @return newly merged DVs + */ + static List mergeDVsAndWrite( + TableOperations ops, + Map> duplicateDVsByReferencedFile, + String tableName, + ExecutorService threadpool) { + Map mergedIndices = + duplicateDVsByReferencedFile.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> readDVsAndMerge(ops, entry.getValue(), threadpool))); + + return writeMergedDVs( + mergedIndices, duplicateDVsByReferencedFile, ops, tableName, ops.current().specsById()); + } + + // Merges the position indices for the duplicate DVs for a given referenced file + private static PositionDeleteIndex readDVsAndMerge( + TableOperations ops, List dvsForFile, ExecutorService pool) { + Preconditions.checkArgument(dvsForFile.size() > 1, "Expected more than 1 DV"); + PositionDeleteIndex[] dvIndices = readDVs(dvsForFile, pool, ops); + PositionDeleteIndex mergedPositions = dvIndices[0]; + DeleteFile firstDV = dvsForFile.get(0); + + for (int i = 1; i < dvIndices.length; i++) { + DeleteFile dv = dvsForFile.get(i); + Preconditions.checkArgument( + Objects.equals(dv.dataSequenceNumber(), firstDV.dataSequenceNumber()), + "Cannot merge duplicate added DVs when data sequence numbers are different, " + + "expected all to be added with sequence %s, but got %s", + firstDV.dataSequenceNumber(), + dv.dataSequenceNumber()); + + Preconditions.checkArgument( + dv.specId() == firstDV.specId(), + "Cannot merge duplicate added DVs when partition specs are different, " + + "expected all to be added with spec %s, but got %s", + firstDV.specId(), + dv.specId()); + + Preconditions.checkArgument( + Objects.equals(dv.partition(), firstDV.partition()), + "Cannot merge duplicate added DVs when partition tuples are different"); + + mergedPositions.merge(dvIndices[i]); + } + + return mergedPositions; + } + + private static PositionDeleteIndex[] readDVs( + List dvs, ExecutorService pool, TableOperations ops) { + PositionDeleteIndex[] dvIndices = new PositionDeleteIndex[dvs.size()]; + Tasks.range(dvIndices.length) + .executeWith(pool) + .stopOnFailure() + .throwFailureWhenFinished() + .run( + i -> { + dvIndices[i] = Deletes.readDV(dvs.get(i), ops.io(), ops.encryption()); + }); + + return dvIndices; + } + + // Produces a Puffin per partition spec containing the merged DVs for that spec + private static List writeMergedDVs( + Map mergedIndices, + Map> dataFilesWithDuplicateDVs, + TableOperations ops, + String tableName, + Map specsById) { + try (DVFileWriter dvFileWriter = + new BaseDVFileWriter( + // Use an unpartitioned spec for the location provider for the puffin containing + // all the merged DVs + OutputFileFactory.builderFor( + ops, PartitionSpec.unpartitioned(), FileFormat.PUFFIN, 1, 1) + .build(), + path -> null)) { + + for (Map.Entry entry : mergedIndices.entrySet()) { + String referencedLocation = entry.getKey(); + PositionDeleteIndex mergedPositions = entry.getValue(); + List duplicateDVs = dataFilesWithDuplicateDVs.get(referencedLocation); + DeleteFile firstDV = duplicateDVs.get(0); + LOG.warn( + "Merged {} DVs for data file {}. These will be orphaned DVs in table {}", + duplicateDVs.size(), + referencedLocation, + tableName); + dvFileWriter.delete( + referencedLocation, + mergedPositions, + specsById.get(firstDV.specId()), + firstDV.partition()); + } + + dvFileWriter.close(); + DeleteWriteResult writeResult = dvFileWriter.result(); + return writeResult.deleteFiles(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 51d17fbdd0f2..9e2e549f3378 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.ValidationException; @@ -47,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.DataFileSet; @@ -55,6 +57,7 @@ import org.apache.iceberg.util.PartitionSet; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,8 +89,8 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final Map newDataFilesBySpec = Maps.newHashMap(); private Long newDataFilesDataSequenceNumber; - private final Map newDeleteFilesBySpec = Maps.newHashMap(); - private final Set newDVRefs = Sets.newHashSet(); + private final List positionAndEqualityDeletes = Lists.newArrayList(); + private final Map> dvsByReferencedFile = Maps.newLinkedHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); @@ -222,7 +225,7 @@ protected boolean addsDataFiles() { } protected boolean addsDeleteFiles() { - return !newDeleteFilesBySpec.isEmpty(); + return !positionAndEqualityDeletes.isEmpty() || !dvsByReferencedFile.isEmpty(); } /** Add a data file to the new snapshot. */ @@ -265,15 +268,14 @@ private void addInternal(DeleteFile file) { "Cannot find partition spec %s for delete file: %s", file.specId(), file.location()); - - DeleteFileSet deleteFiles = - newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); - if (deleteFiles.add(file)) { - addedFilesSummary.addedFile(spec, file); - hasNewDeleteFiles = true; - if (ContentFileUtil.isDV(file)) { - newDVRefs.add(file.referencedDataFile()); - } + hasNewDeleteFiles = true; + if (ContentFileUtil.isDV(file)) { + List dvsForReferencedFile = + dvsByReferencedFile.computeIfAbsent( + file.referencedDataFile(), newFile -> Lists.newArrayList()); + dvsForReferencedFile.add(file); + } else { + positionAndEqualityDeletes.add(file); } } @@ -814,7 +816,7 @@ protected void validateAddedDVs( Expression conflictDetectionFilter, Snapshot parent) { // skip if there is no current table state or this operation doesn't add new DVs - if (parent == null || newDVRefs.isEmpty()) { + if (parent == null || dvsByReferencedFile.isEmpty()) { return; } @@ -847,7 +849,7 @@ private void validateAddedDVs( DeleteFile file = entry.file(); if (newSnapshotIds.contains(entry.snapshotId()) && ContentFileUtil.isDV(file)) { ValidationException.check( - !newDVRefs.contains(file.referencedDataFile()), + !dvsByReferencedFile.containsKey(file.referencedDataFile()), "Found concurrently added DV for %s: %s", file.referencedDataFile(), ContentFileUtil.dvDesc(file)); @@ -1042,7 +1044,7 @@ private List newDataFilesAsManifests() { } private Iterable prepareDeleteManifests() { - if (newDeleteFilesBySpec.isEmpty()) { + if (!addsDeleteFiles()) { return ImmutableList.of(); } @@ -1060,9 +1062,32 @@ private List newDeleteFilesAsManifests() { } if (cachedNewDeleteManifests.isEmpty()) { + Map> duplicateDVs = Maps.newHashMap(); + List validDVs = Lists.newArrayList(); + for (Map.Entry> entry : dvsByReferencedFile.entrySet()) { + if (entry.getValue().size() > 1) { + duplicateDVs.put(entry.getKey(), entry.getValue()); + } else { + validDVs.addAll(entry.getValue()); + } + } + + List mergedDVs = + duplicateDVs.isEmpty() + ? ImmutableList.of() + : DVUtil.mergeDVsAndWrite( + ops(), duplicateDVs, tableName, ThreadPools.getDeleteWorkerPool()); + // Prevent commiting duplicate V2 deletes by deduping them + Map> newDeleteFilesBySpec = + Streams.stream( + Iterables.concat( + mergedDVs, validDVs, DeleteFileSet.of(positionAndEqualityDeletes))) + .map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber())) + .collect(Collectors.groupingBy(ContentFile::specId)); newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { PartitionSpec spec = ops().current().spec(specId); + deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file)); List newDeleteManifests = writeDeleteManifests(deleteFiles, spec); cachedNewDeleteManifests.addAll(newDeleteManifests); }); diff --git a/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java b/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java index 6eabd64514df..5bd4c84fe04c 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/BaseDVFileWriter.java @@ -71,6 +71,17 @@ public void delete(String path, long pos, PartitionSpec spec, StructLike partiti positions.delete(pos); } + @Override + public void delete( + String path, + PositionDeleteIndex positionDeleteIndex, + PartitionSpec spec, + StructLike partition) { + Deletes deletes = + deletesByPath.computeIfAbsent(path, key -> new Deletes(path, spec, partition)); + deletes.positions().merge(positionDeleteIndex); + } + @Override public DeleteWriteResult result() { Preconditions.checkState(result != null, "Cannot get result from unclosed writer"); diff --git a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java index 2561f7be3d34..208b174a3942 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java @@ -43,4 +43,21 @@ public interface DVFileWriter extends Closeable { * @return the writer result */ DeleteWriteResult result(); + + /** + * Marks every position that is deleted in positionDeleteIndex as deleted in the given data file. + * Implementations should merge with existing position indices for the provided path + * + * @param path the data file path + * @param positionDeleteIndex the position delete index containing all the positions to delete + * @param spec the data file partition spec + * @param partition the data file partition + */ + default void delete( + String path, + PositionDeleteIndex positionDeleteIndex, + PartitionSpec spec, + StructLike partition) { + throw new UnsupportedOperationException("Delete with positionDeleteIndex is not supported"); + } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 46df91982ab7..81c4b0c62271 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -29,13 +29,20 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptingFileIO; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.IOUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceMap; +import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; import org.apache.iceberg.util.StructLikeSet; @@ -126,6 +133,18 @@ public static CharSequenceMap toPosi return toPositionIndexes(posDeletes, null /* unknown delete file */); } + public static PositionDeleteIndex readDV( + DeleteFile deleteFile, FileIO fileIO, EncryptionManager encryptionManager) { + Preconditions.checkArgument( + ContentFileUtil.isDV(deleteFile), "Delete file must be a deletion vector"); + InputFile inputFile = + EncryptingFileIO.combine(fileIO, encryptionManager).newInputFile(deleteFile); + long offset = deleteFile.contentOffset(); + int length = deleteFile.contentSizeInBytes().intValue(); + byte[] bytes = IOUtil.readBytes(inputFile, offset, length); + return PositionDeleteIndex.deserialize(bytes, deleteFile); + } + /** * Builds a map of position delete indexes by path. * diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java b/core/src/main/java/org/apache/iceberg/io/IOUtil.java index 37962d322d87..979234997226 100644 --- a/core/src/main/java/org/apache/iceberg/io/IOUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; public class IOUtil { // not meant to be instantiated @@ -49,6 +51,24 @@ public static void readFully(InputStream stream, byte[] bytes, int offset, int l } } + public static byte[] readBytes(InputFile inputFile, long offset, int length) { + try (SeekableInputStream stream = inputFile.newStream()) { + byte[] bytes = new byte[length]; + + if (stream instanceof RangeReadable) { + RangeReadable rangeReadable = (RangeReadable) stream; + rangeReadable.readFully(offset, bytes); + } else { + stream.seek(offset); + ByteStreams.readFully(stream, bytes); + } + + return bytes; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + /** Writes a buffer into a stream, making multiple write calls if necessary. */ public static void writeFully(OutputStream outputStream, ByteBuffer buffer) throws IOException { if (!buffer.hasRemaining()) { diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index 50e84143ffb7..2f5263db2330 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; @@ -87,7 +88,21 @@ private OutputFileFactory( } public static Builder builderFor(Table table, int partitionId, long taskId) { - return new Builder(table, partitionId, taskId); + return new Builder( + table.locationProvider(), + table.encryption(), + table::io, + table.spec(), + FileFormat.fromString( + table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)), + partitionId, + taskId); + } + + public static Builder builderFor( + TableOperations ops, PartitionSpec spec, FileFormat format, int partitionId, long taskId) { + return new Builder( + ops.locationProvider(), ops.encryption(), ops::io, spec, format, partitionId, taskId); } private String generateFilename() { @@ -121,26 +136,32 @@ public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitio } public static class Builder { - private final Table table; private final int partitionId; private final long taskId; + private final LocationProvider locations; + private final EncryptionManager encryption; private PartitionSpec defaultSpec; private String operationId; private FileFormat format; private String suffix; private Supplier ioSupplier; - private Builder(Table table, int partitionId, long taskId) { - this.table = table; + private Builder( + LocationProvider locationProvider, + EncryptionManager encryptionManager, + Supplier ioSupplier, + PartitionSpec spec, + FileFormat format, + int partitionId, + long taskId) { + this.locations = locationProvider; + this.encryption = encryptionManager; + this.ioSupplier = ioSupplier; this.partitionId = partitionId; this.taskId = taskId; - this.defaultSpec = table.spec(); + this.defaultSpec = spec; this.operationId = UUID.randomUUID().toString(); - - String formatAsString = - table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - this.format = FileFormat.fromString(formatAsString); - this.ioSupplier = table::io; + this.format = format; } public Builder defaultSpec(PartitionSpec newDefaultSpec) { @@ -176,8 +197,6 @@ public Builder ioSupplier(Supplier newIoSupplier) { } public OutputFileFactory build() { - LocationProvider locations = table.locationProvider(); - EncryptionManager encryption = table.encryption(); return new OutputFileFactory( defaultSpec, format, diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 4eff3a400f72..77737b6f1fcf 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -35,19 +35,28 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.LongStream; import java.util.stream.Stream; import org.apache.iceberg.ManifestEntry.Status; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ContentFileUtil; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -1882,6 +1891,266 @@ public void testConcurrentDVsForSameDataFile() { .hasMessageContaining("Found concurrently added DV for %s", dataFile.location()); } + @TestTemplate + public void testDuplicateDVsAreMerged() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + DataFile dataFile = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile), branch); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + + DeleteFile deleteFile1 = dvWithPositions(dataFile, fileFactory, 0, 2); + DeleteFile deleteFile2 = dvWithPositions(dataFile, fileFactory, 2, 4); + DeleteFile deleteFile3 = dvWithPositions(dataFile, fileFactory, 4, 8); + RowDelta rowDelta1 = + table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).addDeletes(deleteFile3); + + commit(table, rowDelta1, branch); + + Iterable addedDeleteFiles = + latestSnapshot(table, branch).addedDeleteFiles(table.io()); + assertThat(Iterables.size(addedDeleteFiles)).isEqualTo(1); + DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles); + + assertDVHasDeletedPositions(mergedDV, LongStream.range(0, 8).boxed()::iterator); + } + + @TestTemplate + public void testDuplicateDVsMergedMultipleSpecs() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + // append a partitioned data file + DataFile firstSnapshotDataFile = newDataFile("data_bucket=0"); + commit(table, table.newAppend().appendFile(firstSnapshotDataFile), branch); + + // remove the only partition field to make the spec unpartitioned + table.updateSpec().removeField(Expressions.bucket("data", 16)).commit(); + + // append an unpartitioned data file + DataFile secondSnapshotDataFile = newDataFile(""); + commit(table, table.newAppend().appendFile(secondSnapshotDataFile), branch); + + // evolve the spec and add a new partition field + table.updateSpec().addField("data").commit(); + + // append a data file with the new spec + DataFile thirdSnapshotDataFile = newDataFile("data=abc"); + commit(table, table.newAppend().appendFile(thirdSnapshotDataFile), branch); + + assertThat(table.specs()).hasSize(3); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + + DataFile dataFile = newDataFile("data=xyz"); + // For each data file, create two DVs covering positions [0,2) and [2,4) + DeleteFile deleteFile1a = dvWithPositions(firstSnapshotDataFile, fileFactory, 0, 2); + DeleteFile deleteFile1b = dvWithPositions(firstSnapshotDataFile, fileFactory, 2, 4); + DeleteFile deleteFile2a = dvWithPositions(secondSnapshotDataFile, fileFactory, 0, 2); + DeleteFile deleteFile2b = dvWithPositions(secondSnapshotDataFile, fileFactory, 2, 4); + DeleteFile deleteFile3a = dvWithPositions(thirdSnapshotDataFile, fileFactory, 0, 2); + DeleteFile deleteFile3b = dvWithPositions(thirdSnapshotDataFile, fileFactory, 2, 4); + + commit( + table, + table + .newRowDelta() + .addRows(dataFile) + .addDeletes(deleteFile1a) + .addDeletes(deleteFile1b) + .addDeletes(deleteFile2a) + .addDeletes(deleteFile2b) + .addDeletes(deleteFile3a) + .addDeletes(deleteFile3b), + branch); + + Snapshot snapshot = latestSnapshot(table, branch); + // Expect 3 merged DVs, one per data file + Iterable addedDeleteFiles = snapshot.addedDeleteFiles(table.io()); + List mergedDVs = Lists.newArrayList(addedDeleteFiles); + assertThat(mergedDVs).hasSize(3); + // Should be a Puffin produced per merged DV spec + assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet())) + .hasSize(1); + + DeleteFile committedDVForDataFile1 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter( + dv -> Objects.equals(dv.referencedDataFile(), firstSnapshotDataFile.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0, 4).boxed()::iterator); + + DeleteFile committedDVForDataFile2 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter( + dv -> + Objects.equals(dv.referencedDataFile(), secondSnapshotDataFile.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 4).boxed()::iterator); + + DeleteFile committedDVForDataFile3 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter( + dv -> Objects.equals(dv.referencedDataFile(), thirdSnapshotDataFile.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile3, LongStream.range(0, 4).boxed()::iterator); + } + + @TestTemplate + public void testDuplicateDVsAreMergedForMultipleReferenceFiles() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + DataFile dataFile1 = newDataFile("data_bucket=0"); + DataFile dataFile2 = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2), branch); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + + // For each data file, create two DVs covering positions [0,2) and [2,4) + DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2); + DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4); + DeleteFile deleteFile2a = dvWithPositions(dataFile2, fileFactory, 0, 2); + DeleteFile deleteFile2b = dvWithPositions(dataFile2, fileFactory, 2, 4); + + // Commit all four duplicate DVs + RowDelta rowDelta = + table + .newRowDelta() + .addDeletes(deleteFile1a) + .addDeletes(deleteFile1b) + .addDeletes(deleteFile2a) + .addDeletes(deleteFile2b); + + commit(table, rowDelta, branch); + + // Expect two merged DVs, one per data file + Iterable addedDeleteFiles = + latestSnapshot(table, branch).addedDeleteFiles(table.io()); + List mergedDVs = Lists.newArrayList(addedDeleteFiles); + + assertThat(mergedDVs).hasSize(2); + // Should be a single Puffin produced + assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet())) + .hasSize(1); + + DeleteFile committedDVForDataFile1 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter(dv -> Objects.equals(dv.referencedDataFile(), dataFile1.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0, 4).boxed()::iterator); + + DeleteFile committedDVForDataFile2 = + Iterables.getOnlyElement( + mergedDVs.stream() + .filter(dv -> Objects.equals(dv.referencedDataFile(), dataFile2.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 4).boxed()::iterator); + } + + @TestTemplate + public void testDuplicateDVsAndValidDV() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + DataFile dataFile1 = newDataFile("data_bucket=0"); + DataFile dataFile2 = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2), branch); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + + // dataFile1 has duplicate DVs that need merging + DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2); + DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4); + + // dataFile2 has a valid DV + DeleteFile deleteFile2 = dvWithPositions(dataFile2, fileFactory, 0, 3); + + RowDelta rowDelta = + table + .newRowDelta() + .addDeletes(deleteFile1a) + .addDeletes(deleteFile1b) + .addDeletes(deleteFile2); + + commit(table, rowDelta, branch); + + // Expect two DVs: one merged for dataFile1 and deleteFile2 + Iterable addedDeleteFiles = + latestSnapshot(table, branch).addedDeleteFiles(table.io()); + List committedDVs = Lists.newArrayList(addedDeleteFiles); + + assertThat(committedDVs).hasSize(2); + + // Verify merged DV for dataFile1 has positions [0,4) + DeleteFile committedDVForDataFile1 = + Iterables.getOnlyElement( + committedDVs.stream() + .filter(dv -> Objects.equals(dv.referencedDataFile(), dataFile1.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0, 4).boxed()::iterator); + + // Verify deleteFile2 state + DeleteFile committedDVForDataFile2 = + Iterables.getOnlyElement( + committedDVs.stream() + .filter(dv -> Objects.equals(dv.referencedDataFile(), dataFile2.location())) + .collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0, 3).boxed()::iterator); + } + + @TestTemplate + public void testDuplicateDVsAreMergedAndEqDelete() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + DataFile dataFile = newDataFile("data_bucket=0"); + commit(table, table.newRowDelta().addRows(dataFile), branch); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + + // Two DVs for the same data file: [0,2) and [2,4) => 4 deleted positions total + DeleteFile dv1 = dvWithPositions(dataFile, fileFactory, 0, 2); + DeleteFile dv2 = dvWithPositions(dataFile, fileFactory, 2, 4); + + // One equality delete file for the same partition + DeleteFile eqDelete = + newEqualityDeleteFile( + table.spec().specId(), + "data_bucket=0", + table.schema().asStruct().fields().get(0).fieldId()); + + RowDelta rowDelta = table.newRowDelta().addDeletes(eqDelete).addDeletes(dv1).addDeletes(dv2); + + commit(table, rowDelta, branch); + + Iterable addedDeleteFiles = + latestSnapshot(table, branch).addedDeleteFiles(table.io()); + List committedDeletes = Lists.newArrayList(addedDeleteFiles); + + // 1 DV + 1 equality delete + assertThat(committedDeletes).hasSize(2); + + DeleteFile committedDV = + Iterables.getOnlyElement( + committedDeletes.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList())); + assertDVHasDeletedPositions(committedDV, LongStream.range(0, 4).boxed()::iterator); + + DeleteFile committedEqDelete = + Iterables.getOnlyElement( + committedDeletes.stream() + .filter(df -> df.content() == FileContent.EQUALITY_DELETES) + .collect(Collectors.toList())); + assertThat(committedEqDelete).isNotNull(); + assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES); + } + @TestTemplate public void testManifestMergingAfterUpgradeToV3() { assumeThat(formatVersion).isEqualTo(2); @@ -1959,4 +2228,45 @@ private List planFiles() { throw new RuntimeException(e); } } + + private DeleteFile dvWithPositions( + DataFile dataFile, OutputFileFactory fileFactory, int fromInclusive, int toExclusive) + throws IOException { + + List> deletes = Lists.newArrayList(); + for (int i = fromInclusive; i < toExclusive; i++) { + deletes.add(PositionDelete.create().set(dataFile.location(), i)); + } + + return writeDV(deletes, dataFile.specId(), dataFile.partition(), fileFactory); + } + + private void assertDVHasDeletedPositions(DeleteFile dv, Iterable positions) { + assertThat(dv).isNotNull(); + PositionDeleteIndex index = Deletes.readDV(dv, table.io(), table.encryption()); + assertThat(positions) + .allSatisfy( + pos -> + assertThat(index.isDeleted(pos)) + .as("Expected position %s to be deleted", pos) + .isTrue()); + } + + private DeleteFile writeDV( + List> deletes, + int specId, + StructLike partition, + OutputFileFactory fileFactory) + throws IOException { + + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (PositionDelete delete : deletes) { + closeableWriter.delete( + delete.path().toString(), delete.pos(), table.specs().get(specId), partition); + } + } + + return Iterables.getOnlyElement(writer.result().deleteFiles()); + } } diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java index 99f5c742d37c..ac8775eacc17 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java @@ -41,9 +41,8 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.RangeReadable; -import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.orc.OrcRowReader; import org.apache.iceberg.parquet.Parquet; @@ -51,7 +50,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.CharSequenceMap; import org.apache.iceberg.util.ContentFileUtil; @@ -183,7 +181,7 @@ private PositionDeleteIndex readDV(DeleteFile dv) { InputFile inputFile = loadInputFile.apply(dv); long offset = dv.contentOffset(); int length = dv.contentSizeInBytes().intValue(); - byte[] bytes = readBytes(inputFile, offset, length); + byte[] bytes = IOUtil.readBytes(inputFile, offset, length); return PositionDeleteIndex.deserialize(bytes, dv); } @@ -322,22 +320,4 @@ private void validateDV(DeleteFile dv, CharSequence filePath) { filePath, dv.referencedDataFile()); } - - private static byte[] readBytes(InputFile inputFile, long offset, int length) { - try (SeekableInputStream stream = inputFile.newStream()) { - byte[] bytes = new byte[length]; - - if (stream instanceof RangeReadable) { - RangeReadable rangeReadable = (RangeReadable) stream; - rangeReadable.readFully(offset, bytes); - } else { - stream.seek(offset); - ByteStreams.readFully(stream, bytes); - } - - return bytes; - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index c30a730917ae..81954c19b504 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -409,7 +409,7 @@ public void testPartitionFilter() throws IOException { // Add position deletes for both partitions Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); @@ -455,7 +455,7 @@ public void testPartitionTransformFilter() throws IOException { Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"}); Pair>, DeleteFile> deletesB = - deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"}); + deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"}); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Prepare expected values @@ -496,7 +496,7 @@ public void testPartitionEvolutionReplace() throws Exception { DataFile dataFileB = dataFile(tab, "b"); tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Switch partition spec from (data) to (id) @@ -508,7 +508,7 @@ public void testPartitionEvolutionReplace() throws Exception { tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit(); Pair>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10); - Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99); + Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile99, 99); tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit(); // Query partition of old spec diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java index 76084c2b9402..4df99ca1998b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_A2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-a.parquet") + .withPath("/path/to/data-a2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=a") // easy way to set partition data for now .withRecordCount(1) @@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_B2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-b.parquet") + .withPath("/path/to/data-b2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=b") // easy way to set partition data for now .withRecordCount(1) @@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_C2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-c.parquet") + .withPath("/path/to/data-c2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=c") // easy way to set partition data for now .withRecordCount(1) @@ -119,7 +119,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_D2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-d.parquet") + .withPath("/path/to/data-d2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=d") // easy way to set partition data for now .withRecordCount(1) @@ -370,7 +370,6 @@ public void testPartitionedDeletesWithEqSeqNo() { // Add Data Files with EQ and POS deletes DeleteFile fileADeletes = fileADeletes(); DeleteFile fileA2Deletes = fileA2Deletes(); - DeleteFile fileBDeletes = fileBDeletes(); DeleteFile fileB2Deletes = fileB2Deletes(); table .newRowDelta() @@ -382,7 +381,6 @@ public void testPartitionedDeletesWithEqSeqNo() { .addDeletes(fileA2Deletes) .addDeletes(FILE_A_EQ_DELETES) .addDeletes(FILE_A2_EQ_DELETES) - .addDeletes(fileBDeletes) .addDeletes(fileB2Deletes) .addDeletes(FILE_B_EQ_DELETES) .addDeletes(FILE_B2_EQ_DELETES) @@ -400,7 +398,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), Tuple2.apply(2L, FILE_B_EQ_DELETES.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()), Tuple2.apply(2L, fileB2Deletes.location()), @@ -433,7 +430,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2.location()), Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, fileB2Deletes.location()), Tuple2.apply(2L, FILE_C2.location()), diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 87cbbe3cea5f..8032b0b782f4 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -409,7 +409,7 @@ public void testPartitionFilter() throws IOException { // Add position deletes for both partitions Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); @@ -455,7 +455,7 @@ public void testPartitionTransformFilter() throws IOException { Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"}); Pair>, DeleteFile> deletesB = - deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"}); + deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"}); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Prepare expected values @@ -496,7 +496,7 @@ public void testPartitionEvolutionReplace() throws Exception { DataFile dataFileB = dataFile(tab, "b"); tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Switch partition spec from (data) to (id) @@ -508,7 +508,7 @@ public void testPartitionEvolutionReplace() throws Exception { tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit(); Pair>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10); - Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99); + Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile99, 99); tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit(); // Query partition of old spec diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java index 76084c2b9402..1e4c21d214a8 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_A2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-a.parquet") + .withPath("/path/to/data-a2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=a") // easy way to set partition data for now .withRecordCount(1) @@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_B2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-b.parquet") + .withPath("/path/to/data-b2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=b") // easy way to set partition data for now .withRecordCount(1) @@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_C2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-c.parquet") + .withPath("/path/to/data-c2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=c") // easy way to set partition data for now .withRecordCount(1) @@ -370,7 +370,6 @@ public void testPartitionedDeletesWithEqSeqNo() { // Add Data Files with EQ and POS deletes DeleteFile fileADeletes = fileADeletes(); DeleteFile fileA2Deletes = fileA2Deletes(); - DeleteFile fileBDeletes = fileBDeletes(); DeleteFile fileB2Deletes = fileB2Deletes(); table .newRowDelta() @@ -382,7 +381,6 @@ public void testPartitionedDeletesWithEqSeqNo() { .addDeletes(fileA2Deletes) .addDeletes(FILE_A_EQ_DELETES) .addDeletes(FILE_A2_EQ_DELETES) - .addDeletes(fileBDeletes) .addDeletes(fileB2Deletes) .addDeletes(FILE_B_EQ_DELETES) .addDeletes(FILE_B2_EQ_DELETES) @@ -400,7 +398,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), Tuple2.apply(2L, FILE_B_EQ_DELETES.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()), Tuple2.apply(2L, fileB2Deletes.location()), @@ -433,7 +430,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2.location()), Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, fileB2Deletes.location()), Tuple2.apply(2L, FILE_C2.location()), diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 7892fd65b405..f5456db8e4b0 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -409,7 +409,7 @@ public void testPartitionFilter() throws IOException { // Add position deletes for both partitions Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); @@ -455,7 +455,7 @@ public void testPartitionTransformFilter() throws IOException { Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"}); Pair>, DeleteFile> deletesB = - deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"}); + deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"}); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Prepare expected values @@ -496,7 +496,7 @@ public void testPartitionEvolutionReplace() throws Exception { DataFile dataFileB = dataFile(tab, "b"); tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Switch partition spec from (data) to (id) @@ -508,7 +508,7 @@ public void testPartitionEvolutionReplace() throws Exception { tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit(); Pair>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10); - Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99); + Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile99, 99); tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit(); // Query partition of old spec diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java index 76084c2b9402..4df99ca1998b 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -77,7 +77,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_A2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-a.parquet") + .withPath("/path/to/data-a2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=a") // easy way to set partition data for now .withRecordCount(1) @@ -91,7 +91,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_B2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-b.parquet") + .withPath("/path/to/data-b2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=b") // easy way to set partition data for now .withRecordCount(1) @@ -105,7 +105,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_C2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-c.parquet") + .withPath("/path/to/data-c2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=c") // easy way to set partition data for now .withRecordCount(1) @@ -119,7 +119,7 @@ public class TestRemoveDanglingDeleteAction extends TestBase { .build(); static final DataFile FILE_D2 = DataFiles.builder(SPEC) - .withPath("/path/to/data-d.parquet") + .withPath("/path/to/data-d2.parquet") .withFileSizeInBytes(10) .withPartitionPath("c1=d") // easy way to set partition data for now .withRecordCount(1) @@ -370,7 +370,6 @@ public void testPartitionedDeletesWithEqSeqNo() { // Add Data Files with EQ and POS deletes DeleteFile fileADeletes = fileADeletes(); DeleteFile fileA2Deletes = fileA2Deletes(); - DeleteFile fileBDeletes = fileBDeletes(); DeleteFile fileB2Deletes = fileB2Deletes(); table .newRowDelta() @@ -382,7 +381,6 @@ public void testPartitionedDeletesWithEqSeqNo() { .addDeletes(fileA2Deletes) .addDeletes(FILE_A_EQ_DELETES) .addDeletes(FILE_A2_EQ_DELETES) - .addDeletes(fileBDeletes) .addDeletes(fileB2Deletes) .addDeletes(FILE_B_EQ_DELETES) .addDeletes(FILE_B2_EQ_DELETES) @@ -400,7 +398,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), Tuple2.apply(2L, FILE_B_EQ_DELETES.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()), Tuple2.apply(2L, fileB2Deletes.location()), @@ -433,7 +430,6 @@ public void testPartitionedDeletesWithEqSeqNo() { Tuple2.apply(2L, FILE_A2.location()), Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), Tuple2.apply(2L, fileA2Deletes.location()), - Tuple2.apply(2L, fileBDeletes.location()), Tuple2.apply(2L, FILE_B2.location()), Tuple2.apply(2L, fileB2Deletes.location()), Tuple2.apply(2L, FILE_C2.location()), diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 7892fd65b405..f5456db8e4b0 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -409,7 +409,7 @@ public void testPartitionFilter() throws IOException { // Add position deletes for both partitions Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); @@ -455,7 +455,7 @@ public void testPartitionTransformFilter() throws IOException { Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"}); Pair>, DeleteFile> deletesB = - deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"}); + deleteFile(tab, dataFileB, new Object[] {"bb"}, new Object[] {"b"}); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Prepare expected values @@ -496,7 +496,7 @@ public void testPartitionEvolutionReplace() throws Exception { DataFile dataFileB = dataFile(tab, "b"); tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a"); - Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b"); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b"); tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); // Switch partition spec from (data) to (id) @@ -508,7 +508,7 @@ public void testPartitionEvolutionReplace() throws Exception { tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit(); Pair>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10); - Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99); + Pair>, DeleteFile> deletes99 = deleteFile(tab, dataFile99, 99); tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit(); // Query partition of old spec