From 22408e9b3986cbe7ac291f43a450c0fd0576de1c Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 15 Aug 2025 15:59:45 -1000 Subject: [PATCH 01/10] Fixed bugs in dot-file handling in the Workflow class Added testing --- .github/workflows/build.yml | 2 ++ tests/unit/common/test_workflow.py | 30 +++++++++++++++++++ wfcommons/common/workflow.py | 9 +++++- wfcommons/wfchef/recipes/blast/recipe.py | 2 +- wfcommons/wfchef/recipes/bwa/recipe.py | 2 +- wfcommons/wfchef/recipes/cycles/recipe.py | 2 +- .../wfchef/recipes/epigenomics/recipe.py | 2 +- wfcommons/wfchef/recipes/genome/recipe.py | 2 +- wfcommons/wfchef/recipes/montage/recipe.py | 2 +- wfcommons/wfchef/recipes/rnaseq/recipe.py | 2 +- wfcommons/wfchef/recipes/soykb/recipe.py | 2 +- wfcommons/wfchef/recipes/srasearch/recipe.py | 2 +- wfcommons/wfchef/skeletons/recipe.py | 2 +- wfcommons/wfchef/wfchef_abstract_recipe.py | 2 +- 14 files changed, 51 insertions(+), 12 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2c8c4781..383933ba 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -27,7 +27,9 @@ jobs: run: | sudo apt-get update sudo apt-get install stress-ng + sudo apt-get install graphviz libgraphviz-dev pip install docker + pip install graphviz - name: Check package install run: | diff --git a/tests/unit/common/test_workflow.py b/tests/unit/common/test_workflow.py index 257acd86..5749db32 100644 --- a/tests/unit/common/test_workflow.py +++ b/tests/unit/common/test_workflow.py @@ -140,7 +140,37 @@ def test_workflow_json_generation(self): # Compare the two jsons! assert(original_json == written_json) + @pytest.mark.unit + def test_workflow_dot_file(self): + + # Put a JSON file in /tmp + url = "https://raw.githubusercontent.com/wfcommons/WfInstances/refs/heads/main/makeflow/blast/blast-chameleon-small-001.json" + response = requests.get(url) + local_file_name = url.split("/")[-1] + with open("/tmp/" + local_file_name, 'wb') as f: + f.write(response.content) + + # Create an instance from the JSON File and write it back to a JSON + instance = Instance(pathlib.Path("/tmp") / local_file_name) + # Capture some metrics + num_tasks = len(instance.workflow.tasks) + num_dependencies = len(instance.workflow.edges) + + # # Create a dot file + dot_path = pathlib.Path("/tmp/written_workflow.dot") + instance.workflow.write_dot(dot_path) + assert dot_path.exists() + with open(str(dot_path), "r", encoding="utf-8") as f: + content = f.read() + assert(num_tasks == content.count("label") - 1) # Extra "label" in file for \N + assert(num_dependencies == content.count("->")) # Extra "label" in file for \N + + # Read it back + instance.workflow.read_dot(dot_path) + assert(num_tasks == len(instance.workflow.tasks)) + assert(num_tasks == len(instance.workflow.nodes)) + assert(num_dependencies == len(instance.workflow.edges)) diff --git a/wfcommons/common/workflow.py b/wfcommons/common/workflow.py index c0d3a39e..3a0af603 100644 --- a/wfcommons/common/workflow.py +++ b/wfcommons/common/workflow.py @@ -236,7 +236,14 @@ def read_dot(self, dot_file_path: Optional[pathlib.Path] = None) -> None: raise ModuleNotFoundError( f"\'pydot\' package not found: call to {type(self).__name__}.read_dot() failed.") - graph = nx.drawing.nx_pydot.read_dot(dot_file_path) + # graph = nx.drawing.nx_pydot.read_dot(str(dot_file_path)) + graph = nx.nx_agraph.read_dot(str(dot_file_path)) + + # clear everything + self.tasks.clear() + self.tasks_parents.clear() + self.tasks_children.clear() + self.clear() tasks_map = {} for node in graph.nodes(data=True): diff --git a/wfcommons/wfchef/recipes/blast/recipe.py b/wfcommons/wfchef/recipes/blast/recipe.py index 6fd4f34b..db90c3f0 100644 --- a/wfcommons/wfchef/recipes/blast/recipe.py +++ b/wfcommons/wfchef/recipes/blast/recipe.py @@ -39,7 +39,7 @@ class BlastRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/recipes/bwa/recipe.py b/wfcommons/wfchef/recipes/bwa/recipe.py index 5ba0c47c..26c86dc7 100644 --- a/wfcommons/wfchef/recipes/bwa/recipe.py +++ b/wfcommons/wfchef/recipes/bwa/recipe.py @@ -39,7 +39,7 @@ class BwaRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/recipes/cycles/recipe.py b/wfcommons/wfchef/recipes/cycles/recipe.py index 018cd501..04cb3b89 100644 --- a/wfcommons/wfchef/recipes/cycles/recipe.py +++ b/wfcommons/wfchef/recipes/cycles/recipe.py @@ -39,7 +39,7 @@ class CyclesRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/recipes/epigenomics/recipe.py b/wfcommons/wfchef/recipes/epigenomics/recipe.py index c5e06656..44ec965d 100644 --- a/wfcommons/wfchef/recipes/epigenomics/recipe.py +++ b/wfcommons/wfchef/recipes/epigenomics/recipe.py @@ -39,7 +39,7 @@ class EpigenomicsRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/recipes/genome/recipe.py b/wfcommons/wfchef/recipes/genome/recipe.py index 4dd106f1..223a69ce 100644 --- a/wfcommons/wfchef/recipes/genome/recipe.py +++ b/wfcommons/wfchef/recipes/genome/recipe.py @@ -39,7 +39,7 @@ class GenomeRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/recipes/montage/recipe.py b/wfcommons/wfchef/recipes/montage/recipe.py index c32d91b1..41394eec 100644 --- a/wfcommons/wfchef/recipes/montage/recipe.py +++ b/wfcommons/wfchef/recipes/montage/recipe.py @@ -39,7 +39,7 @@ class MontageRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/recipes/rnaseq/recipe.py b/wfcommons/wfchef/recipes/rnaseq/recipe.py index 3fab75ec..abda061e 100644 --- a/wfcommons/wfchef/recipes/rnaseq/recipe.py +++ b/wfcommons/wfchef/recipes/rnaseq/recipe.py @@ -39,7 +39,7 @@ class RnaseqRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/recipes/soykb/recipe.py b/wfcommons/wfchef/recipes/soykb/recipe.py index eedd5f4e..0c19587a 100644 --- a/wfcommons/wfchef/recipes/soykb/recipe.py +++ b/wfcommons/wfchef/recipes/soykb/recipe.py @@ -39,7 +39,7 @@ class SoykbRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/recipes/srasearch/recipe.py b/wfcommons/wfchef/recipes/srasearch/recipe.py index c1ec4b10..a51c181d 100644 --- a/wfcommons/wfchef/recipes/srasearch/recipe.py +++ b/wfcommons/wfchef/recipes/srasearch/recipe.py @@ -39,7 +39,7 @@ class SrasearchRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/skeletons/recipe.py b/wfcommons/wfchef/skeletons/recipe.py index e3789550..43cdf0ec 100644 --- a/wfcommons/wfchef/skeletons/recipe.py +++ b/wfcommons/wfchef/skeletons/recipe.py @@ -39,7 +39,7 @@ class SkeletonRecipe(WfChefWorkflowRecipe): def __init__(self, data_footprint: Optional[int] = 0, num_tasks: Optional[int] = 3, - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, diff --git a/wfcommons/wfchef/wfchef_abstract_recipe.py b/wfcommons/wfchef/wfchef_abstract_recipe.py index 3927b749..c1fcb3e0 100644 --- a/wfcommons/wfchef/wfchef_abstract_recipe.py +++ b/wfcommons/wfchef/wfchef_abstract_recipe.py @@ -58,7 +58,7 @@ class WfChefWorkflowRecipe(WorkflowRecipe): def __init__(self, name: str, data_footprint: Optional[int], num_tasks: Optional[int], - exclude_graphs: Set[str] = None, + exclude_graphs: Set[str]|None = None, runtime_factor: Optional[float] = 1.0, input_file_size_factor: Optional[float] = 1.0, output_file_size_factor: Optional[float] = 1.0, From 1733ada99ecc7ecfe9d2111abe6196a277fb424b Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 15 Aug 2025 16:17:30 -1000 Subject: [PATCH 02/10] workflow bug-- --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 383933ba..051d9207 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -29,7 +29,7 @@ jobs: sudo apt-get install stress-ng sudo apt-get install graphviz libgraphviz-dev pip install docker - pip install graphviz + pip install pygraphviz - name: Check package install run: | From babeba0cc604d42ac3cfa28eae6adadf71e25947 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 15 Aug 2025 16:30:51 -1000 Subject: [PATCH 03/10] workflow fix --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 051d9207..9f76b21b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -30,6 +30,7 @@ jobs: sudo apt-get install graphviz libgraphviz-dev pip install docker pip install pygraphviz + pip install pydot - name: Check package install run: | From 6a3a79981c91014e8a27ebff256722c4a845ced1 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 20 Aug 2025 16:44:39 -1000 Subject: [PATCH 04/10] Added (not validate) swift/t Dockerfile --- tests/translators/Dockerfile.swiftt | 60 +++++++++++++++++ tests/translators/test_translators.py | 92 +++++++-------------------- 2 files changed, 83 insertions(+), 69 deletions(-) create mode 100644 tests/translators/Dockerfile.swiftt diff --git a/tests/translators/Dockerfile.swiftt b/tests/translators/Dockerfile.swiftt new file mode 100644 index 00000000..d840f726 --- /dev/null +++ b/tests/translators/Dockerfile.swiftt @@ -0,0 +1,60 @@ +# docker build --platform amd64 -t wfcommons-dev-swiftt -f Dockerfile.swiftt . +# docker run -it --rm -v `pwd`:/home/wfcommons wfcommons-dev-swiftt /bin/bash + +FROM amd64/ubuntu:noble + +LABEL org.containers.image.authors="henric@hawaii.edu" + +# update repositories +RUN apt-get update + +# set timezone +RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata + +# install useful stuff +RUN apt-get -y install pkg-config +RUN apt-get -y install git +RUN apt-get -y install wget +RUN apt-get -y install make +RUN apt-get -y install cmake +RUN apt-get -y install cmake-data +RUN apt-get -y install sudo +RUN apt-get -y install vim --fix-missing +RUN apt-get -y install gcc +RUN apt-get -y install gcc-multilib + +# Python stuff +RUN apt-get -y install python3 python3-pip +RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 +RUN python3 -m pip install --break-system-packages pathos pandas filelock +RUN python3 -m pip install --break-system-packages networkx scipy matplotlib +RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests +RUN python3 -m pip install --break-system-packages --upgrade setuptools + +# Stress-ng +RUN apt-get -y install stress-ng + +# Install Swift-t +RUN wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh +RUN sh ./Miniconda3-latest-Linux-x86_64.sh -b -p /opt/conda +ENV PATH="/opt/conda/bin:$PATH" +RUN which conda +ENV CONDA_ALWAYS_YES=true +RUN conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main && \ + conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r && \ + conda create -n swiftt-env python=3.11 -y +RUN conda run -n swiftt-env python --version +RUN conda run -n swiftt-env pip install flowcept +RUN conda run -n swiftt-env conda install -y -c conda-forge gcc zsh zlib pathos +RUN conda run -n swiftt-env conda install -y -c swift-t swift-t + + +# Add wfcommons user +RUN useradd -ms /bin/bash wfcommons +RUN adduser wfcommons sudo +RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers +ENV PATH="$PATH:/opt/conda/bin/:/home/wfcommons/.local/bin/" + +USER wfcommons +WORKDIR /home/wfcommons + diff --git a/tests/translators/test_translators.py b/tests/translators/test_translators.py index 895d51d1..ee4ef649 100644 --- a/tests/translators/test_translators.py +++ b/tests/translators/test_translators.py @@ -28,70 +28,10 @@ from wfcommons.wfbench import TaskVineTranslator from wfcommons.wfbench import CWLTranslator from wfcommons.wfbench import PegasusTranslator +from wfcommons.wfbench import SwiftTTranslator from wfcommons.wfinstances import PegasusLogsParser -# def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=None): -# if command is None: -# command = ["sleep", "infinity"] -# # Pulling the Docker image -# client = docker.from_env() -# image_name = f"wfcommons/wfcommons-testing-{backend}" -# -# try: -# image = client.images.get(image_name) -# sys.stderr.write(f"Image '{image_name}' is available locally\n") -# except ImageNotFound: -# sys.stderr.write(f"Pulling image '{image_name}'...\n") -# client.images.pull(image_name) -# -# # Launch the docker container to actually run the translated workflow -# sys.stderr.write("Starting Docker container...\n") -# container = client.containers.run( -# image=image_name, -# command=command, -# volumes={mounted_dir: {'bind': mounted_dir, 'mode': 'rw'}}, -# working_dir=working_dir, -# tty=True, -# detach=True -# ) -# -# # Installing WfCommons on container -# _install_WfCommons_on_container(container) -# -# # Copy over the wfbench and cpu-benchmark executables to where they should go on the container -# if bin_dir: -# exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir], -# stdout=True, stderr=True) -# exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir], -# stdout=True, stderr=True) -# -# return container - -# def _make_tarfile_of_wfcommons(): -# source_dir = os.getcwd() # This assumes the testing is run from the root -# tar_stream = io.BytesIO() -# with tarfile.open(fileobj=tar_stream, mode='w') as tar: -# tar.add(source_dir, arcname=os.path.basename(source_dir)) -# tar_stream.seek(0) -# return tar_stream -# -# def _install_WfCommons_on_container(container): -# # sys.stderr.write("Installing WfCommons on the container...\n") -# # Copy the WfCommons code to it (removing stuff that should be removed) -# target_path = '/tmp/' # inside container -# tar_data = _make_tarfile_of_wfcommons() -# container.put_archive(target_path, tar_data) -# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True) -# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True) -# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True, -# stderr=True) -# exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", stdout=True, -# stderr=True) -# -# # Install WfCommons on the container (to install wfbench and cpu-benchmark really) -# exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages", -# workdir="/tmp/WfCommons", stdout=True, stderr=True) def _create_workflow_benchmark(): # Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever) @@ -142,6 +82,7 @@ def _additional_setup_pegasus(container): "taskvine": _additional_setup_taskvine, "cwl": noop, "pegasus": _additional_setup_pegasus, + "swiftt": noop, } ############################################################################# @@ -229,6 +170,16 @@ def run_workflow_pegasus(container, num_tasks, str_dirpath): assert(exit_code == 0) assert("success" in output.decode()) +def run_workflow_swiftt(container, num_tasks, str_dirpath): + # Run the workflow! + # exit_code, output = container.exec_run(cmd="bash /home/wfcommons/run_workflow.sh", stdout=True, stderr=True) + # # Kill the container + # container.remove(force=True) + # # Check sanity + # assert(exit_code == 0) + # assert("success" in output.decode()) + pass + run_workflow_methods = { "dask": run_workflow_dask, "parsl": run_workflow_parsl, @@ -238,6 +189,7 @@ def run_workflow_pegasus(container, num_tasks, str_dirpath): "taskvine": run_workflow_taskvine, "cwl": run_workflow_cwl, "pegasus": run_workflow_pegasus, + "swiftt": run_workflow_swiftt, } translator_classes = { @@ -249,6 +201,7 @@ def run_workflow_pegasus(container, num_tasks, str_dirpath): "taskvine": TaskVineTranslator, "cwl": CWLTranslator, "pegasus": PegasusTranslator, + "swiftt": SwiftTTranslator, } logs_parser_classes = { @@ -261,14 +214,15 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - "dask", - "parsl", - "nextflow", - "airflow", - "bash", - "taskvine", - "cwl", - "pegasus", + # "dask", + # "parsl", + # "nextflow", + # "airflow", + # "bash", + # "taskvine", + # "cwl", + # "pegasus", + "swiftt", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") From eea6909b53c180e580e0a80f15626d795c012212 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 20 Aug 2025 16:44:58 -1000 Subject: [PATCH 05/10] Added totally non-working PyCompss dockerfile --- tests/translators/Dockerfile.pycompss | 54 +++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 tests/translators/Dockerfile.pycompss diff --git a/tests/translators/Dockerfile.pycompss b/tests/translators/Dockerfile.pycompss new file mode 100644 index 00000000..c6b480a7 --- /dev/null +++ b/tests/translators/Dockerfile.pycompss @@ -0,0 +1,54 @@ +# docker build --platform amd64 -t wfcommons-dev-dask -f Dockerfile.dask . +# docker run -it --rm -v `pwd`:/home/wfcommons wfcommons-dev-dask /bin/bash + +FROM compss/pycompss + +LABEL org.containers.image.authors="henric@hawaii.edu" + +# update repositories +RUN apt-get update + +# set timezone +RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata + +# install useful stuff +RUN apt-get -y install pkg-config +RUN apt-get -y install git +RUN apt-get -y install wget +RUN apt-get -y install make +RUN apt-get -y install cmake +RUN apt-get -y install cmake-data +RUN apt-get -y install sudo +RUN apt-get -y install vim --fix-missing +RUN apt-get -y install gcc +RUN apt-get -y install gcc-multilib + +# Python stuff +RUN apt-get -y install python3 python3-pip +RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 +RUN python3 -m pip install pathos pandas filelock +RUN python3 -m pip install networkx scipy matplotlib +RUN python3 -m pip install pyyaml jsonschema requests +#RUN python3 -m pip install --upgrade setuptools +#RUN python3 -m pip install --upgrade wheels + +# Stress-ng +RUN apt-get -y install stress-ng + +# Install PyCompss and dependencies +RUN apt-get install -y libtool autotools-dev automake autoconf +RUN apt-get install -y gfortran +RUN apt-get install -y libboost-dev +RUN apt-get install -y libxml2-dev +RUN python3 -m pip install pycompss +RUN python3 -m pip install pycompss-cli + +# Add wfcommons user +RUN useradd -ms /bin/bash wfcommons +RUN adduser wfcommons sudo +RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers +ENV PATH="$PATH:/home/wfcommons/.local/bin/" + +USER wfcommons +WORKDIR /home/wfcommons + From 385620c9370e68934b4d89a03c8d750e4faa2f9e Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 20 Aug 2025 17:14:41 -1000 Subject: [PATCH 06/10] Added working file for swift/t translator Disabling swift/t testing as workflow execution fails --- tests/translators/Dockerfile.swiftt | 19 +++++++++++++------ tests/translators/test_translators.py | 18 +++++++++--------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/tests/translators/Dockerfile.swiftt b/tests/translators/Dockerfile.swiftt index d840f726..90488f7c 100644 --- a/tests/translators/Dockerfile.swiftt +++ b/tests/translators/Dockerfile.swiftt @@ -34,6 +34,16 @@ RUN python3 -m pip install --break-system-packages --upgrade setuptools # Stress-ng RUN apt-get -y install stress-ng +# REDIS +RUN apt-get -y install redis + + +# Add wfcommons user +RUN useradd -ms /bin/bash wfcommons +RUN adduser wfcommons sudo +RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers +ENV PATH="$PATH:/opt/conda/bin/:/home/wfcommons/.local/bin/" + # Install Swift-t RUN wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh RUN sh ./Miniconda3-latest-Linux-x86_64.sh -b -p /opt/conda @@ -47,14 +57,11 @@ RUN conda run -n swiftt-env python --version RUN conda run -n swiftt-env pip install flowcept RUN conda run -n swiftt-env conda install -y -c conda-forge gcc zsh zlib pathos RUN conda run -n swiftt-env conda install -y -c swift-t swift-t +ENV CONDA_PREFIX=/opt/conda/envs/swiftt-env +ENV CONDA_EXE=/opt/conda/bin/conda +ENV PATH="/opt/conda/envs/swiftt-env/bin:$PATH" -# Add wfcommons user -RUN useradd -ms /bin/bash wfcommons -RUN adduser wfcommons sudo -RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers -ENV PATH="$PATH:/opt/conda/bin/:/home/wfcommons/.local/bin/" - USER wfcommons WORKDIR /home/wfcommons diff --git a/tests/translators/test_translators.py b/tests/translators/test_translators.py index ee4ef649..13442cd9 100644 --- a/tests/translators/test_translators.py +++ b/tests/translators/test_translators.py @@ -214,15 +214,15 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - # "dask", - # "parsl", - # "nextflow", - # "airflow", - # "bash", - # "taskvine", - # "cwl", - # "pegasus", - "swiftt", + "dask", + "parsl", + "nextflow", + "airflow", + "bash", + "taskvine", + "cwl", + "pegasus", + # "swiftt", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") From 3f50cb64612bcf5ea31932dcc56205f4a4697b95 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 5 Sep 2025 13:13:33 -1000 Subject: [PATCH 07/10] Fixed Python "bugs" (version-caused, likely) in the Swift-t translator Added testing for Swift-t Some code cleanup --- tests/translators/Dockerfile.swiftt | 1 + .../translators/build_docker_docker_images.sh | 2 +- tests/translators/test_translators.py | 25 +++++++----- .../templates/swift_t/workflow.swift | 38 +++++++++---------- 4 files changed, 36 insertions(+), 30 deletions(-) diff --git a/tests/translators/Dockerfile.swiftt b/tests/translators/Dockerfile.swiftt index 90488f7c..75452c65 100644 --- a/tests/translators/Dockerfile.swiftt +++ b/tests/translators/Dockerfile.swiftt @@ -55,6 +55,7 @@ RUN conda tos accept --override-channels --channel https://repo.anaconda.com/pkg conda create -n swiftt-env python=3.11 -y RUN conda run -n swiftt-env python --version RUN conda run -n swiftt-env pip install flowcept +RUN conda run -n swiftt-env pip install py-cpuinfo psutil redis RUN conda run -n swiftt-env conda install -y -c conda-forge gcc zsh zlib pathos RUN conda run -n swiftt-env conda install -y -c swift-t swift-t ENV CONDA_PREFIX=/opt/conda/envs/swiftt-env diff --git a/tests/translators/build_docker_docker_images.sh b/tests/translators/build_docker_docker_images.sh index 5bf29fe6..07e7fa46 100755 --- a/tests/translators/build_docker_docker_images.sh +++ b/tests/translators/build_docker_docker_images.sh @@ -2,7 +2,7 @@ set -e -for backend in "dask" "parsl" "nextflow" "airflow" "bash" "taskvine" "cwl" "pegasus"; do +for backend in "dask" "parsl" "nextflow" "airflow" "bash" "taskvine" "cwl" "pegasus" "swiftt"; do echo "Building $backend Docker image..." docker build --platform linux/amd64 -t wfcommons/wfcommons-testing-$backend -f Dockerfile.$backend . done diff --git a/tests/translators/test_translators.py b/tests/translators/test_translators.py index 13442cd9..3138e1de 100644 --- a/tests/translators/test_translators.py +++ b/tests/translators/test_translators.py @@ -32,14 +32,13 @@ from wfcommons.wfinstances import PegasusLogsParser - def _create_workflow_benchmark(): # Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever) desired_num_tasks = 45 benchmark_full_path = "/tmp/blast-benchmark-{desired_num_tasks}.json" shutil.rmtree(benchmark_full_path, ignore_errors=True) benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=desired_num_tasks) - benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=1, data=1, percent_cpu=0.6) + benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=10, data=10, percent_cpu=0.6) with open(f"/tmp/blast-benchmark-{desired_num_tasks}.json", "r") as f: generated_json = json.load(f) num_tasks = len(generated_json["workflow"]["specification"]["tasks"]) @@ -72,6 +71,10 @@ def _additional_setup_pegasus(container): "python3 ./pegasus_workflow.py"], stdout=True, stderr=True) +def _additional_setup_swiftt(container): + # Start a redis server in the background + exit_code, output = container.exec_run( + cmd=["bash", "-c", "redis-server"], detach=True, stdout=True, stderr=True) additional_setup_methods = { "dask": noop, @@ -82,7 +85,7 @@ def _additional_setup_pegasus(container): "taskvine": _additional_setup_taskvine, "cwl": noop, "pegasus": _additional_setup_pegasus, - "swiftt": noop, + "swiftt": _additional_setup_swiftt, } ############################################################################# @@ -126,6 +129,7 @@ def run_workflow_airflow(container, num_tasks, str_dirpath): stderr=True) # Kill the container container.remove(force=True) + # Check sanity assert (exit_code == 0) assert (output.decode().count("completed") == num_tasks * 2) @@ -172,12 +176,13 @@ def run_workflow_pegasus(container, num_tasks, str_dirpath): def run_workflow_swiftt(container, num_tasks, str_dirpath): # Run the workflow! - # exit_code, output = container.exec_run(cmd="bash /home/wfcommons/run_workflow.sh", stdout=True, stderr=True) - # # Kill the container - # container.remove(force=True) - # # Check sanity - # assert(exit_code == 0) - # assert("success" in output.decode()) + exit_code, output = container.exec_run(cmd="swift-t workflow.swift", stdout=True, stderr=True) + # Kill the container + container.remove(force=True) + # sys.stderr.write(output.decode()) + # Check sanity + assert(exit_code == 0) + assert (output.decode().count("completed!") == num_tasks) pass run_workflow_methods = { @@ -222,7 +227,7 @@ class TestTranslators: "taskvine", "cwl", "pegasus", - # "swiftt", + "swiftt", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") diff --git a/wfcommons/wfbench/translator/templates/swift_t/workflow.swift b/wfcommons/wfbench/translator/templates/swift_t/workflow.swift index 01a01dbe..6097cbc3 100644 --- a/wfcommons/wfbench/translator/templates/swift_t/workflow.swift +++ b/wfcommons/wfbench/translator/templates/swift_t/workflow.swift @@ -29,7 +29,7 @@ workflow_id = "%s" workflow_name = "%s" out_files = [%s] -logging.info("Flowcept Starting") +__import__("logging").info("Flowcept Starting") flowcept_agent = Flowcept(workflow_id=workflow_id, workflow_name=workflow_name, bundle_exec_id=workflow_id, start_persistence=False, save_workflow=True) try: @@ -56,7 +56,7 @@ except Exception: import traceback traceback.print_exc() -logging.info("Flowcept Completed") +__import__("logging").info("Flowcept Completed") """; string command = @@ -70,7 +70,7 @@ import subprocess import time from pathos.helpers import mp as multiprocessing -logging.basicConfig( +__import__("logging").basicConfig( level=logging.INFO, format="[WfBench][%%(asctime)s][%%(levelname)s] %%(message)s", datefmt="%%H:%%M:%%S", @@ -90,7 +90,7 @@ workflow_id = "%s" task_id = f"{workflow_id}_{task_name}" if 'workflow_id': - logging.info("Running with Flowcept.") + __import__("logging").info("Running with Flowcept.") from flowcept import Flowcept, FlowceptTask fc = Flowcept(workflow_id=workflow_id, bundle_exec_id=workflow_id, @@ -104,13 +104,13 @@ if 'workflow_id': 'gpu-work': gpu_work }) -logging.info(f"Starting {task_name} Benchmark on {socket.gethostname()}") +__import__("logging").info(f"Starting {task_name} Benchmark on {socket.gethostname()}") procs = [] cpu_queue = multiprocessing.Queue() -logging.debug(f"Working directory: {os.getcwd()}") +__import__("logging").debug(f"Working directory: {os.getcwd()}") -logging.debug("Starting IO benchmark...") +__import__("logging").debug("Starting IO benchmark...") io_proc = None termination_event = multiprocessing.Event() @@ -153,14 +153,14 @@ io_proc = multiprocessing.Process( name: max(0, int(size * (cpu_percent / 100) - bytes_written[name])) for name, size in outputs.items() }, - logging.debug("Starting IO Read Benchmark..."), + __import__("logging").debug("Starting IO Read Benchmark..."), in_file := list(bytes_to_read.keys())[0], in_size := list(bytes_to_read.values())[0], open(in_file, "rb").read(int(in_size)), - logging.debug("Completed IO Read Benchmark!"), - out_file := list(output_data.keys())[0], - out_size := list(output_data.values())[0], - logging.debug(f"Writing output file '{out_file}'"), + __import__("logging").debug("Completed IO Read Benchmark!"), + out_file := list(outputs.keys())[0], + out_size := list(outputs.values())[0], + __import__("logging").debug(f"Writing output file '{out_file}'"), open(out_file, "ab").write(__import__("os").urandom(int(out_size))), bytes_read.update({ name: bytes_read[name] + bytes_to_read[name] @@ -171,8 +171,8 @@ io_proc = multiprocessing.Process( for name in bytes_to_write }), - logging.debug(f"Bytes Read: {bytes_read}"), - logging.debug(f"Bytes Written: {bytes_written}"), + __import__("logging").debug(f"Bytes Read: {bytes_read}"), + __import__("logging").debug(f"Bytes Written: {bytes_written}"), io_completed := cpu_percent, ) if cpu_percent is not None else time.sleep(0.1), not (should_exit or io_completed >= 100) @@ -180,14 +180,14 @@ io_proc = multiprocessing.Process( for _ in range(1000000) if not (io_completed >= 100 or termination_event.is_set()) ], - logging.info("IO benchmark completed") + __import__("logging").info("IO benchmark completed") ) ) io_proc.start() procs.append(io_proc) if cpu_work > 0: - logging.info(f"Starting CPU and Memory Benchmarks for {task_name}...") + __import__("logging").info(f"Starting CPU and Memory Benchmarks for {task_name}...") mem_threads = 10 - cpu_threads cpu_work_per_thread = int(cpu_work / cpu_threads) @@ -226,12 +226,12 @@ if cpu_work > 0: try: os.kill(mem_proc.pid, signal.SIGKILL) except subprocess.TimeoutExpired: - logging.debug("Memory process did not terminate; force-killing.") + __import__("logging").debug("Memory process did not terminate; force-killing.") subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() - logging.debug("Completed CPU and Memory Benchmarks!") + __import__("logging").info("Completed CPU and Memory Benchmarks!") -logging.info(f"Benchmark {task_name} completed!") +__import__("logging").info(f"Benchmark {task_name} completed!") if 'workflow_id': fc_task.end() From 5a9dd7717a742c779bebbd4d6965568d6ff0e16b Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 5 Sep 2025 14:08:39 -1000 Subject: [PATCH 08/10] Disaling tests temporarily --- tests/translators/test_translators.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/translators/test_translators.py b/tests/translators/test_translators.py index 3138e1de..1f8bd7f5 100644 --- a/tests/translators/test_translators.py +++ b/tests/translators/test_translators.py @@ -219,14 +219,14 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - "dask", - "parsl", - "nextflow", - "airflow", - "bash", - "taskvine", - "cwl", - "pegasus", + # "dask", + # "parsl", + # "nextflow", + # "airflow", + # "bash", + # "taskvine", + # "cwl", + # "pegasus", "swiftt", ]) @pytest.mark.unit From 2221737a92e151221a54f64f224835ed86a5b01e Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 5 Sep 2025 14:09:07 -1000 Subject: [PATCH 09/10] Added more debug output --- tests/translators/test_translators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/translators/test_translators.py b/tests/translators/test_translators.py index 1f8bd7f5..60107903 100644 --- a/tests/translators/test_translators.py +++ b/tests/translators/test_translators.py @@ -179,7 +179,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd="swift-t workflow.swift", stdout=True, stderr=True) # Kill the container container.remove(force=True) - # sys.stderr.write(output.decode()) + sys.stderr.write(output.decode()) # Check sanity assert(exit_code == 0) assert (output.decode().count("completed!") == num_tasks) From 381771aee96ed4060ef9341e672bf42195d84c84 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Fri, 5 Sep 2025 14:28:54 -1000 Subject: [PATCH 10/10] Re-established testing --- tests/translators/test_translators.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/translators/test_translators.py b/tests/translators/test_translators.py index 60107903..3138e1de 100644 --- a/tests/translators/test_translators.py +++ b/tests/translators/test_translators.py @@ -179,7 +179,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd="swift-t workflow.swift", stdout=True, stderr=True) # Kill the container container.remove(force=True) - sys.stderr.write(output.decode()) + # sys.stderr.write(output.decode()) # Check sanity assert(exit_code == 0) assert (output.decode().count("completed!") == num_tasks) @@ -219,14 +219,14 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - # "dask", - # "parsl", - # "nextflow", - # "airflow", - # "bash", - # "taskvine", - # "cwl", - # "pegasus", + "dask", + "parsl", + "nextflow", + "airflow", + "bash", + "taskvine", + "cwl", + "pegasus", "swiftt", ]) @pytest.mark.unit