Skip to content

Commit eba93be

Browse files
committed
updating Swift/T translator
1 parent 47ae77f commit eba93be

File tree

2 files changed

+43
-43
lines changed

2 files changed

+43
-43
lines changed

docs/source/generating_workflow_benchmarks.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,11 @@ provide an example on how to generate workflow benchmark for running with Swift/
170170
benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500)
171171

172172
# generate a specification based on performance characteristics
173-
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6)
173+
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=1.0)
174174

175175
# generate a Swift/T workflow
176176
translator = SwiftTTranslator(benchmark.workflow)
177-
translator.translate(output_file_name=pathlib.Path("/tmp/benchmark-workflow.swift"))
177+
translator.translate(output_folder=pathlib.Path("./swift-t-wf/"))
178178

179179
TaskVine
180180
++++++++

wfcommons/wfbench/translator/swift_t.py

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from typing import Optional, Union
1515

1616
from .abstract_translator import Translator
17-
from ...common import FileLink, Workflow
17+
from ...common import Workflow
1818

1919

2020
class SwiftTTranslator(Translator):
@@ -23,8 +23,6 @@ class SwiftTTranslator(Translator):
2323
2424
:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
2525
:type workflow: Union[Workflow, pathlib.Path]
26-
:param work_dir: Path to the workflow working directory.
27-
:type work_dir: pathlib.Path
2826
:param stress_path: Path to the stress-ng command.
2927
:type stress_path: pathlib.Path
3028
:param logger: The logger where to log information/warning or errors (optional).
@@ -33,13 +31,11 @@ class SwiftTTranslator(Translator):
3331

3432
def __init__(self,
3533
workflow: Union[Workflow, pathlib.Path],
36-
work_dir: pathlib.Path,
3734
stress_path: pathlib.Path = pathlib.Path("stress-ng"),
3835
logger: Optional[Logger] = None) -> None:
3936
"""Create an object of the translator."""
4037
super().__init__(workflow, logger)
4138

