Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/java/io/bootique/job/JobListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.bootique.job;

import io.bootique.job.runnable.JobResult;

import java.util.Map;
import java.util.function.Consumer;

public interface JobListener {

void onJobStarted(String jobName, Map<String, Object> parameters, Consumer<Consumer<JobResult>> callback);
}
50 changes: 49 additions & 1 deletion src/main/java/io/bootique/job/runnable/JobFuture.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.bootique.job.runnable;

import java.util.Objects;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -9,10 +10,24 @@

public class JobFuture implements ScheduledFuture<JobResult> {

/**
* @since 0.13
*/
public static Builder forJob(String job) {
return new Builder(job);
}

private String job;
private RunnableJob runnable;
private ScheduledFuture<?> delegate;
private Supplier<JobResult> resultSupplier;

public JobFuture(ScheduledFuture<?> delegate, Supplier<JobResult> resultSupplier) {
public JobFuture(String job,
RunnableJob runnable,
ScheduledFuture<?> delegate,
Supplier<JobResult> resultSupplier) {
this.job = job;
this.runnable = runnable;
this.delegate = delegate;
this.resultSupplier = resultSupplier;
}
Expand Down Expand Up @@ -65,4 +80,37 @@ public JobResult get(long timeout, TimeUnit unit) {
return resultSupplier.get();
}

public static class Builder {

private String job;
private RunnableJob runnable;
private ScheduledFuture<?> future;
private Supplier<JobResult> resultSupplier;

public Builder(String job) {
this.job = Objects.requireNonNull(job);
}

public Builder runnable(RunnableJob runnable) {
this.runnable = runnable;
return this;
}

public Builder future(ScheduledFuture<?> future) {
this.future = future;
return this;
}

public Builder resultSupplier(Supplier<JobResult> resultSupplier) {
this.resultSupplier = resultSupplier;
return this;
}

public JobFuture build() {
Objects.requireNonNull(runnable);
Objects.requireNonNull(future);
Objects.requireNonNull(resultSupplier);
return new JobFuture(job, runnable, future, resultSupplier);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

public class SimpleRunnableJobFactory implements RunnableJobFactory {

@Override
public RunnableJob runnable(Job job, Map<String, Object> parameters) {
return () -> job.run(parameters);
}
@Override
public RunnableJob runnable(Job delegate, Map<String, Object> parameters) {
return () -> delegate.run(parameters);
}
}
14 changes: 13 additions & 1 deletion src/main/java/io/bootique/job/runtime/JobModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.bootique.command.Command;
import io.bootique.config.ConfigurationFactory;
import io.bootique.job.Job;
import io.bootique.job.JobListener;
import io.bootique.job.command.ExecCommand;
import io.bootique.job.command.ListCommand;
import io.bootique.job.command.ScheduleCommand;
Expand Down Expand Up @@ -45,6 +46,14 @@ public static Multibinder<Job> contributeJobs(Binder binder) {
return Multibinder.newSetBinder(binder, Job.class);
}

/**
* @param binder DI binder passed to the Module that invokes this method.
* @return a {@link Multibinder} for contributed job lifecycle listeners
*/
public static Multibinder<JobListener> contributeListeners(Binder binder) {
return Multibinder.newSetBinder(binder, JobListener.class);
}

public JobModule() {
}

Expand Down Expand Up @@ -77,6 +86,8 @@ public void configure(Binder binder) {
Multibinder<Job> jobBinder = JobModule.contributeJobs(binder);
jobTypes.forEach(jt -> jobBinder.addBinding().to(jt).in(Singleton.class));

JobModule.contributeListeners(binder);

MapBinder<LockType, LockHandler> lockHandlers = MapBinder.newMapBinder(binder, LockType.class,
LockHandler.class);
lockHandlers.addBinding(LockType.local).to(LocalLockHandler.class);
Expand All @@ -93,9 +104,10 @@ protected Scheduler createScheduler(Map<LockType, LockHandler> jobRunners,
@Provides
@Singleton
protected JobRegistry createJobRegistry(Set<Job> jobs,
Set<JobListener> jobListeners,
Scheduler scheduler,
ConfigurationFactory configFactory) {
return new DefaultJobRegistry(jobs, collectJobDefinitions(jobs, configFactory), scheduler);
return new DefaultJobRegistry(jobs, collectJobDefinitions(jobs, configFactory), scheduler, jobListeners);
}

private Map<String, JobDefinition> collectJobDefinitions(Set<Job> jobs, ConfigurationFactory configFactory) {
Expand Down
38 changes: 26 additions & 12 deletions src/main/java/io/bootique/job/scheduler/DefaultScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import io.bootique.job.Job;
import io.bootique.job.JobMetadata;
import io.bootique.job.JobRegistry;
import io.bootique.job.runnable.JobFuture;
import io.bootique.job.runnable.JobResult;
import io.bootique.job.runnable.RunnableJob;
import io.bootique.job.runnable.RunnableJobFactory;
import io.bootique.job.JobRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class DefaultScheduler implements Scheduler {
Expand Down Expand Up @@ -85,7 +86,7 @@ public JobFuture runOnce(String jobName, Map<String, Object> parameters) {
Job job = jobOptional.get();
return runOnce(job, parameters);
} else {
return invalidJobNameResult(jobName);
return invalidJobNameResult(jobName, parameters);
}
}

Expand All @@ -94,9 +95,12 @@ private Optional<Job> findJobByName(String jobName) {
return (job == null) ? Optional.empty() : Optional.of(job);
}

private JobFuture invalidJobNameResult(String jobName) {
return new JobFuture(new ExpiredFuture(),
() -> JobResult.failure(JobMetadata.build(jobName), "Invalid job name: " + jobName));
private JobFuture invalidJobNameResult(String jobName, Map<String, Object> parameters) {
return JobFuture.forJob(jobName)
.future(new ExpiredFuture())
.runnable(() -> JobResult.unknown(JobMetadata.build(jobName)))
.resultSupplier(() -> JobResult.failure(JobMetadata.build(jobName), "Invalid job name: " + jobName))
.build();
}

@Override
Expand All @@ -106,16 +110,26 @@ public JobFuture runOnce(Job job) {

@Override
public JobFuture runOnce(Job job, Map<String, Object> parameters) {
RunnableJob rj = runnableJobFactory.runnable(job, parameters);
JobResult[] result = new JobResult[1];
ScheduledFuture<?> jobFuture = taskScheduler.schedule(() -> result[0] = rj.run(), new Date());
return new JobFuture(jobFuture, () -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata()));
return submit(job, parameters,
(rj, result) -> taskScheduler.schedule(() -> result[0] = rj.run(), new Date()));
}

public ScheduledFuture<?> schedule(Job job, Map<String, Object> parameters, Trigger trigger) {
private ScheduledFuture<?> schedule(Job job, Map<String, Object> parameters, Trigger trigger) {
return submit(job, parameters,
(rj, result) -> taskScheduler.schedule(() -> result[0] = rj.run(), trigger));
}

private JobFuture submit(Job job, Map<String, Object> parameters,
BiFunction<RunnableJob, JobResult[], ScheduledFuture<?>> executor) {

RunnableJob rj = runnableJobFactory.runnable(job, parameters);
JobResult[] result = new JobResult[1];
ScheduledFuture<?> jobFuture = taskScheduler.schedule(() -> result[0] = rj.run(), trigger);
return new JobFuture(jobFuture, () -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata()));
ScheduledFuture<?> jobFuture = executor.apply(rj, result);

return JobFuture.forJob(job.getMetadata().getName())
.future(jobFuture)
.runnable(rj)
.resultSupplier(() -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;

/**
* A configuration object that is used to setup jobs runtime.
Expand Down
72 changes: 72 additions & 0 deletions src/main/java/io/bootique/job/scheduler/execution/Callback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.bootique.job.scheduler.execution;

import io.bootique.job.Job;
import io.bootique.job.JobListener;
import io.bootique.job.runnable.JobResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

class Callback implements Consumer<Consumer<JobResult>> {

private static final Logger LOGGER = LoggerFactory.getLogger(Callback.class);

static JobResult runAndNotify(Job job, Map<String, Object> parameters, Set<JobListener> listeners) {
if (listeners.isEmpty()) {
return job.run(parameters);
}

String jobName = job.getMetadata().getName();

Callback callback = new Callback(jobName);
listeners.forEach(listener -> {
try {
listener.onJobStarted(jobName, parameters, callback);
} catch (Exception e) {
LOGGER.error("Error invoking job listener for job: " + jobName, e);
}
});
JobResult result;
try {
result = job.run(parameters);
} catch (Exception e) {
callback.invoke(JobResult.failure(job.getMetadata(), e));
throw e;
}
if (result == null) {
result = JobResult.unknown(job.getMetadata());
}
callback.invoke(result);
return result;
}

private String jobName;
private List<Consumer<JobResult>> callbacks;

public Callback(String jobName) {
this.jobName = jobName;
}

@Override
public void accept(Consumer<JobResult> callback) {
if (callbacks == null) {
callbacks = new ArrayList<>();
}
callbacks.add(callback);
}

public void invoke(JobResult result) {
callbacks.forEach(cb -> {
try {
cb.accept(result);
} catch (Exception e) {
LOGGER.error("Error invoking completion callback for job: " + jobName, e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.bootique.job.scheduler.execution;

import io.bootique.job.Job;
import io.bootique.job.JobListener;
import io.bootique.job.JobMetadata;
import io.bootique.job.JobRegistry;
import io.bootique.job.config.JobDefinition;
Expand Down Expand Up @@ -44,13 +45,18 @@ public class DefaultJobRegistry implements JobRegistry {
private ConcurrentMap<String, Job> executions;

private Scheduler scheduler;
private Set<JobListener> listeners;

public DefaultJobRegistry(Collection<Job> jobs, Map<String, JobDefinition> jobDefinitions, Scheduler scheduler) {
public DefaultJobRegistry(Collection<Job> jobs,
Map<String, JobDefinition> jobDefinitions,
Scheduler scheduler,
Set<JobListener> listeners) {
this.availableJobs = Collections.unmodifiableSet(collectJobNames(jobs, jobDefinitions));
this.jobs = mapJobs(jobs);
this.jobDefinitions = jobDefinitions;
this.executions = new ConcurrentHashMap<>((int)(jobDefinitions.size() / 0.75d) + 1);
this.scheduler = scheduler;
this.listeners = listeners;
}

private Set<String> collectJobNames(Collection<Job> jobs, Map<String, JobDefinition> jobDefinitions) {
Expand Down Expand Up @@ -85,9 +91,9 @@ public JobResult run(Map<String, Object> parameters) {
return job.run(parameters);
}
};
execution = new SingleJob(delegate, graph.topSort().get(0).iterator().next());
execution = new SingleJob(delegate, graph.topSort().get(0).iterator().next(), listeners);
} else {
execution = new JobGroup(jobName, executionJobs, graph, scheduler);
execution = new JobGroup(jobName, executionJobs, graph, scheduler, listeners);
}

Job existing = executions.putIfAbsent(jobName, execution);
Expand Down
Loading