From 4902b2b4f93677fc02cc55bb0c8f78a38c2406ff Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Wed, 18 May 2022 12:36:45 +0530 Subject: [PATCH 1/2] add support for direct submission of tasks --- .../com/nirmata/workflow/WorkflowManager.java | 8 +++ .../workflow/details/WorkflowManagerImpl.java | 6 ++ .../details/WorkflowManagerKafkaImpl.java | 72 ++++++++++++++++--- 3 files changed, 76 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/nirmata/workflow/WorkflowManager.java b/src/main/java/com/nirmata/workflow/WorkflowManager.java index 2dde3e56..019f543e 100644 --- a/src/main/java/com/nirmata/workflow/WorkflowManager.java +++ b/src/main/java/com/nirmata/workflow/WorkflowManager.java @@ -98,6 +98,14 @@ public interface WorkflowManager extends Closeable */ RunId submitSubTask(RunId runId, RunId parentRunId, Task task); + /** + * Submit a task directly to executor. Only the top parent is executed, not children. + * The executor in turn, will not attempt to send task results to the scheduler. + * + * @param task task to execute + */ + void submitRootTaskDirect(Task task); + /** * Update task progress info. This method is meant to be used inside of {@link TaskExecutor} * for a running task to update its execution progress(0-100). diff --git a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java index a6ce0c64..99587868 100644 --- a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java +++ b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java @@ -601,4 +601,10 @@ private List makeTaskConsumers(QueueFactory queueFactory, List> taskQueues = new HashMap>(); + private Map> startedTasksCache = new HashMap>(); + private static final TaskType nullTaskType = new TaskType("", "", false); + private static final String DIRECT_SUBMIT_RUN_ID_PREFIX = "DIRECT_SUBMIT"; private enum State { LATENT, @@ -463,16 +469,19 @@ private void executeTask(TaskExecutor taskExecutor, ExecutableTask executableTas throw new RuntimeException(String.format("null returned from task executor for run: %s, task %s", executableTask.getRunId(), executableTask.getTaskId())); } - byte[] bytes = serializer.serialize(new WorkflowMessage(executableTask.getTaskId(), result)); - try { - // Send task result to scheduler to further advance the workflow - sendWorkflowToKafka(executableTask.getRunId(), bytes); - storageMgr.saveTaskResult(executableTask.getRunId(), executableTask.getTaskId(), - serializer.serialize(result)); - - } catch (Exception e) { - log.error("Could not set completed data for executable task: {}", executableTask, e); - throw e; + // No need to send task result if the RunID denotes that this is a directsubmit since scheduler does not manage it. + if (!executableTask.getRunId().getId().startsWith(DIRECT_SUBMIT_RUN_ID_PREFIX)) { + byte[] bytes = serializer.serialize(new WorkflowMessage(executableTask.getTaskId(), result)); + try { + // Send task result to scheduler to further advance the workflow + sendWorkflowToKafka(executableTask.getRunId(), bytes); + storageMgr.saveTaskResult(executableTask.getRunId(), executableTask.getTaskId(), + serializer.serialize(result)); + + } catch (Exception e) { + log.error("Could not set completed data for executable task: {}", executableTask, e); + throw e; + } } } @@ -498,4 +507,47 @@ private List makeTaskConsumers(QueueFactory queueFactory, List producer = taskQueues.get(task.getTaskType()); + if (producer == null) { + this.getKafkaConf().createTaskTopicIfNeeded(task.getTaskType()); + producer = new KafkaProducer( + this.getKafkaConf().getProducerProps()); + taskQueues.put(task.getTaskType(), producer); + } + + producer.send(new ProducerRecord( + this.getKafkaConf().getTaskExecTopic(task.getTaskType()), runnableTaskBytes), + new Callback() { + @Override + public void onCompletion(RecordMetadata m, Exception e) { + if (e != null) { + log.error("Error creating record for Run {} to task type {}", runId, task.getTaskType(), + e); + } else { + log.debug("RunId {} produced record to topic {}, partition [{}] @ offset {}", runId, + m.topic(), m.partition(), m.offset()); + } + } + }); + startedTasksCache.get(runId.getId()).add(task.getTaskId().getId()); + log.debug("Sent task to queue: {}", task); + + } catch (Exception e) { + String message = "Could not start task " + task; + log.error(message, e); + throw new RuntimeException(e); + } + } } From 0dbb681b3652d7ba81d3f77d0de0c14877c7c85a Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Wed, 18 May 2022 12:37:07 +0530 Subject: [PATCH 2/2] remove unused import statement --- .../com/nirmata/workflow/details/WorkflowManagerKafkaImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/nirmata/workflow/details/WorkflowManagerKafkaImpl.java b/src/main/java/com/nirmata/workflow/details/WorkflowManagerKafkaImpl.java index 64da692b..1b06a26d 100644 --- a/src/main/java/com/nirmata/workflow/details/WorkflowManagerKafkaImpl.java +++ b/src/main/java/com/nirmata/workflow/details/WorkflowManagerKafkaImpl.java @@ -20,7 +20,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.mongodb.MongoInterruptedException; import com.nirmata.workflow.WorkflowManager; import com.nirmata.workflow.admin.RunInfo; import com.nirmata.workflow.admin.TaskDetails;