Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final String tableName;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final SnapshotSummary.Builder manifestsSummary = SnapshotSummary.builder();
private final Map<Integer, DataFileSet> newDataFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -166,6 +167,8 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
manifests.addAll(snapshot.allManifests(ops().io()));
}

summaryBuilder.merge(buildManifestCountSummary(manifestsSummary, manifests, 0));

return manifests;
}

Expand Down
22 changes: 20 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public String partition() {

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
// count of manifests that were rewritten with different manifest entry status during filtering
private int replacedManifestsCount = 0;

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Expand Down Expand Up @@ -313,6 +315,18 @@ private Set<F> deletedFiles(ManifestFile[] manifests) {
return deletedFiles;
}

/**
* Returns the count of manifests that were replaced (rewritten) during filtering.
*
* <p>A manifest is considered replaced when a new manifest was created to replace the original
* one (i.e., the original manifest != filtered manifest).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note because of the normal append path in merging snapshot producer this can also be original manifest != appended manifest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nvm, didn't see we were in manifest filter manager

*
* @return the count of replaced manifests
*/
int replacedManifestsCount() {
return replacedManifestsCount;
}

/**
* Deletes filtered manifests that were created by this class, but are not in the committed
* manifest set.
Expand All @@ -329,9 +343,10 @@ void cleanUncommitted(Set<ManifestFile> committed) {
ManifestFile manifest = entry.getKey();
ManifestFile filtered = entry.getValue();
if (!committed.contains(filtered)) {
// only delete if the filtered copy was created
// only delete if the filtered copy was created (manifest was replaced)
if (!manifest.equals(filtered)) {
deleteFile(filtered.path());
replacedManifestsCount--;
}

// remove the entry from the cache
Expand All @@ -342,6 +357,7 @@ void cleanUncommitted(Set<ManifestFile> committed) {

private void invalidateFilteredCache() {
cleanUncommitted(SnapshotProducer.EMPTY_SET);
replacedManifestsCount = 0;
}

/**
Expand All @@ -367,7 +383,9 @@ private ManifestFile filterManifest(
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
if (manifestHasDeletedFiles(evaluator, manifest, reader)) {
return filterManifestWithDeletedFiles(evaluator, manifest, reader);
ManifestFile filtered = filterManifestWithDeletedFiles(evaluator, manifest, reader);
replacedManifestsCount++;
return filtered;
} else {
filteredManifests.put(manifest, manifest);
return manifest;
Expand Down
23 changes: 20 additions & 3 deletions core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ abstract class ManifestMergeManager<F extends ContentFile<F>> {

// cache merge results to reuse when retrying
private final Map<List<ManifestFile>, ManifestFile> mergedManifests = Maps.newConcurrentMap();
// count of manifests that were replaced (merged) during bin-packing
private int replacedManifestsCount = 0;

private final Supplier<ExecutorService> workerPoolSupplier;

Expand Down Expand Up @@ -86,6 +88,18 @@ Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
return merged;
}

/**
* Returns the count of manifests that were replaced (merged) during bin-packing.
*
* <p>When multiple manifests are merged into a single manifest, each of the original manifests is
* considered replaced.
*
* @return the count of replaced manifests
*/
int replacedManifestsCount() {
return replacedManifestsCount;
}

void cleanUncommitted(Set<ManifestFile> committed) {
// iterate over a copy of entries to avoid concurrent modification
List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
Expand All @@ -96,8 +110,10 @@ void cleanUncommitted(Set<ManifestFile> committed) {
ManifestFile merged = entry.getValue();
if (!committed.contains(merged)) {
deleteFile(merged.path());
// remove the deleted file from the cache
mergedManifests.remove(entry.getKey());
// remove the deleted file from the cache and update replaced count
List<ManifestFile> bin = entry.getKey();
mergedManifests.remove(bin);
replacedManifestsCount -= bin.size();
}
}
}
Expand Down Expand Up @@ -200,8 +216,9 @@ private ManifestFile createManifest(int specId, List<ManifestFile> bin) {

ManifestFile manifest = writer.toManifestFile();

// update the cache
// update the cache and track replaced manifests
mergedManifests.put(bin, manifest);
replacedManifestsCount += bin.size();

return manifest;
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
private final SnapshotSummary.Builder manifestsSummary = SnapshotSummary.builder();
private Expression deleteExpression = Expressions.alwaysFalse();

// cache new data manifests after writing
Expand Down Expand Up @@ -962,6 +963,21 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
Iterables.addAll(manifests, deleteMergeManager.mergeManifests(unmergedDeleteManifests));

// update created/kept/replaced manifest count
// replaced manifests come from:
// 1. filterManager - manifests rewritten to remove deleted files
// 2. deleteFilterManager - delete manifests rewritten to remove deleted files
// 3. mergeManager - data manifests merged via bin-packing
// 4. deleteMergeManager - delete manifests merged via bin-packing
// Note: rewrittenAppendManifests are NEW manifests (copies), not replaced ones
int replacedManifestsCount =
filterManager.replacedManifestsCount()
+ deleteFilterManager.replacedManifestsCount()
+ mergeManager.replacedManifestsCount()
+ deleteMergeManager.replacedManifestsCount();
summaryBuilder.merge(
buildManifestCountSummary(manifestsSummary, manifests, replacedManifestsCount));

return manifests;
}

Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,35 @@ protected boolean cleanupAfterCommit() {
return true;
}

/**
* Updates manifest count in the snapshot summary builder, including replaced manifests.
*
* @param summaryBuilder the summary builder to update
* @param manifests the list of manifests in the new snapshot
* @param replacedManifestsCount the count of manifests that were replaced (rewritten)
*/
protected SnapshotSummary.Builder buildManifestCountSummary(
SnapshotSummary.Builder summaryBuilder,
List<ManifestFile> manifests,
int replacedManifestsCount) {
int manifestsCreated = 0;
int manifestsKept = 0;

for (ManifestFile manifest : manifests) {
if (manifest.snapshotId() == snapshotId()) {
manifestsCreated++;
} else {
manifestsKept++;
}
}

summaryBuilder.set(SnapshotSummary.CREATED_MANIFESTS_COUNT, String.valueOf(manifestsCreated));
summaryBuilder.set(SnapshotSummary.KEPT_MANIFESTS_COUNT, String.valueOf(manifestsKept));
summaryBuilder.set(
SnapshotSummary.REPLACED_MANIFESTS_COUNT, String.valueOf(replacedManifestsCount));
return summaryBuilder;
}

protected List<ManifestFile> writeDataManifests(Collection<DataFile> files, PartitionSpec spec) {
return writeDataManifests(files, null /* inherit data seq */, spec);
}
Expand Down
64 changes: 64 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestCommitReporting.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,68 @@ public void addAndDeleteManifests() {
assertThat(metrics.manifestsReplaced().value()).isEqualTo(2L);
assertThat(metrics.manifestEntriesProcessed().value()).isEqualTo(2L);
}

@TestTemplate
public void snapshotProducerManifestMetrics() {
String tableName = "snapshot-producer-manifest-metrics";
Table table =
TestTables.create(
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter);

// Test FastAppend: first append - creates 1 manifest, keeps 0
table.newFastAppend().appendFile(FILE_A).commit();

CommitReport report = reporter.lastCommitReport();
assertThat(report).isNotNull();
assertThat(report.operation()).isEqualTo("append");
assertThat(report.snapshotId()).isEqualTo(1L);

CommitMetricsResult metrics = report.commitMetrics();
assertThat(metrics.manifestsCreated().value()).isEqualTo(1L);
assertThat(metrics.manifestsKept().value()).isEqualTo(0L);

// Test FastAppend: second append - creates 1 new manifest, keeps 1
table.newFastAppend().appendFile(FILE_B).commit();

report = reporter.lastCommitReport();
metrics = report.commitMetrics();
assertThat(metrics.manifestsCreated().value()).isEqualTo(1L);
assertThat(metrics.manifestsKept().value()).isEqualTo(1L);

// Test MergeAppend: creates 1 new manifest, keeps 2
table.newAppend().appendFile(FILE_C).commit();

report = reporter.lastCommitReport();
assertThat(report.operation()).isEqualTo("append");
metrics = report.commitMetrics();
assertThat(metrics.manifestsCreated().value()).isEqualTo(1L);
assertThat(metrics.manifestsKept().value()).isEqualTo(2L);

// Test RowDelta with delete file: creates 1 delete manifest, keeps 3 data manifests
table.newRowDelta().addDeletes(fileADeletes()).commit();

report = reporter.lastCommitReport();
assertThat(report.operation()).isEqualTo("delete");
metrics = report.commitMetrics();
assertThat(metrics.manifestsCreated().value()).isEqualTo(1L);
assertThat(metrics.manifestsKept().value()).isEqualTo(3L);

// Test RowDelta with data and delete: creates 2 manifests (1 data + 1 delete), keeps 4
table.newRowDelta().addRows(FILE_D).addDeletes(fileBDeletes()).commit();

report = reporter.lastCommitReport();
assertThat(report.operation()).isEqualTo("overwrite");
metrics = report.commitMetrics();
assertThat(metrics.manifestsCreated().value()).isEqualTo(2L);
assertThat(metrics.manifestsKept().value()).isEqualTo(4L);

// Test Delete: creates 1 manifest (rewritten), keeps 5
table.newDelete().deleteFile(FILE_C).commit();

report = reporter.lastCommitReport();
assertThat(report.operation()).isEqualTo("delete");
metrics = report.commitMetrics();
assertThat(metrics.manifestsCreated().value()).isEqualTo(1L);
assertThat(metrics.manifestsKept().value()).isEqualTo(5L);
}
}
56 changes: 56 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,62 @@ public void testDeleteFilesOnIndependentBranches() {
statuses(Status.EXISTING, Status.DELETED, Status.DELETED));
}

