diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index f53859ef97d6..983a61e5f09d 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -230,6 +230,10 @@ private Dataset toFileInfoDS(List paths, String type) { return spark.createDataset(fileInfoList, FileInfo.ENCODER); } + protected Dataset emptyFileInfoDS() { + return spark.emptyDataset(FileInfo.ENCODER); + } + /** * Deletes files and keeps track of how many files were removed for each file type. * @@ -405,7 +409,7 @@ public long totalFilesCount() { } } - private static class ReadManifest implements FlatMapFunction { + protected static class ReadManifest implements FlatMapFunction { private final Broadcast table; ReadManifest(Broadcast
table) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java index e49e7326736f..b285d73f2f87 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; +import static org.apache.spark.sql.functions.col; import java.util.Iterator; import java.util.List; @@ -27,8 +28,10 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.stream.Collectors; +import org.apache.iceberg.AllManifestsTable; import org.apache.iceberg.ExpireSnapshots.CleanupLevel; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -41,8 +44,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,14 +182,71 @@ public Dataset expireFiles() { // fetch valid files after expiration TableMetadata updatedMetadata = ops.refresh(); - Dataset validFileDS = fileDS(updatedMetadata); - // fetch files referenced by expired snapshots + // find IDs of expired snapshots Set deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata); - Dataset deleteCandidateFileDS = fileDS(originalMetadata, deletedSnapshotIds); + if (deletedSnapshotIds.isEmpty()) { + this.expiredFileDS = emptyFileInfoDS(); + return expiredFileDS; + } + + Table originalTable = newStaticTable(originalMetadata, table.io()); + Table updatedTable = newStaticTable(updatedMetadata, table.io()); + + Dataset expiredManifestDF = + loadMetadataTable(originalTable, MetadataTableType.ALL_MANIFESTS) + .filter( + col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(deletedSnapshotIds)); + + Dataset liveManifestDF = + loadMetadataTable(updatedTable, MetadataTableType.ALL_MANIFESTS); + + Dataset expiredManifestPaths = + expiredManifestDF.select(col("path")).distinct().as(Encoders.STRING()); + + Dataset liveManifestPaths = + liveManifestDF.select(col("path")).distinct().as(Encoders.STRING()); + + Dataset orphanedManifestPaths = expiredManifestPaths.except(liveManifestPaths); + + Dataset expiredManifestLists = manifestListDS(originalTable, deletedSnapshotIds); + Dataset liveManifestLists = manifestListDS(updatedTable, null); + Dataset orphanedManifestLists = expiredManifestLists.except(liveManifestLists); + + Dataset expiredStats = statisticsFileDS(originalTable, deletedSnapshotIds); + Dataset liveStats = statisticsFileDS(updatedTable, null); + Dataset orphanedStats = expiredStats.except(liveStats); + + if (orphanedManifestPaths.isEmpty()) { + this.expiredFileDS = orphanedManifestLists.union(orphanedStats); + return expiredFileDS; + } - // determine expired files - this.expiredFileDS = deleteCandidateFileDS.except(validFileDS); + Dataset orphanedManifestDF = + expiredManifestDF + .join( + orphanedManifestPaths.toDF("orphaned_path"), + expiredManifestDF.col("path").equalTo(col("orphaned_path")), + "inner") + .drop("orphaned_path"); + + Dataset candidateContentFiles = + contentFilesFromManifestDF(originalTable, orphanedManifestDF); + + Dataset liveContentFiles = contentFilesFromManifestDF(updatedTable, liveManifestDF); + + Dataset orphanedContentFiles = candidateContentFiles.except(liveContentFiles); + + Dataset orphanedManifestsDS = + orphanedManifestPaths.map( + (MapFunction) path -> new FileInfo(path, MANIFEST), + FileInfo.ENCODER); + + this.expiredFileDS = + orphanedContentFiles + .union(orphanedManifestsDS) + .union(orphanedManifestLists) + .union(orphanedStats); } return expiredFileDS; @@ -233,18 +298,6 @@ private boolean streamResults() { return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT); } - private Dataset fileDS(TableMetadata metadata) { - return fileDS(metadata, null); - } - - private Dataset fileDS(TableMetadata metadata, Set snapshotIds) { - Table staticTable = newStaticTable(metadata, table.io()); - return contentFileDS(staticTable, snapshotIds) - .union(manifestDS(staticTable, snapshotIds)) - .union(manifestListDS(staticTable, snapshotIds)) - .union(statisticsFileDS(staticTable, snapshotIds)); - } - private Set findExpiredSnapshotIds( TableMetadata originalMetadata, TableMetadata updatedMetadata) { Set retainedSnapshots = @@ -283,4 +336,26 @@ private ExpireSnapshots.Result deleteFiles(Iterator files) { .deletedStatisticsFilesCount(summary.statisticsFilesCount()) .build(); } + + private Dataset contentFilesFromManifestDF(Table staticTable, Dataset manifestDF) { + Table serializableTable = SerializableTableWithSize.copyOf(staticTable); + Broadcast
tableBroadcast = sparkContext().broadcast(serializableTable); + int numShufflePartitions = spark().sessionState().conf().numShufflePartitions(); + + Dataset manifestBeanDS = + manifestDF + .selectExpr( + "content", + "path", + "length", + "0 as sequenceNumber", + "partition_spec_id as partitionSpecId", + "added_snapshot_id as addedSnapshotId", + "key_metadata as keyMetadata") + .dropDuplicates("path") + .repartition(numShufflePartitions) + .as(ManifestFileBean.ENCODER); + + return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER); + } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java index 7e07c66e0650..febe1610792c 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java @@ -1203,7 +1203,7 @@ public void testUseLocalIterator() { assertThat(jobsRunDuringStreamResults) .as( "Expected total number of jobs with stream-results should match the expected number") - .isEqualTo(4L); + .isEqualTo(12L); }); }