Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ private Dataset<FileInfo> toFileInfoDS(List<String> paths, String type) {
return spark.createDataset(fileInfoList, FileInfo.ENCODER);
}

protected Dataset<FileInfo> emptyFileInfoDS() {
return spark.emptyDataset(FileInfo.ENCODER);
}

/**
* Deletes files and keeps track of how many files were removed for each file type.
*
Expand Down Expand Up @@ -405,7 +409,7 @@ public long totalFilesCount() {
}
}

private static class ReadManifest implements FlatMapFunction<ManifestFileBean, FileInfo> {
protected static class ReadManifest implements FlatMapFunction<ManifestFileBean, FileInfo> {
private final Broadcast<Table> table;

ReadManifest(Broadcast<Table> table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@

import static org.apache.iceberg.TableProperties.GC_ENABLED;
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
import static org.apache.spark.sql.functions.col;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.ExpireSnapshots.CleanupLevel;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand All @@ -41,8 +44,13 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -174,14 +182,71 @@ public Dataset<FileInfo> expireFiles() {

// fetch valid files after expiration
TableMetadata updatedMetadata = ops.refresh();
Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);

// fetch files referenced by expired snapshots
// find IDs of expired snapshots
Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata);
Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata, deletedSnapshotIds);
if (deletedSnapshotIds.isEmpty()) {
this.expiredFileDS = emptyFileInfoDS();
return expiredFileDS;
}

Table originalTable = newStaticTable(originalMetadata, table.io());
Table updatedTable = newStaticTable(updatedMetadata, table.io());

Dataset<Row> expiredManifestDF =
loadMetadataTable(originalTable, MetadataTableType.ALL_MANIFESTS)
.filter(
col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(deletedSnapshotIds));

Dataset<Row> liveManifestDF =
loadMetadataTable(updatedTable, MetadataTableType.ALL_MANIFESTS);

Dataset<String> expiredManifestPaths =
expiredManifestDF.select(col("path")).distinct().as(Encoders.STRING());

Dataset<String> liveManifestPaths =
liveManifestDF.select(col("path")).distinct().as(Encoders.STRING());

Dataset<String> orphanedManifestPaths = expiredManifestPaths.except(liveManifestPaths);

Dataset<FileInfo> expiredManifestLists = manifestListDS(originalTable, deletedSnapshotIds);
Dataset<FileInfo> liveManifestLists = manifestListDS(updatedTable, null);
Dataset<FileInfo> orphanedManifestLists = expiredManifestLists.except(liveManifestLists);

Dataset<FileInfo> expiredStats = statisticsFileDS(originalTable, deletedSnapshotIds);
Dataset<FileInfo> liveStats = statisticsFileDS(updatedTable, null);
Dataset<FileInfo> orphanedStats = expiredStats.except(liveStats);

if (orphanedManifestPaths.isEmpty()) {
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using isEmpty() on a Dataset triggers a Spark action that collects data to the driver. Consider using first() wrapped in a try-catch or take(1).length == 0 to avoid potentially expensive operations when checking if a dataset is empty.

Suggested change
if (orphanedManifestPaths.isEmpty()) {
boolean hasOrphanedManifestPaths = orphanedManifestPaths.limit(1).toLocalIterator().hasNext();
if (!hasOrphanedManifestPaths) {

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

@joyhaldar joyhaldar Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Dataset.isEmpty() uses limit(1) and executeTake(1), it only fetches a single row to check emptiness, not the full dataset.

Source: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala#L557-L560

this.expiredFileDS = orphanedManifestLists.union(orphanedStats);
return expiredFileDS;
}

// determine expired files
this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
Dataset<Row> orphanedManifestDF =
expiredManifestDF
.join(
orphanedManifestPaths.toDF("orphaned_path"),
expiredManifestDF.col("path").equalTo(col("orphaned_path")),
"inner")
.drop("orphaned_path");

Dataset<FileInfo> candidateContentFiles =
contentFilesFromManifestDF(originalTable, orphanedManifestDF);

Dataset<FileInfo> liveContentFiles = contentFilesFromManifestDF(updatedTable, liveManifestDF);

Dataset<FileInfo> orphanedContentFiles = candidateContentFiles.except(liveContentFiles);

Dataset<FileInfo> orphanedManifestsDS =
orphanedManifestPaths.map(
(MapFunction<String, FileInfo>) path -> new FileInfo(path, MANIFEST),
FileInfo.ENCODER);

this.expiredFileDS =
orphanedContentFiles
.union(orphanedManifestsDS)
.union(orphanedManifestLists)
.union(orphanedStats);
}

return expiredFileDS;
Expand Down Expand Up @@ -233,18 +298,6 @@ private boolean streamResults() {
return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
}

private Dataset<FileInfo> fileDS(TableMetadata metadata) {
return fileDS(metadata, null);
}

private Dataset<FileInfo> fileDS(TableMetadata metadata, Set<Long> snapshotIds) {
Table staticTable = newStaticTable(metadata, table.io());
return contentFileDS(staticTable, snapshotIds)
.union(manifestDS(staticTable, snapshotIds))
.union(manifestListDS(staticTable, snapshotIds))
.union(statisticsFileDS(staticTable, snapshotIds));
}

private Set<Long> findExpiredSnapshotIds(
TableMetadata originalMetadata, TableMetadata updatedMetadata) {
Set<Long> retainedSnapshots =
Expand Down Expand Up @@ -283,4 +336,26 @@ private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
.deletedStatisticsFilesCount(summary.statisticsFilesCount())
.build();
}

private Dataset<FileInfo> contentFilesFromManifestDF(Table staticTable, Dataset<Row> manifestDF) {
Table serializableTable = SerializableTableWithSize.copyOf(staticTable);
Broadcast<Table> tableBroadcast = sparkContext().broadcast(serializableTable);
int numShufflePartitions = spark().sessionState().conf().numShufflePartitions();

Dataset<ManifestFileBean> manifestBeanDS =
manifestDF
.selectExpr(
"content",
"path",
"length",
"0 as sequenceNumber",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId",
"key_metadata as keyMetadata")
.dropDuplicates("path")
.repartition(numShufflePartitions)
.as(ManifestFileBean.ENCODER);

return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1200,10 +1200,12 @@ public void testUseLocalIterator() {

checkExpirationResults(1L, 0L, 0L, 1L, 2L, results);

// Job count reflects distributed operations for manifest path filtering,
// early exit checks, and join-based filtering
assertThat(jobsRunDuringStreamResults)
.as(
"Expected total number of jobs with stream-results should match the expected number")
.isEqualTo(4L);
.isEqualTo(12L);
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected job count increased from 4 to 12 due to the new distributed operations. Consider adding a comment explaining why this specific count is expected, or add a test case that validates the optimization logic (e.g., verifying early exits when no orphaned manifests exist).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment explaining the job count.

});
}

Expand Down