Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions src/main/java/sirius/kernel/async/ParallelTaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

/**
* Executes tasks in parallel in the current node using virtual threads with a limit on concurrency.
Expand All @@ -34,6 +36,7 @@ public class ParallelTaskExecutor {
private final Semaphore semaphore;
private final AtomicInteger taskCount;
private final CallContext currentContext;
private Supplier<Boolean> isActiveSupplier;

/**
* Creates a new parallel task executor.
Expand All @@ -46,9 +49,24 @@ public ParallelTaskExecutor(int maxConcurrentTasks) {
this.taskQueue = new LinkedBlockingQueue<>();
this.semaphore = new Semaphore(maxConcurrentTasks);
this.taskCount = new AtomicInteger(0);
this.isActiveSupplier = () -> TaskContext.get().isActive();
startProcessing();
}

/**
* Permits to override the active checker which is used to determine when to stop processing tasks.
* <p>
* By default, the executor checks whether the current {@link TaskContext} is still active which is enough
* is most cases, but more complex scenarios might require a custom check.
*
* @param isActiveSupplier the supplier which determines whether the executor is still active
* @return the executor itself for fluent method calls
*/
public ParallelTaskExecutor withIsActiveSupplier(Supplier<Boolean> isActiveSupplier) {
this.isActiveSupplier = isActiveSupplier;
return this;
}

/**
* Submits a task to be executed in parallel.
*
Expand All @@ -68,31 +86,41 @@ public boolean submitTask(Runnable task) {
});
}

/**
* Determines whether the executor is still active.
*
* @return {@code true} if the executor is still active, {@code false} otherwise
*/
public boolean isActive() {
return Boolean.TRUE.equals(isActiveSupplier.get());
}

/**
* Waits for all tasks to complete and shuts down the executor.
*/
public void shutdownWhenDone() {
while (TaskContext.get().isActive()) {
while (isActive()) {
if (taskQueue.isEmpty() && taskCount.get() == 0) {
executor.close();
break;
}
Wait.millis(500);
}
executor.close();
}

private void startProcessing() {
Thread.startVirtualThread(() -> {
while (TaskContext.get().isActive()) {
while (isActive()) {
try {
Runnable task = taskQueue.take();
semaphore.acquire();
executor.submit(task);
} catch (InterruptedException exception) {
} catch (InterruptedException | RejectedExecutionException _) {
Thread.currentThread().interrupt();
break;
}
}
});
taskQueue.clear();
}
}