Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;

import com.storebrand.scheduledtask.ScheduledTaskRegistry.LogEntry;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.RunOnce;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.Schedule;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.ScheduleRunContext;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.ScheduleRunnable;
Expand Down Expand Up @@ -72,10 +73,22 @@ public interface ScheduledTask {
* master lock it will delay for a short amount of time depending on the implementation. The current default
* implementation will sleep for up to two minutes between checking for new tasks.
* <p>
* This will prepend a line to the logs informing this schedule run was manually started.
* This will prepend a line to the logs informing this schedule run was {@link RunOnce#PROGRAMMATIC} started.
*/
void runNow();


/**
* Sets a schedule to run immediately. Note it will first mark this schedule to run by setting a flag in the db,
* then wake up the scheduler thread so it will be triggered, assuming this is called on the node that has the
* master lock it will trigger nearly instantly. However, if this where triggered by a node that does not have the
* master lock it will delay for a short amount of time depending on the implementation. The current default
* implementation will sleep for up to two minutes between checking for new tasks.
* <p>
* This will prepend a line to the logs informing this schedule run was {@link RunOnce} started.
*/
void runNow(RunOnce runOnce);

/**
* Check if the schedule task thread is alive. This should in theory always be true, but if the thread has been
* stopped by some external means it will return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,29 @@ enum State {
DONE
}

/**
* Represents the source on what triggered the task to run outside the normal schedule.
*/
enum RunOnce {
/**
* The task where triggered by the monitor
*/
MONITOR,
/**
* The task was triggered programmatically by calling {@link ScheduledTask#runNow(RunOnce)}
*/
PROGRAMMATIC;

public static RunOnce fromString(String text) {
for (RunOnce s : RunOnce.values()) {
if (s.name().equalsIgnoreCase(text)) {
return s;
}
}
return null;
}
}

/**
* Interface that all tasks are required to implement. Contains a run method that should perform the actual task.
*/
Expand Down Expand Up @@ -269,6 +292,11 @@ interface Schedule {
*/
boolean isRunOnce();

/**
* Check what triggered the runOnce flagg
*/
Optional<RunOnce> getRunOnce();

/**
* If set informs that this schedule has a new cron expression that differs from the one defined in the code.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import static java.util.stream.Collectors.toMap;

import java.sql.Timestamp;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -449,12 +449,12 @@ void stop() {
public static class ScheduleImpl implements Schedule {
private final String scheduleName;
private final boolean active;
private final boolean runOnce;
private final RunOnce runOnce;
private final String overriddenCronExpression;
private final Instant nextRun;
private final Instant lastUpdated;

public ScheduleImpl(String scheduleName, boolean active, boolean runOnce, String cronExpression,
public ScheduleImpl(String scheduleName, boolean active, RunOnce runOnce, String cronExpression,
Instant nextRun, Instant lastUpdated) {
this.scheduleName = scheduleName;
this.active = active;
Expand All @@ -476,9 +476,13 @@ public boolean isActive() {

@Override
public boolean isRunOnce() {
return runOnce;
return getRunOnce().isPresent();
}

@Override
public Optional<RunOnce> getRunOnce() {
return Optional.ofNullable(runOnce);
}

@Override
public Optional<String> getOverriddenCronExpression() {
Expand Down Expand Up @@ -506,12 +510,12 @@ public static class LogEntryImpl implements LogEntry {
private final String _stackTrace;
private final LocalDateTime _logTime;

public LogEntryImpl(long logId, long runId, String message, String stackTrace, Timestamp logTime) {
public LogEntryImpl(long logId, long runId, String message, String stackTrace, Instant logTime) {
_logId = logId;
_runId = runId;
_message = message;
_stackTrace = stackTrace;
_logTime = logTime.toLocalDateTime();
_logTime = logTime.atZone(ZoneId.systemDefault()).toLocalDateTime();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.MDC;

import com.storebrand.scheduledtask.ScheduledTaskRegistry.LogEntry;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.RunOnce;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.Schedule;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.ScheduleRunContext;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.ScheduleRunnable;
Expand Down Expand Up @@ -80,7 +81,7 @@ class ScheduledTaskRunner implements ScheduledTask {
private volatile boolean _active = true;
private volatile boolean _runFlag = true;
private volatile boolean _isRunning = false;
private volatile boolean _runOnce = false;
private volatile Optional<RunOnce> _runOnce;
private volatile boolean _justStarted = true;

ScheduledTaskRunner(ScheduledTaskConfig config, ScheduleRunnable runnable,
Expand Down Expand Up @@ -134,11 +135,12 @@ void runner() {
updateStateFromSchedule();
_justStarted = false;
}
if (_runOnce) {
if (_runOnce.isPresent()) {
// We should run once, so we should not sleep but instead run the schedule now.
log.info("Thread for Task '" + getName()
+ "' with nodeName '" + Host.getLocalHostName() + "' "
+ " is master and set to run once (NOW) and then continue as set in "
+ " is master and set to run once (NOW) by '" + _runOnce.get()
+ "' and then continue as set in "
+ "schedule '" + getActiveCronExpressionInternal().toString() + "'.");
}
// E-> Have we passed the next run timestamp?
Expand Down Expand Up @@ -223,13 +225,14 @@ else if (Instant.now(_clock).isBefore(_nextRun)) {
updateStateFromSchedule();

// ?: Check if we should run now once regardless of when the schedule should actually run
if (_runOnce) {
if (_runOnce.isPresent()) {
// -> Yes, we should only run once and then continue on the normal schedule plans.
_scheduledTaskRepository.setRunOnce(getName(), false);
log.info("Thread for Task '" + getName()
+ "' with nodeName '" + Host.getLocalHostName() + "' "
+ " is set to run once (NOW) and then continue as set in "
+ " is set to run once (NOW) by '" + _runOnce.get()
+ "' and then continue as set in "
+ "schedule '" + getActiveCronExpressionInternal().toString() + "'.");
_scheduledTaskRepository.setRunOnce(getName(), null);
break SLEEP_LOOP;
}

Expand Down Expand Up @@ -324,7 +327,7 @@ private Optional<Schedule> updateStateFromSchedule() {

_active = scheduleFromDb.get().isActive();
_nextRun = scheduleFromDb.get().getNextRun();
_runOnce = scheduleFromDb.get().isRunOnce();
_runOnce = scheduleFromDb.get().getRunOnce();
return scheduleFromDb;
}

Expand Down Expand Up @@ -354,12 +357,12 @@ private void runTask() {
+ "' is beginning to do the run according "
+ "to the set schedule '" + getActiveCronExpressionInternal().toString()
+ "'. Setting next run to '" + nextRun + "'.");
// ?: Is this schedule manually triggered? Ie set to run once.
if (ScheduledTaskRunner.this._runOnce) {
// -> Yes, this where set to run once and is manually triggered. so add a log line.
// ?: Is this schedule manually or programmatically triggered? Ie set to run once.
if (ScheduledTaskRunner.this._runOnce.isPresent()) {
// -> Yes, this where set to run once and is manually/programmatically triggered. so add a log line.
// note, this is named runNow in the gui but runOnce in the db so we use the term used
// in gui for the logging.
ctx.log("Manually started");
ctx.log(_runOnce.get().name() + " started");
}

// :: Try to run the code that the user wants, log if it fails.
Expand Down Expand Up @@ -693,9 +696,16 @@ Instant nextScheduledRun(CronExpression cronExpression, Instant instant) {

/**
* Manual trigger the schedule to run now.
*
* Replaced with {@link #runNow(RunOnce)}
*/
@Override
public void runNow() {
runNow(RunOnce.PROGRAMMATIC);
}

@Override
public void runNow(RunOnce runOnce) {
// ?: Are we in special test mode?
if (_testMode) {
// -> Yes, then we just run the task, and return.
Expand All @@ -706,13 +716,12 @@ public void runNow() {
}

// Write to db we should run this schedule now regardless of the cronExpression
_scheduledTaskRepository.setRunOnce(getName(), true);
_scheduledTaskRepository.setRunOnce(getName(), runOnce);
// Then wake the tread so it is aware that we should run now once
synchronized (_syncObject) {
_syncObject.notifyAll();
}
}

// ===== Internal helper classes ===================================================================================

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean tryAcquireLock(String lockName, String nodeName) {
return false;
}
Instant now = _clock.instant();
// We should only allow to acquire the lock if the last_updated_time is older than 10 minutes.
// We should only allow acquiring the lock if the last_updated_time_utc is older than 10 minutes.
// Then it means it is up for grabs.
Instant lockShouldBeOlderThan = now.minus(10, ChronoUnit.MINUTES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.storebrand.scheduledtask.ScheduledTask.RetentionPolicy;
import com.storebrand.scheduledtask.ScheduledTaskConfig;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.LogEntry;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.RunOnce;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.Schedule;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.State;
import com.storebrand.scheduledtask.SpringCronUtils.CronExpression;
Expand Down Expand Up @@ -82,7 +83,7 @@ public int createSchedule(ScheduledTaskConfig config) {
Instant nextRunInstant = nextRunTime.atZone(ZoneId.systemDefault()).toInstant();

_scheduledTaskDefinitions.put(config.getName(), config);
InMemorySchedule schedule = new InMemorySchedule(config.getName(), null, true, false,
InMemorySchedule schedule = new InMemorySchedule(config.getName(), null, true, null,
nextRunInstant, _clock.instant());
_schedules.put(config.getName(), schedule);
return 1;
Expand Down Expand Up @@ -133,7 +134,7 @@ public int setActive(String scheduleName, boolean active) {
}

@Override
public int setRunOnce(String scheduleName, boolean runOnce) {
public int setRunOnce(String scheduleName, RunOnce runOnce) {
InMemorySchedule schedule = _schedules.get(scheduleName);
if (schedule == null) {
return 0;
Expand Down Expand Up @@ -333,11 +334,11 @@ private static final class InMemorySchedule implements Schedule {
private final String _name;
private final String _cronExpression;
private volatile boolean _active;
private volatile boolean _runOnce;
private volatile RunOnce _runOnce;
private volatile Instant _nextRun;
private volatile Instant _lastUpdated;

private InMemorySchedule(String name, String cronExpression, boolean active, boolean runOnce,
private InMemorySchedule(String name, String cronExpression, boolean active, RunOnce runOnce,
Instant nextRun, Instant lastUpdated) {
_name = name;
_cronExpression = cronExpression;
Expand All @@ -359,7 +360,12 @@ public boolean isActive() {

@Override
public boolean isRunOnce() {
return _runOnce;
return getRunOnce().isPresent();
}

@Override
public Optional<RunOnce> getRunOnce() {
return Optional.ofNullable(_runOnce);
}

@Override
Expand All @@ -382,7 +388,7 @@ InMemorySchedule active(boolean active) {
_lastUpdated);
}

InMemorySchedule runOnce(boolean runOnce) {
InMemorySchedule runOnce(RunOnce runOnce) {
return new InMemorySchedule(_name, _cronExpression, _active, runOnce, _nextRun,
_lastUpdated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public interface MasterLockRepository {
boolean releaseLock(String lockName, String nodeName);

/**
* Used for the running host to keep the lock for 5 more minutes. If the <b>lock_last_updated_time</b> is updated
* Used for the running host to keep the lock for 5 more minutes. If the <b>lock_last_updated_time_utc</b> is updated
* that means this host still has this master lock for another 5 minutes. After 5 minutes it means no-one has it
* until 10 minutes has passed. At that time it is up for grabs again.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import java.util.Map;
import java.util.Optional;

import com.storebrand.scheduledtask.ScheduledTask.RetentionPolicy;
import com.storebrand.scheduledtask.ScheduledTaskConfig;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.LogEntry;
import com.storebrand.scheduledtask.ScheduledTask.RetentionPolicy;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.RunOnce;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.Schedule;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.State;
import com.storebrand.scheduledtask.ScheduledTaskRegistryImpl;
Expand Down Expand Up @@ -80,7 +81,7 @@ public interface ScheduledTaskRepository {
*/
int setActive(String scheduleName, boolean active);

int setRunOnce(String scheduleName, boolean runOnce);
int setRunOnce(String scheduleName, RunOnce runOnce);

/**
* Get all schedules in the database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.storebrand.scheduledtask.ScheduledTaskConfig;
import com.storebrand.scheduledtask.ScheduledTaskRegistry;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.LogEntry;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.RunOnce;
import com.storebrand.scheduledtask.ScheduledTaskRegistry.Schedule;
import com.storebrand.scheduledtask.db.InMemoryMasterLockRepositoryTest.ClockMock;
import com.storebrand.scheduledtask.db.ScheduledTaskRepository.ScheduledRunDto;
Expand Down Expand Up @@ -279,7 +280,7 @@ public void setRunOnce() {
LocalDateTime updateTime = LocalDateTime.of(2021, 3, 3, 12, 12);
_clock.setFixedClock(updateTime);
Optional<Schedule> beforeSettingRunOnce = schedulerRep.getSchedule("test-schedule");
schedulerRep.setRunOnce("test-schedule", true);
schedulerRep.setRunOnce("test-schedule", RunOnce.PROGRAMMATIC);

// :: Assert
assertTrue(beforeSettingRunOnce.isPresent());
Expand All @@ -294,6 +295,8 @@ public void setRunOnce() {
// No cron expression should be set since we are not overriding the cron expression
assertNull(afterSetRunOnce.get().getOverriddenCronExpression().orElse(null));
assertEquals(insertTimeInstant, afterSetRunOnce.get().getLastUpdated());
assertFalse(beforeSettingRunOnce.get().getRunOnce().isPresent());
assertEquals(RunOnce.PROGRAMMATIC, afterSetRunOnce.get().getRunOnce().orElse(null));
}

@Test
Expand Down
1 change: 1 addition & 0 deletions scheduledtask-db-sql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {

// Placing logback-classic on testCompile path as we need some form of logging implementation.
//testImplementation "org.slf4j:slf4j-api:$slf4jVersion"
testImplementation "org.slf4j:slf4j-api:$slf4jVersion"
testRuntimeOnly "ch.qos.logback:logback-classic:$logbackClassicVersion"
}

Expand Down
Loading