Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
Expand All @@ -41,7 +40,6 @@
import com.storebrand.scheduledtask.ScheduledTaskConfig.StaticRetentionPolicy;
import com.storebrand.scheduledtask.db.MasterLockRepository;
import com.storebrand.scheduledtask.db.ScheduledTaskRepository;
import com.storebrand.scheduledtask.SpringCronUtils.CronExpression;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand Down Expand Up @@ -143,7 +141,16 @@ public Optional<MasterLock> getMasterLock() {

@Override
public boolean hasMasterLock() {
return _masterLockKeeper.isMaster();
return _masterLockKeeper._isMaster;
}

/**
* This will trigger an interrupt signal on all the running threads.
*/
void interruptRunningSchedules() {
_schedules.entrySet().stream().forEach(entry -> {
entry.getValue().interruptThread();
});
}

/**
Expand All @@ -170,26 +177,28 @@ boolean isTestMode() {
// ===== MasterLock and Schedule ===================================================================================

/**
* Master lock is responsible for acquiring and keeping a lock, the node that has this lock will be responsible
* for running ALL Schedules.
* When a lock is first acquired this node has this for 5 minutes and in order to keep it this node has to the
* method {@link MasterLockRepository#keepLock(String, String)} within that timespan in order to keep it for a new
* 5 minutes. If this node does not manage to keep the lock within the 5 min timespan it means no nodes are master
* for the duration 5 - 10 min.
* After the lock has not been updated for 10 minutes it can be re-acquired again.
* Master lock is responsible for acquiring and keeping a lock, the node that has this lock will be responsible for
* running ALL Schedules. When a lock is first acquired this node has this for 5 minutes and in order to keep it
* this node has to the method {@link MasterLockRepository#keepLock(String, String)} within that timespan in order
* to keep it for a new 5 minutes. If this node does not manage to keep the lock within the 5 min timespan it means
* no nodes are master for the duration 5 - 10 min. After the lock has not been updated for 10 minutes it can be
* re-acquired again.
*/
static class MasterLockKeeper {
private static final long MASTER_LOCK_SLEEP_LOOP_IN_MILLISECONDS = 2 * 60 * 1000; // 2 minutes
private static final long WATCHDOG_SLEEP_LOOP_IN_MILLISECONDS = 15 * 1000; // 15 seconds
private static final long MASTER_LOCK_MAX_TIME_SINCE_LAST_UPDATE_MINUTES = 5;

private Thread _runner;
private Thread _masterLockRunner;
private Thread _watchdogRunner;
private final Clock _clock;
/* .. state vars .. */
private volatile boolean _isMaster = false;
private volatile Instant _lastUpdated = Instant.EPOCH;
private volatile boolean _runFlag = true;
private volatile boolean _isInitialRun = true;
private final Object _syncObject = new Object();
private final Object _syncObjectMasterLock = new Object();
private final Object _syncObjectWatchdog = new Object();
private final MasterLockRepository _masterLockRepository;
private final ScheduledTaskRegistryImpl _storebrandScheduleService;
private final AtomicInteger _notMasterCounter = new AtomicInteger();
Expand All @@ -201,14 +210,55 @@ static class MasterLockKeeper {
_storebrandScheduleService = storebrandScheduleService;
_clock = clock;
log.info("Starting MasterLock thread");
_runner = new Thread(MasterLockKeeper.this::runner, "MasterLock thread");
_masterLockRunner = new Thread(MasterLockKeeper.this::runner, "MasterLock thread");
_watchdogRunner = new Thread(MasterLockKeeper.this::watchDogThread, "MasterLock health check thread");

// Make sure that the master lock is created:
_masterLockRepository.tryCreateLock(MASTER_LOCK_NAME, Host.getLocalHostName());
}

// :: This will periodically check if the master lock is updated and if not try will try to update the _isMaster
// flag. Due to the main thread queries a database we can get a situation where this is in a state where
// the database is not accessible so the keepLock can slide over the valid time. This thread will ensure that
// the master lock is kept up to date.
private void watchDogThread() {
// :: We will loop until the run flag is set to false
while (_runFlag) {
try {
// :: Sleep a bit before we attempts to keep/acquire the lock
synchronized (_syncObjectWatchdog) {
log.debug("Thread MasterLock watchDog '" + MASTER_LOCK_NAME
+ " with nodeName '" + Host.getLocalHostName() + "' "
+ "is going to sleep for '" + WATCHDOG_SLEEP_LOOP_IN_MILLISECONDS + "' ms.");
_syncObjectWatchdog.wait(WATCHDOG_SLEEP_LOOP_IN_MILLISECONDS);
}
// Check that the _lastUpdated is not over the limit of 5 minutes. If it is we are not master anymore.
// ?: Are we currently master and have not been updated within the last 5 minutes?
if (_isMaster && !isActiveWithin()) {
// -> Yes, we are master and have not been updated within the last 5 minutes so we should update
// the _isMaster flag to false.
_isMaster = false;
log.warn("Thread MasterLock watchDog '" + MASTER_LOCK_NAME
+ " with nodeName '" + Host.getLocalHostName() + "' "
+ "is marked as not master but the last update where above the limit, removing master "
+ "flag.");
_storebrandScheduleService.interruptRunningSchedules();
}
}
catch (InterruptedException e) {
log.info("MasterLock on node '" + Host.getLocalHostName()
+ " health check sleep where interrupted. Will loop and check run flag.");
}
}
log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "' health check"
+ " with nodeName '" + Host.getLocalHostName() + "' "
+ "asked to exit, shutting down!");
_watchdogRunner = null;
}

void runner() {
SLEEP_LOOP: while (_runFlag) {
SLEEP_LOOP:
while (_runFlag) {
try {
// ?: is this an initial run?
if (_isInitialRun) {
Expand All @@ -218,30 +268,28 @@ void runner() {
// nodes know who is the master node.
if (_masterLockRepository.tryAcquireLock(MASTER_LOCK_NAME, Host.getLocalHostName())) {
// -> Yes, we managed to acquire the lock
_isMaster = true;
_lastUpdated = Instant.now(_clock);
setMasterFlagAndUpdatedTime();
log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "', "
+ " with nodeName '" + Host.getLocalHostName() + "' "
+ "managed to acquire the lock during the initial run");
}
// Regardless if we managed to acquire the lock we have passed the initial run.
// Regardless if we managed to acquire the lock, we have passed the initial run.
_isInitialRun = false;
}

// :: Sleep a bit before we attempts to keep/acquire the lock
synchronized (_syncObject) {
synchronized (_syncObjectMasterLock) {
log.debug("Thread MasterLock '" + MASTER_LOCK_NAME + "' sleeping, "
+ " with nodeName '" + Host.getLocalHostName() + "' "
+ "is going to sleep for '" + MASTER_LOCK_SLEEP_LOOP_IN_MILLISECONDS + "' ms.");
_syncObject.wait(MASTER_LOCK_SLEEP_LOOP_IN_MILLISECONDS);
_syncObjectMasterLock.wait(MASTER_LOCK_SLEEP_LOOP_IN_MILLISECONDS);
}

// :: Try to keep the lock, this will only succeed if the lock is already acquired for this node
// withing the last 5 minutes.
if (_masterLockRepository.keepLock(MASTER_LOCK_NAME, Host.getLocalHostName())) {
// -> Yes, we managed to keep the master lock, go to sleep
_isMaster = true;
_lastUpdated = Instant.now(_clock);
setMasterFlagAndUpdatedTime();
log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "', "
+ " with nodeName '" + Host.getLocalHostName() + "' "
+ "managed to keep the lock.");
Expand All @@ -253,8 +301,7 @@ void runner() {
// ?: Did we manage to acquire the lock.
if (_masterLockRepository.tryAcquireLock(MASTER_LOCK_NAME, Host.getLocalHostName())) {
// -> Yes, we managed to acquire the lock
_isMaster = true;
_lastUpdated = Instant.now(_clock);
setMasterFlagAndUpdatedTime();
_notMasterCounter.set(0);
log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "', "
+ " with nodeName '" + Host.getLocalHostName() + "' "
Expand Down Expand Up @@ -288,12 +335,22 @@ void runner() {
}
}
// Exiting loop, so clear the runner thread and log that we are now shutting down.
_runner = null;
_masterLockRunner = null;
log.info("Thread MasterLock '" + MASTER_LOCK_NAME + "', "
+ " with nodeName '" + Host.getLocalHostName() + "' "
+ "asked to exit, shutting down!");
}

/**
* Sets the master flag to true and updates the last updated time to now.
*/
private void setMasterFlagAndUpdatedTime() {
// Setting the last updated time first, so we are sure that the time is set before the master flag is set.
// This due the watchDog can check the last updated time and forcefully set the master flag back to false.
_lastUpdated = Instant.now(_clock);
_isMaster = true;
}

/**
* The system should self-heal in case the master lock is gone from the database. This should normally never
* happen, but if someone deletes the row we need to recreate it. If the row is gone no nodes will be able to
Expand Down Expand Up @@ -345,11 +402,11 @@ private void ensureMasterLockExists() {
}

/**
* We are only Master if both {@link #_isMaster} is true, and we were last updated less than
* Checks if we were last updated less than
* {@link #MASTER_LOCK_MAX_TIME_SINCE_LAST_UPDATE_MINUTES} minutes ago.
*/
public boolean isMaster() {
return _isMaster && _lastUpdated.isAfter(
public boolean isActiveWithin() {
return _lastUpdated.isAfter(
Instant.now(_clock).minus(MASTER_LOCK_MAX_TIME_SINCE_LAST_UPDATE_MINUTES, ChronoUnit.MINUTES));
}

Expand All @@ -360,16 +417,18 @@ Optional<MasterLock> getMasterLock() {
void start() {
_runFlag = true;
_isInitialRun = true;
_runner.start();
_masterLockRunner.start();
_watchdogRunner.start();
}

void stop() {
_runFlag = false;
synchronized (_syncObject) {
_syncObject.notifyAll();
synchronized (_syncObjectMasterLock) {
_syncObjectMasterLock.notifyAll();
}
try {
_runner.join(1000);
_masterLockRunner.join(1000);
_watchdogRunner.join(1000);
}
catch (InterruptedException e) {
// Ignore interrupt here, we do best effort to wait for the runner thread to stop.
Expand Down Expand Up @@ -574,8 +633,9 @@ public ScheduledTaskBuilder keepMaxNoopRuns(int maxNoopRuns) {
}

@Override
@SuppressFBWarnings(value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", justification = "cronExpressionParsed.next "
+ "always uses Temporal LocalDateTime and is always known.")
@SuppressFBWarnings(value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
justification = "cronExpressionParsed.next "
+ "always uses Temporal LocalDateTime and is always known.")
public ScheduledTask start() {
// In the first insert we can calculate the next run directly since there is no override from db yet
ScheduledTask scheduledTask = _schedules.compute(_scheduleName, (key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ScheduledTaskRunner implements ScheduledTask {
// If the scheduled task registry is in a special test mode then we don't start the thread runner, but instead
// enable a special mode where we only run tasks by calling runNow().
_testMode = (_scheduledTaskRegistry instanceof ScheduledTaskRegistryImpl) &&
((ScheduledTaskRegistryImpl)_scheduledTaskRegistry).isTestMode();
((ScheduledTaskRegistryImpl) _scheduledTaskRegistry).isTestMode();

// ?: Are we in test mode?
if (!_testMode) {
Expand Down Expand Up @@ -150,7 +150,8 @@ else if (Instant.now(_clock).isBefore(_nextRun)) {
+ "' ms and wait for the next schedule run '" + _nextRun + "'.";
// To avoid spamming the log too hard we only log to info every x sleep cycles.
if (elidedSleepLogLines++ >= ELIDED_LOG_LINES_ON_MASTER_SLEEP) {
log.info(message + " NOTE: Elided this log line " + elidedSleepLogLines + " times.");
log.info(
message + " NOTE: Elided this log line " + elidedSleepLogLines + " times.");
elidedSleepLogLines = 0;
}
else {
Expand Down Expand Up @@ -413,8 +414,8 @@ public String getDefaultCronExpression() {
}

/**
* Set a new cronExpression to be used by the schedule. If this is set to null it will fall back to use the {@link
* #_defaultCronExpression} again.
* Set a new cronExpression to be used by the schedule. If this is set to null it will fall back to use the
* {@link #_defaultCronExpression} again.
*/
@Override
public void setOverrideExpression(String newCronExpression) {
Expand Down Expand Up @@ -525,8 +526,8 @@ public boolean hasPassedExpectedRunTime(Instant lastRunStarted) {
// E-> We have started, so we should check if we have passed expected run time
return Instant.now(_clock).isAfter(
nextScheduledRun(getActiveCronExpressionInternal(), lastRunStarted)
// Two minute grace time
.plus(2, ChronoUnit.MINUTES));
// Two minute grace time
.plus(2, ChronoUnit.MINUTES));
}

/**
Expand Down Expand Up @@ -622,17 +623,27 @@ void notifyThread() {
}
}

/**
* Interrupt running threads without writing to the ctx since we do not want to be affected by a potential deadlock.
*/
void interruptThread() {
if (_runner != null) {
log.info("Thread got intterupted, this could be due to shutdown or lost masterLock '" + getName() + "'");
_runner.interrupt();
}
}

/**
* Make this tread completely stop
*/
void killSchedule() {
// :? Are we currently running and do we have the masterLock?
if (_isRunning && _scheduledTaskRegistry.hasMasterLock()) {
// -> Yes, we are currently running and we have the masterLock, try to log that we are stopping
getLastScheduleRun().ifPresent(ctx -> ctx.failed("Aborted due to shutdown!"));
getLastScheduleRun().ifPresent(ctx -> ctx.log("Interrupting due to shutdown!"));
}
_runFlag = false;

interruptThread();
// Then wake the tread, so it is aware that we have updated the runFlag:
notifyThread();
}
Expand Down Expand Up @@ -678,9 +689,8 @@ public void runNow() {
// ===== Internal helper classes ===================================================================================

/**
* Implementation of the {@link ScheduleRunContext}, it is indirectly used when a schedule is running and
* this run is appending {@link #log(String)}, {@link #dispatched(String)}, {@link #done(String)}
* or {@link #failed(String)}
* Implementation of the {@link ScheduleRunContext}, it is indirectly used when a schedule is running and this run
* is appending {@link #log(String)}, {@link #dispatched(String)}, {@link #done(String)} or {@link #failed(String)}
*/
private static class ScheduledTaskRunnerContext implements ScheduleRunContext {
private final ScheduledTaskRunner _storebrandSchedule;
Expand All @@ -700,7 +710,8 @@ private ScheduledTaskRunnerContext(long runId, ScheduledTaskRunner storebrandSch
_clock = clock;
_scheduledTaskRepository = scheduledTaskRepository;
_hostname = hostname;
_scheduledRunDto = ScheduledRunDto.newWithStateStarted(_runId, storebrandSchedule.getName(), hostname, runStart);
_scheduledRunDto =
ScheduledRunDto.newWithStateStarted(_runId, storebrandSchedule.getName(), hostname, runStart);
}

/**
Expand Down Expand Up @@ -858,7 +869,8 @@ public ScheduleStatus noop(String msg) {
}

/**
* Copied from Mats3, inspired from <a href="https://stackoverflow.com/a/11306854">Stackoverflow - Denys Séguret</a>.
* Copied from Mats3, inspired from <a href="https://stackoverflow.com/a/11306854">Stackoverflow - Denys
* Séguret</a>.
*
* @return a String showing where this code was invoked from, like "Test.java.123;com.example.Test;methodName()"
*/
Expand All @@ -882,8 +894,8 @@ private static String getInvocationPoint() {
}

/**
* NO-OP class only used to make sure the users are calling the {@link ScheduleRunContext#done(String)}, {@link
* ScheduleRunContext#failed(String)} or {@link ScheduleRunContext#dispatched(String)}
* NO-OP class only used to make sure the users are calling the {@link ScheduleRunContext#done(String)},
* {@link ScheduleRunContext#failed(String)} or {@link ScheduleRunContext#dispatched(String)}
*/
static class ScheduleStatusValidResponse implements ScheduleStatus {

Expand Down
Loading