@TestTemplate
public void testDeleteFilesWithManifestSnapshotSummary() {
// add both data files
Snapshot appendFile =
commit(table, table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B), branch);

assertThat(appendFile.allManifests(FILE_IO)).hasSize(1);
assertThat(appendFile.summary())
.containsEntry(SnapshotSummary.CREATED_MANIFESTS_COUNT, "1")
.containsEntry(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "0")
.containsEntry(SnapshotSummary.KEPT_MANIFESTS_COUNT, "0");

validateManifestEntries(
appendFile.allManifests(FILE_IO).get(0),
ids(appendFile.snapshotId(), appendFile.snapshotId()),
files(FILE_A, FILE_B),
statuses(Status.ADDED, Status.ADDED));

// delete the first data file by file reference
Snapshot deleteByFileReference = commit(table, table.newDelete().deleteFile(FILE_A), branch);
assertThat(deleteByFileReference.allManifests(FILE_IO)).hasSize(1);
assertThat(deleteByFileReference.summary())
.containsEntry(SnapshotSummary.CREATED_MANIFESTS_COUNT, "1")
.containsEntry(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "1")
.containsEntry(SnapshotSummary.KEPT_MANIFESTS_COUNT, "0");
validateManifestEntries(
deleteByFileReference.allManifests(FILE_IO).get(0),
ids(deleteByFileReference.snapshotId(), appendFile.snapshotId()),
files(FILE_A, FILE_B),
statuses(Status.DELETED, Status.EXISTING));

