Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ default DeleteOrphanFiles equalAuthorities(Map<String, String> newEqualAuthoriti
interface Result {
/** Returns locations of orphan files. */
Iterable<String> orphanFileLocations();

/** Returns the total number of orphan files. */
default long orphanFilesCount() {
return 0;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,11 @@
interface BaseDeleteOrphanFiles extends DeleteOrphanFiles {

@Value.Immutable
interface Result extends DeleteOrphanFiles.Result {}
interface Result extends DeleteOrphanFiles.Result {
@Override
@Value.Default
default long orphanFilesCount() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ private DeleteOrphanFiles.Result deleteFiles(Dataset<String> orphanFileDS) {

LOG.info("Deleted {} orphan files", filesCount);

return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFileList).build();
return ImmutableDeleteOrphanFiles.Result.builder()
.orphanFileLocations(orphanFileList)
.orphanFilesCount(filesCount)
.build();
}

private void collectPathsForOutput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ public void testDryRun() throws IOException {
assertThat(result1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();
assertThat(result1.orphanFilesCount())
.as("Should not find any orphan file using default olderThan interval")
.isEqualTo(0L);

DeleteOrphanFiles.Result result2 =
actions
Expand All @@ -195,6 +198,9 @@ public void testDryRun() throws IOException {
assertThat(result2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
assertThat(result2.orphanFilesCount())
.as("Action should find 1 file")
.isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present")
.isTrue();
Expand All @@ -210,6 +216,9 @@ public void testDryRun() throws IOException {
assertThat(result3.orphanFileLocations())
.as("Streaming dry run should find 1 file")
.isEqualTo(invalidFiles);
assertThat(result3.orphanFilesCount())
.as("Streaming dry run should find 1 file")
.isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present after streaming dry run")
.isTrue();
Expand All @@ -223,6 +232,9 @@ public void testDryRun() throws IOException {
assertThat(result4.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFiles);
assertThat(result4.orphanFilesCount())
.as("Action should delete 1 file")
.isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should not be present")
.isFalse();
Expand Down Expand Up @@ -286,6 +298,7 @@ public void testAllValidFilesAreKept() throws IOException {
.execute();

assertThat(result.orphanFileLocations()).as("Should delete 4 files").hasSize(4);
assertThat(result.orphanFilesCount()).as("Should delete 4 files").isEqualTo(4L);

Path dataPath = new Path(tableLocation + "/data");
FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
Expand Down Expand Up @@ -366,6 +379,7 @@ public void orphanedFileRemovedWithParallelTasks() {
.containsExactlyInAnyOrder(
"remove-orphan-0", "remove-orphan-1", "remove-orphan-2", "remove-orphan-3");
assertThat(deletedFiles).hasSize(4);
assertThat(result.orphanFilesCount()).as("Should delete 4 files").isEqualTo(4L);
}

@TestTemplate
Expand Down Expand Up @@ -410,6 +424,7 @@ public void testWapFilesAreKept() {
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();

assertThat(result.orphanFileLocations()).as("Should not delete any files").isEmpty();
assertThat(result.orphanFilesCount()).as("Should not delete any files").isEqualTo(0L);
}

@TestTemplate
Expand Down Expand Up @@ -440,6 +455,7 @@ public void testMetadataFolderIsIntact() {
.execute();

assertThat(result.orphanFileLocations()).as("Should delete 1 file").hasSize(1);
assertThat(result.orphanFilesCount()).as("Should delete 1 file").isEqualTo(1L);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
Expand Down Expand Up @@ -478,6 +494,7 @@ public void testOlderThanTimestamp() {
.execute();

assertThat(result.orphanFileLocations()).as("Should delete only 2 files").hasSize(2);
assertThat(result.orphanFilesCount()).as("Should delete only 2 files").isEqualTo(2L);
}

@TestTemplate
Expand Down Expand Up @@ -509,6 +526,7 @@ public void testRemoveUnreachableMetadataVersionFiles() {

assertThat(result.orphanFileLocations())
.containsExactly(tableLocation + "metadata/v1.metadata.json");
assertThat(result.orphanFilesCount()).as("Should delete 1 file").isEqualTo(1L);

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
Expand Down Expand Up @@ -545,6 +563,7 @@ public void testManyTopLevelPartitions() {
.execute();

assertThat(result.orphanFileLocations()).as("Should not delete any files").isEmpty();
assertThat(result.orphanFilesCount()).as("Should not delete any files").isEqualTo(0L);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
assertThat(resultDF.count()).as("Rows count must match").isEqualTo(records.size());
Expand Down Expand Up @@ -575,6 +594,7 @@ public void testManyLeafPartitions() {
.execute();

assertThat(result.orphanFileLocations()).as("Should not delete any files").isEmpty();
assertThat(result.orphanFilesCount()).as("Should not delete any files").isEqualTo(0L);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
assertThat(resultDF.count()).as("Row count must match").isEqualTo(records.size());
Expand Down Expand Up @@ -615,6 +635,7 @@ public void testHiddenPartitionPaths() {
.execute();

assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
assertThat(result.orphanFilesCount()).as("Should delete 2 files").isEqualTo(2L);
}

@TestTemplate
Expand Down Expand Up @@ -655,6 +676,7 @@ public void testHiddenPartitionPathsWithPartitionEvolution() {
.execute();

assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
assertThat(result.orphanFilesCount()).as("Should delete 2 files").isEqualTo(2L);
}

@TestTemplate
Expand Down Expand Up @@ -694,6 +716,7 @@ public void testHiddenPathsStartingWithPartitionNamesAreIgnored() throws IOExcep
.execute();

assertThat(result.orphanFileLocations()).as("Should delete 0 files").isEmpty();
assertThat(result.orphanFilesCount()).as("Should delete 0 files").isEqualTo(0L);
assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
}

Expand Down Expand Up @@ -766,6 +789,9 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException {
assertThat(result.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
assertThat(result.orphanFilesCount())
.as("Action should find 1 file")
.isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present")
.isTrue();
Expand Down Expand Up @@ -803,6 +829,7 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException
.execute();

assertThat(result.orphanFileLocations()).as("Should delete only 1 file").hasSize(1);
assertThat(result.orphanFilesCount()).as("Should delete only 1 file").isEqualTo(1L);

Dataset<Row> resultDF = spark.read().format("iceberg").load(table.location());
List<ThreeColumnRecord> actualRecords =
Expand Down Expand Up @@ -838,6 +865,7 @@ public void testHiveCatalogTable() throws IOException {
assertThat(result.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + "/data/trashfile");
assertThat(result.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L);
}

@TestTemplate
Expand Down Expand Up @@ -932,6 +960,9 @@ public void testCompareToFileList() throws IOException {
assertThat(result1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();
assertThat(result1.orphanFilesCount())
.as("Should not find any orphan file using default olderThan interval")
.isEqualTo(0L);

DeleteOrphanFiles.Result result2 =
actions
Expand All @@ -943,6 +974,9 @@ public void testCompareToFileList() throws IOException {
assertThat(result2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFilePaths);
assertThat(result2.orphanFilesCount())
.as("Action should find 1 file")
.isEqualTo((long) invalidFilePaths.size());
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should be present")
.isTrue();
Expand All @@ -956,6 +990,9 @@ public void testCompareToFileList() throws IOException {
assertThat(result3.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFilePaths);
assertThat(result3.orphanFilesCount())
.as("Action should delete 1 file")
.isEqualTo((long) invalidFilePaths.size());
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should not be present")
.isFalse();
Expand Down Expand Up @@ -985,6 +1022,7 @@ public void testCompareToFileList() throws IOException {
.deleteWith(s -> {})
.execute();
assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty();
assertThat(result4.orphanFilesCount()).as("Action should find nothing").isEqualTo(0L);
}

protected long waitUntilAfter(long timestampMillis) {
Expand Down Expand Up @@ -1064,6 +1102,7 @@ public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
.execute();
Iterable<String> orphanFileLocations = result.orphanFileLocations();
assertThat(orphanFileLocations).hasSize(1).containsExactly(statsLocation.toURI().toString());
assertThat(result.orphanFilesCount()).as("Should delete 1 file").isEqualTo(1L);
assertThat(statsLocation).as("stats file should be deleted").doesNotExist();
}

Expand Down Expand Up @@ -1282,6 +1321,9 @@ public void testStreamResultsDeletion() throws IOException {
.as("Non-streaming dry-run should return all 10 orphan files")
.hasSize(10)
.containsExactlyInAnyOrderElementsOf(invalidFiles);
assertThat(nonStreamingResult.orphanFilesCount())
.as("Non-streaming dry-run should return all 10 orphan files")
.isEqualTo((long) invalidFiles.size());

DeleteOrphanFiles.Result streamingResult =
SparkActions.get()
Expand All @@ -1295,6 +1337,9 @@ public void testStreamResultsDeletion() throws IOException {
assertThat(streamingResult.orphanFileLocations())
.as("Streaming with sample size 5 should return only 5 orphan files")
.hasSize(5);
assertThat(streamingResult.orphanFilesCount())
.as("Deleted 10 files")
.isEqualTo((long) invalidFiles.size());

for (String invalidFile : invalidFiles) {
assertThat(fs.exists(new Path(invalidFile))).as("Orphan file should be deleted").isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void testSparkCatalogTable() throws Exception {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L);
}

@TestTemplate
Expand Down Expand Up @@ -89,6 +90,7 @@ public void testSparkCatalogNamedHadoopTable() throws Exception {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L);
}

@TestTemplate
Expand Down Expand Up @@ -119,6 +121,7 @@ public void testSparkCatalogNamedHiveTable() throws Exception {
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(), false))
.as("trash file should be removed")
.anyMatch(file -> file.contains("file:" + location + trashFile));
assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L);
}

@TestTemplate
Expand Down Expand Up @@ -151,6 +154,7 @@ public void testSparkSessionCatalogHadoopTable() throws Exception {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L);
}

@TestTemplate
Expand Down Expand Up @@ -183,6 +187,7 @@ public void testSparkSessionCatalogHiveTable() throws Exception {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
assertThat(results.orphanFilesCount()).as("trash file should be removed").isEqualTo(1L);
}

@AfterEach
Expand Down