diff --git a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java index d84e6dc232..9f3c390987 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java @@ -54,6 +54,10 @@ public static ThreadFactory getNettyThreadFactory(String threadPoolPrefix) { return new DefaultThreadFactory(threadPoolPrefix, true); } + public static ThreadFactory getExceptionCatchingThreadFactory(String factoryName) { + return new ExceptionCatchingThreadFactory(getThreadFactory(factoryName)); + } + /** * Encapsulation of the ScheduledExecutorService * @@ -62,8 +66,12 @@ public static ThreadFactory getNettyThreadFactory(String threadPoolPrefix) { */ public static ScheduledExecutorService getDaemonSingleThreadScheduledExecutor( String factoryName) { - ScheduledThreadPoolExecutor executor = - new ScheduledThreadPoolExecutor(1, getThreadFactory(factoryName)); + return getDaemonSingleThreadScheduledExecutor(getThreadFactory(factoryName)); + } + + public static ScheduledExecutorService getDaemonSingleThreadScheduledExecutor( + ThreadFactory threadFactory) { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory); executor.setRemoveOnCancelPolicy(true); return executor; } @@ -201,4 +209,19 @@ public static synchronized void printThreadInfo(StringBuilder builder, String ti builder.append(info + "\n"); } } + + private static class ExceptionCatchingThreadFactory implements ThreadFactory { + private final ThreadFactory delegate; + + ExceptionCatchingThreadFactory(ThreadFactory delegate) { + this.delegate = delegate; + } + + public Thread newThread(final Runnable runnable) { + Thread t = delegate.newThread(runnable); + t.setUncaughtExceptionHandler( + (t1, e) -> LOGGER.error("Thread {} threw an Exception.", t1, e)); + return t; + } + } } diff --git a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java index 1b22c64fcc..97ffb3cf41 100644 --- a/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java +++ b/server/src/main/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTask.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -28,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.util.ThreadUtils; + public class TopNShuffleDataSizeOfAppCalcTask { private static final Logger LOG = LoggerFactory.getLogger(TopNShuffleDataSizeOfAppCalcTask.class); @@ -51,7 +52,9 @@ public TopNShuffleDataSizeOfAppCalcTask(ShuffleTaskManager taskManager, ShuffleS this.gaugeInMemoryDataSize = ShuffleServerMetrics.gaugeInMemoryDataSizeUsage; this.gaugeOnLocalFileDataSize = ShuffleServerMetrics.gaugeOnDiskDataSizeUsage; this.gaugeOnHadoopDataSize = ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage; - this.scheduler = Executors.newScheduledThreadPool(1); + this.scheduler = + ThreadUtils.getDaemonSingleThreadScheduledExecutor( + ThreadUtils.getExceptionCatchingThreadFactory("topN-shuffleDataSize-calc")); } private void calcTopNShuffleDataSize() { @@ -127,7 +130,7 @@ public List> calcTopNOnHadoopDataSizeTaskInfo public void start() { LOG.info("TopNShuffleDataSizeOfAppCalcTask start schedule."); - this.scheduler.scheduleAtFixedRate( + this.scheduler.scheduleWithFixedDelay( this::calcTopNShuffleDataSize, 0, topNShuffleDataTaskRefreshInterval,