diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index 4e8f80fa833f..ab12a3b7c1e3 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -142,6 +142,11 @@ default DeleteOrphanFiles equalAuthorities(Map newEqualAuthoriti interface Result { /** Returns locations of orphan files. */ Iterable orphanFileLocations(); + + /** Returns the total number of orphan files. */ + default long orphanFilesCount() { + return 0; + } } /** diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFiles.java b/core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFiles.java index 182c8b191e87..fc87dad49de0 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFiles.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseDeleteOrphanFiles.java @@ -29,5 +29,11 @@ interface BaseDeleteOrphanFiles extends DeleteOrphanFiles { @Value.Immutable - interface Result extends DeleteOrphanFiles.Result {} + interface Result extends DeleteOrphanFiles.Result { + @Override + @Value.Default + default long orphanFilesCount() { + return 0; + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 78662159b0bb..92bfc880ad7f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -300,7 +300,10 @@ private DeleteOrphanFiles.Result deleteFiles(Dataset orphanFileDS) { LOG.info("Deleted {} orphan files", filesCount); - return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFileList).build(); + return ImmutableDeleteOrphanFiles.Result.builder() + .orphanFileLocations(orphanFileList) + .orphanFilesCount(filesCount) + .build(); } private void collectPathsForOutput( diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 40505b856737..0d2a5c0a4daf 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -184,6 +184,9 @@ public void testDryRun() throws IOException { assertThat(result1.orphanFileLocations()) .as("Default olderThan interval should be safe") .isEmpty(); + assertThat(result1.orphanFilesCount()) + .as("Should not find any orphan file using default olderThan interval") + .isEqualTo(0L); DeleteOrphanFiles.Result result2 = actions @@ -195,6 +198,9 @@ public void testDryRun() throws IOException { assertThat(result2.orphanFileLocations()) .as("Action should find 1 file") .isEqualTo(invalidFiles); + assertThat(result2.orphanFilesCount()) + .as("Action should find 1 file") + .isEqualTo((long) invalidFiles.size()); assertThat(fs.exists(new Path(invalidFiles.get(0)))) .as("Invalid file should be present") .isTrue(); @@ -210,6 +216,9 @@ public void testDryRun() throws IOException { assertThat(result3.orphanFileLocations()) .as("Streaming dry run should find 1 file") .isEqualTo(invalidFiles); + assertThat(result3.orphanFilesCount()) + .as("Streaming dry run should find 1 file") + .isEqualTo((long) invalidFiles.size()); assertThat(fs.exists(new Path(invalidFiles.get(0)))) .as("Invalid file should be present after streaming dry run") .isTrue(); @@ -223,6 +232,9 @@ public void testDryRun() throws IOException { assertThat(result4.orphanFileLocations()) .as("Action should delete 1 file") .isEqualTo(invalidFiles); + assertThat(result4.orphanFilesCount()) + .as("Action should delete 1 file") + .isEqualTo((long) invalidFiles.size()); assertThat(fs.exists(new Path(invalidFiles.get(0)))) .as("Invalid file should not be present") .isFalse(); @@ -286,6 +298,7 @@ public void testAllValidFilesAreKept() throws IOException { .execute(); assertThat(result.orphanFileLocations()).as("Should delete 4 files").hasSize(4); + assertThat(result.orphanFilesCount()).as("Should delete 4 files").isEqualTo(4L); Path dataPath = new Path(tableLocation + "/data"); FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); @@ -366,6 +379,7 @@ public void orphanedFileRemovedWithParallelTasks() { .containsExactlyInAnyOrder( "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", "remove-orphan-3"); assertThat(deletedFiles).hasSize(4); + assertThat(result.orphanFilesCount()).as("Should delete 4 files").isEqualTo(4L); } @TestTemplate @@ -410,6 +424,7 @@ public void testWapFilesAreKept() { actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); assertThat(result.orphanFileLocations()).as("Should not delete any files").isEmpty(); + assertThat(result.orphanFilesCount()).as("Should not delete any files").isEqualTo(0L); } @TestTemplate @@ -440,6 +455,7 @@ public void testMetadataFolderIsIntact() { .execute(); assertThat(result.orphanFileLocations()).as("Should delete 1 file").hasSize(1); + assertThat(result.orphanFilesCount()).as("Should delete 1 file").isEqualTo(1L); Dataset resultDF = spark.read().format("iceberg").load(tableLocation); List actualRecords = @@ -478,6 +494,7 @@ public void testOlderThanTimestamp() { .execute(); assertThat(result.orphanFileLocations()).as("Should delete only 2 files").hasSize(2); + assertThat(result.orphanFilesCount()).as("Should delete only 2 files").isEqualTo(2L); } @TestTemplate @@ -509,6 +526,7 @@ public void testRemoveUnreachableMetadataVersionFiles() { assertThat(result.orphanFileLocations()) .containsExactly(tableLocation + "metadata/v1.metadata.json"); + assertThat(result.orphanFilesCount()).as("Should delete 1 file").isEqualTo(1L); List expectedRecords = Lists.newArrayList(); expectedRecords.addAll(records); @@ -545,6 +563,7 @@ public void testManyTopLevelPartitions() { .execute(); assertThat(result.orphanFileLocations()).as("Should not delete any files").isEmpty(); + assertThat(result.orphanFilesCount()).as("Should not delete any files").isEqualTo(0L); Dataset resultDF = spark.read().format("iceberg").load(tableLocation); assertThat(resultDF.count()).as("Rows count must match").isEqualTo(records.size()); @@ -575,6 +594,7 @@ public void testManyLeafPartitions() { .execute(); assertThat(result.orphanFileLocations()).as("Should not delete any files").isEmpty(); + assertThat(result.orphanFilesCount()).as("Should not delete any files").isEqualTo(0L); Dataset resultDF = spark.read().format("iceberg").load(tableLocation); assertThat(resultDF.count()).as("Row count must match").isEqualTo(records.size()); @@ -615,6 +635,7 @@ public void testHiddenPartitionPaths() { .execute(); assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2); + assertThat(result.orphanFilesCount()).as("Should delete 2 files").isEqualTo(2L); } @TestTemplate @@ -655,6 +676,7 @@ public void testHiddenPartitionPathsWithPartitionEvolution() { .execute(); assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2); + assertThat(result.orphanFilesCount()).as("Should delete 2 files").isEqualTo(2L); } @TestTemplate @@ -694,6 +716,7 @@ public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOExcep .execute(); assertThat(result.orphanFileLocations()).as("Should delete 0 files").isEmpty(); + assertThat(result.orphanFilesCount()).as("Should delete 0 files").isEqualTo(0L); assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue(); } @@ -766,6 +789,9 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException { assertThat(result.orphanFileLocations()) .as("Action should find 1 file") .isEqualTo(invalidFiles); + assertThat(result.orphanFilesCount()) + .as("Action should find 1 file") + .isEqualTo((long) invalidFiles.size()); assertThat(fs.exists(new Path(invalidFiles.get(0)))) .as("Invalid file should be present") .isTrue(); @@ -803,6 +829,7 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException .execute(); assertThat(result.orphanFileLocations()).as("Should delete only 1 file").hasSize(1); + assertThat(result.orphanFilesCount()).as("Should delete only 1 file").isEqualTo(1L); Dataset resultDF = spark.read().format("iceberg").load(table.location()); List actualRecords = @@ -838,6 +865,7 @@ public void testHiveCatalogTable() throws IOException { assertThat(result.orphanFileLocations()) .as("trash file should be removed") .contains("file:" + location + "/data/trashfile"); + assertThat(result.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L); } @TestTemplate @@ -932,6 +960,9 @@ public void testCompareToFileList() throws IOException { assertThat(result1.orphanFileLocations()) .as("Default olderThan interval should be safe") .isEmpty(); + assertThat(result1.orphanFilesCount()) + .as("Should not find any orphan file using default olderThan interval") + .isEqualTo(0L); DeleteOrphanFiles.Result result2 = actions @@ -943,6 +974,9 @@ public void testCompareToFileList() throws IOException { assertThat(result2.orphanFileLocations()) .as("Action should find 1 file") .isEqualTo(invalidFilePaths); + assertThat(result2.orphanFilesCount()) + .as("Action should find 1 file") + .isEqualTo((long) invalidFilePaths.size()); assertThat(fs.exists(new Path(invalidFilePaths.get(0)))) .as("Invalid file should be present") .isTrue(); @@ -956,6 +990,9 @@ public void testCompareToFileList() throws IOException { assertThat(result3.orphanFileLocations()) .as("Action should delete 1 file") .isEqualTo(invalidFilePaths); + assertThat(result3.orphanFilesCount()) + .as("Action should delete 1 file") + .isEqualTo((long) invalidFilePaths.size()); assertThat(fs.exists(new Path(invalidFilePaths.get(0)))) .as("Invalid file should not be present") .isFalse(); @@ -985,6 +1022,7 @@ public void testCompareToFileList() throws IOException { .deleteWith(s -> {}) .execute(); assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty(); + assertThat(result4.orphanFilesCount()).as("Action should find nothing").isEqualTo(0L); } protected long waitUntilAfter(long timestampMillis) { @@ -1064,6 +1102,7 @@ public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { .execute(); Iterable orphanFileLocations = result.orphanFileLocations(); assertThat(orphanFileLocations).hasSize(1).containsExactly(statsLocation.toURI().toString()); + assertThat(result.orphanFilesCount()).as("Should delete 1 file").isEqualTo(1L); assertThat(statsLocation).as("stats file should be deleted").doesNotExist(); } @@ -1282,6 +1321,9 @@ public void testStreamResultsDeletion() throws IOException { .as("Non-streaming dry-run should return all 10 orphan files") .hasSize(10) .containsExactlyInAnyOrderElementsOf(invalidFiles); + assertThat(nonStreamingResult.orphanFilesCount()) + .as("Non-streaming dry-run should return all 10 orphan files") + .isEqualTo((long) invalidFiles.size()); DeleteOrphanFiles.Result streamingResult = SparkActions.get() @@ -1295,6 +1337,9 @@ public void testStreamResultsDeletion() throws IOException { assertThat(streamingResult.orphanFileLocations()) .as("Streaming with sample size 5 should return only 5 orphan files") .hasSize(5); + assertThat(streamingResult.orphanFilesCount()) + .as("Deleted 10 files") + .isEqualTo((long) invalidFiles.size()); for (String invalidFile : invalidFiles) { assertThat(fs.exists(new Path(invalidFile))).as("Orphan file should be deleted").isFalse(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java index 5f98287951f1..88ac800b158f 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java @@ -60,6 +60,7 @@ public void testSparkCatalogTable() throws Exception { assertThat(results.orphanFileLocations()) .as("trash file should be removed") .contains("file:" + location + trashFile); + assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L); } @TestTemplate @@ -89,6 +90,7 @@ public void testSparkCatalogNamedHadoopTable() throws Exception { assertThat(results.orphanFileLocations()) .as("trash file should be removed") .contains("file:" + location + trashFile); + assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L); } @TestTemplate @@ -119,6 +121,7 @@ public void testSparkCatalogNamedHiveTable() throws Exception { assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(), false)) .as("trash file should be removed") .anyMatch(file -> file.contains("file:" + location + trashFile)); + assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L); } @TestTemplate @@ -151,6 +154,7 @@ public void testSparkSessionCatalogHadoopTable() throws Exception { assertThat(results.orphanFileLocations()) .as("trash file should be removed") .contains("file:" + location + trashFile); + assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L); } @TestTemplate @@ -183,6 +187,7 @@ public void testSparkSessionCatalogHiveTable() throws Exception { assertThat(results.orphanFileLocations()) .as("trash file should be removed") .contains("file:" + location + trashFile); + assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L); } @AfterEach