From 31c8c5dbf49cb356fcf374753bc01a1281c3715f Mon Sep 17 00:00:00 2001 From: msur Date: Thu, 28 Sep 2023 10:46:49 +0530 Subject: [PATCH 1/4] Closing SimpleQueue and WorkflowManager gracefully --- .../java/com/nirmata/workflow/WorkflowManager.java | 9 +++++++++ .../workflow/details/WorkflowManagerImpl.java | 9 +++++++++ .../com/nirmata/workflow/queue/QueueConsumer.java | 6 ++++++ .../workflow/queue/zookeeper/SimpleQueue.java | 14 ++++++++++++++ 4 files changed, 38 insertions(+) diff --git a/src/main/java/com/nirmata/workflow/WorkflowManager.java b/src/main/java/com/nirmata/workflow/WorkflowManager.java index 2dde3e56..c5a4ee14 100644 --- a/src/main/java/com/nirmata/workflow/WorkflowManager.java +++ b/src/main/java/com/nirmata/workflow/WorkflowManager.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework; import java.io.Closeable; import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * Main API - create via {@link WorkflowManagerBuilder} @@ -140,4 +141,12 @@ public interface WorkflowManager extends Closeable * @return new WorkflowListenerManager */ WorkflowListenerManager newWorkflowListenerManager(); + + /** + * Close the workflow manager gracefully by allowing the in-progress tasks to continue till the timeout specified. + * This method is different from close method(Closeable) as close method will stop the in-progress tasks as well. + * + * @param timeOut is the maximum time(in seconds) allocated for the in-progress tasks to complete the execution. + */ + void closeGracefully(long timeOut); } diff --git a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java index a6ce0c64..a99637cc 100644 --- a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java +++ b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java @@ -128,6 +128,15 @@ public WorkflowListenerManager newWorkflowListenerManager() return new WorkflowListenerManagerImpl(this); } + @Override + public void closeGracefully(long timeOut) { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + CloseableUtils.closeQuietly(schedulerSelector); + consumers.forEach(consumer -> consumer.closeGraceFully(timeOut)); + } + } + @Override public Map getTaskDetails(RunId runId) { diff --git a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java index 807f8376..a92551e1 100644 --- a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java +++ b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java @@ -30,4 +30,10 @@ public interface QueueConsumer extends Closeable @VisibleForTesting void debugValidateClosed(); + + /** + * Allowing the executorService(SimpleQueue) to finish the previously submitted tasks before shutting down + * @param timeOut (in seconds) + */ + void closeGraceFully(long timeOut); } diff --git a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java index 8bc500e4..400cab44 100644 --- a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java +++ b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java @@ -200,6 +200,20 @@ public void debugValidateClosed() Preconditions.checkState(executorService.isTerminated()); } + @Override + public void closeGraceFully(long timeOut) { + if ( started.compareAndSet(true, false) ) + { + executorService.shutdown(); + try { + executorService.awaitTermination(timeOut, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Error while processing of in-progress tasks, possibly due to timeout while waiting", e); + Thread.currentThread().interrupt(); + } + } + } + public void start() { if ( started.compareAndSet(false, true) ) From 5c81a9f910ca9dc723eab56f6fa750554fd871af Mon Sep 17 00:00:00 2001 From: msur Date: Thu, 5 Oct 2023 09:32:05 +0530 Subject: [PATCH 2/4] Closing SimpleQueue and WorkflowManager gracefully --- .../workflow/queue/zookeeper/SimpleQueue.java | 2 + .../java/com/nirmata/workflow/TestNormal.java | 60 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java index 400cab44..07313e7e 100644 --- a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java +++ b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java @@ -206,7 +206,9 @@ public void closeGraceFully(long timeOut) { { executorService.shutdown(); try { + log.info("Blocks until all tasks have completed execution or the timeout occurs"); executorService.awaitTermination(timeOut, TimeUnit.SECONDS); + log.info("Simple Queue executor service shutdown completed"); } catch (InterruptedException e) { log.error("Error while processing of in-progress tasks, possibly due to timeout while waiting", e); Thread.currentThread().interrupt(); diff --git a/src/test/java/com/nirmata/workflow/TestNormal.java b/src/test/java/com/nirmata/workflow/TestNormal.java index 71ed133c..7c3ef8c9 100644 --- a/src/test/java/com/nirmata/workflow/TestNormal.java +++ b/src/test/java/com/nirmata/workflow/TestNormal.java @@ -24,6 +24,7 @@ import com.nirmata.workflow.admin.RunInfo; import com.nirmata.workflow.admin.StandardAutoCleaner; import com.nirmata.workflow.admin.TaskInfo; +import com.nirmata.workflow.admin.WorkflowManagerState; import com.nirmata.workflow.details.WorkflowManagerImpl; import com.nirmata.workflow.executor.TaskExecution; import com.nirmata.workflow.executor.TaskExecutionStatus; @@ -35,6 +36,7 @@ import com.nirmata.workflow.models.TaskId; import com.nirmata.workflow.models.TaskType; import com.nirmata.workflow.serialization.JsonSerializerMapper; +import org.apache.commons.lang.time.StopWatch; import org.apache.curator.utils.CloseableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -514,6 +516,59 @@ public void testMultiTypes() throws Exception closeWorkflow(workflowManager); } } + @Test + public void testWorkflowManagerGracefulClose() throws Exception + { + boolean shutdownDone = false; + BlockingQueue tasks = Queues.newLinkedBlockingQueue(); + TaskExecutor taskExecutor = (w, t) -> () -> { + tasks.add(t.getTaskId()); + StopWatch stopwatch = new StopWatch(); + stopwatch.start(); + try + { + Thread.sleep(5000); + tasks.poll(timing.milliseconds(), TimeUnit.MILLISECONDS); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(); + } + return new TaskExecutionResult(TaskExecutionStatus.SUCCESS, "Completed"); + }; + TaskType taskType = new TaskType("test", "1", true); + WorkflowManager workflowManager = WorkflowManagerBuilder.builder() + .addingTaskExecutor(taskExecutor, 1, taskType) + .withCurator(curator, "test", "1") + .build(); + StopWatch stopwatch = new StopWatch(); + try + { + workflowManager.start(); + + TaskId taskId = new TaskId(); + RunId runId = workflowManager.submitTask(new Task(taskId, taskType)); + + stopwatch.start(); + timing.sleepABit(); + + List tasksPresent = workflowManager.getAdmin().getTaskInfo(runId); + Assert.assertTrue(tasksPresent.size() == 1); // checking via admin API whether task is present before the graceful shutdown request. + Assert.assertTrue(tasks.size() == 1); // checking whether task is yet to be completed before the graceful shutdown request. + closeWorkflowGracefully(workflowManager); // requesting a graceful shutdown while the task is in progress. + shutdownDone = true; + Assert.assertTrue(stopwatch.getTime() >= 5000); // checking if task took more than 5000 ms after the submission. + Assert.assertTrue(tasks.size() == 0); // checking if task is completed during graceful shutdown + Assert.assertTrue(workflowManager.getAdmin().getWorkflowManagerState().getExecutorsState().get(0) + == WorkflowManagerState.State.CLOSED); // checking if workflow manager is in closed state at the end. + } + finally + { + stopwatch.stop(); + if(!shutdownDone) closeWorkflow(workflowManager);//shutdown here if graceful shutdown was interrupted unexpectedly. + } + } private void closeWorkflow(WorkflowManager workflowManager) throws InterruptedException { @@ -521,4 +576,9 @@ private void closeWorkflow(WorkflowManager workflowManager) throws InterruptedEx timing.sleepABit(); ((WorkflowManagerImpl)workflowManager).debugValidateClosed(); } + + private void closeWorkflowGracefully(WorkflowManager workflowManager) throws InterruptedException + { + workflowManager.closeGracefully(20); + } } From 78aa3ac068e723221c3f4a1903ae5f57ebb1f052 Mon Sep 17 00:00:00 2001 From: msur Date: Thu, 5 Oct 2023 09:38:51 +0530 Subject: [PATCH 3/4] Closing SimpleQueue and WorkflowManager gracefully --- src/main/java/com/nirmata/workflow/WorkflowManager.java | 5 +++-- .../com/nirmata/workflow/details/WorkflowManagerImpl.java | 5 +++-- src/main/java/com/nirmata/workflow/queue/QueueConsumer.java | 6 ++++-- .../com/nirmata/workflow/queue/zookeeper/SimpleQueue.java | 4 ++-- src/test/java/com/nirmata/workflow/TestNormal.java | 2 +- 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/nirmata/workflow/WorkflowManager.java b/src/main/java/com/nirmata/workflow/WorkflowManager.java index c5a4ee14..1dd83ed6 100644 --- a/src/main/java/com/nirmata/workflow/WorkflowManager.java +++ b/src/main/java/com/nirmata/workflow/WorkflowManager.java @@ -146,7 +146,8 @@ public interface WorkflowManager extends Closeable * Close the workflow manager gracefully by allowing the in-progress tasks to continue till the timeout specified. * This method is different from close method(Closeable) as close method will stop the in-progress tasks as well. * - * @param timeOut is the maximum time(in seconds) allocated for the in-progress tasks to complete the execution. + * @param timeOut is the maximum time allocated for the in-progress tasks to complete the execution. + * @param unit Timeunit for the timeOut quantity specified */ - void closeGracefully(long timeOut); + void closeGracefully(long timeOut, TimeUnit unit); } diff --git a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java index a99637cc..4d1f2b4d 100644 --- a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java +++ b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java @@ -58,6 +58,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -129,11 +130,11 @@ public WorkflowListenerManager newWorkflowListenerManager() } @Override - public void closeGracefully(long timeOut) { + public void closeGracefully(long timeOut, TimeUnit unit) { if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { CloseableUtils.closeQuietly(schedulerSelector); - consumers.forEach(consumer -> consumer.closeGraceFully(timeOut)); + consumers.forEach(consumer -> consumer.closeGraceFully(timeOut, unit)); } } diff --git a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java index a92551e1..2bfd9a38 100644 --- a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java +++ b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.nirmata.workflow.admin.WorkflowManagerState; import java.io.Closeable; +import java.util.concurrent.TimeUnit; public interface QueueConsumer extends Closeable { @@ -33,7 +34,8 @@ public interface QueueConsumer extends Closeable /** * Allowing the executorService(SimpleQueue) to finish the previously submitted tasks before shutting down - * @param timeOut (in seconds) + * @param timeOut + * @param unit Timeunit for the timeOut quantity specified */ - void closeGraceFully(long timeOut); + void closeGraceFully(long timeOut, TimeUnit unit); } diff --git a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java index 07313e7e..1c0dac61 100644 --- a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java +++ b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java @@ -201,13 +201,13 @@ public void debugValidateClosed() } @Override - public void closeGraceFully(long timeOut) { + public void closeGraceFully(long timeOut, TimeUnit unit) { if ( started.compareAndSet(true, false) ) { executorService.shutdown(); try { log.info("Blocks until all tasks have completed execution or the timeout occurs"); - executorService.awaitTermination(timeOut, TimeUnit.SECONDS); + executorService.awaitTermination(timeOut, unit); log.info("Simple Queue executor service shutdown completed"); } catch (InterruptedException e) { log.error("Error while processing of in-progress tasks, possibly due to timeout while waiting", e); diff --git a/src/test/java/com/nirmata/workflow/TestNormal.java b/src/test/java/com/nirmata/workflow/TestNormal.java index 7c3ef8c9..5a2290c5 100644 --- a/src/test/java/com/nirmata/workflow/TestNormal.java +++ b/src/test/java/com/nirmata/workflow/TestNormal.java @@ -579,6 +579,6 @@ private void closeWorkflow(WorkflowManager workflowManager) throws InterruptedEx private void closeWorkflowGracefully(WorkflowManager workflowManager) throws InterruptedException { - workflowManager.closeGracefully(20); + workflowManager.closeGracefully(20, TimeUnit.SECONDS); } } From 34d5fc71736867ac0030194a92368c94897275f1 Mon Sep 17 00:00:00 2001 From: msur Date: Sat, 27 Sep 2025 22:57:14 +0530 Subject: [PATCH 4/4] Closing SimpleQueue and WorkflowManager gracefully-Using Guava Executor instead --- .../com/nirmata/workflow/queue/zookeeper/SimpleQueue.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java index 1c0dac61..3588d55e 100644 --- a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java +++ b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; import com.nirmata.workflow.admin.WorkflowManagerState; import com.nirmata.workflow.models.ExecutableTask; import com.nirmata.workflow.models.TaskMode; @@ -204,12 +205,11 @@ public void debugValidateClosed() public void closeGraceFully(long timeOut, TimeUnit unit) { if ( started.compareAndSet(true, false) ) { - executorService.shutdown(); + log.info("Blocks until all tasks have completed execution or the timeout occurs"); try { - log.info("Blocks until all tasks have completed execution or the timeout occurs"); - executorService.awaitTermination(timeOut, unit); + MoreExecutors.shutdownAndAwaitTermination(executorService, timeOut, unit); log.info("Simple Queue executor service shutdown completed"); - } catch (InterruptedException e) { + } catch (RuntimeException e) { log.error("Error while processing of in-progress tasks, possibly due to timeout while waiting", e); Thread.currentThread().interrupt(); }