42-
self.work_dir = work_dir
4339
self.stress_path = stress_path
4440
self.categories_list = []
4541
self.categories_input = {}
@@ -58,22 +54,21 @@ def __init__(self,
5854
self.apps.append(task.name)
5955

6056
out_count = 0
61-
for file in task.files:
62-
if file.link == FileLink.OUTPUT:
63-
self.files_map[file.file_id] = f"{task.name}__out"
64-
out_count += 1
57+
for file in task.output_files:
58+
self.files_map[file.file_id] = f"{task.name}__out"
59+
out_count += 1
6560

6661
if out_count > 1:
6762
self.logger.error(
6863
"Swift/T does not allow an application to have multiple outputs.")
6964
exit(1)
7065

71-
def translate(self, output_file_path: pathlib.Path) -> None:
66+
def translate(self, output_folder: pathlib.Path) -> None:
7267
"""
7368
Translate a workflow benchmark description (WfFormat) into a Swift/T workflow application.
7469
75-
:param output_file_path: The path of the output file (e.g., workflow.swift).
76-
:type output_file_path: pathlib.Path
70+
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
71+
:type output_folder: pathlib.Path
7772
"""
7873
self.logger.info("Translating workflow into Swift/T")
7974
self.script += "string command = \n" \
@@ -84,7 +79,7 @@ def translate(self, output_file_path: pathlib.Path) -> None:
8479
"import subprocess\n" \
8580
"import time\n" \
8681
"\n" \
87-
f"this_dir = pathlib.Path(\"{self.work_dir}\").absolute()\n" \
82+
f"this_dir = pathlib.Path(\".\").absolute()\n" \
8883
"\n" \
8984
"task_name = \"%s\"\n" \
9085
"files_list = \"%s\"\n" \
@@ -94,20 +89,20 @@ def translate(self, output_file_path: pathlib.Path) -> None:
9489
"\n" \
9590
"print(f\"[WfBench] [{task_name}] Starting IO Read Benchmark...\", flush=True)\n" \
9691
"if \"__\" not in files_list:\n" \
97-
" with open(this_dir.joinpath(files_list), \"rb\") as fp:\n" \
92+
" with open(this_dir.joinpath(f\"./data/{files_list}\"), \"rb\") as fp:\n" \
9893
" start = time.perf_counter()\n" \
9994
" print(f\"[WfBench] Reading '{files_list}'\", flush=True)\n" \
10095
" fp.readlines()\n" \
10196
" end = time.perf_counter()\n" \
102-
" print(f\"[WfBench] [{task_name}] Metrics (read) [time,size]: {end - start},{this_dir.joinpath(files_list).stat().st_size}\", flush=True)\n" \
97+
" print(f\"[WfBench] [{task_name}] Metrics (read) [time,size]: {end - start},{this_dir.joinpath(f\"./data/{files_list}\").stat().st_size}\", flush=True)\n" \
10398
"else:\n" \
10499
" files = files_list.split(\", \")\n" \
105100
" for file in files:\n" \
106101
" counter = 0\n" \
107102
" fd = file.split(\"__\")\n" \
108103
" start = time.perf_counter()\n" \
109104
" file_size = 0\n" \
110-
" for f in this_dir.glob(f\"{fd[0]}_*_output.txt\"):\n" \
105+
" for f in this_dir.glob(f\"./data/{fd[0]}_*_output.txt\"):\n" \
111106
" if counter >= int(fd[1]):\n" \
112107
" break\n" \
113108
" file_size += os.stat(f).st_size\n" \
@@ -121,7 +116,7 @@ def translate(self, output_file_path: pathlib.Path) -> None:
121116
"\n" \
122117
"if gpu_work > 0:\n" \
123118
" print(f\"[WfBench] [{task_name}] Starting GPU Benchmark...\", flush=True)\n" \
124-
" gpu_prog = [f\"CUDA_DEVICE_ORDER=PCI_BUS_ID {this_dir.joinpath('gpu-benchmark')} {gpu_work}\"]\n" \
119+
" gpu_prog = [f\"CUDA_DEVICE_ORDER=PCI_BUS_ID {this_dir.joinpath('./bin/gpu-benchmark')} {gpu_work}\"]\n" \
125120
" start = time.perf_counter()\n" \
126121
" gpu_proc = subprocess.Popen(gpu_prog, shell=True)\n" \
127122
" gpu_proc.wait()\n" \
@@ -138,7 +133,7 @@ def translate(self, output_file_path: pathlib.Path) -> None:
138133
"\n" \
139134
" cpu_procs = []\n" \
140135
" cpu_prog = [\n" \
141-
" f\"{this_dir.joinpath('cpu-benchmark')}\", f\"{cpu_work_per_thread}\"]\n" \
136+
" f\"{this_dir.joinpath('./bin/cpu-benchmark')}\", f\"{cpu_work_per_thread}\"]\n" \
142137
f" mem_prog = [\"{self.stress_path}\", \"--vm\", f\"{{mem_threads}}\",\n" \
143138
" \"--vm-bytes\", f\"{total_mem_bytes}%%\", \"--vm-keep\"]\n" \
144139
"\n" \
@@ -159,7 +154,7 @@ def translate(self, output_file_path: pathlib.Path) -> None:
159154
"\n" \
160155
"print(f\"[WfBench] [{task_name}] Writing output file\", flush=True)\n" \
161156
"start = time.perf_counter()\n" \
162-
"with open(this_dir.joinpath(\"%s\"), \"wb\") as fp:\n" \
157+
"with open(this_dir.joinpath(\"./data/%s\"), \"wb\") as fp:\n" \
163158
" file_size = int(%i)\n" \
164159
" fp.write(os.urandom(file_size))\n" \
165160
"end = time.perf_counter()\n" \
@@ -176,13 +171,12 @@ def translate(self, output_file_path: pathlib.Path) -> None:
176171

177172
for task_name in self.root_task_names:
178173
task = self.tasks[task_name]
179-
for file in task.files:
180-
if file.link == FileLink.INPUT:
181-
if task.name not in self.categories_input.keys():
182-
self.categories_input[task.name] = in_count
183-
self.script += f"root_in_files[{in_count}] = \"{file.file_id}\";\n"
184-
in_count += 1
185-
self.files_map[file.file_id] = f"ins[{in_count}]"
174+
for file in task.input_files:
175+
if task.name not in self.categories_input.keys():
176+
self.categories_input[task.name] = in_count
177+
self.script += f"root_in_files[{in_count}] = \"{file.file_id}\";\n"
178+
in_count += 1
179+
self.files_map[file.file_id] = f"ins[{in_count}]"
186180

187181
self.script += "\n"
188182

@@ -195,8 +189,14 @@ def translate(self, output_file_path: pathlib.Path) -> None:
195189
for category in self.categories_list:
196190
self._add_tasks(category)
197191

198-
# write script to file
199-
self._write_output_file(self.script, output_file_path)
192+
# write benchmark files
193+
output_folder.mkdir(parents=True)
194+
with open(output_folder.joinpath("workflow.swift"), "w") as fp:
195+
fp.write(self.script)
196+
197+
# additional files
198+
self._copy_binary_files(output_folder)
199+
self._generate_input_files(output_folder)
200200

201201
def _find_categories_list(self, task_name: str, parent_task: Optional[str] = None) -> None:
202202
""""
@@ -248,19 +248,19 @@ def _add_tasks(self, category: str) -> None:
248248
input_files = []
249249
prefix = ""
250250

251-
for file in task.files:
252-
if file.link == FileLink.OUTPUT:
253-
out_file = file.file_id
254-
file_size = file.size
255-
elif file.link == FileLink.INPUT:
256-
cat_prefix = self.files_map[file.file_id].split("__out")[0]
257-
if file.file_id not in parsed_input_files:
258-
input_files_cat.setdefault(cat_prefix, 0)
259-
input_files_cat[cat_prefix] += 1
260-
parsed_input_files.append(file.file_id)
261-
input_files.append(self.files_map[file.file_id])
262-
if not prefix:
263-
prefix = cat_prefix
251+
for file in task.output_files:
252+
out_file = file.file_id
253+
file_size = file.size
254+
255+
for file in task.input_files:
256+
cat_prefix = self.files_map[file.file_id].split("__out")[0]
257+
if file.file_id not in parsed_input_files:
258+
input_files_cat.setdefault(cat_prefix, 0)
259+
input_files_cat[cat_prefix] += 1
260+
parsed_input_files.append(file.file_id)
261+
input_files.append(self.files_map[file.file_id])
262+
if not prefix:
263+
prefix = cat_prefix
264264

265265
# arguments
266266
if num_tasks == 0:

0 commit comments

Comments
 (0)