From 35ab44dcd4f2f33dae990d491a2b0e5473b702d0 Mon Sep 17 00:00:00 2001 From: Andrei Tomashpolskiy Date: Wed, 25 Jan 2017 13:28:11 +0300 Subject: [PATCH 1/4] Provide info on currently running jobs #21 --- .../io/bootique/job/runnable/JobFuture.java | 50 ++++++++++++++++++- .../job/scheduler/DefaultScheduler.java | 38 +++++++++----- 2 files changed, 75 insertions(+), 13 deletions(-) diff --git a/src/main/java/io/bootique/job/runnable/JobFuture.java b/src/main/java/io/bootique/job/runnable/JobFuture.java index b226933d..5deed8fc 100644 --- a/src/main/java/io/bootique/job/runnable/JobFuture.java +++ b/src/main/java/io/bootique/job/runnable/JobFuture.java @@ -1,5 +1,6 @@ package io.bootique.job.runnable; +import java.util.Objects; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; @@ -9,10 +10,24 @@ public class JobFuture implements ScheduledFuture { + /** + * @since 0.13 + */ + public static Builder forJob(String job) { + return new Builder(job); + } + + private String job; + private RunnableJob runnable; private ScheduledFuture delegate; private Supplier resultSupplier; - public JobFuture(ScheduledFuture delegate, Supplier resultSupplier) { + public JobFuture(String job, + RunnableJob runnable, + ScheduledFuture delegate, + Supplier resultSupplier) { + this.job = job; + this.runnable = runnable; this.delegate = delegate; this.resultSupplier = resultSupplier; } @@ -65,4 +80,37 @@ public JobResult get(long timeout, TimeUnit unit) { return resultSupplier.get(); } + public static class Builder { + + private String job; + private RunnableJob runnable; + private ScheduledFuture future; + private Supplier resultSupplier; + + public Builder(String job) { + this.job = Objects.requireNonNull(job); + } + + public Builder runnable(RunnableJob runnable) { + this.runnable = runnable; + return this; + } + + public Builder future(ScheduledFuture future) { + this.future = future; + return this; + } + + public Builder resultSupplier(Supplier resultSupplier) { + this.resultSupplier = resultSupplier; + return this; + } + + public JobFuture build() { + Objects.requireNonNull(runnable); + Objects.requireNonNull(future); + Objects.requireNonNull(resultSupplier); + return new JobFuture(job, runnable, future, resultSupplier); + } + } } diff --git a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java index aa1d05ef..6b126d18 100644 --- a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java +++ b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java @@ -2,11 +2,11 @@ import io.bootique.job.Job; import io.bootique.job.JobMetadata; +import io.bootique.job.JobRegistry; import io.bootique.job.runnable.JobFuture; import io.bootique.job.runnable.JobResult; import io.bootique.job.runnable.RunnableJob; import io.bootique.job.runnable.RunnableJobFactory; -import io.bootique.job.JobRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.TaskScheduler; @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; import java.util.stream.Collectors; public class DefaultScheduler implements Scheduler { @@ -85,7 +86,7 @@ public JobFuture runOnce(String jobName, Map parameters) { Job job = jobOptional.get(); return runOnce(job, parameters); } else { - return invalidJobNameResult(jobName); + return invalidJobNameResult(jobName, parameters); } } @@ -94,9 +95,12 @@ private Optional findJobByName(String jobName) { return (job == null) ? Optional.empty() : Optional.of(job); } - private JobFuture invalidJobNameResult(String jobName) { - return new JobFuture(new ExpiredFuture(), - () -> JobResult.failure(JobMetadata.build(jobName), "Invalid job name: " + jobName)); + private JobFuture invalidJobNameResult(String jobName, Map parameters) { + return JobFuture.forJob(jobName) + .future(new ExpiredFuture()) + .runnable(() -> JobResult.unknown(JobMetadata.build(jobName))) + .resultSupplier(() -> JobResult.failure(JobMetadata.build(jobName), "Invalid job name: " + jobName)) + .build(); } @Override @@ -106,16 +110,26 @@ public JobFuture runOnce(Job job) { @Override public JobFuture runOnce(Job job, Map parameters) { - RunnableJob rj = runnableJobFactory.runnable(job, parameters); - JobResult[] result = new JobResult[1]; - ScheduledFuture jobFuture = taskScheduler.schedule(() -> result[0] = rj.run(), new Date()); - return new JobFuture(jobFuture, () -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata())); + return submit(job, parameters, + (rj, result) -> taskScheduler.schedule(() -> result[0] = rj.run(), new Date())); } - public ScheduledFuture schedule(Job job, Map parameters, Trigger trigger) { + private ScheduledFuture schedule(Job job, Map parameters, Trigger trigger) { + return submit(job, parameters, + (rj, result) -> taskScheduler.schedule(() -> result[0] = rj.run(), trigger)); + } + + private JobFuture submit(Job job, Map parameters, + BiFunction> executor) { + RunnableJob rj = runnableJobFactory.runnable(job, parameters); JobResult[] result = new JobResult[1]; - ScheduledFuture jobFuture = taskScheduler.schedule(() -> result[0] = rj.run(), trigger); - return new JobFuture(jobFuture, () -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata())); + ScheduledFuture jobFuture = executor.apply(rj, result); + + return JobFuture.forJob(job.getMetadata().getName()) + .future(jobFuture) + .runnable(rj) + .resultSupplier(() -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata())) + .build(); } } From c96e599c46eda11fbd2b401c654c7acd79f9471d Mon Sep 17 00:00:00 2001 From: Andrei Tomashpolskiy Date: Wed, 25 Jan 2017 15:15:43 +0300 Subject: [PATCH 2/4] Provide info on currently running jobs #21 --- .../java/io/bootique/job/JobListener.java | 11 ++ .../runnable/SimpleRunnableJobFactory.java | 81 +++++++++- .../io/bootique/job/runtime/JobModule.java | 16 +- .../job/scheduler/SchedulerFactory.java | 7 +- .../job/SimpleRunnableJobFactoryTest.java | 152 ++++++++++++++++++ 5 files changed, 259 insertions(+), 8 deletions(-) create mode 100644 src/main/java/io/bootique/job/JobListener.java create mode 100644 src/test/java/io/bootique/job/SimpleRunnableJobFactoryTest.java diff --git a/src/main/java/io/bootique/job/JobListener.java b/src/main/java/io/bootique/job/JobListener.java new file mode 100644 index 00000000..ff8e35b3 --- /dev/null +++ b/src/main/java/io/bootique/job/JobListener.java @@ -0,0 +1,11 @@ +package io.bootique.job; + +import io.bootique.job.runnable.JobResult; + +import java.util.Map; +import java.util.function.Consumer; + +public interface JobListener { + + void onJobStarted(String jobName, Map parameters, Consumer> callback); +} diff --git a/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java b/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java index 8026ebef..c9e1a757 100644 --- a/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java +++ b/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java @@ -1,13 +1,86 @@ package io.bootique.job.runnable; import io.bootique.job.Job; +import io.bootique.job.JobListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; public class SimpleRunnableJobFactory implements RunnableJobFactory { - @Override - public RunnableJob runnable(Job job, Map parameters) { - return () -> job.run(parameters); - } + private static final Logger LOGGER = LoggerFactory.getLogger(SimpleRunnableJobFactory.class); + + private Set listeners; + + public SimpleRunnableJobFactory() { + this.listeners = Collections.emptySet(); + } + + public SimpleRunnableJobFactory(Set listeners) { + this.listeners = listeners; + } + + @Override + public RunnableJob runnable(Job delegate, Map parameters) { + if (listeners.isEmpty()) { + return () -> delegate.run(parameters); + } + + String jobName = delegate.getMetadata().getName(); + return () -> { + Callback callback = new Callback(jobName); + listeners.forEach(listener -> { + try { + listener.onJobStarted(jobName, parameters, callback); + } catch (Exception e) { + LOGGER.error("Error invoking job listener for job: " + jobName, e); + } + }); + JobResult result; + try { + result = delegate.run(parameters); + } catch (Exception e) { + callback.invoke(JobResult.failure(delegate.getMetadata(), e)); + throw e; + } + callback.invoke(result); + return result; + }; + } + + private static class Callback implements Consumer> { + + private static final Logger LOGGER = LoggerFactory.getLogger(Callback.class); + + private String jobName; + private List> callbacks; + + public Callback(String jobName) { + this.jobName = jobName; + } + + @Override + public void accept(Consumer callback) { + if (callbacks == null) { + callbacks = new ArrayList<>(); + } + callbacks.add(callback); + } + + public void invoke(JobResult result) { + callbacks.forEach(cb -> { + try { + cb.accept(result); + } catch (Exception e) { + LOGGER.error("Error invoking completion callback for job: " + jobName, e); + } + }); + } + } } diff --git a/src/main/java/io/bootique/job/runtime/JobModule.java b/src/main/java/io/bootique/job/runtime/JobModule.java index 13ddfe50..281d31aa 100644 --- a/src/main/java/io/bootique/job/runtime/JobModule.java +++ b/src/main/java/io/bootique/job/runtime/JobModule.java @@ -10,6 +10,7 @@ import io.bootique.command.Command; import io.bootique.config.ConfigurationFactory; import io.bootique.job.Job; +import io.bootique.job.JobListener; import io.bootique.job.command.ExecCommand; import io.bootique.job.command.ListCommand; import io.bootique.job.command.ScheduleCommand; @@ -45,6 +46,14 @@ public static Multibinder contributeJobs(Binder binder) { return Multibinder.newSetBinder(binder, Job.class); } + /** + * @param binder DI binder passed to the Module that invokes this method. + * @return a {@link Multibinder} for contributed job lifecycle listeners + */ + public static Multibinder contributeListeners(Binder binder) { + return Multibinder.newSetBinder(binder, JobListener.class); + } + public JobModule() { } @@ -77,6 +86,8 @@ public void configure(Binder binder) { Multibinder jobBinder = JobModule.contributeJobs(binder); jobTypes.forEach(jt -> jobBinder.addBinding().to(jt).in(Singleton.class)); + JobModule.contributeListeners(binder); + MapBinder lockHandlers = MapBinder.newMapBinder(binder, LockType.class, LockHandler.class); lockHandlers.addBinding(LockType.local).to(LocalLockHandler.class); @@ -84,10 +95,11 @@ public void configure(Binder binder) { } @Provides - protected Scheduler createScheduler(Map jobRunners, + protected Scheduler createScheduler(Set jobListeners, + Map jobRunners, JobRegistry jobRegistry, ConfigurationFactory configFactory) { - return configFactory.config(SchedulerFactory.class, configPrefix).createScheduler(jobRunners, jobRegistry); + return configFactory.config(SchedulerFactory.class, configPrefix).createScheduler(jobListeners, jobRunners, jobRegistry); } @Provides diff --git a/src/main/java/io/bootique/job/scheduler/SchedulerFactory.java b/src/main/java/io/bootique/job/scheduler/SchedulerFactory.java index 54a300a1..bd852a37 100644 --- a/src/main/java/io/bootique/job/scheduler/SchedulerFactory.java +++ b/src/main/java/io/bootique/job/scheduler/SchedulerFactory.java @@ -2,6 +2,7 @@ import io.bootique.annotation.BQConfig; import io.bootique.annotation.BQConfigProperty; +import io.bootique.job.JobListener; import io.bootique.job.lock.LockHandler; import io.bootique.job.lock.LockType; import io.bootique.job.runnable.ErrorHandlingRunnableJobFactory; @@ -15,6 +16,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Map; +import java.util.Set; /** * A configuration object that is used to setup jobs runtime. @@ -31,7 +33,8 @@ public SchedulerFactory() { this.threadPoolSize = 4; } - public Scheduler createScheduler(Map lockHandlers, + public Scheduler createScheduler(Set jobListeners, + Map lockHandlers, JobRegistry jobRegistry) { TaskScheduler taskScheduler = createTaskScheduler(); @@ -43,7 +46,7 @@ public Scheduler createScheduler(Map lockHandlers, throw new IllegalStateException("No LockHandler for lock type: " + lockType); } - RunnableJobFactory rf1 = new SimpleRunnableJobFactory(); + RunnableJobFactory rf1 = new SimpleRunnableJobFactory(jobListeners); RunnableJobFactory rf2 = new LockAwareRunnableJobFactory(rf1, lockHandler); RunnableJobFactory rf3 = new ErrorHandlingRunnableJobFactory(rf2); diff --git a/src/test/java/io/bootique/job/SimpleRunnableJobFactoryTest.java b/src/test/java/io/bootique/job/SimpleRunnableJobFactoryTest.java new file mode 100644 index 00000000..d3748a8b --- /dev/null +++ b/src/test/java/io/bootique/job/SimpleRunnableJobFactoryTest.java @@ -0,0 +1,152 @@ +package io.bootique.job; + +import io.bootique.job.runnable.JobOutcome; +import io.bootique.job.runnable.JobResult; +import io.bootique.job.runnable.RunnableJob; +import io.bootique.job.runnable.RunnableJobFactory; +import io.bootique.job.runnable.SimpleRunnableJobFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; + +public class SimpleRunnableJobFactoryTest { + + private RunnableJobFactory rjf; + private JobStats jobStats; + + private ExecutorService executor; + + @Before + public void before() { + this.jobStats = new JobStats(); + this.rjf = new SimpleRunnableJobFactory(Collections.singleton(jobStats)); + this.executor = Executors.newFixedThreadPool(50); + } + + @After + public void after() { + this.executor.shutdownNow(); + } + + @Test + public void testListeners() throws InterruptedException { + int jobCount = 1000; + CountDownLatch latch = new CountDownLatch(jobCount); + + List jobs = new ArrayList<>(); + for (int i = 0; i < jobCount; i++) { + Job job = (i % 2 == 0) ? TestJob.withOutcome("goodjob", JobOutcome.SUCCESS) : TestJob.failing("badjob"); + RunnableJob rj = rjf.runnable(job, Collections.emptyMap()); + jobs.add(() -> { + try { + rj.run(); + } catch (Throwable e) { + // ignore + } finally { + latch.countDown(); + } + }); + } + Collections.shuffle(jobs); + + jobs.forEach(executor::execute); + latch.await(); + + jobStats.assertHasResults("goodjob", Collections.singletonMap(JobOutcome.SUCCESS, 500)); + jobStats.assertHasResults("badjob", Collections.singletonMap(JobOutcome.FAILURE, 500)); + } + + private static class TestJob extends BaseJob { + + public static TestJob withOutcome(String name, JobOutcome outcome) { + return new TestJob(name, outcome); + } + + public static TestJob failing(String name) { + return new TestJob(name); + } + + private JobOutcome outcome; + private boolean shouldFail; + + private TestJob(String name, JobOutcome outcome) { + super(JobMetadata.build(name)); + this.outcome = outcome; + } + + private TestJob(String name) { + super(JobMetadata.build(name)); + this.shouldFail = true; + } + + @Override + public JobResult run(Map parameters) { + try { + Thread.sleep(ThreadLocalRandom.current().nextLong(50)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (shouldFail) { + throw new RuntimeException(); + } else { + return new JobResult(getMetadata(), outcome, null, null); + } + } + } + + private static class JobStats implements JobListener { + + private Map> allResults; + + public JobStats() { + this.allResults = new ConcurrentHashMap<>(); + } + + @Override + public void onJobStarted(String jobName, Map parameters, Consumer> callback) { + callback.accept(result -> getResults(jobName).add(result)); + } + + public Set getResults(String jobName) { + Set results = allResults.get(jobName); + if (results == null) { + results = ConcurrentHashMap.newKeySet(); + Set existing = allResults.putIfAbsent(jobName, results); + if (existing != null) { + results = existing; + } + } + return results; + } + + public void assertHasResults(String jobName, Map outcomes) { + Set results = allResults.get(jobName); + if (results == null) { + throw new IllegalArgumentException("Unknown job: " + jobName); + } + + Map realOutcomes = results.stream() + .collect(HashMap::new, (m, r) -> { + Integer count = m.get(r.getOutcome()); + m.put(r.getOutcome(), (count == null ? 1 : ++count)); + }, Map::putAll); + + assertEquals(realOutcomes, outcomes); + } + } +} From dd7629eaf295de5e76d093ecc86070859d495192 Mon Sep 17 00:00:00 2001 From: Andrei Tomashpolskiy Date: Wed, 25 Jan 2017 16:24:04 +0300 Subject: [PATCH 3/4] Provide info on currently running jobs #21 --- .../runnable/SimpleRunnableJobFactory.java | 75 +--------------- .../io/bootique/job/runtime/JobModule.java | 8 +- .../job/scheduler/SchedulerFactory.java | 6 +- .../job/scheduler/execution/Callback.java | 69 +++++++++++++++ .../execution/DefaultJobRegistry.java | 12 ++- .../job/scheduler/execution/JobGroup.java | 87 +++++++++++++------ .../job/scheduler/execution/SingleJob.java | 8 +- .../execution/CallbackTest.java} | 11 ++- 8 files changed, 159 insertions(+), 117 deletions(-) create mode 100644 src/main/java/io/bootique/job/scheduler/execution/Callback.java rename src/test/java/io/bootique/job/{SimpleRunnableJobFactoryTest.java => scheduler/execution/CallbackTest.java} (93%) diff --git a/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java b/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java index c9e1a757..0bb87643 100644 --- a/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java +++ b/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java @@ -1,86 +1,13 @@ package io.bootique.job.runnable; import io.bootique.job.Job; -import io.bootique.job.JobListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.function.Consumer; public class SimpleRunnableJobFactory implements RunnableJobFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(SimpleRunnableJobFactory.class); - - private Set listeners; - - public SimpleRunnableJobFactory() { - this.listeners = Collections.emptySet(); - } - - public SimpleRunnableJobFactory(Set listeners) { - this.listeners = listeners; - } - @Override public RunnableJob runnable(Job delegate, Map parameters) { - if (listeners.isEmpty()) { - return () -> delegate.run(parameters); - } - - String jobName = delegate.getMetadata().getName(); - return () -> { - Callback callback = new Callback(jobName); - listeners.forEach(listener -> { - try { - listener.onJobStarted(jobName, parameters, callback); - } catch (Exception e) { - LOGGER.error("Error invoking job listener for job: " + jobName, e); - } - }); - JobResult result; - try { - result = delegate.run(parameters); - } catch (Exception e) { - callback.invoke(JobResult.failure(delegate.getMetadata(), e)); - throw e; - } - callback.invoke(result); - return result; - }; - } - - private static class Callback implements Consumer> { - - private static final Logger LOGGER = LoggerFactory.getLogger(Callback.class); - - private String jobName; - private List> callbacks; - - public Callback(String jobName) { - this.jobName = jobName; - } - - @Override - public void accept(Consumer callback) { - if (callbacks == null) { - callbacks = new ArrayList<>(); - } - callbacks.add(callback); - } - - public void invoke(JobResult result) { - callbacks.forEach(cb -> { - try { - cb.accept(result); - } catch (Exception e) { - LOGGER.error("Error invoking completion callback for job: " + jobName, e); - } - }); - } + return () -> delegate.run(parameters); } } diff --git a/src/main/java/io/bootique/job/runtime/JobModule.java b/src/main/java/io/bootique/job/runtime/JobModule.java index 281d31aa..ad7bdfe7 100644 --- a/src/main/java/io/bootique/job/runtime/JobModule.java +++ b/src/main/java/io/bootique/job/runtime/JobModule.java @@ -95,19 +95,19 @@ public void configure(Binder binder) { } @Provides - protected Scheduler createScheduler(Set jobListeners, - Map jobRunners, + protected Scheduler createScheduler(Map jobRunners, JobRegistry jobRegistry, ConfigurationFactory configFactory) { - return configFactory.config(SchedulerFactory.class, configPrefix).createScheduler(jobListeners, jobRunners, jobRegistry); + return configFactory.config(SchedulerFactory.class, configPrefix).createScheduler(jobRunners, jobRegistry); } @Provides @Singleton protected JobRegistry createJobRegistry(Set jobs, + Set jobListeners, Scheduler scheduler, ConfigurationFactory configFactory) { - return new DefaultJobRegistry(jobs, collectJobDefinitions(jobs, configFactory), scheduler); + return new DefaultJobRegistry(jobs, collectJobDefinitions(jobs, configFactory), scheduler, jobListeners); } private Map collectJobDefinitions(Set jobs, ConfigurationFactory configFactory) { diff --git a/src/main/java/io/bootique/job/scheduler/SchedulerFactory.java b/src/main/java/io/bootique/job/scheduler/SchedulerFactory.java index bd852a37..d90a7ae8 100644 --- a/src/main/java/io/bootique/job/scheduler/SchedulerFactory.java +++ b/src/main/java/io/bootique/job/scheduler/SchedulerFactory.java @@ -2,7 +2,6 @@ import io.bootique.annotation.BQConfig; import io.bootique.annotation.BQConfigProperty; -import io.bootique.job.JobListener; import io.bootique.job.lock.LockHandler; import io.bootique.job.lock.LockType; import io.bootique.job.runnable.ErrorHandlingRunnableJobFactory; @@ -33,8 +32,7 @@ public SchedulerFactory() { this.threadPoolSize = 4; } - public Scheduler createScheduler(Set jobListeners, - Map lockHandlers, + public Scheduler createScheduler(Map lockHandlers, JobRegistry jobRegistry) { TaskScheduler taskScheduler = createTaskScheduler(); @@ -46,7 +44,7 @@ public Scheduler createScheduler(Set jobListeners, throw new IllegalStateException("No LockHandler for lock type: " + lockType); } - RunnableJobFactory rf1 = new SimpleRunnableJobFactory(jobListeners); + RunnableJobFactory rf1 = new SimpleRunnableJobFactory(); RunnableJobFactory rf2 = new LockAwareRunnableJobFactory(rf1, lockHandler); RunnableJobFactory rf3 = new ErrorHandlingRunnableJobFactory(rf2); diff --git a/src/main/java/io/bootique/job/scheduler/execution/Callback.java b/src/main/java/io/bootique/job/scheduler/execution/Callback.java new file mode 100644 index 00000000..958e055b --- /dev/null +++ b/src/main/java/io/bootique/job/scheduler/execution/Callback.java @@ -0,0 +1,69 @@ +package io.bootique.job.scheduler.execution; + +import io.bootique.job.Job; +import io.bootique.job.JobListener; +import io.bootique.job.runnable.JobResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +class Callback implements Consumer> { + + private static final Logger LOGGER = LoggerFactory.getLogger(Callback.class); + + static JobResult runAndNotify(Job job, Map parameters, Set listeners) { + if (listeners.isEmpty()) { + return job.run(parameters); + } + + String jobName = job.getMetadata().getName(); + + Callback callback = new Callback(jobName); + listeners.forEach(listener -> { + try { + listener.onJobStarted(jobName, parameters, callback); + } catch (Exception e) { + LOGGER.error("Error invoking job listener for job: " + jobName, e); + } + }); + JobResult result; + try { + result = job.run(parameters); + } catch (Exception e) { + callback.invoke(JobResult.failure(job.getMetadata(), e)); + throw e; + } + callback.invoke(result); + return result; + } + + private String jobName; + private List> callbacks; + + public Callback(String jobName) { + this.jobName = jobName; + } + + @Override + public void accept(Consumer callback) { + if (callbacks == null) { + callbacks = new ArrayList<>(); + } + callbacks.add(callback); + } + + public void invoke(JobResult result) { + callbacks.forEach(cb -> { + try { + cb.accept(result); + } catch (Exception e) { + LOGGER.error("Error invoking completion callback for job: " + jobName, e); + } + }); + } +} diff --git a/src/main/java/io/bootique/job/scheduler/execution/DefaultJobRegistry.java b/src/main/java/io/bootique/job/scheduler/execution/DefaultJobRegistry.java index 8bae7fff..055405b2 100644 --- a/src/main/java/io/bootique/job/scheduler/execution/DefaultJobRegistry.java +++ b/src/main/java/io/bootique/job/scheduler/execution/DefaultJobRegistry.java @@ -1,6 +1,7 @@ package io.bootique.job.scheduler.execution; import io.bootique.job.Job; +import io.bootique.job.JobListener; import io.bootique.job.JobMetadata; import io.bootique.job.JobRegistry; import io.bootique.job.config.JobDefinition; @@ -44,13 +45,18 @@ public class DefaultJobRegistry implements JobRegistry { private ConcurrentMap executions; private Scheduler scheduler; + private Set listeners; - public DefaultJobRegistry(Collection jobs, Map jobDefinitions, Scheduler scheduler) { + public DefaultJobRegistry(Collection jobs, + Map jobDefinitions, + Scheduler scheduler, + Set listeners) { this.availableJobs = Collections.unmodifiableSet(collectJobNames(jobs, jobDefinitions)); this.jobs = mapJobs(jobs); this.jobDefinitions = jobDefinitions; this.executions = new ConcurrentHashMap<>((int)(jobDefinitions.size() / 0.75d) + 1); this.scheduler = scheduler; + this.listeners = listeners; } private Set collectJobNames(Collection jobs, Map jobDefinitions) { @@ -85,9 +91,9 @@ public JobResult run(Map parameters) { return job.run(parameters); } }; - execution = new SingleJob(delegate, graph.topSort().get(0).iterator().next()); + execution = new SingleJob(delegate, graph.topSort().get(0).iterator().next(), listeners); } else { - execution = new JobGroup(jobName, executionJobs, graph, scheduler); + execution = new JobGroup(jobName, executionJobs, graph, scheduler, listeners); } Job existing = executions.putIfAbsent(jobName, execution); diff --git a/src/main/java/io/bootique/job/scheduler/execution/JobGroup.java b/src/main/java/io/bootique/job/scheduler/execution/JobGroup.java index 4a5a518d..b2a3fc14 100644 --- a/src/main/java/io/bootique/job/scheduler/execution/JobGroup.java +++ b/src/main/java/io/bootique/job/scheduler/execution/JobGroup.java @@ -1,6 +1,7 @@ package io.bootique.job.scheduler.execution; import io.bootique.job.Job; +import io.bootique.job.JobListener; import io.bootique.job.JobMetadata; import io.bootique.job.runnable.JobFuture; import io.bootique.job.runnable.JobOutcome; @@ -16,57 +17,91 @@ import java.util.Map; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; class JobGroup implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(JobGroup.class); + private volatile Job delegate; + private Supplier delegateSupplier; + private final Object lock; + private String name; - private Map jobs; + private Collection jobs; private DependencyGraph graph; private Scheduler scheduler; + private Set listeners; - public JobGroup(String name, Collection jobs, DependencyGraph graph, Scheduler scheduler) { + public JobGroup(String name, Collection jobs, DependencyGraph graph, Scheduler scheduler, Set listeners) { this.name = name; - this.jobs = mapJobs(jobs); + this.jobs = jobs; + this.delegateSupplier = this::buildDelegate; + this.lock = new Object(); + this.graph = graph; this.scheduler = scheduler; + this.listeners = listeners; + } + + private Job buildDelegate() { + Map jobMap = mapJobs(jobs); + JobMetadata.Builder builder = JobMetadata.builder(name); + for (Job job : jobMap.values()) { + job.getMetadata().getParameters().forEach(builder::param); + } + JobMetadata metadata = builder.build(); + + return new Job() { + @Override + public JobMetadata getMetadata() { + return metadata; + } + + @Override + public JobResult run(Map parameters) { + traverseExecution(jobExecutions -> { + Set results = execute(jobExecutions, jobMap); + results.forEach(result -> { + if (result.getOutcome() != JobOutcome.SUCCESS) { + String message = "Failed to execute job: " + result.getMetadata().getName(); + if (result.getMessage() != null) { + message += ". Reason: " + result.getMessage(); + } + throw new RuntimeException(message, result.getThrowable()); + } + }); + }); + return JobResult.success(getMetadata()); + } + }; } private Map mapJobs(Collection jobs) { return jobs.stream().collect(Collectors.toMap(job -> job.getMetadata().getName(), job -> job)); } + private Job getDelegate() { + if (delegate == null) { + synchronized (lock) { + if (delegate == null) { + delegate = delegateSupplier.get(); + } + } + } + return delegate; + } + @Override public JobMetadata getMetadata() { - JobMetadata.Builder builder = JobMetadata.builder(name); - for (Job job : jobs.values()) { - job.getMetadata().getParameters().forEach(builder::param); - } - return builder.build(); + return getDelegate().getMetadata(); } @Override public JobResult run(Map params) { // TODO: merge execution params into individual jobs' params - try { - traverseExecution(jobExecutions -> { - Set results = execute(jobExecutions); - results.forEach(result -> { - if (result.getOutcome() != JobOutcome.SUCCESS) { - String message = "Failed to execute job: " + result.getMetadata().getName(); - if (result.getMessage() != null) { - message += ". Reason: " + result.getMessage(); - } - throw new RuntimeException(message, result.getThrowable()); - } - }); - }); - return JobResult.success(getMetadata()); - } catch (Exception e) { - return JobResult.failure(getMetadata(), e); - } + return Callback.runAndNotify(getDelegate(), params, listeners); } private void traverseExecution(Consumer> visitor) { @@ -75,7 +110,7 @@ private void traverseExecution(Consumer> visitor) { executions.forEach(visitor::accept); } - private Set execute(Set jobExecutions) { + private Set execute(Set jobExecutions, Map jobs) { if (jobExecutions.isEmpty()) { JobResult.failure(getMetadata(), "No jobs"); } diff --git a/src/main/java/io/bootique/job/scheduler/execution/SingleJob.java b/src/main/java/io/bootique/job/scheduler/execution/SingleJob.java index df9d456d..13aca66f 100644 --- a/src/main/java/io/bootique/job/scheduler/execution/SingleJob.java +++ b/src/main/java/io/bootique/job/scheduler/execution/SingleJob.java @@ -1,6 +1,7 @@ package io.bootique.job.scheduler.execution; import io.bootique.job.Job; +import io.bootique.job.JobListener; import io.bootique.job.JobMetadata; import io.bootique.job.runnable.JobResult; import org.slf4j.Logger; @@ -8,6 +9,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; class SingleJob implements Job { @@ -15,10 +17,12 @@ class SingleJob implements Job { private Job delegate; private JobExecution execution; + private Set listeners; - SingleJob(Job delegate, JobExecution execution) { + SingleJob(Job delegate, JobExecution execution, Set listeners) { this.delegate = delegate; this.execution = execution; + this.listeners = listeners; } @Override @@ -31,7 +35,7 @@ public JobResult run(Map parameters) { Map mergedParams = mergeParams(parameters, execution.getParams()); LOGGER.info(String.format("job '%s' started with params %s", getMetadata().getName(), mergedParams)); try { - return delegate.run(mergedParams); + return Callback.runAndNotify(delegate, mergedParams, listeners); } finally { LOGGER.info(String.format("job '%s' finished", getMetadata().getName())); } diff --git a/src/test/java/io/bootique/job/SimpleRunnableJobFactoryTest.java b/src/test/java/io/bootique/job/scheduler/execution/CallbackTest.java similarity index 93% rename from src/test/java/io/bootique/job/SimpleRunnableJobFactoryTest.java rename to src/test/java/io/bootique/job/scheduler/execution/CallbackTest.java index d3748a8b..45587e19 100644 --- a/src/test/java/io/bootique/job/SimpleRunnableJobFactoryTest.java +++ b/src/test/java/io/bootique/job/scheduler/execution/CallbackTest.java @@ -1,10 +1,13 @@ -package io.bootique.job; +package io.bootique.job.scheduler.execution; +import io.bootique.job.BaseJob; +import io.bootique.job.Job; +import io.bootique.job.JobListener; +import io.bootique.job.JobMetadata; import io.bootique.job.runnable.JobOutcome; import io.bootique.job.runnable.JobResult; import io.bootique.job.runnable.RunnableJob; import io.bootique.job.runnable.RunnableJobFactory; -import io.bootique.job.runnable.SimpleRunnableJobFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -24,7 +27,7 @@ import static org.junit.Assert.assertEquals; -public class SimpleRunnableJobFactoryTest { +public class CallbackTest { private RunnableJobFactory rjf; private JobStats jobStats; @@ -34,7 +37,7 @@ public class SimpleRunnableJobFactoryTest { @Before public void before() { this.jobStats = new JobStats(); - this.rjf = new SimpleRunnableJobFactory(Collections.singleton(jobStats)); + this.rjf = (job, parameters) -> () -> Callback.runAndNotify(job, parameters, Collections.singleton(jobStats)); this.executor = Executors.newFixedThreadPool(50); } From 6866b82c4d6efa973e1a20081948c0be2e3f833a Mon Sep 17 00:00:00 2001 From: Andrei Tomashpolskiy Date: Thu, 26 Jan 2017 11:22:53 +0300 Subject: [PATCH 4/4] Provide info on currently running jobs #21 --- .../java/io/bootique/job/scheduler/execution/Callback.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/io/bootique/job/scheduler/execution/Callback.java b/src/main/java/io/bootique/job/scheduler/execution/Callback.java index 958e055b..661973a2 100644 --- a/src/main/java/io/bootique/job/scheduler/execution/Callback.java +++ b/src/main/java/io/bootique/job/scheduler/execution/Callback.java @@ -38,6 +38,9 @@ static JobResult runAndNotify(Job job, Map parameters, Set