diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index a2049d4ccb..561c34eafd 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -21,6 +21,9 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -73,6 +76,9 @@ public class ExecutionService { private static final int POLL_COUNT_ONE = 1; private static final int POLLING_TIMEOUT_IN_MS = 100; + private Integer globalLock = Integer.valueOf(1); + private ConcurrentHashMap taskLocksMap = new ConcurrentHashMap(); + public ExecutionService( WorkflowExecutor workflowExecutor, ExecutionDAOFacade executionDAOFacade, @@ -258,8 +264,42 @@ public void updateTask(Task task) { updateTask(new TaskResult(task)); } + /* + Introduced locking to avoid parallel task updates for the same task. We faced an issue in conductor when two + threads are updating the same task status as COMPLETED and IN_PROGRESS. Both these updates went successful + where IN_PROGRESS after COMPLETED should fail. As IN_PROGRESS went successful, task is TIMED_OUT after sometime + as there are no further updates from fusion. Workflow TIMED_OUT notification sent to fusion which makes the workflow as + TIMED_OUT even though its successfully completed. + */ public void updateTask(TaskResult taskResult) { - workflowExecutor.updateTask(taskResult); + if (taskResult.getTaskId() == null) { + LOGGER.error( + "Invalid task id in TaskResult. TaskStatus {} workflow id {}", + taskResult.getStatus(), + taskResult.getWorkflowInstanceId()); + return; + } + Lock taskLock = null; + synchronized (globalLock) { + taskLock = taskLocksMap.get(taskResult.getTaskId()); + if (taskLock == null) { + taskLock = new ReentrantLock(); + taskLocksMap.put(taskResult.getTaskId(), taskLock); + } + } + taskLock.lock(); + LOGGER.debug("Lock Successfully obtained for {}", taskResult.getTaskId()); + try { + workflowExecutor.updateTask(taskResult); + } finally { + if (taskLock != null) { + taskLock.unlock(); + LOGGER.debug("Releasing lock for {}", taskResult.getTaskId()); + synchronized (globalLock) { + taskLocksMap.remove(taskResult.getTaskId()); + } + } + } } public List getTasks(String taskType, String startKey, int count) {