messageMetaData) {
+ runningTasks.remove(messageId);
+ }
+
+ @Override
+ public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
+ if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) {
+ LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName);
+ listeners.forEach(QueueWatchDogListener::reset);
+ }
+ }
+
/**
* Check if the condition is met to do a reset. This is if for more than {@link #RESET_TIME_MINUTES} workers are running,
* but no messages were on the queue it's time to free all tasks.
@@ -39,7 +83,7 @@ public class QueueWatchDog {
* @param numberOfMessages number of messages on queue
* @return true if it's time to free all tasks
*/
- public boolean isItDead(final boolean runningWorkers, final int numberOfMessages) {
+ private boolean isItDead(final boolean runningWorkers, final int numberOfMessages) {
boolean doReset = false;
if (runningWorkers && numberOfMessages == 0) {
if (firstProblem == null) {
@@ -59,4 +103,5 @@ public boolean isItDead(final boolean runningWorkers, final int numberOfMessages
protected LocalDateTime now() {
return LocalDateTime.now();
}
+
}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java
index b954bd7..c92b1c7 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java
@@ -120,13 +120,23 @@ private class TaskScheduleBucket {
public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedException {
this.workerQueueName = queueConfig.queueName();
+ final QueueWatchDog watchDog = new QueueWatchDog(workerQueueName);
taskScheduler = schedulerFactory.createScheduler(queueConfig);
workerProducer = factory.createWorkerProducer(queueConfig);
final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler);
final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(),
OpenTelemetryMetrics.METER, workerPool);
- workerProducer.addWorkerFinishedHandler(reporter);
+
+ watchDog.addQueueWatchDogListener(workerPool);
+ watchDog.addQueueWatchDogListener(taskScheduler);
+ watchDog.addQueueWatchDogListener(reporter);
+
+ workerProducer.addWorkerProducerHandler(reporter);
+ workerProducer.addWorkerProducerHandler(watchDog);
+
workerSizeObserverProxy.addObserver(workerQueueName, workerPool);
+ workerSizeObserverProxy.addObserver(workerQueueName, watchDog);
+
if (taskScheduler instanceof final WorkerSizeObserver wzo) {
workerSizeObserverProxy.addObserver(workerQueueName, wzo);
}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java
index 739d19d..d489735 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java
@@ -26,9 +26,10 @@
import org.slf4j.LoggerFactory;
import nl.aerius.taskmanager.adaptor.WorkerProducer;
-import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerFinishedHandler;
import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics;
+import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler;
import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
+import nl.aerius.taskmanager.domain.QueueWatchDogListener;
import nl.aerius.taskmanager.domain.Task;
import nl.aerius.taskmanager.domain.TaskRecord;
import nl.aerius.taskmanager.domain.WorkerUpdateHandler;
@@ -40,13 +41,12 @@
* Reserved workers are workers that are waiting for a task to become available on the queue.
*
Running workers are workers for that are busy running the task and are waiting for the task to finish.
*/
-class WorkerPool implements WorkerSizeObserver, WorkerFinishedHandler, WorkerMetrics {
+class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, WorkerMetrics, QueueWatchDogListener {
private static final Logger LOG = LoggerFactory.getLogger(WorkerPool.class);
private final Semaphore freeWorkers = new Semaphore(0);
private final Map runningWorkers = new ConcurrentHashMap<>();
- private final QueueWatchDog watchDog = new QueueWatchDog();
private int totalConfiguredWorkers;
private final String workerQueueName;
private final WorkerProducer wp;
@@ -56,7 +56,7 @@ public WorkerPool(final String workerQueueName, final WorkerProducer wp, final W
this.workerQueueName = workerQueueName;
this.wp = wp;
this.workerUpdateHandler = workerUpdateHandler;
- wp.addWorkerFinishedHandler(this);
+ wp.addWorkerProducerHandler(this);
}
/**
@@ -104,22 +104,12 @@ public void onWorkerFinished(final String messageId, final Map m
releaseWorker(messageId);
}
- @Override
- public void reset() {
- synchronized (this) {
- for (final Entry taskEntry : runningWorkers.entrySet()) {
- releaseWorker(taskEntry.getKey(), taskEntry.getValue());
- }
- updateNumberOfWorkers(0);
- }
- }
-
/**
* Adds the worker to the pool of available workers and calls onWorkerReady.
*
* @param taskId Id of the task to release
*/
- public void releaseWorker(final String taskId) {
+ void releaseWorker(final String taskId) {
releaseWorker(taskId, runningWorkers.get(taskId));
}
@@ -129,7 +119,7 @@ public void releaseWorker(final String taskId) {
* @param taskId Id of the task that was reported done and can be released
* @param taskRecord the task is expected to be on.
*/
- public void releaseWorker(final String taskId, final TaskRecord taskRecord) {
+ void releaseWorker(final String taskId, final TaskRecord taskRecord) {
if (taskRecord != null) {
synchronized (this) {
if (runningWorkers.containsKey(taskId)) {
@@ -164,8 +154,6 @@ public void reserveWorker() {
}
}
-
-
/**
* Sets the number of workers which are actually available. This number should
* be determined on the number of workers that are actually in operation.
@@ -182,7 +170,6 @@ public void reserveWorker() {
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
synchronized (this) {
updateNumberOfWorkers(numberOfWorkers);
- checkDeadTasks(numberOfMessages);
}
}
@@ -203,10 +190,12 @@ private void updateNumberOfWorkers(final int numberOfWorkers) {
}
}
- private void checkDeadTasks(final int numberOfMessages) {
- if (watchDog.isItDead(!runningWorkers.isEmpty(), numberOfMessages)) {
- LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName);
- reset();
+ @Override
+ public void reset() {
+ synchronized (this) {
+ for (final Entry taskEntry : runningWorkers.entrySet()) {
+ releaseWorker(taskEntry.getKey(), taskEntry.getValue());
+ }
}
}
}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java
index 2083eec..34c67ab 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java
@@ -28,9 +28,9 @@ public interface WorkerProducer {
/**
* Sets the handler to call when a task is finished by a worker.
- * @param workerFinishedHandler handler.
+ * @param workerProducerHandler handler.
*/
- void addWorkerFinishedHandler(WorkerFinishedHandler workerFinishedHandler);
+ void addWorkerProducerHandler(WorkerProducerHandler workerProducerHandler);
/**
* Starts the worker producer.
@@ -39,7 +39,7 @@ public interface WorkerProducer {
/**
* Dispatch a message to the worker.
- * @param message message to ispatch
+ * @param message message to dispatch
* @throws IOException connection errors
*/
void dispatchMessage(final Message message) throws IOException;
@@ -50,9 +50,9 @@ public interface WorkerProducer {
void shutdown();
/**
- * Interface for handling finished tasks from the communication layer send by the workers.
+ * Interface called when the tasks is finished by the worker.
*/
- interface WorkerFinishedHandler {
+ interface WorkerProducerHandler {
/**
* Called when work dispatched to the worker.
*
@@ -70,13 +70,6 @@ default void onWorkDispatched(final String messageId, final Map
* @param messageMetaData message meta data
*/
void onWorkerFinished(String messageId, Map messageMetaData);
-
- /**
- * Instruct the handler to reset; that means all tasks that are waiting to be finished will never be marked as finished and therefor should
- * be cleaned up.
- */
- default void reset() {
- }
}
/**
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java
new file mode 100644
index 0000000..1514cf0
--- /dev/null
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright the State of the Netherlands
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see http://www.gnu.org/licenses/.
+ */
+package nl.aerius.taskmanager.domain;
+
+/**
+ * Interface for classes that need to be reset when the watch dog is triggered.
+ */
+public interface QueueWatchDogListener {
+ void reset();
+}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java
index 76c817a..2a2a837 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java
@@ -62,6 +62,14 @@ public synchronized void register(final int deltaActiveWorkers, final int totalW
runningWorkers += deltaActiveWorkers;
}
+ /**
+ * Resets the metric state. Sets running workers to 0, and resets the average load time by calling process.
+ */
+ public synchronized void reset() {
+ runningWorkers = 0;
+ process();
+ }
+
/**
* Calculates average duration over the last time frame since this method was called. Internals are reset in this method to a new measure point.
*
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java
index b644d53..e13338d 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java
@@ -31,9 +31,10 @@
import io.opentelemetry.api.metrics.DoubleGauge;
import io.opentelemetry.api.metrics.Meter;
-import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerFinishedHandler;
import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics;
+import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler;
import nl.aerius.taskmanager.client.TaskMetrics;
+import nl.aerius.taskmanager.domain.QueueWatchDogListener;
import nl.aerius.taskmanager.metrics.DurationMetric.DurationMetricValue;
/**
@@ -51,7 +52,7 @@
*
* - Average load (in percentage) of all workers (of a certain type) together.
*/
-public class PerformanceMetricsReporter implements WorkerFinishedHandler {
+public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener {
private static final Logger LOG = LoggerFactory.getLogger(PerformanceMetricsReporter.class);
@@ -149,6 +150,15 @@ public synchronized void onWorkerFinished(final String messageId, final Map e.getValue().process());
+ dispatchedWorkerMetrics.process();
+ // work metrics not needed to be reset because they are about work already done.
+ loadMetrics.reset();
+ }
+
private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) {
return new DurationMetric(OpenTelemetryMetrics.queueAttributes(queueGroupName, taskMetrics.queueName()));
}
@@ -172,13 +182,13 @@ private static void metrics(final String prefixText, final Map 0) {
LOG.debug("{} for {}: {} ms/task (#tasks: {})", prefixText, name, metric.avgDuration(), count);
}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java
index cbba798..e177c41 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java
@@ -127,9 +127,7 @@ private void updateMetrics() {
}
private void publish(final String queueName, final int size, final int utilisation) throws IOException, TimeoutException {
- final Channel channel = factory.getConnection().createChannel();
-
- try {
+ try (final Channel channel = factory.getConnection().createChannel()) {
channel.exchangeDeclare(AERIUS_EVENT_EXCHANGE, EXCHANGE_TYPE);
final Map headers = new HashMap<>();
@@ -139,8 +137,6 @@ private void publish(final String queueName, final int size, final int utilisati
final BasicProperties props = new BasicProperties().builder().headers(headers).build();
channel.basicPublish(AERIUS_EVENT_EXCHANGE, "", props, null);
debugLogState(queueName, size, utilisation);
- } finally {
- channel.close();
}
}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java
index 33a21ce..8cb1b2a 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java
@@ -57,7 +57,7 @@ class RabbitMQWorkerProducer implements WorkerProducer {
private final String workerQueueName;
private final boolean durable;
private final RabbitMQQueueType queueType;
- private final List workerFinishedHandlers = new ArrayList<>();
+ private final List workerProducerHandlers = new ArrayList<>();
private Channel channel;
private boolean isShutdown;
@@ -72,8 +72,8 @@ public RabbitMQWorkerProducer(final BrokerConnectionFactory factory, final Queue
}
@Override
- public void addWorkerFinishedHandler(final WorkerFinishedHandler workerFinishedHandler) {
- this.workerFinishedHandlers.add(workerFinishedHandler);
+ public void addWorkerProducerHandler(final WorkerProducerHandler workerProducerHandler) {
+ this.workerProducerHandlers.add(workerProducerHandler);
}
@Override
@@ -99,7 +99,15 @@ public void dispatchMessage(final Message message) throws IOException {
forwardBuilder.headers(headers);
final BasicProperties forwardProperties = forwardBuilder.deliveryMode(2).build();
channel.basicPublish("", workerQueueName, forwardProperties, rabbitMQMessage.getBody());
- workerFinishedHandlers.forEach(h -> h.onWorkDispatched(message.getMessageId(), headers));
+ workerProducerHandlers.forEach(h -> handleWorkDispatched(message, headers, h));
+ }
+
+ private static void handleWorkDispatched(final Message message, final Map headers, final WorkerProducerHandler handler) {
+ try {
+ handler.onWorkDispatched(message.getMessageId(), headers);
+ } catch (final RuntimeException e) {
+ LOG.error("Runtime exception during onWorkDispatched of {}", handler.getClass(), e);
+ }
}
private synchronized void ensureChanne() throws IOException {
@@ -182,13 +190,13 @@ private boolean startReplyConsumer(final Connection connection) throws IOExcepti
replyChannel.basicConsume(workerReplyQueue, true, workerReplyQueue, new DefaultConsumer(replyChannel) {
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) {
- workerFinishedHandlers.forEach(h -> handleWorkFinished(h, properties));
+ workerProducerHandlers.forEach(h -> handleWorkFinished(h, properties));
}
});
return true;
}
- private static void handleWorkFinished(final WorkerFinishedHandler handler, final BasicProperties properties) {
+ private static void handleWorkFinished(final WorkerProducerHandler handler, final BasicProperties properties) {
try {
handler.onWorkerFinished(properties.getMessageId(), properties.getHeaders());
} catch (final RuntimeException e) {
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java
index 520317a..0aa5a16 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java
@@ -171,15 +171,15 @@ private void updateWorkerQueueState(final String queueName) {
}
private static class WorkerSizeObserverComposite implements WorkerSizeObserver {
- private final List list = new ArrayList<>();
+ private final List observers = new ArrayList<>();
public void add(final WorkerSizeObserver observer) {
- list.add(observer);
+ observers.add(observer);
}
@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
- for (final WorkerSizeObserver observer : list) {
+ for (final WorkerSizeObserver observer : observers) {
try {
observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages);
} catch (final RuntimeException e) {
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java
index 0fd70b6..73ace23 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java
@@ -17,6 +17,7 @@
package nl.aerius.taskmanager.scheduler;
import nl.aerius.taskmanager.domain.QueueConfig;
+import nl.aerius.taskmanager.domain.QueueWatchDogListener;
import nl.aerius.taskmanager.domain.Task;
import nl.aerius.taskmanager.domain.TaskQueue;
import nl.aerius.taskmanager.domain.TaskSchedule;
@@ -26,7 +27,7 @@
* Interface for the scheduling algorithm. The implementation should maintain an internal list of all tasks added and return the task to be processed
* in {@link #getNextTask()} based on whatever priority algorithm the scheduler implements.
*/
-public interface TaskScheduler extends WorkerUpdateHandler {
+public interface TaskScheduler extends WorkerUpdateHandler, QueueWatchDogListener {
/**
* Adds a Task to the scheduler to being processed. The scheduler will return this task in {@link #getNextTask()}
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java
index a2ab392..435cb7b 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java
@@ -70,7 +70,11 @@ public void incrementOnWorker(final TaskRecord taskRecord) {
tasksOnWorkersPerQueue.computeIfAbsent(key(taskRecord), k -> new AtomicInteger()).incrementAndGet();
}
- public int onWorkerTotal(final String queueName) {
+ public int onWorkerTotal() {
+ return tasksOnWorkersPerQueue.entrySet().stream().mapToInt(e -> e.getValue().get()).sum();
+ }
+
+ public int onWorkerByQueue(final String queueName) {
return tasksOnWorkersPerQueue.entrySet().stream()
.filter(e -> keyMapper.queueName(e.getKey()).equals(queueName))
.mapToInt(e -> e.getValue().get())
diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java
index 152e8d3..2ba755a 100644
--- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java
+++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java
@@ -27,8 +27,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import nl.aerius.taskmanager.QueueWatchDog;
-import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
import nl.aerius.taskmanager.domain.PriorityTaskQueue;
import nl.aerius.taskmanager.domain.Task;
import nl.aerius.taskmanager.domain.TaskRecord;
@@ -41,12 +39,11 @@
* like with other priorities.
*
*/
-class PriorityTaskScheduler implements TaskScheduler, Comparator, WorkerSizeObserver {
+class PriorityTaskScheduler implements TaskScheduler, Comparator {
private static final Logger LOG = LoggerFactory.getLogger(PriorityTaskScheduler.class);
private static final int NEXT_TASK_MAX_WAIT_TIME_SECONDS = 120;
- private final QueueWatchDog watchDog = new QueueWatchDog();
private final PriorityTaskSchedulerMetrics metrics = new PriorityTaskSchedulerMetrics();
private final Queue queue;
private final PriorityQueueMap> priorityQueueMap;
@@ -54,7 +51,6 @@ class PriorityTaskScheduler implements TaskScheduler, Compara
private final Condition nextTaskCondition = lock.newCondition();
private final String workerQueueName;
private int numberOfWorkers;
- private int tasksOnWorkers;
/**
* Constructs scheduler for given configuration.
@@ -119,7 +115,6 @@ public Task getNextTask() throws InterruptedException {
}
private void obtainTask() {
- tasksOnWorkers++;
final Task task = queue.poll();
priorityQueueMap.incrementOnWorker(task.getTaskRecord());
@@ -146,14 +141,14 @@ private boolean isTaskNext(final TaskRecord taskRecord) {
if (!taskNext) {
LOG.trace("Task for queue '{}.{}' not scheduled: queueConfiguration:{}, numberOfWorkers:{}, tasksOnWorkers:{}, tasksForQueue:{}",
- workerQueueName, taskRecord, priorityQueueMap.get(taskRecord), numberOfWorkers, tasksOnWorkers,
+ workerQueueName, taskRecord, priorityQueueMap.get(taskRecord), numberOfWorkers, priorityQueueMap.onWorkerTotal(),
priorityQueueMap.onWorker(taskRecord));
}
return taskNext;
}
private int getFreeWorkers() {
- return numberOfWorkers - tasksOnWorkers;
+ return numberOfWorkers - priorityQueueMap.onWorkerTotal();
}
private boolean hasCapacityRemaining(final TaskRecord taskRecord) {
@@ -166,7 +161,6 @@ public void onTaskFinished(final TaskRecord taskRecord) {
lock.lock();
try {
priorityQueueMap.decrementOnWorker(taskRecord);
- tasksOnWorkers--;
signalNextTask();
// clean up queue if it has been removed.
if (!priorityQueueMap.containsKey(taskRecord.queueName())) {
@@ -198,7 +192,7 @@ public void updateQueue(final PriorityTaskQueue queue) {
try {
final String queueName = queue.getQueueName();
if (!priorityQueueMap.containsKey(queueName)) {
- metrics.addMetric(() -> priorityQueueMap.onWorkerTotal(queueName), workerQueueName, queueName);
+ metrics.addMetric(() -> priorityQueueMap.onWorkerByQueue(queueName), workerQueueName, queueName);
}
final PriorityTaskQueue old = priorityQueueMap.put(queueName, queue);
@@ -293,22 +287,14 @@ private void signalNextTask() {
}
}
- @Override
- public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
- if (watchDog.isItDead(tasksOnWorkers > 0, numberOfMessages)) {
- LOG.info("It looks like some tasks are zombies on {} worker queue in priority scheduler, so all tasks currently in state running are released.", workerQueueName);
- reset();
- }
- }
-
/**
* Resets the internal state. Called in case a difference was detected that internally it still has messages as being on the queue,
* while the queue is empty.
*/
- void reset() {
+ @Override
+ public void reset() {
lock.lock();
try {
- tasksOnWorkers = 0;
priorityQueueMap.reset();
signalNextTask();
} finally {
diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTaskScheduler.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTaskScheduler.java
index da85353..4ffff18 100644
--- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTaskScheduler.java
+++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTaskScheduler.java
@@ -83,4 +83,9 @@ public PriorityTaskSchedulerFileHandler getHandler() {
}
}
+ @Override
+ public void reset() {
+ // No-op
+ }
+
}
diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockWorkerProducer.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockWorkerProducer.java
index e90ef76..43b6846 100644
--- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockWorkerProducer.java
+++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockWorkerProducer.java
@@ -31,7 +31,7 @@ public class MockWorkerProducer implements WorkerProducer {
private boolean shutdownExceptionOnForward = false;
@Override
- public void addWorkerFinishedHandler(final WorkerFinishedHandler workerFinishedHandler) {
+ public void addWorkerProducerHandler(final WorkerProducerHandler workerFinishedHandler) {
// no-op
}
diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java
index e57682e..1974a67 100644
--- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java
+++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java
@@ -16,33 +16,60 @@
*/
package nl.aerius.taskmanager;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.time.LocalDateTime;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import nl.aerius.taskmanager.domain.QueueWatchDogListener;
/**
* Test class for {@link QueueWatchDog}.
*/
class QueueWatchDogTest {
- @Test
- void testIsItDead() {
+ private static final String QUEUE_NAME = "queue1";
+
+ @ParameterizedTest
+ @MethodSource("isDeadTests")
+ void testIsItDead(final int runningWorkers, final int finishedWorkers, final int numberOfMessages, final int expected, final String message) {
final AtomicReference now = new AtomicReference<>(LocalDateTime.now());
- final QueueWatchDog qwd = new QueueWatchDog() {
+ final QueueWatchDog qwd = new QueueWatchDog(QUEUE_NAME) {
@Override
protected LocalDateTime now() {
return now.get();
}
};
- assertFalse(qwd.isItDead(false, 0), "No running workers, with no messages, no problem");
- assertFalse(qwd.isItDead(false, 10), "No running workers, no problem");
- assertFalse(qwd.isItDead(true, 10), "Running workers, with messages, no problem");
- assertFalse(qwd.isItDead(true, 0), "Running workers, with no messages, possible problem, we just wait");
- now.set(now.get().plusMinutes(20)); //fast forward 20 minutes.
- assertTrue(qwd.isItDead(true, 0), "Running workers, with no messages, after specified time; yes reset");
+ final QueueWatchDogListener listener = mock(QueueWatchDogListener.class);
+
+ qwd.addQueueWatchDogListener(listener);
+ IntStream.range(0, runningWorkers).forEach(i -> qwd.onWorkDispatched(String.valueOf(i), null));
+ IntStream.range(0, finishedWorkers).forEach(i -> qwd.onWorkerFinished(String.valueOf(i), null));
+
+ qwd.onNumberOfWorkersUpdate(0, numberOfMessages);
+ // reset should never trigger the first time the problem was reported.
+ verify(listener, never()).reset();
+
+ // Fast forward 20 minutes to trigger reset if there is a problem.
+ now.set(now.get().plusMinutes(20));
+ qwd.onNumberOfWorkersUpdate(0, numberOfMessages);
+ verify(listener, times(expected)).reset();
+ }
+
+ private static List isDeadTests() {
+ return List.of(
+ Arguments.of(5, 5, 0, 0, "No running workers, with no messages, no problem"),
+ Arguments.of(5, 5, 10, 0, "No running workers, 10 messages, no problem"),
+ Arguments.of(5, 1, 10, 0, "Running workers, with messages, no problem"),
+ Arguments.of(5, 1, 0, 1, "Running workers, with no messages, after specified time; yes reset"));
}
}
diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java
index 3cdf948..562cdbf 100644
--- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java
+++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java
@@ -28,7 +28,6 @@
import java.util.concurrent.ExecutorService;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -41,7 +40,6 @@
import nl.aerius.taskmanager.domain.TaskConsumer;
import nl.aerius.taskmanager.domain.WorkerUpdateHandler;
import nl.aerius.taskmanager.exception.NoFreeWorkersException;
-import nl.aerius.taskmanager.exception.TaskAlreadySentException;
import nl.aerius.taskmanager.mq.RabbitMQMessage;
/**
@@ -83,8 +81,7 @@ void testWorkerPoolSizing() throws IOException {
assertEquals(10, numberOfWorkers, "Check if workerPool change handler called.");
workerPool.reserveWorker();
assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker");
- final Task task = createTask();
- workerPool.sendTaskToWorker(task);
+ final Task task = createAndSendTaskToWorker();
assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker");
workerPool.releaseWorker(task.getId());
assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after releasing 1 worker");
@@ -92,19 +89,16 @@ void testWorkerPoolSizing() throws IOException {
@Test
void testNoFreeWorkers() {
- assertThrows(NoFreeWorkersException.class, () -> workerPool.sendTaskToWorker(createTask()),
+ assertThrows(NoFreeWorkersException.class, () -> createAndSendTaskToWorker(),
"Expected NoFreeWorkersException when trying to send a task while there are no free workers.");
}
@Test
void testWorkerPoolScaleDown() throws IOException {
workerPool.onNumberOfWorkersUpdate(5, 0);
- final Task task1 = createTask();
- workerPool.sendTaskToWorker(task1);
- final Task task2 = createTask();
- workerPool.sendTaskToWorker(task2);
- final Task task3 = createTask();
- workerPool.sendTaskToWorker(task3);
+ final Task task1 = createAndSendTaskToWorker();
+ final Task task2 = createAndSendTaskToWorker();
+ final Task task3 = createAndSendTaskToWorker();
assertEquals(5, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after 2 workers running");
workerPool.onNumberOfWorkersUpdate(1, 0);
assertEquals(3, workerPool.getWorkerSize(),
@@ -118,11 +112,11 @@ void testWorkerPoolScaleDown() throws IOException {
assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size should remain the same");
}
+
@Test
void testReleaseTaskTwice() throws IOException {
workerPool.onNumberOfWorkersUpdate(2, 0);
- final Task task1 = createTask();
- workerPool.sendTaskToWorker(task1);
+ final Task task1 = createAndSendTaskToWorker();
final String id = task1.getId();
workerPool.releaseWorker(id);
final int currentWorkerSize1 = workerPool.getReportedWorkerSize();
@@ -132,26 +126,27 @@ void testReleaseTaskTwice() throws IOException {
assertEquals(2, workerPool.getReportedWorkerSize(), "Check if task worker size not decreased to much");
}
- @Disabled("Exception is not thrown anymore, so test ignored for now")
- @Test
- void testSendSameTaskTwice() throws IOException {
- workerPool.onNumberOfWorkersUpdate(3, 0);
- final Task task1 = createTask();
- workerPool.sendTaskToWorker(task1);
-
- assertThrows(TaskAlreadySentException.class, () -> workerPool.sendTaskToWorker(task1),
- "Expected TaskAlreadySentException when a message is send a second time.");
- }
-
@Test
void testMessageDeliverd() throws IOException {
workerPool.onNumberOfWorkersUpdate(1, 0);
- final Task task1 = createTask();
- workerPool.sendTaskToWorker(task1);
+ createAndSendTaskToWorker();
assertNotSame(0, message.getDeliveryTag(), "Check if message is delivered");
}
- private Task createTask() {
- return new MockTask(taskConsumer);
+ @Test
+ void testReset() throws IOException {
+ workerPool.onNumberOfWorkersUpdate(5, 0);
+ createAndSendTaskToWorker();
+ createAndSendTaskToWorker();
+ assertEquals(2, workerPool.getRunningWorkerSize(), "Should report 2 workers running.");
+ workerPool.reset();
+ assertEquals(0, workerPool.getRunningWorkerSize(), "Should report no workers running after internal state reset.");
+ }
+
+ private Task createAndSendTaskToWorker() throws IOException {
+ final Task task = new MockTask(taskConsumer);
+
+ workerPool.sendTaskToWorker(task);
+ return task;
}
}
diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java
index d1a0bdf..16df98e 100644
--- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java
+++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java
@@ -25,7 +25,6 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -33,6 +32,7 @@
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -87,35 +87,34 @@ void beforeEach() {
@Test
void testOnWorkDispatched() {
- doReturn(10).when(workMetrics).getReportedWorkerSize();
reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L));
reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L));
+ lenient().doReturn(10).when(workMetrics).getReportedWorkerSize();
methodCaptor.getValue().run();
- verify(mockedGauges.get("aer.taskmanager.dispatched")).set(eq(2.0), any());
- verify(mockedGauges.get("aer.taskmanager.dispatched.wait")).set(durationCaptor.capture(), any());
- verify(mockedGauges.get("aer.taskmanager.dispatched.queue")).set(eq(2.0), any());
- verify(mockedGauges.get("aer.taskmanager.dispatched.queue.wait")).set(durationCaptor.capture(), any());
- durationCaptor.getAllValues()
- .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v));
+ assertGaugeCalls("dispatched", "wait", 2.0, v -> v > 99.0);
}
@Test
void testOnWorkerFinished() {
- lenient().doReturn(10).when(workMetrics).getReportedWorkerSize();
reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L));
reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L));
reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L));
+ lenient().doReturn(10).when(workMetrics).getReportedWorkerSize();
methodCaptor.getValue().run();
- verify(mockedGauges.get("aer.taskmanager.work")).set(eq(2.0), any());
- verify(mockedGauges.get("aer.taskmanager.work.duration")).set(durationCaptor.capture(), any());
- verify(mockedGauges.get("aer.taskmanager.work.queue")).set(eq(2.0), any());
- verify(mockedGauges.get("aer.taskmanager.work.queue.duration")).set(durationCaptor.capture(), any());
- durationCaptor.getAllValues()
- .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v));
+ assertGaugeCalls("work", "duration", 2.0, v -> v > 99.0);
// getReportedWorkerSize should only be called on tasks also dispatched, so only for task "1"
verify(workMetrics, times(2)).getReportedWorkerSize();
}
+ private void assertGaugeCalls(final String label, final String type, final double expected, final Predicate duration) {
+ verify(mockedGauges.get("aer.taskmanager." + label)).set(eq(expected), any());
+ verify(mockedGauges.get("aer.taskmanager.%s.%s".formatted(label, type))).set(durationCaptor.capture(), any());
+ verify(mockedGauges.get("aer.taskmanager.%s.queue".formatted(label))).set(eq(expected), any());
+ verify(mockedGauges.get("aer.taskmanager.%s.queue.%s".formatted(label, type))).set(durationCaptor.capture(), any());
+ durationCaptor.getAllValues()
+ .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v));
+ }
+
@Test
void testWorkLoad() throws InterruptedException {
doReturn(4).when(workMetrics).getReportedWorkerSize();
@@ -128,6 +127,22 @@ void testWorkLoad() throws InterruptedException {
assertEquals(50.0, durationCaptor.getAllValues().get(1), "Expected workload of 50%");
}
+ @Test
+ void testReset() throws InterruptedException {
+ doReturn(4).when(workMetrics).getReportedWorkerSize();
+ reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L));
+ reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L));
+ Thread.sleep(2); // Add a bit of delay to get some time frame between updates..
+ reporter.reset();
+ methodCaptor.getValue().run();
+ // Verify dispatched metrics have been reset.
+ assertGaugeCalls("dispatched", "wait", 0.0, v -> v == 0.0);
+
+ // Verify load metric have been reset.
+ verify(mockedGauges.get("aer.taskmanager.work.load"), times(1)).set(durationCaptor.capture(), any());
+ assertEquals(0.0, durationCaptor.getAllValues().get(0), 1E-5, "Expected to have no workload anymore");
+ }
+
private Map createMap(final String queueName, final long duration) {
return new TaskMetrics().duration(duration).queueName(queueName).start(System.currentTimeMillis() - 100).build();
}
diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java
index a89ffea..227f6d0 100644
--- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java
+++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java
@@ -63,7 +63,7 @@ class PriorityTaskSchedulerTest {
private static final String QUEUE2 = "queue2";
private static final String QUEUE3 = "queue3";
private static final double TEST_CAPACITY = 0.7;
- private static final PriorityTaskSchedulerFactory factory = new PriorityTaskSchedulerFactory();
+ private static final PriorityTaskSchedulerFactory FACTORY = new PriorityTaskSchedulerFactory();
private TaskConsumer taskConsumer1;
private TaskConsumer taskConsumer2;
@@ -86,7 +86,7 @@ void setUp() throws IOException {
configuration.getQueues().add(tc1);
configuration.getQueues().add(tc2);
configuration.getQueues().add(tc3);
- scheduler = (PriorityTaskScheduler) factory.createScheduler(new QueueConfig(QUEUE1, false, true, null));
+ scheduler = (PriorityTaskScheduler) FACTORY.createScheduler(new QueueConfig(QUEUE1, false, true, null));
configuration.getQueues().forEach(scheduler::updateQueue);
task1 = createTask(taskConsumer1, "1");
task2a = createTask(taskConsumer2, "2a");