Skip to content

Commit 59c5254

Browse files
committed
added cwl translator
1 parent a315541 commit 59c5254

File tree

5 files changed

+310
-2
lines changed

5 files changed

+310
-2
lines changed

wfcommons/wfbench/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@
99
# (at your option) any later version.
1010

1111
from .bench import WorkflowBenchmark
12-
from .translator import DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator
12+
from .translator import DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator, CWLTranslator

wfcommons/wfbench/translator/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
from .parsl import ParslTranslator
1414
from .pegasus import PegasusTranslator
1515
from .swift_t import SwiftTTranslator
16-
from .taskvine import TaskVineTranslator
16+
from .taskvine import TaskVineTranslator
17+
from .cwl import CWLTranslator
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2024 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import shutil
12+
import logging
13+
import pathlib
14+
from typing import Union, Optional
15+
from collections import defaultdict, deque
16+
17+
from .abstract_translator import Translator
18+
from ...common import Workflow
19+
20+
this_dir = pathlib.Path(__file__).resolve().parent
21+
22+
class CWLTranslator(Translator):
23+
"""
24+
A WfFormat parser for creating CWL workflow benchmarks.
25+
26+
:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
27+
:type workflow: Union[Workflow, pathlib.Path],
28+
:param logger: The logger where to log information/warning or errors (optional).
29+
:type logger: Logger
30+
"""
31+
def __init__(self,
32+
workflow: Union[Workflow, pathlib.Path],
33+
logger: Optional[logging.Logger] = None) -> None:
34+
super().__init__(workflow, logger)
35+
self.cwl_script = ["cwlVersion: v1.2",
36+
"class: Workflow",
37+
"requirements:",
38+
" MultipleInputFeatureRequirement: {}",
39+
" StepInputExpressionRequirement: {}",
40+
" InlineJavascriptRequirement: {}"]
41+
self.yml_script = []
42+
self.parsed_tasks = []
43+
self.task_level_map = defaultdict(lambda: [])
44+
45+
queue = deque(self.root_task_names)
46+
visited = set()
47+
top_sort = []
48+
49+
while queue:
50+
task_name = queue.popleft()
51+
52+
if task_name not in visited:
53+
top_sort.append(task_name)
54+
visited.add(task_name)
55+
56+
for child in self.task_children[task_name]:
57+
queue.append(child)
58+
59+
assert (len(top_sort) == len(self.tasks))
60+
61+
levels = {task_name: 0 for task_name in top_sort}
62+
63+
for task_name in top_sort:
64+
for child in self.task_children[task_name]:
65+
levels[child] = max(levels[child], levels[task_name] + 1)
66+
67+
for task_name, level in levels.items():
68+
self.task_level_map[level].append(task_name)
69+
70+
def translate(self, output_folder: pathlib.Path) -> None:
71+
# Create output folder
72+
output_folder.mkdir(parents=True)
73+
74+
# Parsing the inputs and outputs of the workflow
75+
self._parse_inputs_outputs()
76+
77+
# Parsing the steos
78+
self._parse_steps()
79+
80+
# additional files
81+
self._copy_binary_files(output_folder)
82+
self._generate_input_files(output_folder)
83+
84+
# Writing the CWL files to the output folder
85+
self._write_cwl_files(output_folder)
86+
87+
return 0
88+
89+
def _parse_steps(self) -> None:
90+
steps_folder_source = []
91+
self.cwl_script.append("\nsteps:")
92+
# Parsing each steps by Workflow levels
93+
for level in sorted(self.task_level_map.keys()):
94+
# Parsing each task within a Workflow level
95+
for task_name in self.task_level_map[level]:
96+
97+
# Getting the task object
98+
task = self.tasks[task_name]
99+
100+
# Parsing the arguments of the task
101+
args_array = []
102+
benchmark_name = False
103+
104+
for item in task.args:
105+
# Split elements that contain both an option and a value
106+
if item.startswith("--"):
107+
parts = item.split(" ", 1)
108+
args_array.append(parts[0])
109+
if len(parts) > 1:
110+
args_array.append(parts[1])
111+
elif not benchmark_name:
112+
args_array.append(item)
113+
benchmark_name = True
114+
115+
output_files = [
116+
f.file_id for f in task.output_files]
117+
118+
# Adding the step to the cwl script
119+
120+
self.cwl_script.append(f" {task.task_id}:")
121+
# TODO: change so that it doesn't only run wfbench programs
122+
if not task.program.startswith("wfbench"):
123+
raise ValueError("Only wfbench programs are supported")
124+
self.cwl_script.append(" run: clt/wfbench.cwl")
125+
126+
self.cwl_script.append(" in:")
127+
if level == 0:
128+
self.cwl_script.append(
129+
f" input_files: {task.task_id}_input")
130+
else:
131+
self.cwl_script.append(
132+
" input_files:")
133+
self.cwl_script.append(
134+
" linkMerge: merge_flattened")
135+
self.cwl_script.append(
136+
" source:")
137+
for parent in self.task_parents[task_name]:
138+
self.cwl_script.append(
139+
f" - {parent}/output_files")
140+
self.cwl_script.append(
141+
f" input_params: {{ default: {args_array} }}")
142+
self.cwl_script.append(" step_name:")
143+
self.cwl_script.append(f" valueFrom: {task.task_id}")
144+
self.cwl_script.append(f" output_filenames: {{ default: {output_files} }}")
145+
self.cwl_script.append(
146+
" out: [out, err, output_files]\n")
147+
148+
# Adding a step to create a directory with the output files
149+
self.cwl_script.append(f" {task.task_id}_folder:")
150+
self.cwl_script.append(" run: clt/folder.cwl")
151+
self.cwl_script.append(" in:")
152+
self.cwl_script.append(" - id: item")
153+
self.cwl_script.append(" linkMerge: merge_flattened")
154+
self.cwl_script.append(" source:")
155+
self.cwl_script.append(f" - {task.task_id}/out")
156+
self.cwl_script.append(f" - {task.task_id}/err")
157+
self.cwl_script.append(f" - {task.task_id}/output_files")
158+
self.cwl_script.append(" - id: name")
159+
self.cwl_script.append(f" valueFrom: \"{level}_{task.task_id}\"")
160+
self.cwl_script.append(" out: [out]\n")
161+
162+
# adding the folder id to grand list of step folders
163+
steps_folder_source.append(f"{task.task_id}_folder")
164+
165+
self.cwl_script.append(" final_folder:")
166+
self.cwl_script.append(" run: clt/folder.cwl")
167+
self.cwl_script.append(" in:")
168+
self.cwl_script.append(" - id: item")
169+
self.cwl_script.append(" linkMerge: merge_flattened")
170+
self.cwl_script.append(" source:")
171+
for folder in steps_folder_source:
172+
self.cwl_script.append(f" - {folder}/out")
173+
self.cwl_script.append(" - id: name")
174+
self.cwl_script.append(" valueFrom: \"final_output\"")
175+
self.cwl_script.append(" out: [out]")
176+
177+
def _parse_inputs_outputs(self) -> None:
178+
# Parsing the inputs of all root tasks
179+
self.cwl_script.append("\ninputs:")
180+
for task_name in self.root_task_names:
181+
task = self.tasks[task_name]
182+
cwl_written = False
183+
yml_written = False
184+
for f in task.input_files:
185+
if not cwl_written:
186+
self.cwl_script.append(f" {task.task_id}_input:")
187+
self.cwl_script.append(" type: File[]")
188+
cwl_written = True
189+
if not yml_written:
190+
self.yml_script.append(f"{task.task_id}_input:")
191+
yml_written = True
192+
193+
self.yml_script.append(" - class: File")
194+
self.yml_script.append(f" path: data/{f.file_id}")
195+
196+
# Appending the output to the cwl script
197+
self.cwl_script.append("\noutputs:")
198+
self.cwl_script.append(" final_output_folder:")
199+
self.cwl_script.append(" type: Directory")
200+
self.cwl_script.append(" outputSource: final_folder/out")
201+
202+
def _write_cwl_files(self, output_folder: pathlib.Path) -> None:
203+
cwl_folder = output_folder
204+
205+
clt_folder = cwl_folder.joinpath("clt")
206+
clt_folder.mkdir(exist_ok=True)
207+
shutil.copy(pathlib.Path.cwd().joinpath("templates/wfbench.cwl"), clt_folder)
208+
shutil.copy(pathlib.Path.cwd().joinpath("templates/folder.cwl"), clt_folder)
209+
210+
with open(cwl_folder.joinpath("main.cwl"), "w", encoding="utf-8") as f:
211+
f.write("\n".join(self.cwl_script))
212+
213+
with (open(cwl_folder.joinpath("config.yml"), "w", encoding="utf-8")) as f:
214+
f.write("\n".join(self.yml_script))
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# A collection of RNA-Seq data analysis tools wrapped in CWL scripts
2+
# Copyright (C) 2019 Alessandro Pio Greco, Patrick Hedley-Miller, Filipe Jesus, Zeyu Yang
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public License
15+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
17+
#!/usr/bin/env cwl-runner
18+
19+
cwlVersion: v1.2
20+
class: ExpressionTool
21+
requirements:
22+
InlineJavascriptRequirement: {}
23+
24+
inputs:
25+
item:
26+
type:
27+
- File
28+
- Directory
29+
- type: array
30+
items:
31+
- File
32+
- Directory
33+
name: string
34+
35+
outputs:
36+
out: Directory
37+
38+
expression: "${
39+
if (inputs.item.class == 'Directory'){
40+
return {
41+
'out': {
42+
'class': 'Directory',
43+
'basename': inputs.name,
44+
'listing': [inputs.item]
45+
}
46+
}
47+
};
48+
if (inputs.item.class == 'File'){
49+
var arr = [inputs.item];
50+
}
51+
else {
52+
var arr = inputs.item;
53+
}
54+
return {
55+
'out': {
56+
'class': 'Directory',
57+
'basename': inputs.name,
58+
'listing': arr
59+
}
60+
}
61+
}"
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
cwlVersion: v1.2
2+
class: CommandLineTool
3+
requirements:
4+
InlineJavascriptRequirement: {}
5+
baseCommand: /Users/wongy/Documents/GitHub/WfFormat-To-StreamFlow-Translator/script/output/bin/wfbench.py
6+
arguments:
7+
- valueFrom: $(inputs.input_params)
8+
stdout: $(inputs.step_name + ".out")
9+
stderr: $(inputs.step_name + ".err")
10+
inputs:
11+
step_name:
12+
type: string
13+
input_params:
14+
type: string[]?
15+
input_files:
16+
type: File[]?
17+
inputBinding:
18+
position: 0
19+
output_filenames:
20+
type: string[]?
21+
outputs:
22+
out:
23+
type: stdout
24+
err:
25+
type: stderr
26+
output_files:
27+
type:
28+
type: array
29+
items:
30+
- File
31+
outputBinding:
32+
glob: $(inputs.output_filenames)

0 commit comments

Comments
 (0)