Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,9 @@ def add_dependencies(tx, task, old_task=None, restarted_task=False):
tx.run(restarted_query, old_task_id=old_task.id, new_task_id=task.id)
tx.run(dependency_query, task_id=task.id)
else:
begins_query = ("MATCH (s:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) "
"WITH s, collect(i.source) AS sources "
"MATCH (w:Workflow {id: s.workflow_id})<-[:INPUT_OF]-(i:Input) "
"WITH s, w, sources, collect(i.id) AS inputs "
"WHERE any(input IN sources WHERE input IN inputs) "
"MERGE (s)-[:BEGINS]->(w)")
task_of_query = ("MATCH (s:Task {id: $task_id}) "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Kabir can you give a bit more info on what this change does?

"MATCH (w:Workflow {id: s.workflow_id}) "
"MERGE (s)-[:TASK_OF]->(w)")
dependency_query = ("MATCH (s:Task {id: $task_id})<-[:INPUT_OF]-(i:Input) "
"WITH s, collect(i.source) as sources "
"MATCH (t:Task)<-[:OUTPUT_OF]-(o:Output) "
Expand All @@ -260,7 +257,7 @@ def add_dependencies(tx, task, old_task=None, restarted_task=False):
"AND s.workflow_id = t.workflow_id "
"MERGE (t)-[:DEPENDS_ON]->(s)")

tx.run(begins_query, task_id=task.id, wf_id=task.workflow_id)
tx.run(task_of_query, task_id=task.id, wf_id=task.workflow_id)
tx.run(dependency_query, task_id=task.id)
tx.run(dependent_query, task_id=task.id)

Expand Down Expand Up @@ -612,12 +609,12 @@ def set_init_task_inputs(tx, wf_id):
:param wf_id: the workflow id
:type wf_id: str
"""
task_inputs_query = ("MATCH (i:Input)-[:INPUT_OF]->(:Task)-[:BEGINS]->(:Workflow {id: $wf_id})"
"<-[:INPUT_OF]-(wi:Input) "
task_inputs_query = ("MATCH (i:Input)-[:INPUT_OF]->(:Task)-[:TASK_OF]->"
"(:Workflow {id: $wf_id})<-[:INPUT_OF]-(wi:Input) "
"WHERE i.source = wi.id AND wi.value IS NOT NULL "
"SET i.value = wi.value")
# Set any values to defaults if necessary
defaults_query = ("MATCH (i:Input)-[:INPUT_OF]->(t:Task)-[:BEGINS]->(:Workflow {id: $wf_id})"
defaults_query = ("MATCH (i:Input)-[:INPUT_OF]->(t:Task)-[:TASK_OF]->(:Workflow {id: $wf_id})"
"<-[:INPUT_OF]-(wi:Input) "
"WHERE i.source = wi.id "
"AND i.value IS NULL AND i.default IS NOT NULL "
Expand Down Expand Up @@ -678,12 +675,13 @@ def set_paused_tasks_to_running(tx):
def set_runnable_tasks_to_ready(tx, wf_id):
"""Set task states to 'READY' if all required inputs have values."""
set_runnable_ready_query = ("MATCH (m:Metadata)-[:DESCRIBES]->"
"(t:Task {workflow_id: $wf_id})<-[:INPUT_OF]-(i:Input) "
"WITH m, t, collect(i) AS ilist "
"(t:Task {workflow_id: $wf_id}) "
"WHERE m.state = 'WAITING' "
"AND all(i IN ilist WHERE i.value IS NOT NULL) "
"AND NOT EXISTS { "
"MATCH (t)-[:DEPENDS_ON]->(dep:Task)<-[:DESCRIBES]-(dm:Metadata) "
"WHERE dm.state <> 'COMPLETED' "
"} "
"SET m.state = 'READY'")

tx.run(set_runnable_ready_query, wf_id=wf_id)


Expand Down
12 changes: 6 additions & 6 deletions beeflow/common/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ def parse_job(self, job):
else:
raise CwlParseError("Unsupported input job file extension (only .yml "
"and .json supported)")

for k, v in self.params.items():
if not isinstance(k, str):
raise CwlParseError(f"Invalid input job key: {str(k)}")
if not isinstance(v, (str, int, float)):
raise CwlParseError(f"Invalid input job parameter type: {type(v)}")
if self.params:
for k, v in self.params.items():
if not isinstance(k, str):
raise CwlParseError(f"Invalid input job key: {str(k)}")
if not isinstance(v, (str, int, float)):
raise CwlParseError(f"Invalid input job parameter type: {type(v)}")

@staticmethod
def parse_step_inputs(cwl_in, step_inputs):
Expand Down