Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -73,6 +76,9 @@ public class ExecutionService {
private static final int POLL_COUNT_ONE = 1;
private static final int POLLING_TIMEOUT_IN_MS = 100;

private Integer globalLock = Integer.valueOf(1);
private ConcurrentHashMap<String, Lock> taskLocksMap = new ConcurrentHashMap();

public ExecutionService(
WorkflowExecutor workflowExecutor,
ExecutionDAOFacade executionDAOFacade,
Expand Down Expand Up @@ -258,8 +264,42 @@ public void updateTask(Task task) {
updateTask(new TaskResult(task));
}

/*
Introduced locking to avoid parallel task updates for the same task. We faced an issue in conductor when two
threads are updating the same task status as COMPLETED and IN_PROGRESS. Both these updates went successful
where IN_PROGRESS after COMPLETED should fail. As IN_PROGRESS went successful, task is TIMED_OUT after sometime
as there are no further updates from fusion. Workflow TIMED_OUT notification sent to fusion which makes the workflow as

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let the comment be generic and not add any fusion details here

TIMED_OUT even though its successfully completed.
*/
public void updateTask(TaskResult taskResult) {
workflowExecutor.updateTask(taskResult);
if (taskResult.getTaskId() == null) {
LOGGER.error(
"Invalid task id in TaskResult. TaskStatus {} workflow id {}",
taskResult.getStatus(),
taskResult.getWorkflowInstanceId());
return;
}
Lock taskLock = null;
synchronized (globalLock) {
taskLock = taskLocksMap.get(taskResult.getTaskId());
if (taskLock == null) {
taskLock = new ReentrantLock();
taskLocksMap.put(taskResult.getTaskId(), taskLock);
}
}
taskLock.lock();
LOGGER.debug("Lock Successfully obtained for {}", taskResult.getTaskId());
try {
workflowExecutor.updateTask(taskResult);
} finally {
if (taskLock != null) {
taskLock.unlock();
LOGGER.debug("Releasing lock for {}", taskResult.getTaskId());
synchronized (globalLock) {
taskLocksMap.remove(taskResult.getTaskId());
}
}
}
}

public List<Task> getTasks(String taskType, String startKey, int count) {
Expand Down