From bce79f92040861106969cdba986b54ba9e960f4f Mon Sep 17 00:00:00 2001 From: xianjingfeng Date: Mon, 12 Jan 2026 18:31:03 +0800 Subject: [PATCH 1/2] Wrap calcTopNShuffleDataSize in error handling --- .../server/TopNShuffleDataSizeOfAppCalcTask.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java index 1b22c64fcc..e6c22924dc 100644 --- a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java +++ b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java @@ -54,6 +54,14 @@ public TopNShuffleDataSizeOfAppCalcTask(ShuffleTaskManager taskManager, ShuffleS this.scheduler = Executors.newScheduledThreadPool(1); } + private void calcTopNShuffleDataSizeIgnoreErrors() { + try { + calcTopNShuffleDataSize(); + } catch (Throwable e) { + LOG.warn("Failed to calculate top N shuffle data size", e); + } + } + private void calcTopNShuffleDataSize() { List> topNTaskInfo = calcTopNTotalDataSizeTaskInfo(); gaugeTotalDataSize.clear(); @@ -127,8 +135,8 @@ public List> calcTopNOnHadoopDataSizeTaskInfo public void start() { LOG.info("TopNShuffleDataSizeOfAppCalcTask start schedule."); - this.scheduler.scheduleAtFixedRate( - this::calcTopNShuffleDataSize, + this.scheduler.scheduleWithFixedDelay( + this::calcTopNShuffleDataSizeIgnoreErrors, 0, topNShuffleDataTaskRefreshInterval, TimeUnit.MILLISECONDS); From a1a815e03bf12e52984358ef4b4dc26a6359d822 Mon Sep 17 00:00:00 2001 From: xianjingfeng Date: Tue, 13 Jan 2026 10:32:36 +0800 Subject: [PATCH 2/2] use ExceptionHandler --- .../uniffle/common/util/ThreadUtils.java | 27 +++++++++++++++++-- .../TopNShuffleDataSizeOfAppCalcTask.java | 17 +++++------- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java index d84e6dc232..9f3c390987 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java @@ -54,6 +54,10 @@ public static ThreadFactory getNettyThreadFactory(String threadPoolPrefix) { return new DefaultThreadFactory(threadPoolPrefix, true); } + public static ThreadFactory getExceptionCatchingThreadFactory(String factoryName) { + return new ExceptionCatchingThreadFactory(getThreadFactory(factoryName)); + } + /** * Encapsulation of the ScheduledExecutorService * @@ -62,8 +66,12 @@ public static ThreadFactory getNettyThreadFactory(String threadPoolPrefix) { */ public static ScheduledExecutorService getDaemonSingleThreadScheduledExecutor( String factoryName) { - ScheduledThreadPoolExecutor executor = - new ScheduledThreadPoolExecutor(1, getThreadFactory(factoryName)); + return getDaemonSingleThreadScheduledExecutor(getThreadFactory(factoryName)); + } + + public static ScheduledExecutorService getDaemonSingleThreadScheduledExecutor( + ThreadFactory threadFactory) { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory); executor.setRemoveOnCancelPolicy(true); return executor; } @@ -201,4 +209,19 @@ public static synchronized void printThreadInfo(StringBuilder builder, String ti builder.append(info + "\n"); } } + + private static class ExceptionCatchingThreadFactory implements ThreadFactory { + private final ThreadFactory delegate; + + ExceptionCatchingThreadFactory(ThreadFactory delegate) { + this.delegate = delegate; + } + + public Thread newThread(final Runnable runnable) { + Thread t = delegate.newThread(runnable); + t.setUncaughtExceptionHandler( + (t1, e) -> LOGGER.error("Thread {} threw an Exception.", t1, e)); + return t; + } + } } diff --git a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java index e6c22924dc..97ffb3cf41 100644 --- a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java +++ b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -28,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.util.ThreadUtils; + public class TopNShuffleDataSizeOfAppCalcTask { private static final Logger LOG = LoggerFactory.getLogger(TopNShuffleDataSizeOfAppCalcTask.class); @@ -51,15 +52,9 @@ public TopNShuffleDataSizeOfAppCalcTask(ShuffleTaskManager taskManager, ShuffleS this.gaugeInMemoryDataSize = ShuffleServerMetrics.gaugeInMemoryDataSizeUsage; this.gaugeOnLocalFileDataSize = ShuffleServerMetrics.gaugeOnDiskDataSizeUsage; this.gaugeOnHadoopDataSize = ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage; - this.scheduler = Executors.newScheduledThreadPool(1); - } - - private void calcTopNShuffleDataSizeIgnoreErrors() { - try { - calcTopNShuffleDataSize(); - } catch (Throwable e) { - LOG.warn("Failed to calculate top N shuffle data size", e); - } + this.scheduler = + ThreadUtils.getDaemonSingleThreadScheduledExecutor( + ThreadUtils.getExceptionCatchingThreadFactory("topN-shuffleDataSize-calc")); } private void calcTopNShuffleDataSize() { @@ -136,7 +131,7 @@ public List> calcTopNOnHadoopDataSizeTaskInfo public void start() { LOG.info("TopNShuffleDataSizeOfAppCalcTask start schedule."); this.scheduler.scheduleWithFixedDelay( - this::calcTopNShuffleDataSizeIgnoreErrors, + this::calcTopNShuffleDataSize, 0, topNShuffleDataTaskRefreshInterval, TimeUnit.MILLISECONDS);