|
14 | 14 | import sys |
15 | 15 | import json |
16 | 16 | import time |
| 17 | +import networkx |
17 | 18 |
|
18 | 19 | from tests.test_helpers import _create_fresh_local_dir |
19 | 20 | from tests.test_helpers import _remove_local_dir_if_it_exists |
20 | 21 | from tests.test_helpers import _start_docker_container |
| 22 | +from tests.test_helpers import _compare_workflows |
| 23 | + |
21 | 24 | from wfcommons import BlastRecipe |
| 25 | +from wfcommons.common import Workflow, Task |
22 | 26 | from wfcommons.wfbench import WorkflowBenchmark |
23 | 27 | from wfcommons.wfbench import DaskTranslator |
24 | 28 | from wfcommons.wfbench import ParslTranslator |
|
34 | 38 | from wfcommons.wfinstances.logs import TaskVineLogsParser |
35 | 39 |
|
36 | 40 |
|
37 | | -def _create_workflow_benchmark(): |
| 41 | +def _create_workflow_benchmark() -> (WorkflowBenchmark, int): |
38 | 42 | # Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever) |
39 | 43 | desired_num_tasks = 45 |
40 | 44 | benchmark_full_path = "/tmp/blast-benchmark-{desired_num_tasks}.json" |
@@ -85,8 +89,13 @@ def _additional_setup_swiftt(container): |
85 | 89 | # Start a redis server in the background |
86 | 90 | exit_code, output = container.exec_run( |
87 | 91 | cmd=["bash", "-c", "redis-server"], detach=True, stdout=True, stderr=True) |
88 | | - # Note that exit_code will always be None because of detach=True. So hopefully this works. |
89 | | - # TODO?: check that the vine_worker is running.... |
| 92 | + # Note that exit_code will always be None because of detach=True. |
| 93 | + |
| 94 | + # Check that the redis-server is up |
| 95 | + exit_code, output = container.exec_run( |
| 96 | + cmd=["bash", "-c", "redis-cli ping"], stdout=True, stderr=True) |
| 97 | + if output.decode().strip() != 'PONG': |
| 98 | + raise Exception("Failed to start redis-server...") |
90 | 99 |
|
91 | 100 | additional_setup_methods = { |
92 | 101 | "dask": noop, |
@@ -242,6 +251,7 @@ def test_translator(self, backend) -> None: |
242 | 251 | # Create workflow benchmark |
243 | 252 | benchmark, num_tasks = _create_workflow_benchmark() |
244 | 253 |
|
| 254 | + |
245 | 255 | # Create a local translation directory |
246 | 256 | str_dirpath = "/tmp/" + backend + "_translated_workflow/" |
247 | 257 | dirpath = pathlib.Path(str_dirpath) |
@@ -270,13 +280,16 @@ def test_translator(self, backend) -> None: |
270 | 280 | if backend == "pegasus": |
271 | 281 | parser = PegasusLogsParser(dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001/") |
272 | 282 | elif backend == "taskvine": |
273 | | - parser = TaskVineLogsParser(dirpath / "vine-run-info/", filenames_to_ignore=["cpu-benchmark","stress-ng"]) |
| 283 | + parser = TaskVineLogsParser(dirpath / "vine-run-info/", filenames_to_ignore=["cpu-benchmark","stress-ng", "wfbench"]) |
274 | 284 | else: |
275 | 285 | parser = None |
276 | 286 |
|
277 | 287 | if parser: |
278 | 288 | sys.stderr.write("\nParsing the logs...\n") |
279 | | - workflow = parser.build_workflow("reconstructed_workflow") |
280 | | - # TODO: test more stuff |
281 | | - workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json")) |
282 | | - assert(num_tasks == len(workflow.tasks)) |
| 289 | + reconstructed_workflow : Workflow = parser.build_workflow("reconstructed_workflow") |
| 290 | + reconstructed_workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json")) |
| 291 | + |
| 292 | + original_workflow : Workflow = benchmark.workflow |
| 293 | + |
| 294 | + _compare_workflows(original_workflow, reconstructed_workflow) |
| 295 | + |
0 commit comments