Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>nl.aerius</groupId>
<artifactId>aerius-root-pom</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,73 @@
package nl.aerius.taskmanager;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler;
import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
import nl.aerius.taskmanager.domain.QueueWatchDogListener;

/**
* WatchDog to detect dead messages. Dead messages are messages once put on the queue, but those messages have gone. For example because
* the queue was purged after some restart. In such a case the scheduler keeps the tasks locked and since there will never come an message
* for the task it's locked indefinitely. This watch dog tries to detect such tasks and release them at some point.
*/
public class QueueWatchDog {
class QueueWatchDog implements WorkerSizeObserver, WorkerProducerHandler {

private static final Logger LOG = LoggerFactory.getLogger(QueueWatchDog.class);

/**
* If for more than 10 minutes the problem remains the sign to reset is given.
*/
private static final long RESET_TIME_MINUTES = 10;

private final String workerQueueName;
private final List<QueueWatchDogListener> listeners = new ArrayList<>();
private final Set<String> runningTasks = new HashSet<>();

private LocalDateTime firstProblem;

public QueueWatchDog(final String workerQueueName) {
this.workerQueueName = workerQueueName;
}

public void addQueueWatchDogListener(final QueueWatchDogListener listener) {
listeners.add(listener);
}

@Override
public void onWorkDispatched(final String messageId, final Map<String, Object> messageMetaData) {
runningTasks.add(messageId);
}

@Override
public void onWorkerFinished(final String messageId, final Map<String, Object> messageMetaData) {
runningTasks.remove(messageId);
}

@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) {
LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName);
listeners.forEach(QueueWatchDogListener::reset);
}
}

