diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index ceb3fe91ba88..c165666e59d7 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -18,17 +18,31 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; public class SetStatistics implements UpdateStatistics { + private final TableOperations ops; private final Map> statisticsToSet = Maps.newHashMap(); + private TableMetadata base; public SetStatistics(TableOperations ops) { this.ops = ops; + this.base = ops.current(); } @Override @@ -45,17 +59,28 @@ public UpdateStatistics removeStatistics(long snapshotId) { @Override public List apply() { - return internalApply(ops.current()).statisticsFiles(); + return internalApply().statisticsFiles(); } @Override public void commit() { - TableMetadata base = ops.current(); - TableMetadata newMetadata = internalApply(base); - ops.commit(base, newMetadata); + Tasks.foreach(ops) + .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run( + item -> { + TableMetadata updated = internalApply(); + ops.commit(base, updated); + }); } - private TableMetadata internalApply(TableMetadata base) { + private TableMetadata internalApply() { + this.base = ops.refresh(); TableMetadata.Builder builder = TableMetadata.buildFrom(base); statisticsToSet.forEach( (snapshotId, statistics) -> { diff --git a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java index e51614f45b2a..aa07c950ac80 100644 --- a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java +++ b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java @@ -19,7 +19,9 @@ package org.apache.iceberg; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.TestTemplate; @@ -106,4 +108,62 @@ public void testRemoveStatistics() { assertThat(version()).isEqualTo(3); assertThat(metadata.statisticsFiles()).isEmpty(); } + + @TestTemplate + public void testSetStatisticsRetryWithConcurrentModification() { + table.newFastAppend().appendFile(FILE_A).commit(); + TableMetadata base = readMetadata(); + long snapshotId = base.currentSnapshot().snapshotId(); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); + + UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); + + table.newFastAppend().appendFile(FILE_B).commit(); + + updateStats.commit(); + + assertThat(readMetadata().statisticsFiles()).containsExactly(statisticsFile); + } + + @TestTemplate + public void testSetStatisticsRetryExhaustion() { + table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "2").commit(); + + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = readMetadata().currentSnapshot().snapshotId(); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); + + TestTables.TestTableOperations ops = table.ops(); + int configuredRetries = + Integer.parseInt(table.properties().getOrDefault(TableProperties.COMMIT_NUM_RETRIES, "4")); + ops.failCommits(configuredRetries + 1); + + UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); + assertThatThrownBy(updateStats::commit) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); + } + + @TestTemplate + public void testSetStatisticsRetrySuccess() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = readMetadata().currentSnapshot().snapshotId(); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); + + TestTables.TestTableOperations ops = table.ops(); + ops.failCommits(2); + + table.updateStatistics().setStatistics(statisticsFile).commit(); + + assertThat(readMetadata().statisticsFiles()).containsExactly(statisticsFile); + } }