diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 973aca6bf..8886acc35 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -239,12 +239,9 @@ def add_dependencies(tx, task, old_task=None, restarted_task=False): tx.run(restarted_query, old_task_id=old_task.id, new_task_id=task.id) tx.run(dependency_query, task_id=task.id) else: - begins_query = ("MATCH (s:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) " - "WITH s, collect(i.source) AS sources " - "MATCH (w:Workflow {id: s.workflow_id})<-[:INPUT_OF]-(i:Input) " - "WITH s, w, sources, collect(i.id) AS inputs " - "WHERE any(input IN sources WHERE input IN inputs) " - "MERGE (s)-[:BEGINS]->(w)") + task_of_query = ("MATCH (s:Task {id: $task_id}) " + "MATCH (w:Workflow {id: s.workflow_id}) " + "MERGE (s)-[:TASK_OF]->(w)") dependency_query = ("MATCH (s:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) " "WITH s, collect(i.source) as sources " "MATCH (t:Task)<-[:OUTPUT_OF]-(o:Output) " @@ -260,7 +257,7 @@ def add_dependencies(tx, task, old_task=None, restarted_task=False): "AND s.workflow_id = t.workflow_id " "MERGE (t)-[:DEPENDS_ON]->(s)") - tx.run(begins_query, task_id=task.id, wf_id=task.workflow_id) + tx.run(task_of_query, task_id=task.id, wf_id=task.workflow_id) tx.run(dependency_query, task_id=task.id) tx.run(dependent_query, task_id=task.id) @@ -612,12 +609,12 @@ def set_init_task_inputs(tx, wf_id): :param wf_id: the workflow id :type wf_id: str """ - task_inputs_query = ("MATCH (i:Input)-[:INPUT_OF]->(:Task)-[:BEGINS]->(:Workflow {id: $wf_id})" - "<-[:INPUT_OF]-(wi:Input) " + task_inputs_query = ("MATCH (i:Input)-[:INPUT_OF]->(:Task)-[:TASK_OF]->" + "(:Workflow {id: $wf_id})<-[:INPUT_OF]-(wi:Input) " "WHERE i.source = wi.id AND wi.value IS NOT NULL " "SET i.value = wi.value") # Set any values to defaults if necessary - defaults_query = ("MATCH (i:Input)-[:INPUT_OF]->(t:Task)-[:BEGINS]->(:Workflow {id: $wf_id})" + defaults_query = ("MATCH (i:Input)-[:INPUT_OF]->(t:Task)-[:TASK_OF]->(:Workflow {id: $wf_id})" "<-[:INPUT_OF]-(wi:Input) " "WHERE i.source = wi.id " "AND i.value IS NULL AND i.default IS NOT NULL " @@ -678,12 +675,13 @@ def set_paused_tasks_to_running(tx): def set_runnable_tasks_to_ready(tx, wf_id): """Set task states to 'READY' if all required inputs have values.""" set_runnable_ready_query = ("MATCH (m:Metadata)-[:DESCRIBES]->" - "(t:Task {workflow_id: $wf_id})<-[:INPUT_OF]-(i:Input) " - "WITH m, t, collect(i) AS ilist " + "(t:Task {workflow_id: $wf_id}) " "WHERE m.state = 'WAITING' " - "AND all(i IN ilist WHERE i.value IS NOT NULL) " + "AND NOT EXISTS { " + "MATCH (t)-[:DEPENDS_ON]->(dep:Task)<-[:DESCRIBES]-(dm:Metadata) " + "WHERE dm.state <> 'COMPLETED' " + "} " "SET m.state = 'READY'") - tx.run(set_runnable_ready_query, wf_id=wf_id) diff --git a/beeflow/common/parser/parser.py b/beeflow/common/parser/parser.py index 77018a99c..f8dcbfffb 100644 --- a/beeflow/common/parser/parser.py +++ b/beeflow/common/parser/parser.py @@ -201,12 +201,12 @@ def parse_job(self, job): else: raise CwlParseError("Unsupported input job file extension (only .yml " "and .json supported)") - - for k, v in self.params.items(): - if not isinstance(k, str): - raise CwlParseError(f"Invalid input job key: {str(k)}") - if not isinstance(v, (str, int, float)): - raise CwlParseError(f"Invalid input job parameter type: {type(v)}") + if self.params: + for k, v in self.params.items(): + if not isinstance(k, str): + raise CwlParseError(f"Invalid input job key: {str(k)}") + if not isinstance(v, (str, int, float)): + raise CwlParseError(f"Invalid input job parameter type: {type(v)}") @staticmethod def parse_step_inputs(cwl_in, step_inputs):