diff --git a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java index df701b95..62ca1737 100644 --- a/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java +++ b/src/main/java/sirius/kernel/async/ParallelTaskExecutor.java @@ -14,8 +14,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; /** * Executes tasks in parallel in the current node using virtual threads with a limit on concurrency. @@ -34,6 +36,7 @@ public class ParallelTaskExecutor { private final Semaphore semaphore; private final AtomicInteger taskCount; private final CallContext currentContext; + private Supplier isActiveSupplier; /** * Creates a new parallel task executor. @@ -46,9 +49,24 @@ public ParallelTaskExecutor(int maxConcurrentTasks) { this.taskQueue = new LinkedBlockingQueue<>(); this.semaphore = new Semaphore(maxConcurrentTasks); this.taskCount = new AtomicInteger(0); + this.isActiveSupplier = () -> TaskContext.get().isActive(); startProcessing(); } + /** + * Permits to override the active checker which is used to determine when to stop processing tasks. + *

+ * By default, the executor checks whether the current {@link TaskContext} is still active which is enough + * is most cases, but more complex scenarios might require a custom check. + * + * @param isActiveSupplier the supplier which determines whether the executor is still active + * @return the executor itself for fluent method calls + */ + public ParallelTaskExecutor withIsActiveSupplier(Supplier isActiveSupplier) { + this.isActiveSupplier = isActiveSupplier; + return this; + } + /** * Submits a task to be executed in parallel. * @@ -68,31 +86,41 @@ public boolean submitTask(Runnable task) { }); } + /** + * Determines whether the executor is still active. + * + * @return {@code true} if the executor is still active, {@code false} otherwise + */ + public boolean isActive() { + return Boolean.TRUE.equals(isActiveSupplier.get()); + } + /** * Waits for all tasks to complete and shuts down the executor. */ public void shutdownWhenDone() { - while (TaskContext.get().isActive()) { + while (isActive()) { if (taskQueue.isEmpty() && taskCount.get() == 0) { - executor.close(); break; } Wait.millis(500); } + executor.close(); } private void startProcessing() { Thread.startVirtualThread(() -> { - while (TaskContext.get().isActive()) { + while (isActive()) { try { Runnable task = taskQueue.take(); semaphore.acquire(); executor.submit(task); - } catch (InterruptedException exception) { + } catch (InterruptedException | RejectedExecutionException _) { Thread.currentThread().interrupt(); break; } } }); + taskQueue.clear(); } }