From 7845d86e4ca2176143b9f3a9890806b755e51dd3 Mon Sep 17 00:00:00 2001 From: Idevaldo De Lira Date: Thu, 4 Dec 2025 15:53:22 +0100 Subject: [PATCH 1/4] Closes the executor in the right place Fixes: OX-12263 --- src/main/java/sirius/kernel/async/ParallelTaskExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java index df701b95..57f5d047 100644 --- a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java +++ b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java @@ -74,11 +74,11 @@ public boolean submitTask(Runnable task) { public void shutdownWhenDone() { while (TaskContext.get().isActive()) { if (taskQueue.isEmpty() && taskCount.get() == 0) { - executor.close(); break; } Wait.millis(500); } + executor.close(); } private void startProcessing() { From a0bb3efd5ac8f28ade6883f94c6308999c9a589c Mon Sep 17 00:00:00 2001 From: Idevaldo De Lira Date: Thu, 4 Dec 2025 15:53:48 +0100 Subject: [PATCH 2/4] Uses unnamed variable where not used Fixes: OX-12263 --- src/main/java/sirius/kernel/async/ParallelTaskExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java index 57f5d047..a6c3c7f6 100644 --- a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java +++ b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java @@ -88,7 +88,7 @@ private void startProcessing() { Runnable task = taskQueue.take(); semaphore.acquire(); executor.submit(task); - } catch (InterruptedException exception) { + } catch (InterruptedException _) { Thread.currentThread().interrupt(); break; } From ce543494ab7f19d5c81bd2f78a134c869008f136 Mon Sep 17 00:00:00 2001 From: Idevaldo De Lira Date: Thu, 4 Dec 2025 17:15:06 +0100 Subject: [PATCH 3/4] Permits to override when the executor must stop By default, we check if the current TaskContext is active, but more complex scenarios might require additional checks. Fixes: OX-12263 --- .../kernel/async/ParallelTaskExecutor.java | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java index a6c3c7f6..c7aaf231 100644 --- a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java +++ b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java @@ -16,6 +16,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; /** * Executes tasks in parallel in the current node using virtual threads with a limit on concurrency. @@ -34,6 +35,7 @@ public class ParallelTaskExecutor { private final Semaphore semaphore; private final AtomicInteger taskCount; private final CallContext currentContext; + private Supplier isActiveSupplier; /** * Creates a new parallel task executor. @@ -46,9 +48,24 @@ public ParallelTaskExecutor(int maxConcurrentTasks) { this.taskQueue = new LinkedBlockingQueue<>(); this.semaphore = new Semaphore(maxConcurrentTasks); this.taskCount = new AtomicInteger(0); + this.isActiveSupplier = () -> TaskContext.get().isActive(); startProcessing(); } + /** + * Permits to override the active checker which is used to determine when to stop processing tasks. + *

+ * By default, the executor checks whether the current {@link TaskContext} is still active which is enough + * is most cases, but more complex scenarios might require a custom check. + * + * @param isActiveSupplier the supplier which determines whether the executor is still active + * @return the executor itself for fluent method calls + */ + public ParallelTaskExecutor withIsActiveSupplier(Supplier isActiveSupplier) { + this.isActiveSupplier = isActiveSupplier; + return this; + } + /** * Submits a task to be executed in parallel. * @@ -68,11 +85,20 @@ public boolean submitTask(Runnable task) { }); } + /** + * Determines whether the executor is still active. + * + * @return {@code true} if the executor is still active, {@code false} otherwise + */ + public boolean isActive() { + return Boolean.TRUE.equals(isActiveSupplier.get()); + } + /** * Waits for all tasks to complete and shuts down the executor. */ public void shutdownWhenDone() { - while (TaskContext.get().isActive()) { + while (isActive()) { if (taskQueue.isEmpty() && taskCount.get() == 0) { break; } @@ -83,7 +109,7 @@ public void shutdownWhenDone() { private void startProcessing() { Thread.startVirtualThread(() -> { - while (TaskContext.get().isActive()) { + while (isActive()) { try { Runnable task = taskQueue.take(); semaphore.acquire(); From d2977288fac28c8249bc680f8bd37600fb45f253 Mon Sep 17 00:00:00 2001 From: Idevaldo De Lira Date: Fri, 5 Dec 2025 08:31:29 +0100 Subject: [PATCH 4/4] Covers for RejectedExecutionException if something is submitted after the executor have been closed. For cleanliness, empty the queue at the end. Fixes: OX-12263 --- src/main/java/sirius/kernel/async/ParallelTaskExecutor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java index c7aaf231..62ca1737 100644 --- a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java +++ b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java @@ -14,6 +14,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -114,11 +115,12 @@ private void startProcessing() { Runnable task = taskQueue.take(); semaphore.acquire(); executor.submit(task); - } catch (InterruptedException _) { + } catch (InterruptedException | RejectedExecutionException _) { Thread.currentThread().interrupt(); break; } } }); + taskQueue.clear(); } }