diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 92a3da0812..363d32a493 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -64,6 +65,7 @@ public class ShuffleFlushManager { // appId -> shuffleId -> committed shuffle blockIds private Map> committedBlockIds = JavaUtils.newConcurrentMap(); + private List shuffleIdsWithWriteError = new CopyOnWriteArrayList<>(); private final int retryMax; private final StorageManager storageManager; @@ -71,6 +73,7 @@ public class ShuffleFlushManager { private FlushEventHandler eventHandler; private final boolean isAuditLogEnabled; + public ShuffleFlushManager( ShuffleServerConf shuffleServerConf, ShuffleServer shuffleServer, @@ -168,12 +171,20 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception { storageDataReplica, user, maxConcurrencyPerPartitionToWrite); - ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request); - long startTime = System.currentTimeMillis(); - boolean writeSuccess = storageManager.write(storage, handler, event); - if (!writeSuccess) { - throw new EventRetryException(); + + long startTime = 0L; + try { + ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request); + startTime = System.currentTimeMillis(); + boolean writeSuccess = storageManager.write(storage, handler, event); + if (!writeSuccess) { + shuffleIdsWithWriteError.add(event.getShuffleId()); + } + } catch (Exception e) { + LOG.error("storageManager write error.", e); + shuffleIdsWithWriteError.add(event.getShuffleId()); } + long endTime = System.currentTimeMillis(); // update some metrics for shuffle task @@ -243,7 +254,11 @@ private void updateCommittedBlockIds( } } - public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) { + public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) throws EventDiscardException { + if (shuffleIdsWithWriteError.contains(shuffleId)) { + throw new EventDiscardException(); + } + Map shuffleIdToBlockIds = committedBlockIds.get(appId); if (shuffleIdToBlockIds == null) { LOG.warn("Unexpected value when getCommittedBlockIds for appId[" + appId + "]"); @@ -299,4 +314,9 @@ public ShuffleDataDistributionType getDataDistributionType(String appId) { public FlushEventHandler getEventHandler() { return eventHandler; } + + @VisibleForTesting + public void setShuffleIdsWithWriteError(int shuffleId) { + shuffleIdsWithWriteError.add(shuffleId); + } } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index 75c49cd4dc..80d1571a52 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -36,6 +36,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.uniffle.server.flush.EventDiscardException; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -75,6 +76,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -1048,6 +1050,53 @@ public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws Excep assertTrue(hiddenFile.exists()); } + @Test + public void testCommitShuffleFailOnWriteFail() throws Exception { + String confFile = ClassLoader.getSystemResource("server.conf").getFile(); + ShuffleServerConf conf = new ShuffleServerConf(confFile); + final String remoteStorage = HDFS_URI + "rss/test"; + final String appId = "testAppId"; + final int shuffleId = 1; + conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L); + conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0); + conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0); + conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name()); + conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); + conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L); + conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L); + conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); + shuffleServer = new ShuffleServer(conf); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); + shuffleTaskManager.registerShuffle( + appId, + shuffleId, + Lists.newArrayList(new PartitionRange(1, 1)), + new RemoteStorageInfo(remoteStorage), + StringUtils.EMPTY); + shuffleTaskManager.registerShuffle( + appId, + shuffleId, + Lists.newArrayList(new PartitionRange(2, 2)), + new RemoteStorageInfo(remoteStorage), + StringUtils.EMPTY); + final List expectedBlocks1 = Lists.newArrayList(); + final List expectedBlocks2 = Lists.newArrayList(); + final Map bufferIds = shuffleTaskManager.getRequireBufferIds(); + + shuffleTaskManager.requireBuffer(10); + shuffleTaskManager.requireBuffer(10); + shuffleTaskManager.requireBuffer(10); + assertEquals(3, bufferIds.size()); + // required buffer should be clear if it doesn't receive data after timeout + Thread.sleep(6000); + assertEquals(0, bufferIds.size()); + + ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); + shuffleFlushManager.setShuffleIdsWithWriteError(shuffleId); + + assertThrows(EventDiscardException.class, () -> shuffleTaskManager.commitShuffle(appId, shuffleId)); + } + private Set getAppIdsOnDisk(LocalStorageManager localStorageManager) { Set appIdsOnDisk = new HashSet<>();