/**
* Check if the condition is met to do a reset. This is if for more than {@link #RESET_TIME_MINUTES} workers are running,
* but no messages were on the queue it's time to free all tasks.
* @param runningWorkers number of workers running
* @param numberOfMessages number of messages on queue
* @return true if it's time to free all tasks
*/
public boolean isItDead(final boolean runningWorkers, final int numberOfMessages) {
private boolean isItDead(final boolean runningWorkers, final int numberOfMessages) {
boolean doReset = false;
if (runningWorkers && numberOfMessages == 0) {
if (firstProblem == null) {
Expand All @@ -59,4 +103,5 @@ public boolean isItDead(final boolean runningWorkers, final int numberOfMessages
protected LocalDateTime now() {
return LocalDateTime.now();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,23 @@ private class TaskScheduleBucket {

public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedException {
this.workerQueueName = queueConfig.queueName();
final QueueWatchDog watchDog = new QueueWatchDog(workerQueueName);
taskScheduler = schedulerFactory.createScheduler(queueConfig);
workerProducer = factory.createWorkerProducer(queueConfig);
final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler);
final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(),
OpenTelemetryMetrics.METER, workerPool);
workerProducer.addWorkerFinishedHandler(reporter);

watchDog.addQueueWatchDogListener(workerPool);
watchDog.addQueueWatchDogListener(taskScheduler);
watchDog.addQueueWatchDogListener(reporter);

workerProducer.addWorkerProducerHandler(reporter);
workerProducer.addWorkerProducerHandler(watchDog);

workerSizeObserverProxy.addObserver(workerQueueName, workerPool);
workerSizeObserverProxy.addObserver(workerQueueName, watchDog);

if (taskScheduler instanceof final WorkerSizeObserver wzo) {
workerSizeObserverProxy.addObserver(workerQueueName, wzo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.slf4j.LoggerFactory;

import nl.aerius.taskmanager.adaptor.WorkerProducer;
import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerFinishedHandler;
import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics;
import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler;
import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
import nl.aerius.taskmanager.domain.QueueWatchDogListener;
import nl.aerius.taskmanager.domain.Task;
import nl.aerius.taskmanager.domain.TaskRecord;
import nl.aerius.taskmanager.domain.WorkerUpdateHandler;
Expand All @@ -40,13 +41,12 @@
* <p>Reserved workers are workers that are waiting for a task to become available on the queue.
* <p>Running workers are workers for that are busy running the task and are waiting for the task to finish.
*/
class WorkerPool implements WorkerSizeObserver, WorkerFinishedHandler, WorkerMetrics {
class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, WorkerMetrics, QueueWatchDogListener {

private static final Logger LOG = LoggerFactory.getLogger(WorkerPool.class);

private final Semaphore freeWorkers = new Semaphore(0);
private final Map<String, TaskRecord> runningWorkers = new ConcurrentHashMap<>();
private final QueueWatchDog watchDog = new QueueWatchDog();
private int totalConfiguredWorkers;
private final String workerQueueName;
private final WorkerProducer wp;
Expand All @@ -56,7 +56,7 @@ public WorkerPool(final String workerQueueName, final WorkerProducer wp, final W
this.workerQueueName = workerQueueName;
this.wp = wp;
this.workerUpdateHandler = workerUpdateHandler;
wp.addWorkerFinishedHandler(this);
wp.addWorkerProducerHandler(this);
}

/**
Expand Down Expand Up @@ -104,22 +104,12 @@ public void onWorkerFinished(final String messageId, final Map<String, Object> m
releaseWorker(messageId);
}

@Override
public void reset() {
synchronized (this) {
for (final Entry<String, TaskRecord> taskEntry : runningWorkers.entrySet()) {
releaseWorker(taskEntry.getKey(), taskEntry.getValue());
}
updateNumberOfWorkers(0);
}
}

/**
* Adds the worker to the pool of available workers and calls onWorkerReady.
*
* @param taskId Id of the task to release
*/
public void releaseWorker(final String taskId) {
void releaseWorker(final String taskId) {
releaseWorker(taskId, runningWorkers.get(taskId));
}

Expand All @@ -129,7 +119,7 @@ public void releaseWorker(final String taskId) {
* @param taskId Id of the task that was reported done and can be released
* @param taskRecord the task is expected to be on.
*/
public void releaseWorker(final String taskId, final TaskRecord taskRecord) {
void releaseWorker(final String taskId, final TaskRecord taskRecord) {
if (taskRecord != null) {
synchronized (this) {
if (runningWorkers.containsKey(taskId)) {
Expand Down Expand Up @@ -164,8 +154,6 @@ public void reserveWorker() {
}
}



/**
* Sets the number of workers which are actually available. This number should
* be determined on the number of workers that are actually in operation.
Expand All @@ -182,7 +170,6 @@ public void reserveWorker() {
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
synchronized (this) {
updateNumberOfWorkers(numberOfWorkers);
checkDeadTasks(numberOfMessages);
}
}

Expand All @@ -203,10 +190,12 @@ private void updateNumberOfWorkers(final int numberOfWorkers) {
}
}

private void checkDeadTasks(final int numberOfMessages) {
if (watchDog.isItDead(!runningWorkers.isEmpty(), numberOfMessages)) {
LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName);
reset();
@Override
public void reset() {
synchronized (this) {
for (final Entry<String, TaskRecord> taskEntry : runningWorkers.entrySet()) {
releaseWorker(taskEntry.getKey(), taskEntry.getValue());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public interface WorkerProducer {

/**
* Sets the handler to call when a task is finished by a worker.
* @param workerFinishedHandler handler.
* @param workerProducerHandler handler.
*/
void addWorkerFinishedHandler(WorkerFinishedHandler workerFinishedHandler);
void addWorkerProducerHandler(WorkerProducerHandler workerProducerHandler);

/**
* Starts the worker producer.
Expand All @@ -39,7 +39,7 @@ public interface WorkerProducer {

/**
* Dispatch a message to the worker.
* @param message message to ispatch
* @param message message to dispatch
* @throws IOException connection errors
*/
void dispatchMessage(final Message message) throws IOException;
Expand All @@ -50,9 +50,9 @@ public interface WorkerProducer {
void shutdown();

/**
* Interface for handling finished tasks from the communication layer send by the workers.
* Interface called when the tasks is finished by the worker.
*/
interface WorkerFinishedHandler {
interface WorkerProducerHandler {
/**
* Called when work dispatched to the worker.
*
Expand All @@ -70,13 +70,6 @@ default void onWorkDispatched(final String messageId, final Map<String, Object>
* @param messageMetaData message meta data
*/
void onWorkerFinished(String messageId, Map<String, Object> messageMetaData);

/**
* Instruct the handler to reset; that means all tasks that are waiting to be finished will never be marked as finished and therefor should
* be cleaned up.
*/
default void reset() {
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright the State of the Netherlands
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package nl.aerius.taskmanager.domain;

/**
* Interface for classes that need to be reset when the watch dog is triggered.
*/
public interface QueueWatchDogListener {
void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ public synchronized void register(final int deltaActiveWorkers, final int totalW
runningWorkers += deltaActiveWorkers;
}

/**
* Resets the metric state. Sets running workers to 0, and resets the average load time by calling process.
*/
public synchronized void reset() {
runningWorkers = 0;
process();
}

/**
* Calculates average duration over the last time frame since this method was called. Internals are reset in this method to a new measure point.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
import io.opentelemetry.api.metrics.DoubleGauge;
import io.opentelemetry.api.metrics.Meter;

import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerFinishedHandler;
import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics;
import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler;
import nl.aerius.taskmanager.client.TaskMetrics;
import nl.aerius.taskmanager.domain.QueueWatchDogListener;
import nl.aerius.taskmanager.metrics.DurationMetric.DurationMetricValue;

/**
Expand All @@ -51,7 +52,7 @@
*
* - Average load (in percentage) of all workers (of a certain type) together.
*/
public class PerformanceMetricsReporter implements WorkerFinishedHandler {
public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener {

private static final Logger LOG = LoggerFactory.getLogger(PerformanceMetricsReporter.class);

Expand Down Expand Up @@ -149,6 +150,15 @@ public synchronized void onWorkerFinished(final String messageId, final Map<Stri
}
}

@Override
public void reset() {
dispatchedTasks.clear();
dispatchedQueueMetrics.entrySet().forEach(e -> e.getValue().process());
dispatchedWorkerMetrics.process();
// work metrics not needed to be reset because they are about work already done.
loadMetrics.reset();
}

private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) {
return new DurationMetric(OpenTelemetryMetrics.queueAttributes(queueGroupName, taskMetrics.queueName()));
}
Expand All @@ -172,13 +182,13 @@ private static void metrics(final String prefixText, final Map<String, DurationM
}
}

private static void metrics(final String prefixText, final DoubleGauge gauge, final DoubleGauge waitGauge, final String name,
private static void metrics(final String prefixText, final DoubleGauge gauge, final DoubleGauge averageGauge, final String name,
final DurationMetric metrics) {
final DurationMetricValue metric = metrics.process();
final int count = metric.count();

gauge.set(count, metrics.getAttributes());
waitGauge.set(metric.avgDuration(), metrics.getAttributes());
averageGauge.set(metric.avgDuration(), metrics.getAttributes());
if (count > 0) {
LOG.debug("{} for {}: {} ms/task (#tasks: {})", prefixText, name, metric.avgDuration(), count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ private void updateMetrics() {
}

private void publish(final String queueName, final int size, final int utilisation) throws IOException, TimeoutException {
final Channel channel = factory.getConnection().createChannel();

try {
try (final Channel channel = factory.getConnection().createChannel()) {
channel.exchangeDeclare(AERIUS_EVENT_EXCHANGE, EXCHANGE_TYPE);

final Map<String, Object> headers = new HashMap<>();
Expand All @@ -139,8 +137,6 @@ private void publish(final String queueName, final int size, final int utilisati
final BasicProperties props = new BasicProperties().builder().headers(headers).build();
channel.basicPublish(AERIUS_EVENT_EXCHANGE, "", props, null);
debugLogState(queueName, size, utilisation);
} finally {
channel.close();
}
}

Expand Down
Loading