-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Track duplicate DVs for data file and merge them before committing #15006
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
82cced9
e41943d
76e24e4
a740ff9
11ffc2f
772e3c2
3404a86
c04d0e0
a39b073
a079d22
d7eadb0
0a053a6
6b04dd9
a50fb32
301f0fe
112b086
eecacad
9673f85
4104097
9972ce0
e75bb03
6bccc52
9bb9c56
85801f1
669f125
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.deletes.BaseDVFileWriter; | ||
| import org.apache.iceberg.deletes.DVFileWriter; | ||
| import org.apache.iceberg.deletes.Deletes; | ||
| import org.apache.iceberg.deletes.PositionDeleteIndex; | ||
| import org.apache.iceberg.io.DeleteWriteResult; | ||
| import org.apache.iceberg.io.OutputFileFactory; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.util.Tasks; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| class DVUtil { | ||
| private static final Logger LOG = LoggerFactory.getLogger(DVUtil.class); | ||
|
|
||
| private DVUtil() {} | ||
|
|
||
| /** | ||
| * Merges duplicate DVs for the same data file and writes the merged DV Puffin files. | ||
| * | ||
| * @param duplicateDVsByReferencedFile map of data file location to duplicate DVs (all entries | ||
| * must have size > 1) | ||
| * @return newly merged DVs | ||
| */ | ||
| static List<DeleteFile> mergeDVsAndWrite( | ||
| TableOperations ops, | ||
| Map<String, List<DeleteFile>> duplicateDVsByReferencedFile, | ||
| String tableName, | ||
| ExecutorService threadpool) { | ||
| Map<String, PositionDeleteIndex> mergedIndices = | ||
| duplicateDVsByReferencedFile.entrySet().stream() | ||
| .collect( | ||
| Collectors.toMap( | ||
| Map.Entry::getKey, | ||
| entry -> readDVsAndMerge(ops, entry.getValue(), threadpool))); | ||
|
|
||
| return writeMergedDVs( | ||
| mergedIndices, duplicateDVsByReferencedFile, ops, tableName, ops.current().specsById()); | ||
| } | ||
|
|
||
| // Merges the position indices for the duplicate DVs for a given referenced file | ||
| private static PositionDeleteIndex readDVsAndMerge( | ||
| TableOperations ops, List<DeleteFile> dvsForFile, ExecutorService pool) { | ||
| Preconditions.checkArgument(dvsForFile.size() > 1, "Expected more than 1 DV"); | ||
| PositionDeleteIndex[] dvIndices = readDVs(dvsForFile, pool, ops); | ||
| PositionDeleteIndex mergedPositions = dvIndices[0]; | ||
| DeleteFile firstDV = dvsForFile.get(0); | ||
|
|
||
| for (int i = 1; i < dvIndices.length; i++) { | ||
| DeleteFile dv = dvsForFile.get(i); | ||
| Preconditions.checkArgument( | ||
| Objects.equals(dv.dataSequenceNumber(), firstDV.dataSequenceNumber()), | ||
| "Cannot merge duplicate added DVs when data sequence numbers are different, " | ||
| + "expected all to be added with sequence %s, but got %s", | ||
| firstDV.dataSequenceNumber(), | ||
| dv.dataSequenceNumber()); | ||
|
|
||
| Preconditions.checkArgument( | ||
| dv.specId() == firstDV.specId(), | ||
| "Cannot merge duplicate added DVs when partition specs are different, " | ||
| + "expected all to be added with spec %s, but got %s", | ||
| firstDV.specId(), | ||
| dv.specId()); | ||
|
|
||
| Preconditions.checkArgument( | ||
| Objects.equals(dv.partition(), firstDV.partition()), | ||
| "Cannot merge duplicate added DVs when partition tuples are different"); | ||
|
|
||
| mergedPositions.merge(dvIndices[i]); | ||
| } | ||
|
|
||
| return mergedPositions; | ||
| } | ||
|
|
||
| private static PositionDeleteIndex[] readDVs( | ||
| List<DeleteFile> dvs, ExecutorService pool, TableOperations ops) { | ||
| PositionDeleteIndex[] dvIndices = new PositionDeleteIndex[dvs.size()]; | ||
| Tasks.range(dvIndices.length) | ||
| .executeWith(pool) | ||
| .stopOnFailure() | ||
| .throwFailureWhenFinished() | ||
| .run( | ||
| i -> { | ||
| dvIndices[i] = Deletes.readDV(dvs.get(i), ops.io(), ops.encryption()); | ||
| }); | ||
|
|
||
| return dvIndices; | ||
| } | ||
|
|
||
| // Produces a Puffin per partition spec containing the merged DVs for that spec | ||
| private static List<DeleteFile> writeMergedDVs( | ||
| Map<String, PositionDeleteIndex> mergedIndices, | ||
| Map<String, List<DeleteFile>> dataFilesWithDuplicateDVs, | ||
| TableOperations ops, | ||
| String tableName, | ||
| Map<Integer, PartitionSpec> specsById) { | ||
| try (DVFileWriter dvFileWriter = | ||
| new BaseDVFileWriter( | ||
| // Use an unpartitioned spec for the location provider for the puffin containing | ||
| // all the merged DVs | ||
| OutputFileFactory.builderFor( | ||
| ops, PartitionSpec.unpartitioned(), FileFormat.PUFFIN, 1, 1) | ||
| .build(), | ||
| path -> null)) { | ||
|
|
||
| for (Map.Entry<String, PositionDeleteIndex> entry : mergedIndices.entrySet()) { | ||
| String referencedLocation = entry.getKey(); | ||
| PositionDeleteIndex mergedPositions = entry.getValue(); | ||
| List<DeleteFile> duplicateDVs = dataFilesWithDuplicateDVs.get(referencedLocation); | ||
| DeleteFile firstDV = duplicateDVs.get(0); | ||
| LOG.warn( | ||
| "Merged {} DVs for data file {}. These will be orphaned DVs in table {}", | ||
| duplicateDVs.size(), | ||
| referencedLocation, | ||
| tableName); | ||
| dvFileWriter.delete( | ||
| referencedLocation, | ||
| mergedPositions, | ||
| specsById.get(firstDV.specId()), | ||
| firstDV.partition()); | ||
| } | ||
|
|
||
| dvFileWriter.close(); | ||
| DeleteWriteResult writeResult = dvFileWriter.result(); | ||
| return writeResult.deleteFiles(); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.encryption.EncryptedOutputFile; | ||
| import org.apache.iceberg.events.CreateSnapshotEvent; | ||
| import org.apache.iceberg.exceptions.ValidationException; | ||
|
|
@@ -47,6 +48,7 @@ | |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Streams; | ||
| import org.apache.iceberg.util.CharSequenceSet; | ||
| import org.apache.iceberg.util.ContentFileUtil; | ||
| import org.apache.iceberg.util.DataFileSet; | ||
|
|
@@ -55,6 +57,7 @@ | |
| import org.apache.iceberg.util.PartitionSet; | ||
| import org.apache.iceberg.util.SnapshotUtil; | ||
| import org.apache.iceberg.util.Tasks; | ||
| import org.apache.iceberg.util.ThreadPools; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -86,8 +89,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> { | |
| // update data | ||
| private final Map<Integer, DataFileSet> newDataFilesBySpec = Maps.newHashMap(); | ||
| private Long newDataFilesDataSequenceNumber; | ||
| private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = Maps.newHashMap(); | ||
| private final Set<String> newDVRefs = Sets.newHashSet(); | ||
| private final List<DeleteFile> positionAndEqualityDeletes = Lists.newArrayList(); | ||
| private final Map<String, List<DeleteFile>> dvsByReferencedFile = Maps.newLinkedHashMap(); | ||
|
Comment on lines
+92
to
+93
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue These are 2 disjoint fields, one for a list of v2 deletes and a multimap for DVs. I personally think our tests should probably get away from expecting a certain order in manifests, and just assert the contents (or at least have validate methods that express either being strict on the ordering or not). As we get into V4, maybe we'll make implementation choices for ordering entries in a certain way but in the current state of things, it was kind of a hinderance to making changes here. I didn't make the test change since it's fairly large, and can be distracting from this change and I figured the linkedhashma has negligible overhead so we can just preserve the existing behavior. |
||
| private final List<ManifestFile> appendManifests = Lists.newArrayList(); | ||
| private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList(); | ||
| private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); | ||
|
|
@@ -222,7 +225,7 @@ protected boolean addsDataFiles() { | |
| } | ||
|
|
||
| protected boolean addsDeleteFiles() { | ||
| return !newDeleteFilesBySpec.isEmpty(); | ||
| return !positionAndEqualityDeletes.isEmpty() || !dvsByReferencedFile.isEmpty(); | ||
| } | ||
|
|
||
| /** Add a data file to the new snapshot. */ | ||
|
|
@@ -265,15 +268,14 @@ private void addInternal(DeleteFile file) { | |
| "Cannot find partition spec %s for delete file: %s", | ||
| file.specId(), | ||
| file.location()); | ||
|
|
||
| DeleteFileSet deleteFiles = | ||
| newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); | ||
| if (deleteFiles.add(file)) { | ||
| addedFilesSummary.addedFile(spec, file); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because we may be merging duplicates, we don't update the summary for delete files until after we dedupe and are just about to write the new manifests |
||
| hasNewDeleteFiles = true; | ||
| if (ContentFileUtil.isDV(file)) { | ||
| newDVRefs.add(file.referencedDataFile()); | ||
| } | ||
| hasNewDeleteFiles = true; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're not tracking by DeleteFileSet at the time of adding, we treat every addition as a new delete, even potential duplicates (unless we want to do a look back in the list on every addDeleteFile, but I'm very against that since it's an O(deletes-added^2) operation effectively at that point for a commit). If we look at how We end up merging/deduping the DVs (and the V2 pos deletes and equality deletes) anyways just before producing new manifests. See my comment below |
||
| if (ContentFileUtil.isDV(file)) { | ||
| List<DeleteFile> dvsForReferencedFile = | ||
| dvsByReferencedFile.computeIfAbsent( | ||
| file.referencedDataFile(), newFile -> Lists.newArrayList()); | ||
| dvsForReferencedFile.add(file); | ||
| } else { | ||
| positionAndEqualityDeletes.add(file); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -814,7 +816,7 @@ protected void validateAddedDVs( | |
| Expression conflictDetectionFilter, | ||
| Snapshot parent) { | ||
| // skip if there is no current table state or this operation doesn't add new DVs | ||
| if (parent == null || newDVRefs.isEmpty()) { | ||
| if (parent == null || dvsByReferencedFile.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -847,7 +849,7 @@ private void validateAddedDVs( | |
| DeleteFile file = entry.file(); | ||
| if (newSnapshotIds.contains(entry.snapshotId()) && ContentFileUtil.isDV(file)) { | ||
| ValidationException.check( | ||
| !newDVRefs.contains(file.referencedDataFile()), | ||
| !dvsByReferencedFile.containsKey(file.referencedDataFile()), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this change is correct, but I want to note that in the future we could avoid failing by merging DVs as long as that is allowed by the operation being committed.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah had an old PR out for this https://github.com/apache/iceberg/pull/11693/files#diff-410ff1b47d9a44a2fd5dbd103cad9463d82c8f4f51aa1be63b8b403123ab6e0e (probably a bad PR title since by definition for the operation if the positions are disjoint, it's not conflicting) |
||
| "Found concurrently added DV for %s: %s", | ||
| file.referencedDataFile(), | ||
| ContentFileUtil.dvDesc(file)); | ||
|
|
@@ -1042,7 +1044,7 @@ private List<ManifestFile> newDataFilesAsManifests() { | |
| } | ||
|
|
||
| private Iterable<ManifestFile> prepareDeleteManifests() { | ||
| if (newDeleteFilesBySpec.isEmpty()) { | ||
| if (!addsDeleteFiles()) { | ||
| return ImmutableList.of(); | ||
| } | ||
|
|
||
|
|
@@ -1060,9 +1062,32 @@ private List<ManifestFile> newDeleteFilesAsManifests() { | |
| } | ||
|
|
||
| if (cachedNewDeleteManifests.isEmpty()) { | ||
| Map<String, List<DeleteFile>> duplicateDVs = Maps.newHashMap(); | ||
| List<DeleteFile> validDVs = Lists.newArrayList(); | ||
| for (Map.Entry<String, List<DeleteFile>> entry : dvsByReferencedFile.entrySet()) { | ||
| if (entry.getValue().size() > 1) { | ||
| duplicateDVs.put(entry.getKey(), entry.getValue()); | ||
| } else { | ||
| validDVs.addAll(entry.getValue()); | ||
| } | ||
| } | ||
|
|
||
| List<DeleteFile> mergedDVs = | ||
| duplicateDVs.isEmpty() | ||
| ? ImmutableList.of() | ||
| : DVUtil.mergeDVsAndWrite( | ||
| ops(), duplicateDVs, tableName, ThreadPools.getDeleteWorkerPool()); | ||
| // Prevent commiting duplicate V2 deletes by deduping them | ||
| Map<Integer, List<DeleteFile>> newDeleteFilesBySpec = | ||
| Streams.stream( | ||
| Iterables.concat( | ||
| mergedDVs, validDVs, DeleteFileSet.of(positionAndEqualityDeletes))) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue let me know how you feel about the The summary stats are anyways produced from this "final" deleteFilesBySpec which should be all correct so I think we're covered in general. |
||
| .map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber())) | ||
| .collect(Collectors.groupingBy(ContentFile::specId)); | ||
| newDeleteFilesBySpec.forEach( | ||
| (specId, deleteFiles) -> { | ||
| PartitionSpec spec = ops().current().spec(specId); | ||
| deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like that the The reason why I don't like the double storage is that it doesn't handle some strange cases. For instance, what if a I think it would be cleaner to keep a list of v2 deletes and the multimap of DVs and maintain them separately. This method should produce a new list of merged DVs, then both lists (v2 deletes and merged DVs) should be written to delete manifests by spec. It's easy enough to produce a filtered iterator, so I don't think we are buying much by grouping by spec ID as files are added. |
||
| List<ManifestFile> newDeleteManifests = writeDeleteManifests(deleteFiles, spec); | ||
| cachedNewDeleteManifests.addAll(newDeleteManifests); | ||
| }); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue let me know if you feel strongly about this check. While it is
StructLikeand doesn't guarantee an equals implementation, the way I look at it is the following:Another rationale behind these checks is that if a writer produces duplicate DVs, there's also a chance of some kind of metadata record reuse issue from the writer and this felt like an easy sanity check.
Alternatively, we could just simplify this and remove these validations by assuming that the duplicate DVs are OK by every other dimension.