From bb5cc3656de8a1cda826be032d7681696b682baf Mon Sep 17 00:00:00 2001 From: Dag Bertelsen Date: Fri, 7 Mar 2025 14:10:29 +0100 Subject: [PATCH] Added watchDog to safeguard against db deadlock in the database --- .../ScheduledTaskRegistryImpl.java | 126 +++++++++++++----- .../scheduledtask/ScheduledTaskRunner.java | 42 +++--- 2 files changed, 120 insertions(+), 48 deletions(-) diff --git a/scheduledtask-core/src/main/java/com/storebrand/scheduledtask/ScheduledTaskRegistryImpl.java b/scheduledtask-core/src/main/java/com/storebrand/scheduledtask/ScheduledTaskRegistryImpl.java index 3338e62..951d836 100644 --- a/scheduledtask-core/src/main/java/com/storebrand/scheduledtask/ScheduledTaskRegistryImpl.java +++ b/scheduledtask-core/src/main/java/com/storebrand/scheduledtask/ScheduledTaskRegistryImpl.java @@ -22,7 +22,6 @@ import java.time.Clock; import java.time.Instant; import java.time.LocalDateTime; -import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; @@ -41,7 +40,6 @@ import com.storebrand.scheduledtask.ScheduledTaskConfig.StaticRetentionPolicy; import com.storebrand.scheduledtask.db.MasterLockRepository; import com.storebrand.scheduledtask.db.ScheduledTaskRepository; -import com.storebrand.scheduledtask.SpringCronUtils.CronExpression; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -143,7 +141,16 @@ public Optional getMasterLock() { @Override public boolean hasMasterLock() { - return _masterLockKeeper.isMaster(); + return _masterLockKeeper._isMaster; + } + + /** + * This will trigger an interrupt signal on all the running threads. + */ + void interruptRunningSchedules() { + _schedules.entrySet().stream().forEach(entry -> { + entry.getValue().interruptThread(); + }); } /** @@ -170,26 +177,28 @@ boolean isTestMode() { // ===== MasterLock and Schedule =================================================================================== /** - * Master lock is responsible for acquiring and keeping a lock, the node that has this lock will be responsible - * for running ALL Schedules. - * When a lock is first acquired this node has this for 5 minutes and in order to keep it this node has to the - * method {@link MasterLockRepository#keepLock(String, String)} within that timespan in order to keep it for a new - * 5 minutes. If this node does not manage to keep the lock within the 5 min timespan it means no nodes are master - * for the duration 5 - 10 min. - * After the lock has not been updated for 10 minutes it can be re-acquired again. + * Master lock is responsible for acquiring and keeping a lock, the node that has this lock will be responsible for + * running ALL Schedules. When a lock is first acquired this node has this for 5 minutes and in order to keep it + * this node has to the method {@link MasterLockRepository#keepLock(String, String)} within that timespan in order + * to keep it for a new 5 minutes. If this node does not manage to keep the lock within the 5 min timespan it means + * no nodes are master for the duration 5 - 10 min. After the lock has not been updated for 10 minutes it can be + * re-acquired again. */ static class MasterLockKeeper { private static final long MASTER_LOCK_SLEEP_LOOP_IN_MILLISECONDS = 2 * 60 * 1000; // 2 minutes + private static final long WATCHDOG_SLEEP_LOOP_IN_MILLISECONDS = 15 * 1000; // 15 seconds private static final long MASTER_LOCK_MAX_TIME_SINCE_LAST_UPDATE_MINUTES = 5; - private Thread _runner; + private Thread _masterLockRunner; + private Thread _watchdogRunner; private final Clock _clock; /* .. state vars .. */ private volatile boolean _isMaster = false; private volatile Instant _lastUpdated = Instant.EPOCH; private volatile boolean _runFlag = true; private volatile boolean _isInitialRun = true; - private final Object _syncObject = new Object(); + private final Object _syncObjectMasterLock = new Object(); + private final Object _syncObjectWatchdog = new Object(); private final MasterLockRepository _masterLockRepository; private final ScheduledTaskRegistryImpl _storebrandScheduleService; private final AtomicInteger _notMasterCounter = new AtomicInteger(); @@ -201,14 +210,55 @@ static class MasterLockKeeper { _storebrandScheduleService = storebrandScheduleService; _clock = clock; log.info("Starting MasterLock thread"); - _runner = new Thread(MasterLockKeeper.this::runner, "MasterLock thread"); + _masterLockRunner = new Thread(MasterLockKeeper.this::runner, "MasterLock thread"); + _watchdogRunner = new Thread(MasterLockKeeper.this::watchDogThread, "MasterLock health check thread"); // Make sure that the master lock is created: _masterLockRepository.tryCreateLock(MASTER_LOCK_NAME, Host.getLocalHostName()); } + // :: This will periodically check if the master lock is updated and if not try will try to update the _isMaster + // flag. Due to the main thread queries a database we can get a situation where this is in a state where + // the database is not accessible so the keepLock can slide over the valid time. This thread will ensure that + // the master lock is kept up to date. + private void watchDogThread() { + // :: We will loop until the run flag is set to false + while (_runFlag) { + try { + // :: Sleep a bit before we attempts to keep/acquire the lock + synchronized (_syncObjectWatchdog) { + log.debug("Thread MasterLock watchDog '" + MASTER_LOCK_NAME + + " with nodeName '" + Host.getLocalHostName() + "' " + + "is going to sleep for '" + WATCHDOG_SLEEP_LOOP_IN_MILLISECONDS + "' ms."); + _syncObjectWatchdog.wait(WATCHDOG_SLEEP_LOOP_IN_MILLISECONDS); + } + // Check that the _lastUpdated is not over the limit of 5 minutes. If it is we are not master anymore. + // ?: Are we currently master and have not been updated within the last 5 minutes? + if (_isMaster && !isActiveWithin()) { + // -> Yes, we are master and have not been updated within the last 5 minutes so we should update + // the _isMaster flag to false. + _isMaster = false; + log.warn("Thread MasterLock watchDog '" + MASTER_LOCK_NAME + + " with nodeName '" + Host.getLocalHostName() + "' " + + "is marked as not master but the last update where above the limit, removing master " + + "flag."); + _storebrandScheduleService.interruptRunningSchedules(); + } + } + catch (InterruptedException e) { + log.info("MasterLock on node '" + Host.getLocalHostName() + + " health check sleep where interrupted. Will loop and check run flag."); + } + } + log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "' health check" + + " with nodeName '" + Host.getLocalHostName() + "' " + + "asked to exit, shutting down!"); + _watchdogRunner = null; + } + void runner() { - SLEEP_LOOP: while (_runFlag) { + SLEEP_LOOP: + while (_runFlag) { try { // ?: is this an initial run? if (_isInitialRun) { @@ -218,30 +268,28 @@ void runner() { // nodes know who is the master node. if (_masterLockRepository.tryAcquireLock(MASTER_LOCK_NAME, Host.getLocalHostName())) { // -> Yes, we managed to acquire the lock - _isMaster = true; - _lastUpdated = Instant.now(_clock); + setMasterFlagAndUpdatedTime(); log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "', " + " with nodeName '" + Host.getLocalHostName() + "' " + "managed to acquire the lock during the initial run"); } - // Regardless if we managed to acquire the lock we have passed the initial run. + // Regardless if we managed to acquire the lock, we have passed the initial run. _isInitialRun = false; } // :: Sleep a bit before we attempts to keep/acquire the lock - synchronized (_syncObject) { + synchronized (_syncObjectMasterLock) { log.debug("Thread MasterLock '" + MASTER_LOCK_NAME + "' sleeping, " + " with nodeName '" + Host.getLocalHostName() + "' " + "is going to sleep for '" + MASTER_LOCK_SLEEP_LOOP_IN_MILLISECONDS + "' ms."); - _syncObject.wait(MASTER_LOCK_SLEEP_LOOP_IN_MILLISECONDS); + _syncObjectMasterLock.wait(MASTER_LOCK_SLEEP_LOOP_IN_MILLISECONDS); } // :: Try to keep the lock, this will only succeed if the lock is already acquired for this node // withing the last 5 minutes. if (_masterLockRepository.keepLock(MASTER_LOCK_NAME, Host.getLocalHostName())) { // -> Yes, we managed to keep the master lock, go to sleep - _isMaster = true; - _lastUpdated = Instant.now(_clock); + setMasterFlagAndUpdatedTime(); log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "', " + " with nodeName '" + Host.getLocalHostName() + "' " + "managed to keep the lock."); @@ -253,8 +301,7 @@ void runner() { // ?: Did we manage to acquire the lock. if (_masterLockRepository.tryAcquireLock(MASTER_LOCK_NAME, Host.getLocalHostName())) { // -> Yes, we managed to acquire the lock - _isMaster = true; - _lastUpdated = Instant.now(_clock); + setMasterFlagAndUpdatedTime(); _notMasterCounter.set(0); log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "', " + " with nodeName '" + Host.getLocalHostName() + "' " @@ -288,12 +335,22 @@ void runner() { } } // Exiting loop, so clear the runner thread and log that we are now shutting down. - _runner = null; + _masterLockRunner = null; log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "', " + " with nodeName '" + Host.getLocalHostName() + "' " + "asked to exit, shutting down!"); } + /** + * Sets the master flag to true and updates the last updated time to now. + */ + private void setMasterFlagAndUpdatedTime() { + // Setting the last updated time first, so we are sure that the time is set before the master flag is set. + // This due the watchDog can check the last updated time and forcefully set the master flag back to false. + _lastUpdated = Instant.now(_clock); + _isMaster = true; + } + /** * The system should self-heal in case the master lock is gone from the database. This should normally never * happen, but if someone deletes the row we need to recreate it. If the row is gone no nodes will be able to @@ -345,11 +402,11 @@ private void ensureMasterLockExists() { } /** - * We are only Master if both {@link #_isMaster} is true, and we were last updated less than + * Checks if we were last updated less than * {@link #MASTER_LOCK_MAX_TIME_SINCE_LAST_UPDATE_MINUTES} minutes ago. */ - public boolean isMaster() { - return _isMaster && _lastUpdated.isAfter( + public boolean isActiveWithin() { + return _lastUpdated.isAfter( Instant.now(_clock).minus(MASTER_LOCK_MAX_TIME_SINCE_LAST_UPDATE_MINUTES, ChronoUnit.MINUTES)); } @@ -360,16 +417,18 @@ Optional getMasterLock() { void start() { _runFlag = true; _isInitialRun = true; - _runner.start(); + _masterLockRunner.start(); + _watchdogRunner.start(); } void stop() { _runFlag = false; - synchronized (_syncObject) { - _syncObject.notifyAll(); + synchronized (_syncObjectMasterLock) { + _syncObjectMasterLock.notifyAll(); } try { - _runner.join(1000); + _masterLockRunner.join(1000); + _watchdogRunner.join(1000); } catch (InterruptedException e) { // Ignore interrupt here, we do best effort to wait for the runner thread to stop. @@ -574,8 +633,9 @@ public ScheduledTaskBuilder keepMaxNoopRuns(int maxNoopRuns) { } @Override - @SuppressFBWarnings(value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", justification = "cronExpressionParsed.next " - + "always uses Temporal LocalDateTime and is always known.") + @SuppressFBWarnings(value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", + justification = "cronExpressionParsed.next " + + "always uses Temporal LocalDateTime and is always known.") public ScheduledTask start() { // In the first insert we can calculate the next run directly since there is no override from db yet ScheduledTask scheduledTask = _schedules.compute(_scheduleName, (key, value) -> { diff --git a/scheduledtask-core/src/main/java/com/storebrand/scheduledtask/ScheduledTaskRunner.java b/scheduledtask-core/src/main/java/com/storebrand/scheduledtask/ScheduledTaskRunner.java index bec7c26..71fa97e 100644 --- a/scheduledtask-core/src/main/java/com/storebrand/scheduledtask/ScheduledTaskRunner.java +++ b/scheduledtask-core/src/main/java/com/storebrand/scheduledtask/ScheduledTaskRunner.java @@ -94,7 +94,7 @@ class ScheduledTaskRunner implements ScheduledTask { // If the scheduled task registry is in a special test mode then we don't start the thread runner, but instead // enable a special mode where we only run tasks by calling runNow(). _testMode = (_scheduledTaskRegistry instanceof ScheduledTaskRegistryImpl) && - ((ScheduledTaskRegistryImpl)_scheduledTaskRegistry).isTestMode(); + ((ScheduledTaskRegistryImpl) _scheduledTaskRegistry).isTestMode(); // ?: Are we in test mode? if (!_testMode) { @@ -150,7 +150,8 @@ else if (Instant.now(_clock).isBefore(_nextRun)) { + "' ms and wait for the next schedule run '" + _nextRun + "'."; // To avoid spamming the log too hard we only log to info every x sleep cycles. if (elidedSleepLogLines++ >= ELIDED_LOG_LINES_ON_MASTER_SLEEP) { - log.info(message + " NOTE: Elided this log line " + elidedSleepLogLines + " times."); + log.info( + message + " NOTE: Elided this log line " + elidedSleepLogLines + " times."); elidedSleepLogLines = 0; } else { @@ -413,8 +414,8 @@ public String getDefaultCronExpression() { } /** - * Set a new cronExpression to be used by the schedule. If this is set to null it will fall back to use the {@link - * #_defaultCronExpression} again. + * Set a new cronExpression to be used by the schedule. If this is set to null it will fall back to use the + * {@link #_defaultCronExpression} again. */ @Override public void setOverrideExpression(String newCronExpression) { @@ -525,8 +526,8 @@ public boolean hasPassedExpectedRunTime(Instant lastRunStarted) { // E-> We have started, so we should check if we have passed expected run time return Instant.now(_clock).isAfter( nextScheduledRun(getActiveCronExpressionInternal(), lastRunStarted) - // Two minute grace time - .plus(2, ChronoUnit.MINUTES)); + // Two minute grace time + .plus(2, ChronoUnit.MINUTES)); } /** @@ -622,6 +623,16 @@ void notifyThread() { } } + /** + * Interrupt running threads without writing to the ctx since we do not want to be affected by a potential deadlock. + */ + void interruptThread() { + if (_runner != null) { + log.info("Thread got intterupted, this could be due to shutdown or lost masterLock '" + getName() + "'"); + _runner.interrupt(); + } + } + /** * Make this tread completely stop */ @@ -629,10 +640,10 @@ void killSchedule() { // :? Are we currently running and do we have the masterLock? if (_isRunning && _scheduledTaskRegistry.hasMasterLock()) { // -> Yes, we are currently running and we have the masterLock, try to log that we are stopping - getLastScheduleRun().ifPresent(ctx -> ctx.failed("Aborted due to shutdown!")); + getLastScheduleRun().ifPresent(ctx -> ctx.log("Interrupting due to shutdown!")); } _runFlag = false; - + interruptThread(); // Then wake the tread, so it is aware that we have updated the runFlag: notifyThread(); } @@ -678,9 +689,8 @@ public void runNow() { // ===== Internal helper classes =================================================================================== /** - * Implementation of the {@link ScheduleRunContext}, it is indirectly used when a schedule is running and - * this run is appending {@link #log(String)}, {@link #dispatched(String)}, {@link #done(String)} - * or {@link #failed(String)} + * Implementation of the {@link ScheduleRunContext}, it is indirectly used when a schedule is running and this run + * is appending {@link #log(String)}, {@link #dispatched(String)}, {@link #done(String)} or {@link #failed(String)} */ private static class ScheduledTaskRunnerContext implements ScheduleRunContext { private final ScheduledTaskRunner _storebrandSchedule; @@ -700,7 +710,8 @@ private ScheduledTaskRunnerContext(long runId, ScheduledTaskRunner storebrandSch _clock = clock; _scheduledTaskRepository = scheduledTaskRepository; _hostname = hostname; - _scheduledRunDto = ScheduledRunDto.newWithStateStarted(_runId, storebrandSchedule.getName(), hostname, runStart); + _scheduledRunDto = + ScheduledRunDto.newWithStateStarted(_runId, storebrandSchedule.getName(), hostname, runStart); } /** @@ -858,7 +869,8 @@ public ScheduleStatus noop(String msg) { } /** - * Copied from Mats3, inspired from Stackoverflow - Denys Séguret. + * Copied from Mats3, inspired from Stackoverflow - Denys + * Séguret. * * @return a String showing where this code was invoked from, like "Test.java.123;com.example.Test;methodName()" */ @@ -882,8 +894,8 @@ private static String getInvocationPoint() { } /** - * NO-OP class only used to make sure the users are calling the {@link ScheduleRunContext#done(String)}, {@link - * ScheduleRunContext#failed(String)} or {@link ScheduleRunContext#dispatched(String)} + * NO-OP class only used to make sure the users are calling the {@link ScheduleRunContext#done(String)}, + * {@link ScheduleRunContext#failed(String)} or {@link ScheduleRunContext#dispatched(String)} */ static class ScheduleStatusValidResponse implements ScheduleStatus {