From 987b454e5ade9a4d445f26beb91d7f77820dd755 Mon Sep 17 00:00:00 2001 From: Joy Haldar Date: Mon, 26 Jan 2026 19:07:55 +0530 Subject: [PATCH 1/3] Spark: Optimize ExpireSnapshotsSparkAction with manifest-level pruning Co-authored-by: Joy Haldar --- .../spark/actions/BaseSparkAction.java | 4 + .../actions/ExpireSnapshotsSparkAction.java | 150 ++++++++++++++++-- .../actions/TestExpireSnapshotsAction.java | 2 +- 3 files changed, 138 insertions(+), 18 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index f53859ef97d6..89feda665442 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -230,6 +230,10 @@ private Dataset toFileInfoDS(List paths, String type) { return spark.createDataset(fileInfoList, FileInfo.ENCODER); } + protected Dataset emptyFileInfoDS() { + return spark.emptyDataset(FileInfo.ENCODER); + } + /** * Deletes files and keeps track of how many files were removed for each file type. * diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index e49e7326736f..70dc6c5f09e5 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -20,15 +20,25 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; +import static org.apache.spark.sql.functions.col; +import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.stream.Collectors; +import org.apache.iceberg.AllManifestsTable; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; import org.apache.iceberg.ExpireSnapshots.CleanupLevel; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestContent; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -36,13 +46,21 @@ import org.apache.iceberg.actions.ExpireSnapshots; import org.apache.iceberg.actions.ImmutableExpireSnapshots; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,14 +192,69 @@ public Dataset expireFiles() { // fetch valid files after expiration TableMetadata updatedMetadata = ops.refresh(); - Dataset validFileDS = fileDS(updatedMetadata); - // fetch files referenced by expired snapshots + // find IDs of expired snapshots Set deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata); - Dataset deleteCandidateFileDS = fileDS(originalMetadata, deletedSnapshotIds); + if (deletedSnapshotIds.isEmpty()) { + this.expiredFileDS = emptyFileInfoDS(); + return expiredFileDS; + } + + Table originalTable = newStaticTable(originalMetadata, table.io()); + Table updatedTable = newStaticTable(updatedMetadata, table.io()); + + Dataset expiredManifestDF = + loadMetadataTable(originalTable, MetadataTableType.ALL_MANIFESTS) + .filter( + col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(deletedSnapshotIds)); + + Dataset liveManifestDF = + loadMetadataTable(updatedTable, MetadataTableType.ALL_MANIFESTS); + + Dataset expiredManifestPaths = + expiredManifestDF.select(col("path")).distinct().as(Encoders.STRING()); + + Dataset liveManifestPaths = + liveManifestDF.select(col("path")).distinct().as(Encoders.STRING()); + + Dataset orphanedManifestPaths = expiredManifestPaths.except(liveManifestPaths); + + Set orphanedManifestPathSet = Sets.newHashSet(orphanedManifestPaths.collectAsList()); + + Dataset expiredManifestLists = manifestListDS(originalTable, deletedSnapshotIds); + Dataset liveManifestLists = manifestListDS(updatedTable, null); + Dataset orphanedManifestLists = expiredManifestLists.except(liveManifestLists); + + Dataset expiredStats = statisticsFileDS(originalTable, deletedSnapshotIds); + Dataset liveStats = statisticsFileDS(updatedTable, null); + Dataset orphanedStats = expiredStats.except(liveStats); + + if (orphanedManifestPaths.isEmpty()) { + this.expiredFileDS = orphanedManifestLists.union(orphanedStats); + return expiredFileDS; + } + + Dataset candidateContentFiles = + contentFilesFromManifestDF( + originalTable, + expiredManifestDF.filter(col("path").isInCollection(orphanedManifestPathSet))); + + Dataset liveContentFiles = contentFilesFromManifestDF(updatedTable, liveManifestDF); + + Dataset orphanedContentFiles = candidateContentFiles.except(liveContentFiles); + + List orphanedManifestFileInfos = + orphanedManifestPathSet.stream() + .map(path -> new FileInfo(path, MANIFEST)) + .collect(Collectors.toList()); + Dataset orphanedManifestsDS = + spark().createDataset(orphanedManifestFileInfos, FileInfo.ENCODER); - // determine expired files - this.expiredFileDS = deleteCandidateFileDS.except(validFileDS); + this.expiredFileDS = + orphanedContentFiles + .union(orphanedManifestsDS) + .union(orphanedManifestLists) + .union(orphanedStats); } return expiredFileDS; @@ -233,18 +306,6 @@ private boolean streamResults() { return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT); } - private Dataset fileDS(TableMetadata metadata) { - return fileDS(metadata, null); - } - - private Dataset fileDS(TableMetadata metadata, Set snapshotIds) { - Table staticTable = newStaticTable(metadata, table.io()); - return contentFileDS(staticTable, snapshotIds) - .union(manifestDS(staticTable, snapshotIds)) - .union(manifestListDS(staticTable, snapshotIds)) - .union(statisticsFileDS(staticTable, snapshotIds)); - } - private Set findExpiredSnapshotIds( TableMetadata originalMetadata, TableMetadata updatedMetadata) { Set retainedSnapshots = @@ -283,4 +344,59 @@ private ExpireSnapshots.Result deleteFiles(Iterator files) { .deletedStatisticsFilesCount(summary.statisticsFilesCount()) .build(); } + + private Dataset contentFilesFromManifestDF(Table staticTable, Dataset manifestDF) { + Table serializableTable = SerializableTableWithSize.copyOf(staticTable); + Broadcast tableBroadcast = sparkContext().broadcast(serializableTable); + int numShufflePartitions = spark().sessionState().conf().numShufflePartitions(); + + Dataset manifestBeanDS = + manifestDF + .selectExpr( + "content", + "path", + "length", + "0 as sequenceNumber", + "partition_spec_id as partitionSpecId", + "added_snapshot_id as addedSnapshotId", + "key_metadata as keyMetadata") + .dropDuplicates("path") + .repartition(numShufflePartitions) + .as(ManifestFileBean.ENCODER); + + return manifestBeanDS.flatMap(new ReadManifestContent(tableBroadcast), FileInfo.ENCODER); + } + + private static class ReadManifestContent implements FlatMapFunction { + private final Broadcast
table; + + ReadManifestContent(Broadcast
table) { + this.table = table; + } + + @Override + public Iterator call(ManifestFileBean manifest) throws IOException { + ManifestContent content = manifest.content(); + FileIO io = table.getValue().io(); + Map specs = table.getValue().specs(); + List proj = ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name()); + + switch (content) { + case DATA: + return CloseableIterator.transform( + ManifestFiles.read(manifest, io, specs).select(proj).iterator(), + ReadManifestContent::toFileInfo); + case DELETES: + return CloseableIterator.transform( + ManifestFiles.readDeleteManifest(manifest, io, specs).select(proj).iterator(), + ReadManifestContent::toFileInfo); + default: + throw new IllegalArgumentException("Unsupported manifest content type: " + content); + } + } + + private static FileInfo toFileInfo(ContentFile file) { + return new FileInfo(file.location(), file.content().toString()); + } + } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index 7e07c66e0650..7be6543f3b07 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -1203,7 +1203,7 @@ public void testUseLocalIterator() { assertThat(jobsRunDuringStreamResults) .as( "Expected total number of jobs with stream-results should match the expected number") - .isEqualTo(4L); + .isEqualTo(11L); }); } From 6fcf4c322bafb38bc3f9ad42c866816c4af4572d Mon Sep 17 00:00:00 2001 From: Joy Haldar Date: Tue, 27 Jan 2026 01:45:50 +0530 Subject: [PATCH 2/3] Optimize ExpireSnapshotsSparkAction by replacing collectAsList with join-based filtering --- .../actions/ExpireSnapshotsSparkAction.java | 23 +++++++++++-------- .../actions/TestExpireSnapshotsAction.java | 2 +- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index 70dc6c5f09e5..ab2f1e403fe6 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -57,6 +57,7 @@ import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -219,8 +220,6 @@ public Dataset expireFiles() { Dataset orphanedManifestPaths = expiredManifestPaths.except(liveManifestPaths); - Set orphanedManifestPathSet = Sets.newHashSet(orphanedManifestPaths.collectAsList()); - Dataset expiredManifestLists = manifestListDS(originalTable, deletedSnapshotIds); Dataset liveManifestLists = manifestListDS(updatedTable, null); Dataset orphanedManifestLists = expiredManifestLists.except(liveManifestLists); @@ -234,21 +233,25 @@ public Dataset expireFiles() { return expiredFileDS; } + Dataset orphanedManifestDF = + expiredManifestDF + .join( + orphanedManifestPaths.toDF("orphaned_path"), + expiredManifestDF.col("path").equalTo(col("orphaned_path")), + "inner") + .drop("orphaned_path"); + Dataset candidateContentFiles = - contentFilesFromManifestDF( - originalTable, - expiredManifestDF.filter(col("path").isInCollection(orphanedManifestPathSet))); + contentFilesFromManifestDF(originalTable, orphanedManifestDF); Dataset liveContentFiles = contentFilesFromManifestDF(updatedTable, liveManifestDF); Dataset orphanedContentFiles = candidateContentFiles.except(liveContentFiles); - List orphanedManifestFileInfos = - orphanedManifestPathSet.stream() - .map(path -> new FileInfo(path, MANIFEST)) - .collect(Collectors.toList()); Dataset orphanedManifestsDS = - spark().createDataset(orphanedManifestFileInfos, FileInfo.ENCODER); + orphanedManifestPaths.map( + (MapFunction) path -> new FileInfo(path, MANIFEST), + FileInfo.ENCODER); this.expiredFileDS = orphanedContentFiles diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index 7be6543f3b07..febe1610792c 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -1203,7 +1203,7 @@ public void testUseLocalIterator() { assertThat(jobsRunDuringStreamResults) .as( "Expected total number of jobs with stream-results should match the expected number") - .isEqualTo(11L); + .isEqualTo(12L); }); } From 323b7acc0691e06c45bcdadbf444b71f831f31c4 Mon Sep 17 00:00:00 2001 From: Joy Haldar Date: Tue, 27 Jan 2026 19:45:44 +0530 Subject: [PATCH 3/3] Optimize ExpireSnapshotsSparkAction by replacing collectAsList with join-based filtering --- .../spark/actions/BaseSparkAction.java | 2 +- .../actions/ExpireSnapshotsSparkAction.java | 46 +------------------ 2 files changed, 2 insertions(+), 46 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 89feda665442..983a61e5f09d 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -409,7 +409,7 @@ public long totalFilesCount() { } } - private static class ReadManifest implements FlatMapFunction { + protected static class ReadManifest implements FlatMapFunction { private final Broadcast
table; ReadManifest(Broadcast
table) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index ab2f1e403fe6..b285d73f2f87 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -22,23 +22,16 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; import static org.apache.spark.sql.functions.col; -import java.io.IOException; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.iceberg.AllManifestsTable; -import org.apache.iceberg.ContentFile; -import org.apache.iceberg.DataFile; import org.apache.iceberg.ExpireSnapshots.CleanupLevel; import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.ManifestContent; -import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.MetadataTableType; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -46,17 +39,13 @@ import org.apache.iceberg.actions.ExpireSnapshots; import org.apache.iceberg.actions.ImmutableExpireSnapshots; import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.util.PropertyUtil; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Dataset; @@ -367,39 +356,6 @@ private Dataset contentFilesFromManifestDF(Table staticTable, Dataset< .repartition(numShufflePartitions) .as(ManifestFileBean.ENCODER); - return manifestBeanDS.flatMap(new ReadManifestContent(tableBroadcast), FileInfo.ENCODER); - } - - private static class ReadManifestContent implements FlatMapFunction { - private final Broadcast
table; - - ReadManifestContent(Broadcast
table) { - this.table = table; - } - - @Override - public Iterator call(ManifestFileBean manifest) throws IOException { - ManifestContent content = manifest.content(); - FileIO io = table.getValue().io(); - Map specs = table.getValue().specs(); - List proj = ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name()); - - switch (content) { - case DATA: - return CloseableIterator.transform( - ManifestFiles.read(manifest, io, specs).select(proj).iterator(), - ReadManifestContent::toFileInfo); - case DELETES: - return CloseableIterator.transform( - ManifestFiles.readDeleteManifest(manifest, io, specs).select(proj).iterator(), - ReadManifestContent::toFileInfo); - default: - throw new IllegalArgumentException("Unsupported manifest content type: " + content); - } - } - - private static FileInfo toFileInfo(ContentFile file) { - return new FileInfo(file.location(), file.content().toString()); - } + return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER); } }