Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/java/com/nirmata/workflow/WorkflowManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.curator.framework.CuratorFramework;
import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
* Main API - create via {@link WorkflowManagerBuilder}
Expand Down Expand Up @@ -140,4 +141,13 @@ public interface WorkflowManager extends Closeable
* @return new WorkflowListenerManager
*/
WorkflowListenerManager newWorkflowListenerManager();

/**
* Close the workflow manager gracefully by allowing the in-progress tasks to continue till the timeout specified.
* This method is different from close method(Closeable) as close method will stop the in-progress tasks as well.
*
* @param timeOut is the maximum time allocated for the in-progress tasks to complete the execution.
* @param unit Timeunit for the timeOut quantity specified
*/
void closeGracefully(long timeOut, TimeUnit unit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -128,6 +129,15 @@ public WorkflowListenerManager newWorkflowListenerManager()
return new WorkflowListenerManagerImpl(this);
}

@Override
public void closeGracefully(long timeOut, TimeUnit unit) {
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
CloseableUtils.closeQuietly(schedulerSelector);
consumers.forEach(consumer -> consumer.closeGraceFully(timeOut, unit));
}
}

@Override
public Map<TaskId, TaskDetails> getTaskDetails(RunId runId)
{
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/nirmata/workflow/queue/QueueConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.nirmata.workflow.admin.WorkflowManagerState;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;

public interface QueueConsumer extends Closeable
{
Expand All @@ -30,4 +31,11 @@ public interface QueueConsumer extends Closeable

@VisibleForTesting
void debugValidateClosed();

/**
* Allowing the executorService(SimpleQueue) to finish the previously submitted tasks before shutting down
* @param timeOut
* @param unit Timeunit for the timeOut quantity specified
*/
void closeGraceFully(long timeOut, TimeUnit unit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.nirmata.workflow.admin.WorkflowManagerState;
import com.nirmata.workflow.models.ExecutableTask;
import com.nirmata.workflow.models.TaskMode;
Expand Down Expand Up @@ -200,6 +201,21 @@ public void debugValidateClosed()
Preconditions.checkState(executorService.isTerminated());
}

@Override
public void closeGraceFully(long timeOut, TimeUnit unit) {
if ( started.compareAndSet(true, false) )
{
log.info("Blocks until all tasks have completed execution or the timeout occurs");
try {
MoreExecutors.shutdownAndAwaitTermination(executorService, timeOut, unit);
log.info("Simple Queue executor service shutdown completed");
} catch (RuntimeException e) {
log.error("Error while processing of in-progress tasks, possibly due to timeout while waiting", e);
Thread.currentThread().interrupt();
}
}
}

public void start()
{
if ( started.compareAndSet(false, true) )
Expand Down
60 changes: 60 additions & 0 deletions src/test/java/com/nirmata/workflow/TestNormal.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.nirmata.workflow.admin.RunInfo;
import com.nirmata.workflow.admin.StandardAutoCleaner;
import com.nirmata.workflow.admin.TaskInfo;
import com.nirmata.workflow.admin.WorkflowManagerState;
import com.nirmata.workflow.details.WorkflowManagerImpl;
import com.nirmata.workflow.executor.TaskExecution;
import com.nirmata.workflow.executor.TaskExecutionStatus;
Expand All @@ -35,6 +36,7 @@
import com.nirmata.workflow.models.TaskId;
import com.nirmata.workflow.models.TaskType;
import com.nirmata.workflow.serialization.JsonSerializerMapper;
import org.apache.commons.lang.time.StopWatch;
import org.apache.curator.utils.CloseableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -514,11 +516,69 @@ public void testMultiTypes() throws Exception
closeWorkflow(workflowManager);
}
}
@Test
public void testWorkflowManagerGracefulClose() throws Exception
{
boolean shutdownDone = false;
BlockingQueue<TaskId> tasks = Queues.newLinkedBlockingQueue();
TaskExecutor taskExecutor = (w, t) -> () -> {
tasks.add(t.getTaskId());
StopWatch stopwatch = new StopWatch();
stopwatch.start();
try
{
Thread.sleep(5000);
tasks.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw new RuntimeException();
}
return new TaskExecutionResult(TaskExecutionStatus.SUCCESS, "Completed");
};
TaskType taskType = new TaskType("test", "1", true);
WorkflowManager workflowManager = WorkflowManagerBuilder.builder()
.addingTaskExecutor(taskExecutor, 1, taskType)
.withCurator(curator, "test", "1")
.build();
StopWatch stopwatch = new StopWatch();
try
{
workflowManager.start();

TaskId taskId = new TaskId();
RunId runId = workflowManager.submitTask(new Task(taskId, taskType));

stopwatch.start();
timing.sleepABit();

List<TaskInfo> tasksPresent = workflowManager.getAdmin().getTaskInfo(runId);
Assert.assertTrue(tasksPresent.size() == 1); // checking via admin API whether task is present before the graceful shutdown request.
Assert.assertTrue(tasks.size() == 1); // checking whether task is yet to be completed before the graceful shutdown request.
closeWorkflowGracefully(workflowManager); // requesting a graceful shutdown while the task is in progress.
shutdownDone = true;
Assert.assertTrue(stopwatch.getTime() >= 5000); // checking if task took more than 5000 ms after the submission.
Assert.assertTrue(tasks.size() == 0); // checking if task is completed during graceful shutdown
Assert.assertTrue(workflowManager.getAdmin().getWorkflowManagerState().getExecutorsState().get(0)
== WorkflowManagerState.State.CLOSED); // checking if workflow manager is in closed state at the end.
}
finally
{
stopwatch.stop();
if(!shutdownDone) closeWorkflow(workflowManager);//shutdown here if graceful shutdown was interrupted unexpectedly.
}
}

private void closeWorkflow(WorkflowManager workflowManager) throws InterruptedException
{
CloseableUtils.closeQuietly(workflowManager);
timing.sleepABit();
((WorkflowManagerImpl)workflowManager).debugValidateClosed();
}

private void closeWorkflowGracefully(WorkflowManager workflowManager) throws InterruptedException
{
workflowManager.closeGracefully(20, TimeUnit.SECONDS);
}
}