Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -57,6 +58,11 @@ class ScheduledTaskRunner implements ScheduledTask {
private static final int ELIDED_LOG_LINES_ON_MASTER_SLEEP = 8;
private static String DEFAULT_LOGGER_NAME = ScheduledTask.class.getName();

private static final String MDC_TRACE_ID_KEY = "traceId";
private static final String MDC_TASK_RUN_ID_KEY = "scheduledTaskRunId";
private static final String MDC_TASK_NAME_KEY = "scheduledTaskName";
private static final String MDC_TASK_NAME_CLEAN_KEY = "scheduledTaskNameClean";

private final ScheduledTaskConfig _config;
private final ScheduledTaskRegistry _scheduledTaskRegistry;
private final ScheduledTaskRepository _scheduledTaskRepository;
Expand Down Expand Up @@ -328,47 +334,63 @@ private void runTask() {
// Update the next run time in the database
_scheduledTaskRepository.updateNextRun(getName());
Instant nextRun = nextScheduledRun(getActiveCronExpressionInternal(), Instant.now(_clock));
log.info("Thread for Task '" + getName()
+ "' is beginning to do the run according "
+ "to the set schedule '" + getActiveCronExpressionInternal().toString()
+ "'. Setting next run to '" + nextRun + "'.");
_currentRunStarted = Instant.now(_clock);
long id = _scheduledTaskRepository.addScheduleRun(getName(), Host.getLocalHostName(),
_currentRunStarted, "Schedule run starting.");
ScheduleRunContext ctx = new ScheduledTaskRunnerContext(id, ScheduledTaskRunner.this,
Host.getLocalHostName(), _scheduledTaskRepository, _clock, _currentRunStarted);


// ?: Is this schedule manually triggered? Ie set to run once.
if (ScheduledTaskRunner.this._runOnce) {
// -> Yes, this where set to run once and is manually triggered. so add a log line.
// note, this is named runNow in the gui but runOnce in the db so we use the term used
// in gui for the logging.
ctx.log("Manually started");
}

// :: Try to run the code that the user wants, log if it fails.
try {
ScheduleStatus status = _runnable.run(ctx);
if (!(status instanceof ScheduleStatusValidResponse)) {
throw new IllegalArgumentException("The ScheduleRunContext returned a invalid ScheduleStatus,"
+ " make sure you are using done(), failed() or dispatched()");
// :: Set MDC values for log lines.
MDC.put(MDC_TRACE_ID_KEY, "ScheduledTask[" + getNameCleanedAndStripped() + "]rid[" + id + "]"
+ generateRandomString());
MDC.put(MDC_TASK_RUN_ID_KEY, String.valueOf(id));
MDC.put(MDC_TASK_NAME_KEY, getName());
MDC.put(MDC_TASK_NAME_CLEAN_KEY, getNameCleanedAndStripped());

// Proceed with the run
log.info("Thread for Task '" + getName()
+ "' is beginning to do the run according "
+ "to the set schedule '" + getActiveCronExpressionInternal().toString()
+ "'. Setting next run to '" + nextRun + "'.");
// ?: Is this schedule manually triggered? Ie set to run once.
if (ScheduledTaskRunner.this._runOnce) {
// -> Yes, this where set to run once and is manually triggered. so add a log line.
// note, this is named runNow in the gui but runOnce in the db so we use the term used
// in gui for the logging.
ctx.log("Manually started");
}

// :: Try to run the code that the user wants, log if it fails.
try {
ScheduleStatus status = _runnable.run(ctx);
if (!(status instanceof ScheduleStatusValidResponse)) {
throw new IllegalArgumentException("The ScheduleRunContext returned a invalid ScheduleStatus,"
+ " make sure you are using done(), failed() or dispatched()");
}
}
catch (Throwable t) {
// Oh no, the schedule we set to run failed, we should log this and continue
// on the normal schedule loop
String message = "Schedule '" + getName() + " run failed due to an error.' ";
log.info(message);
ctx.failed(message, t);
}
_lastRunCompleted = Instant.now(_clock);

_isRunning = false;
log.info("Thread for Task '" + getName() + "' "
+ " runId '" + ctx.getRunId() + "' "
+ "used '" + Duration.between(_currentRunStarted, _lastRunCompleted).toMillis() + "' "
+ "ms to run.");
}
catch (Throwable t) {
// Oh no, the schedule we set to run failed, we should log this and continue
// on the normal schedule loop
String message = "Schedule '" + getName() + " run failed due to an error.' ";
log.info(message);
ctx.failed(message, t);
finally {
// :: Clear MDC values
MDC.remove(MDC_TRACE_ID_KEY);
MDC.remove(MDC_TASK_RUN_ID_KEY);
MDC.remove(MDC_TASK_NAME_KEY);
MDC.remove(MDC_TASK_NAME_CLEAN_KEY);
}
_lastRunCompleted = Instant.now(_clock);

_isRunning = false;
log.info("Thread for Task '" + getName() + "' "
+ " runId '" + ctx.getRunId() + "' "
+ "used '" + Duration.between(_currentRunStarted, _lastRunCompleted).toMillis() + "' "
+ "ms to run.");
}

/**
Expand Down Expand Up @@ -456,6 +478,11 @@ public String getName() {
return _config.getName();
}

public String getNameCleanedAndStripped() {
return _config.getName().replace(" ", "_")
.replace(":", "_").strip();
}

@Override
public Criticality getCriticality() {
return _config.getCriticality();
Expand Down Expand Up @@ -905,4 +932,18 @@ static String throwableToStackTraceString(Throwable throwable) {
throwable.printStackTrace(pw);
return sw.toString();
}

private static final String ALPHANUMERIC_CHARACTERS =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
/**
* Generate a random string, so our traceId will be unique (enough) for each run of the health check.
*/
private String generateRandomString() {
char[] randomChars = new char[7];
for (int i = 0; i < randomChars.length; i++) {
randomChars[i] = ALPHANUMERIC_CHARACTERS
.charAt(ThreadLocalRandom.current().nextInt(ALPHANUMERIC_CHARACTERS.length()));
}
return new String(randomChars);
}
}
Loading