diff --git a/src/main/java/com/nirmata/workflow/WorkflowManager.java b/src/main/java/com/nirmata/workflow/WorkflowManager.java index 2dde3e56..1dd83ed6 100644 --- a/src/main/java/com/nirmata/workflow/WorkflowManager.java +++ b/src/main/java/com/nirmata/workflow/WorkflowManager.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework; import java.io.Closeable; import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * Main API - create via {@link WorkflowManagerBuilder} @@ -140,4 +141,13 @@ public interface WorkflowManager extends Closeable * @return new WorkflowListenerManager */ WorkflowListenerManager newWorkflowListenerManager(); + + /** + * Close the workflow manager gracefully by allowing the in-progress tasks to continue till the timeout specified. + * This method is different from close method(Closeable) as close method will stop the in-progress tasks as well. + * + * @param timeOut is the maximum time allocated for the in-progress tasks to complete the execution. + * @param unit Timeunit for the timeOut quantity specified + */ + void closeGracefully(long timeOut, TimeUnit unit); } diff --git a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java index a6ce0c64..4d1f2b4d 100644 --- a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java +++ b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java @@ -58,6 +58,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -128,6 +129,15 @@ public WorkflowListenerManager newWorkflowListenerManager() return new WorkflowListenerManagerImpl(this); } + @Override + public void closeGracefully(long timeOut, TimeUnit unit) { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + CloseableUtils.closeQuietly(schedulerSelector); + consumers.forEach(consumer -> consumer.closeGraceFully(timeOut, unit)); + } + } + @Override public Map getTaskDetails(RunId runId) { diff --git a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java index 807f8376..2bfd9a38 100644 --- a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java +++ b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.nirmata.workflow.admin.WorkflowManagerState; import java.io.Closeable; +import java.util.concurrent.TimeUnit; public interface QueueConsumer extends Closeable { @@ -30,4 +31,11 @@ public interface QueueConsumer extends Closeable @VisibleForTesting void debugValidateClosed(); + + /** + * Allowing the executorService(SimpleQueue) to finish the previously submitted tasks before shutting down + * @param timeOut + * @param unit Timeunit for the timeOut quantity specified + */ + void closeGraceFully(long timeOut, TimeUnit unit); } diff --git a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java index 8bc500e4..3588d55e 100644 --- a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java +++ b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; import com.nirmata.workflow.admin.WorkflowManagerState; import com.nirmata.workflow.models.ExecutableTask; import com.nirmata.workflow.models.TaskMode; @@ -200,6 +201,21 @@ public void debugValidateClosed() Preconditions.checkState(executorService.isTerminated()); } + @Override + public void closeGraceFully(long timeOut, TimeUnit unit) { + if ( started.compareAndSet(true, false) ) + { + log.info("Blocks until all tasks have completed execution or the timeout occurs"); + try { + MoreExecutors.shutdownAndAwaitTermination(executorService, timeOut, unit); + log.info("Simple Queue executor service shutdown completed"); + } catch (RuntimeException e) { + log.error("Error while processing of in-progress tasks, possibly due to timeout while waiting", e); + Thread.currentThread().interrupt(); + } + } + } + public void start() { if ( started.compareAndSet(false, true) ) diff --git a/src/test/java/com/nirmata/workflow/TestNormal.java b/src/test/java/com/nirmata/workflow/TestNormal.java index 71ed133c..5a2290c5 100644 --- a/src/test/java/com/nirmata/workflow/TestNormal.java +++ b/src/test/java/com/nirmata/workflow/TestNormal.java @@ -24,6 +24,7 @@ import com.nirmata.workflow.admin.RunInfo; import com.nirmata.workflow.admin.StandardAutoCleaner; import com.nirmata.workflow.admin.TaskInfo; +import com.nirmata.workflow.admin.WorkflowManagerState; import com.nirmata.workflow.details.WorkflowManagerImpl; import com.nirmata.workflow.executor.TaskExecution; import com.nirmata.workflow.executor.TaskExecutionStatus; @@ -35,6 +36,7 @@ import com.nirmata.workflow.models.TaskId; import com.nirmata.workflow.models.TaskType; import com.nirmata.workflow.serialization.JsonSerializerMapper; +import org.apache.commons.lang.time.StopWatch; import org.apache.curator.utils.CloseableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -514,6 +516,59 @@ public void testMultiTypes() throws Exception closeWorkflow(workflowManager); } } + @Test + public void testWorkflowManagerGracefulClose() throws Exception + { + boolean shutdownDone = false; + BlockingQueue tasks = Queues.newLinkedBlockingQueue(); + TaskExecutor taskExecutor = (w, t) -> () -> { + tasks.add(t.getTaskId()); + StopWatch stopwatch = new StopWatch(); + stopwatch.start(); + try + { + Thread.sleep(5000); + tasks.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(); + } + return new TaskExecutionResult(TaskExecutionStatus.SUCCESS, "Completed"); + }; + TaskType taskType = new TaskType("test", "1", true); + WorkflowManager workflowManager = WorkflowManagerBuilder.builder() + .addingTaskExecutor(taskExecutor, 1, taskType) + .withCurator(curator, "test", "1") + .build(); + StopWatch stopwatch = new StopWatch(); + try + { + workflowManager.start(); + + TaskId taskId = new TaskId(); + RunId runId = workflowManager.submitTask(new Task(taskId, taskType)); + + stopwatch.start(); + timing.sleepABit(); + + List tasksPresent = workflowManager.getAdmin().getTaskInfo(runId); + Assert.assertTrue(tasksPresent.size() == 1); // checking via admin API whether task is present before the graceful shutdown request. + Assert.assertTrue(tasks.size() == 1); // checking whether task is yet to be completed before the graceful shutdown request. + closeWorkflowGracefully(workflowManager); // requesting a graceful shutdown while the task is in progress. + shutdownDone = true; + Assert.assertTrue(stopwatch.getTime() >= 5000); // checking if task took more than 5000 ms after the submission. + Assert.assertTrue(tasks.size() == 0); // checking if task is completed during graceful shutdown + Assert.assertTrue(workflowManager.getAdmin().getWorkflowManagerState().getExecutorsState().get(0) + == WorkflowManagerState.State.CLOSED); // checking if workflow manager is in closed state at the end. + } + finally + { + stopwatch.stop(); + if(!shutdownDone) closeWorkflow(workflowManager);//shutdown here if graceful shutdown was interrupted unexpectedly. + } + } private void closeWorkflow(WorkflowManager workflowManager) throws InterruptedException { @@ -521,4 +576,9 @@ private void closeWorkflow(WorkflowManager workflowManager) throws InterruptedEx timing.sleepABit(); ((WorkflowManagerImpl)workflowManager).debugValidateClosed(); } + + private void closeWorkflowGracefully(WorkflowManager workflowManager) throws InterruptedException + { + workflowManager.closeGracefully(20, TimeUnit.SECONDS); + } }