From 9f01b7b1baead8473e6c072c4e9f81293e99d80c Mon Sep 17 00:00:00 2001 From: hboyina Date: Mon, 26 Jan 2026 22:24:59 +0530 Subject: [PATCH 1/3] Core: Fix compute_table_stats failures with concurrent writes --- .../org/apache/iceberg/SetStatistics.java | 47 ++++- .../iceberg/TestSetStatisticsConcurrency.java | 196 ++++++++++++++++++ 2 files changed, 240 insertions(+), 3 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestSetStatisticsConcurrency.java diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index ceb3fe91ba88..dfb941c3bc99 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -18,12 +18,26 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SetStatistics implements UpdateStatistics { + private static final Logger LOG = LoggerFactory.getLogger(SetStatistics.class); private final TableOperations ops; private final Map> statisticsToSet = Maps.newHashMap(); @@ -50,9 +64,36 @@ public List apply() { @Override public void commit() { - TableMetadata base = ops.current(); - TableMetadata newMetadata = internalApply(base); - ops.commit(base, newMetadata); + // Get current metadata for retry configuration + TableMetadata currentMetadata = ops.current(); + + // Retry loop with exponential backoff + Tasks.foreach(ops) + .retry(currentMetadata.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + currentMetadata.propertyAsInt( + COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + currentMetadata.propertyAsInt( + COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + currentMetadata.propertyAsInt( + COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0) + .onlyRetryOn(CommitFailedException.class) + .run( + taskOps -> { + // Refresh metadata on each retry attempt + TableMetadata base = taskOps.refresh(); + TableMetadata newMetadata = internalApply(base); + + // Skip if no changes + if (base == newMetadata) { + LOG.info("No statistics changes to commit, skipping"); + return; + } + + // Attempt commit + taskOps.commit(base, newMetadata); + }); } private TableMetadata internalApply(TableMetadata base) { diff --git a/core/src/test/java/org/apache/iceberg/TestSetStatisticsConcurrency.java b/core/src/test/java/org/apache/iceberg/TestSetStatisticsConcurrency.java new file mode 100644 index 000000000000..382ceec902e2 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSetStatisticsConcurrency.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Tests for SetStatistics concurrency handling. + * + *

Tests that SetStatistics properly handles concurrent modifications using retry mechanism. + */ +@ExtendWith(ParameterizedTestExtension.class) +public class TestSetStatisticsConcurrency extends TestBase { + + /** + * Test that SetStatistics succeeds when there are concurrent modifications. + * + *

This reproduces the production issue where compute_table_stats fails with + * CommitFailedException when Lambda functions write concurrently. + */ + @TestTemplate + public void testSetStatisticsWithConcurrentModification() { + // Step 1: Create initial snapshot + table.newFastAppend().appendFile(FILE_A).commit(); + assertThat(version()).isEqualTo(1); + + TableMetadata base = readMetadata(); + long snapshotId = base.currentSnapshot().snapshotId(); + + // Step 2: Create statistics file for this snapshot + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, + "/some/statistics/file.puffin", + 100, + 42, + ImmutableList.of( + new GenericBlobMetadata( + "ndv", + snapshotId, + base.lastSequenceNumber(), + ImmutableList.of(1), + ImmutableMap.of("ndv", "12345")))); + + // Step 3: Start statistics update + UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); + + // Step 4: Simulate concurrent write (this changes table metadata) + // This is what happens in production when Lambda functions write concurrently + table.newFastAppend().appendFile(FILE_B).commit(); + assertThat(version()).isEqualTo(2); // Table version changed + + // Step 5: Commit statistics - should succeed with retry mechanism + updateStats.commit(); + + // Step 6: Verify statistics were committed + TableMetadata metadata = readMetadata(); + assertThat(metadata.statisticsFiles()).containsExactly(statisticsFile); + assertThat(metadata.currentSnapshot().snapshotId()) + .isNotEqualTo(snapshotId); // Current snapshot changed + } + + /** Test that SetStatistics respects retry configuration and fails after retries exhausted. */ + @TestTemplate + public void testSetStatisticsRespectsRetryConfiguration() { + // Step 1: Set low retry count + table + .updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "2") // Only 2 retries + .commit(); + + // Step 2: Create snapshot and statistics + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = readMetadata().currentSnapshot().snapshotId(); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); + + // Step 3: Inject more failures than retry limit + TestTables.TestTableOperations ops = table.ops(); + ops.failCommits(3); // More failures than retries (2) + + // Step 4: Should fail after exhausting configured retries + UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); + assertThatThrownBy(updateStats::commit) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); + } + + /** + * Test that SetStatistics succeeds with retry when failures are within retry limit. + * + *

This test explicitly injects failures to verify the retry mechanism works correctly. + */ + @TestTemplate + public void testSetStatisticsSucceedsWithinRetryLimit() { + // Step 1: Create snapshot + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = readMetadata().currentSnapshot().snapshotId(); + + // Step 2: Create statistics + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); + + // Step 3: Inject failures WITHIN default retry limit + // Default COMMIT_NUM_RETRIES is 4, so inject 2 failures (less than limit) + TestTables.TestTableOperations ops = table.ops(); + ops.failCommits(2); + + // Step 4: Commit should succeed after retries (2 failures, then success) + UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); + updateStats.commit(); // Should NOT throw + + // Step 5: Verify statistics committed successfully + TableMetadata metadata = readMetadata(); + assertThat(metadata.statisticsFiles()).containsExactly(statisticsFile); + } + + /** + * Test that reproduces high-concurrency production scenario (AWS Glue + Lambda). + * + *

Verifies that statistics commits succeed even with multiple concurrent writes. + */ + @TestTemplate + public void testProductionScenarioHighConcurrency() { + // Step 1: Table starts with some data + table.newFastAppend().appendFile(FILE_A).commit(); + long statisticsSnapshotId = readMetadata().currentSnapshot().snapshotId(); + + // Step 2: Create statistics file (mimics compute_table_stats output) + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + statisticsSnapshotId, + String.format("/metadata/stats/%d-uuid.stats", statisticsSnapshotId), + 30720, + 156, + ImmutableList.of( + new GenericBlobMetadata( + "apache-datasketches-theta-v1", + statisticsSnapshotId, + readMetadata().lastSequenceNumber(), + ImmutableList.of(1, 2, 3), + ImmutableMap.of("sketch-type", "theta")))); + + // Step 3: Prepare statistics update + UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); + + // Step 4: Simulate high-frequency Lambda writes + table.newFastAppend().appendFile(FILE_B).commit(); // Snapshot 2 + table.newFastAppend().appendFile(FILE_C).commit(); // Snapshot 3 + table.newFastAppend().appendFile(FILE_D).commit(); // Snapshot 4 + + // Step 5: Commit statistics - should succeed despite concurrent modifications + updateStats.commit(); + + // Step 6: Verify final state + TableMetadata metadata = readMetadata(); + + // Statistics reference exists + assertThat(metadata.statisticsFiles()).containsExactly(statisticsFile); + + // Statistics point to original snapshot + assertThat(statisticsFile.snapshotId()).isEqualTo(statisticsSnapshotId); + + // All concurrent snapshots are preserved + assertThat(metadata.snapshots()).hasSize(4); + + // Current snapshot is the latest (from last write) + assertThat(metadata.currentSnapshot().snapshotId()).isNotEqualTo(statisticsSnapshotId); + } +} From 5a897ddc1f0dcec7ec9cb356696ccc0b5adcb099 Mon Sep 17 00:00:00 2001 From: hboyina Date: Tue, 27 Jan 2026 17:44:45 +0530 Subject: [PATCH 2/3] fixed test failures and moved tests to TestSetStatistics --- .../org/apache/iceberg/SetStatistics.java | 44 ++-- .../org/apache/iceberg/TestSetStatistics.java | 58 ++++++ .../iceberg/TestSetStatisticsConcurrency.java | 196 ------------------ 3 files changed, 72 insertions(+), 226 deletions(-) delete mode 100644 core/src/test/java/org/apache/iceberg/TestSetStatisticsConcurrency.java diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index dfb941c3bc99..c165666e59d7 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -33,16 +33,16 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Tasks; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SetStatistics implements UpdateStatistics { - private static final Logger LOG = LoggerFactory.getLogger(SetStatistics.class); + private final TableOperations ops; private final Map> statisticsToSet = Maps.newHashMap(); + private TableMetadata base; public SetStatistics(TableOperations ops) { this.ops = ops; + this.base = ops.current(); } @Override @@ -59,44 +59,28 @@ public UpdateStatistics removeStatistics(long snapshotId) { @Override public List apply() { - return internalApply(ops.current()).statisticsFiles(); + return internalApply().statisticsFiles(); } @Override public void commit() { - // Get current metadata for retry configuration - TableMetadata currentMetadata = ops.current(); - - // Retry loop with exponential backoff Tasks.foreach(ops) - .retry(currentMetadata.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) .exponentialBackoff( - currentMetadata.propertyAsInt( - COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - currentMetadata.propertyAsInt( - COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - currentMetadata.propertyAsInt( - COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0) + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) .run( - taskOps -> { - // Refresh metadata on each retry attempt - TableMetadata base = taskOps.refresh(); - TableMetadata newMetadata = internalApply(base); - - // Skip if no changes - if (base == newMetadata) { - LOG.info("No statistics changes to commit, skipping"); - return; - } - - // Attempt commit - taskOps.commit(base, newMetadata); + item -> { + TableMetadata updated = internalApply(); + ops.commit(base, updated); }); } - private TableMetadata internalApply(TableMetadata base) { + private TableMetadata internalApply() { + this.base = ops.refresh(); TableMetadata.Builder builder = TableMetadata.buildFrom(base); statisticsToSet.forEach( (snapshotId, statistics) -> { diff --git a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java index e51614f45b2a..25f471af0088 100644 --- a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java +++ b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java @@ -19,7 +19,9 @@ package org.apache.iceberg; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.TestTemplate; @@ -106,4 +108,60 @@ public void testRemoveStatistics() { assertThat(version()).isEqualTo(3); assertThat(metadata.statisticsFiles()).isEmpty(); } + + @TestTemplate + public void testSetStatisticsRetryWithConcurrentModification() { + table.newFastAppend().appendFile(FILE_A).commit(); + TableMetadata base = readMetadata(); + long snapshotId = base.currentSnapshot().snapshotId(); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); + + UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); + + table.newFastAppend().appendFile(FILE_B).commit(); + + updateStats.commit(); + + assertThat(readMetadata().statisticsFiles()).containsExactly(statisticsFile); + } + + @TestTemplate + public void testSetStatisticsRetryExhaustion() { + table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "2").commit(); + + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = readMetadata().currentSnapshot().snapshotId(); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); + + TestTables.TestTableOperations ops = table.ops(); + ops.failCommits(3); + + UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); + assertThatThrownBy(updateStats::commit) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); + } + + @TestTemplate + public void testSetStatisticsRetrySuccess() { + table.newFastAppend().appendFile(FILE_A).commit(); + long snapshotId = readMetadata().currentSnapshot().snapshotId(); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); + + TestTables.TestTableOperations ops = table.ops(); + ops.failCommits(2); + + table.updateStatistics().setStatistics(statisticsFile).commit(); + + assertThat(readMetadata().statisticsFiles()).containsExactly(statisticsFile); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestSetStatisticsConcurrency.java b/core/src/test/java/org/apache/iceberg/TestSetStatisticsConcurrency.java deleted file mode 100644 index 382ceec902e2..000000000000 --- a/core/src/test/java/org/apache/iceberg/TestSetStatisticsConcurrency.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -/** - * Tests for SetStatistics concurrency handling. - * - *

Tests that SetStatistics properly handles concurrent modifications using retry mechanism. - */ -@ExtendWith(ParameterizedTestExtension.class) -public class TestSetStatisticsConcurrency extends TestBase { - - /** - * Test that SetStatistics succeeds when there are concurrent modifications. - * - *

This reproduces the production issue where compute_table_stats fails with - * CommitFailedException when Lambda functions write concurrently. - */ - @TestTemplate - public void testSetStatisticsWithConcurrentModification() { - // Step 1: Create initial snapshot - table.newFastAppend().appendFile(FILE_A).commit(); - assertThat(version()).isEqualTo(1); - - TableMetadata base = readMetadata(); - long snapshotId = base.currentSnapshot().snapshotId(); - - // Step 2: Create statistics file for this snapshot - GenericStatisticsFile statisticsFile = - new GenericStatisticsFile( - snapshotId, - "/some/statistics/file.puffin", - 100, - 42, - ImmutableList.of( - new GenericBlobMetadata( - "ndv", - snapshotId, - base.lastSequenceNumber(), - ImmutableList.of(1), - ImmutableMap.of("ndv", "12345")))); - - // Step 3: Start statistics update - UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); - - // Step 4: Simulate concurrent write (this changes table metadata) - // This is what happens in production when Lambda functions write concurrently - table.newFastAppend().appendFile(FILE_B).commit(); - assertThat(version()).isEqualTo(2); // Table version changed - - // Step 5: Commit statistics - should succeed with retry mechanism - updateStats.commit(); - - // Step 6: Verify statistics were committed - TableMetadata metadata = readMetadata(); - assertThat(metadata.statisticsFiles()).containsExactly(statisticsFile); - assertThat(metadata.currentSnapshot().snapshotId()) - .isNotEqualTo(snapshotId); // Current snapshot changed - } - - /** Test that SetStatistics respects retry configuration and fails after retries exhausted. */ - @TestTemplate - public void testSetStatisticsRespectsRetryConfiguration() { - // Step 1: Set low retry count - table - .updateProperties() - .set(TableProperties.COMMIT_NUM_RETRIES, "2") // Only 2 retries - .commit(); - - // Step 2: Create snapshot and statistics - table.newFastAppend().appendFile(FILE_A).commit(); - long snapshotId = readMetadata().currentSnapshot().snapshotId(); - - GenericStatisticsFile statisticsFile = - new GenericStatisticsFile( - snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); - - // Step 3: Inject more failures than retry limit - TestTables.TestTableOperations ops = table.ops(); - ops.failCommits(3); // More failures than retries (2) - - // Step 4: Should fail after exhausting configured retries - UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); - assertThatThrownBy(updateStats::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); - } - - /** - * Test that SetStatistics succeeds with retry when failures are within retry limit. - * - *

This test explicitly injects failures to verify the retry mechanism works correctly. - */ - @TestTemplate - public void testSetStatisticsSucceedsWithinRetryLimit() { - // Step 1: Create snapshot - table.newFastAppend().appendFile(FILE_A).commit(); - long snapshotId = readMetadata().currentSnapshot().snapshotId(); - - // Step 2: Create statistics - GenericStatisticsFile statisticsFile = - new GenericStatisticsFile( - snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); - - // Step 3: Inject failures WITHIN default retry limit - // Default COMMIT_NUM_RETRIES is 4, so inject 2 failures (less than limit) - TestTables.TestTableOperations ops = table.ops(); - ops.failCommits(2); - - // Step 4: Commit should succeed after retries (2 failures, then success) - UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); - updateStats.commit(); // Should NOT throw - - // Step 5: Verify statistics committed successfully - TableMetadata metadata = readMetadata(); - assertThat(metadata.statisticsFiles()).containsExactly(statisticsFile); - } - - /** - * Test that reproduces high-concurrency production scenario (AWS Glue + Lambda). - * - *

Verifies that statistics commits succeed even with multiple concurrent writes. - */ - @TestTemplate - public void testProductionScenarioHighConcurrency() { - // Step 1: Table starts with some data - table.newFastAppend().appendFile(FILE_A).commit(); - long statisticsSnapshotId = readMetadata().currentSnapshot().snapshotId(); - - // Step 2: Create statistics file (mimics compute_table_stats output) - GenericStatisticsFile statisticsFile = - new GenericStatisticsFile( - statisticsSnapshotId, - String.format("/metadata/stats/%d-uuid.stats", statisticsSnapshotId), - 30720, - 156, - ImmutableList.of( - new GenericBlobMetadata( - "apache-datasketches-theta-v1", - statisticsSnapshotId, - readMetadata().lastSequenceNumber(), - ImmutableList.of(1, 2, 3), - ImmutableMap.of("sketch-type", "theta")))); - - // Step 3: Prepare statistics update - UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); - - // Step 4: Simulate high-frequency Lambda writes - table.newFastAppend().appendFile(FILE_B).commit(); // Snapshot 2 - table.newFastAppend().appendFile(FILE_C).commit(); // Snapshot 3 - table.newFastAppend().appendFile(FILE_D).commit(); // Snapshot 4 - - // Step 5: Commit statistics - should succeed despite concurrent modifications - updateStats.commit(); - - // Step 6: Verify final state - TableMetadata metadata = readMetadata(); - - // Statistics reference exists - assertThat(metadata.statisticsFiles()).containsExactly(statisticsFile); - - // Statistics point to original snapshot - assertThat(statisticsFile.snapshotId()).isEqualTo(statisticsSnapshotId); - - // All concurrent snapshots are preserved - assertThat(metadata.snapshots()).hasSize(4); - - // Current snapshot is the latest (from last write) - assertThat(metadata.currentSnapshot().snapshotId()).isNotEqualTo(statisticsSnapshotId); - } -} From 30c5e44b84568f907fcc4f119c808ba185165ad7 Mon Sep 17 00:00:00 2001 From: hboyina Date: Wed, 28 Jan 2026 11:00:52 +0530 Subject: [PATCH 3/3] updated the test variables with configured value --- core/src/test/java/org/apache/iceberg/TestSetStatistics.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java index 25f471af0088..aa07c950ac80 100644 --- a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java +++ b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java @@ -140,7 +140,9 @@ public void testSetStatisticsRetryExhaustion() { snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); TestTables.TestTableOperations ops = table.ops(); - ops.failCommits(3); + int configuredRetries = + Integer.parseInt(table.properties().getOrDefault(TableProperties.COMMIT_NUM_RETRIES, "4")); + ops.failCommits(configuredRetries + 1); UpdateStatistics updateStats = table.updateStatistics().setStatistics(statisticsFile); assertThatThrownBy(updateStats::commit)