Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ def _make_tarfile_of_wfcommons():
tar_stream.seek(0)
return tar_stream


def _install_WfCommons_on_container(container):
# sys.stderr.write("Installing WfCommons on the container...\n")
# Copy the WfCommons code to it (removing stuff that should be removed)
target_path = '/tmp/' # inside container
tar_data = _make_tarfile_of_wfcommons()
container.put_archive(target_path, tar_data)
# Cleanup files from the host
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True)
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True,
Expand All @@ -44,6 +46,8 @@ def _install_WfCommons_on_container(container):
# Install WfCommons on the container (to install wfbench and cpu-benchmark really)
exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages",
workdir="/tmp/WfCommons", stdout=True, stderr=True)
if exit_code != 0:
raise RuntimeError("Failed to install WfCommons on the container");

def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=None):
if command is None:
Expand Down Expand Up @@ -75,10 +79,17 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=

# Copy over the wfbench and cpu-benchmark executables to where they should go on the container
if bin_dir:
sys.stderr.write("Copying wfbench and cpu-benchmark...\n")
exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir],
stdout=True, stderr=True)
if exit_code != 0:
raise RuntimeError("Failed to copy wfbench script to the bin directory")
exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir],
stdout=True, stderr=True)
if exit_code != 0:
raise RuntimeError("Failed to copy cpu-benchmark executable to the bin directory")
else:
sys.stderr.write("Not Copying wfbench and cpu-benchmark...\n")

return container

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from wfcommons.wfbench import SwiftTTranslator

from wfcommons.wfinstances import PegasusLogsParser
from wfcommons.wfinstances.logs import TaskVineLogsParser


def _create_workflow_benchmark():
# Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever)
Expand All @@ -56,25 +58,35 @@ def _additional_setup_taskvine(container):
exit_code, output = container.exec_run(cmd=["bash", "-c",
"source ~/conda/etc/profile.d/conda.sh && conda activate && poncho_package_create taskvine_poncho.json taskvine_poncho.tar.gz"],
stdout=True, stderr=True)
if exit_code != 0:
raise Exception("Failed to setup TaskVine: cannot create poncho package")
# Start a vine worker in the background
exit_code, output = container.exec_run(
cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && vine_worker localhost 9123"],
detach=True, stdout=True, stderr=True)
# Note that exit_code will always be None because of detach=True. So hopefully this works.
# TODO?: check that the vine_worker is running (so as to abort early)

def _additional_setup_pegasus(container):
# Start Condor
exit_code, output = container.exec_run(cmd=["bash", "-c",
"bash /home/wfcommons/start_condor.sh"],
stdout=True, stderr=True)
if exit_code != 0:
raise Exception("Failed to setup Pegasus: cannot start HTCondor")
# Run pegasus script
exit_code, output = container.exec_run(cmd=["bash", "-c",
"python3 ./pegasus_workflow.py"],
stdout=True, stderr=True)
if exit_code != 0:
raise Exception("Failed to setup Pegasus: error while running the pegasus_workflow.py script")

def _additional_setup_swiftt(container):
# Start a redis server in the background
exit_code, output = container.exec_run(
cmd=["bash", "-c", "redis-server"], detach=True, stdout=True, stderr=True)
# Note that exit_code will always be None because of detach=True. So hopefully this works.
# TODO?: check that the vine_worker is running....

additional_setup_methods = {
"dask": noop,
Expand Down Expand Up @@ -209,25 +221,22 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
"swiftt": SwiftTTranslator,
}

logs_parser_classes = {
"pegasus": PegasusLogsParser,
}


class TestTranslators:

@pytest.mark.parametrize(
"backend",
[
"dask",
"parsl",
"nextflow",
"airflow",
"dask",
"parsl",
"nextflow",
"airflow",
"bash",
"bash",
"taskvine",
"cwl",
"pegasus",
"swiftt",
"taskvine",
"cwl",
"pegasus",
"swiftt",
])
@pytest.mark.unit
# @pytest.mark.skip(reason="tmp")
Expand Down Expand Up @@ -260,10 +269,16 @@ def test_translator(self, backend) -> None:
sys.stderr.write("Workflow ran in %.2f seconds\n" % (time.time() - start_time))

# Run the log parser if any
if backend in logs_parser_classes:
if backend == "pegasus":
parser = PegasusLogsParser(dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001/")
# elif backend == "taskvine":
# parser = TaskVineLogsParser(dirpath / "vine-run-info/", filenames_to_ignore=["cpu-benchmark","stress-ng"])
else:
parser = None

if parser:
sys.stderr.write("\nParsing the logs...\n")
parser = logs_parser_classes[backend](submit_dir=dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001")
workflow = parser.build_workflow("reconstructed_workflow")
# TODO: test more stuff
workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json"))
assert(num_tasks == len(workflow.tasks))
pass
1 change: 1 addition & 0 deletions wfcommons/wfinstances/logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# (at your option) any later version.

from .makeflow import MakeflowLogsParser
from .taskvine import TaskVineLogsParser
from .nextflow import NextflowLogsParser
from .pegasus import PegasusLogsParser
from .pegasusrec import HierarchicalPegasusLogsParser
Loading