From c0783c6591b0adcba21e90c9c3e4fa7728f0cf1d Mon Sep 17 00:00:00 2001 From: Kabir Vats Date: Wed, 2 Jul 2025 14:59:38 -0600 Subject: [PATCH 1/6] dont error if workflow inputs are empty --- beeflow/common/parser/parser.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/beeflow/common/parser/parser.py b/beeflow/common/parser/parser.py index 77018a99c..f8dcbfffb 100644 --- a/beeflow/common/parser/parser.py +++ b/beeflow/common/parser/parser.py @@ -201,12 +201,12 @@ def parse_job(self, job): else: raise CwlParseError("Unsupported input job file extension (only .yml " "and .json supported)") - - for k, v in self.params.items(): - if not isinstance(k, str): - raise CwlParseError(f"Invalid input job key: {str(k)}") - if not isinstance(v, (str, int, float)): - raise CwlParseError(f"Invalid input job parameter type: {type(v)}") + if self.params: + for k, v in self.params.items(): + if not isinstance(k, str): + raise CwlParseError(f"Invalid input job key: {str(k)}") + if not isinstance(v, (str, int, float)): + raise CwlParseError(f"Invalid input job parameter type: {type(v)}") @staticmethod def parse_step_inputs(cwl_in, step_inputs): From 63517052677e8f4de46f2c58746ce3af8a85e134 Mon Sep 17 00:00:00 2001 From: Kabir Vats Date: Wed, 2 Jul 2025 15:55:29 -0600 Subject: [PATCH 2/6] update gdb task dependencies to not require input matching --- beeflow/common/gdb/neo4j_cypher.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 973aca6bf..79710fed7 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -239,12 +239,9 @@ def add_dependencies(tx, task, old_task=None, restarted_task=False): tx.run(restarted_query, old_task_id=old_task.id, new_task_id=task.id) tx.run(dependency_query, task_id=task.id) else: - begins_query = ("MATCH (s:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) " - "WITH s, collect(i.source) AS sources " - "MATCH (w:Workflow {id: s.workflow_id})<-[:INPUT_OF]-(i:Input) " - "WITH s, w, sources, collect(i.id) AS inputs " - "WHERE any(input IN sources WHERE input IN inputs) " - "MERGE (s)-[:BEGINS]->(w)") + begins_query = ("MATCH (s:Task {id: $task_id}) " + "MATCH (w:Workflow (id: s.workflow_id}) " + "MERGE (s)-[:TASK_OF]->(w)") dependency_query = ("MATCH (s:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) " "WITH s, collect(i.source) as sources " "MATCH (t:Task)<-[:OUTPUT_OF]-(o:Output) " @@ -612,12 +609,12 @@ def set_init_task_inputs(tx, wf_id): :param wf_id: the workflow id :type wf_id: str """ - task_inputs_query = ("MATCH (i:Input)-[:INPUT_OF]->(:Task)-[:BEGINS]->(:Workflow {id: $wf_id})" + task_inputs_query = ("MATCH (i:Input)-[:INPUT_OF]->(:Task)-[:TASK_OF]->(:Workflow {id: $wf_id})" "<-[:INPUT_OF]-(wi:Input) " "WHERE i.source = wi.id AND wi.value IS NOT NULL " "SET i.value = wi.value") # Set any values to defaults if necessary - defaults_query = ("MATCH (i:Input)-[:INPUT_OF]->(t:Task)-[:BEGINS]->(:Workflow {id: $wf_id})" + defaults_query = ("MATCH (i:Input)-[:INPUT_OF]->(t:Task)-[:TASK_OF]->(:Workflow {id: $wf_id})" "<-[:INPUT_OF]-(wi:Input) " "WHERE i.source = wi.id " "AND i.value IS NULL AND i.default IS NOT NULL " From 53d4fb611cb174880bd6b3857c475ae9441f7e5e Mon Sep 17 00:00:00 2001 From: Kabir Vats Date: Wed, 2 Jul 2025 16:13:11 -0600 Subject: [PATCH 3/6] correct paren --- beeflow/common/gdb/neo4j_cypher.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 79710fed7..6ee02d83d 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -239,8 +239,8 @@ def add_dependencies(tx, task, old_task=None, restarted_task=False): tx.run(restarted_query, old_task_id=old_task.id, new_task_id=task.id) tx.run(dependency_query, task_id=task.id) else: - begins_query = ("MATCH (s:Task {id: $task_id}) " - "MATCH (w:Workflow (id: s.workflow_id}) " + task_of_query = ("MATCH (s:Task {id: $task_id}) " + "MATCH (w:Workflow {id: s.workflow_id}) " "MERGE (s)-[:TASK_OF]->(w)") dependency_query = ("MATCH (s:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) " "WITH s, collect(i.source) as sources " @@ -257,7 +257,7 @@ def add_dependencies(tx, task, old_task=None, restarted_task=False): "AND s.workflow_id = t.workflow_id " "MERGE (t)-[:DEPENDS_ON]->(s)") - tx.run(begins_query, task_id=task.id, wf_id=task.workflow_id) + tx.run(task_of_query, task_id=task.id, wf_id=task.workflow_id) tx.run(dependency_query, task_id=task.id) tx.run(dependent_query, task_id=task.id) From 053f504de1686d663ca01f92a4b6428a71d8bb23 Mon Sep 17 00:00:00 2001 From: Kabir Vats Date: Wed, 2 Jul 2025 16:56:10 -0600 Subject: [PATCH 4/6] adjust the ready query (failing) --- beeflow/common/gdb/neo4j_cypher.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 6ee02d83d..5c1fe3ad8 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -675,12 +675,13 @@ def set_paused_tasks_to_running(tx): def set_runnable_tasks_to_ready(tx, wf_id): """Set task states to 'READY' if all required inputs have values.""" set_runnable_ready_query = ("MATCH (m:Metadata)-[:DESCRIBES]->" - "(t:Task {workflow_id: $wf_id})<-[:INPUT_OF]-(i:Input) " - "WITH m, t, collect(i) AS ilist " + "(t:Task {workflow_id: $wf_id}) " "WHERE m.state = 'WAITING' " - "AND all(i IN ilist WHERE i.value IS NOT NULL) " + "AND NOT EXISTS { " + "MATCH (t)-[:DEPENDS_ON]->(dep:Task)<-[:DESCRIBES]-(depmeta:METADATA) " + "WHERE depmeta.state <> 'COMPLETED' " + "} " "SET m.state = 'READY'") - tx.run(set_runnable_ready_query, wf_id=wf_id) From cbe64b4548364dc1d3302e9dbb17302025070bc2 Mon Sep 17 00:00:00 2001 From: Kabir Vats Date: Wed, 2 Jul 2025 17:06:52 -0600 Subject: [PATCH 5/6] ready now resolves --- beeflow/common/gdb/neo4j_cypher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 5c1fe3ad8..a6b27a4a7 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -678,7 +678,7 @@ def set_runnable_tasks_to_ready(tx, wf_id): "(t:Task {workflow_id: $wf_id}) " "WHERE m.state = 'WAITING' " "AND NOT EXISTS { " - "MATCH (t)-[:DEPENDS_ON]->(dep:Task)<-[:DESCRIBES]-(depmeta:METADATA) " + "MATCH (t)-[:DEPENDS_ON]->(dep:Task)<-[:DESCRIBES]-(depmeta:Metadata) " "WHERE depmeta.state <> 'COMPLETED' " "} " "SET m.state = 'READY'") From 8fe2d8ea25d5e34a7c6e75f46b5f1a28dffc4dec Mon Sep 17 00:00:00 2001 From: kabir-vats <141685484+kabir-vats@users.noreply.github.com> Date: Wed, 2 Jul 2025 17:42:32 -0600 Subject: [PATCH 6/6] Lint line lengths --- beeflow/common/gdb/neo4j_cypher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index a6b27a4a7..8886acc35 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -609,8 +609,8 @@ def set_init_task_inputs(tx, wf_id): :param wf_id: the workflow id :type wf_id: str """ - task_inputs_query = ("MATCH (i:Input)-[:INPUT_OF]->(:Task)-[:TASK_OF]->(:Workflow {id: $wf_id})" - "<-[:INPUT_OF]-(wi:Input) " + task_inputs_query = ("MATCH (i:Input)-[:INPUT_OF]->(:Task)-[:TASK_OF]->" + "(:Workflow {id: $wf_id})<-[:INPUT_OF]-(wi:Input) " "WHERE i.source = wi.id AND wi.value IS NOT NULL " "SET i.value = wi.value") # Set any values to defaults if necessary @@ -678,8 +678,8 @@ def set_runnable_tasks_to_ready(tx, wf_id): "(t:Task {workflow_id: $wf_id}) " "WHERE m.state = 'WAITING' " "AND NOT EXISTS { " - "MATCH (t)-[:DEPENDS_ON]->(dep:Task)<-[:DESCRIBES]-(depmeta:Metadata) " - "WHERE depmeta.state <> 'COMPLETED' " + "MATCH (t)-[:DEPENDS_ON]->(dep:Task)<-[:DESCRIBES]-(dm:Metadata) " + "WHERE dm.state <> 'COMPLETED' " "} " "SET m.state = 'READY'") tx.run(set_runnable_ready_query, wf_id=wf_id)