// unmatched delete by row filter
Snapshot unmatchedDelete =
commit(table, table.newDelete().deleteFromRowFilter(Expressions.alwaysFalse()), branch);
assertThat(unmatchedDelete.allManifests(FILE_IO)).hasSize(1);
assertThat(unmatchedDelete.summary())
.containsEntry(SnapshotSummary.CREATED_MANIFESTS_COUNT, "0")
.containsEntry(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "0")
.containsEntry(SnapshotSummary.KEPT_MANIFESTS_COUNT, "1");

// delete the second data file by using file path only
Snapshot deleteByFilePath =
commit(table, table.newDelete().deleteFile(FILE_B.location()), branch);

assertThat(deleteByFilePath.allManifests(FILE_IO)).hasSize(1);
assertThat(deleteByFilePath.summary())
.containsEntry(SnapshotSummary.CREATED_MANIFESTS_COUNT, "1")
.containsEntry(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "1")
.containsEntry(SnapshotSummary.KEPT_MANIFESTS_COUNT, "0");
validateManifestEntries(
deleteByFilePath.allManifests(FILE_IO).get(0),
ids(deleteByFilePath.snapshotId()),
files(FILE_B),
statuses(Status.DELETED));
}

@TestTemplate
public void testDeleteWithCollision() {
Schema schema = new Schema(Types.NestedField.required(0, "x", Types.StringType.get()));
Expand Down
Loading