diff --git a/tests/translators/Dockerfile.pegasus b/tests/translators/Dockerfile.pegasus new file mode 100644 index 00000000..22fd7f8c --- /dev/null +++ b/tests/translators/Dockerfile.pegasus @@ -0,0 +1,146 @@ +# docker build --platform amd64 -t wfcommons-dev-dask -f Dockerfile.dask . +# docker run -it --rm -v `pwd`:/home/wfcommons wfcommons-dev-dask /bin/bash + +FROM amd64/ubuntu:noble + +LABEL org.containers.image.authors="henric@hawaii.edu" + +# update repositories +RUN apt-get update + +# set timezone +RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata + +# install useful stuff +RUN apt-get -y install pkg-config +RUN apt-get -y install git +RUN apt-get -y install wget +RUN apt-get -y install make +RUN apt-get -y install cmake +RUN apt-get -y install cmake-data +RUN apt-get -y install sudo +RUN apt-get -y install vim --fix-missing +RUN apt-get -y install gcc +RUN apt-get -y install gcc-multilib + +# Python stuff +RUN apt-get -y install python3 python3-pip +RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 +RUN python3 -m pip install --break-system-packages pathos pandas filelock +RUN python3 -m pip install --break-system-packages networkx scipy matplotlib +RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests +RUN python3 -m pip install --break-system-packages --upgrade setuptools + +# Stress-ng +RUN apt-get -y install stress-ng + +# HTCondor +RUN apt-get install -y curl +RUN curl -fsSL https://get.htcondor.org | sudo /bin/bash -s -- --no-dry-run --channel stable +RUN rm /etc/condor/config.d/00-minicondor + +# HTCondor: Create master config +RUN echo "use ROLE: CentralManager" >> /etc/condor/config.d/00-minicondor-master && \ + echo "use ROLE: Submit" >> /etc/condor/config.d/00-minicondor-master && \ + echo "BIND_ALL_INTERFACES = True" >> /etc/condor/config.d/00-minicondor-master && \ + echo "CONDOR_HOST = localhost" >> /etc/condor/config.d/00-minicondor-master && \ + echo "SEC_DEFAULT_AUTHENTICATION = OPTIONAL" >> /etc/condor/config.d/00-minicondor-master && \ + echo "SEC_DEFAULT_AUTHENTICATION_METHODS = CLAIMTOBE" >> /etc/condor/config.d/00-minicondor-master && \ + echo "SCHEDD_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-master && \ + echo "NEGOTIATOR_INTERVAL = 2" >> /etc/condor/config.d/00-minicondor-master && \ + echo "NEGOTIATOR_CYCLE_DELAY = 5" >> /etc/condor/config.d/00-minicondor-master && \ + echo "STARTER_UPDATE_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-master && \ + echo "SHADOW_QUEUE_UPDATE_INTERVAL = 10" >> /etc/condor/config.d/00-minicondor-master && \ + echo "UPDATE_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-master && \ + echo "RUNBENCHMARKS = 0" >> /etc/condor/config.d/00-minicondor-master && \ + echo "HOSTALLOW_READ = *" >> /etc/condor/config.d/00-minicondor-master && \ + echo "ALLOW_WRITE = *" >> /etc/condor/config.d/00-minicondor-master && \ + echo "ALLOW_READ = *" >> /etc/condor/config.d/00-minicondor-master && \ + echo "ALLOW_NEGOTIATOR = *" >> /etc/condor/config.d/00-minicondor-master && \ + echo "ALLOW_DAEMON = *" >> /etc/condor/config.d/00-minicondor-master && \ + echo "ALLOW_COLLECTOR = *" >> /etc/condor/config.d/00-minicondor-master && \ + echo "DAGMAN_USE_STRICT = 0" >> /etc/condor/config.d/00-minicondor-master && \ + echo "MAX_TRANSFER_OUTPUT_MB = 2048" >> /etc/condor/config.d/00-minicondor-master && \ + echo "JOB_MAX_FILE_TRANSFER_SIZE = 100" >> /etc/condor/config.d/00-minicondor-master && \ + echo "JOB_DEFAULT_REQUESTDISK = 10MB" >> /etc/condor/config.d/00-minicondor-master && \ + echo "NUM_CPUS = 1" >> /etc/condor/config.d/00-minicondor-master + +# HTCondor: Create worker config +RUN echo "use ROLE: Execute" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "BIND_ALL_INTERFACES = True" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "CONDOR_HOST = localhost" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "COLLECTOR_HOST = localhost" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "SEC_DEFAULT_AUTHENTICATION = OPTIONAL" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "SEC_DEFAULT_AUTHENTICATION_METHODS = CLAIMTOBE" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "SCHEDD_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "NEGOTIATOR_INTERVAL = 2" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "NEGOTIATOR_CYCLE_DELAY = 5" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "STARTER_UPDATE_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "SHADOW_QUEUE_UPDATE_INTERVAL = 10" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "UPDATE_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "RUNBENCHMARKS = 0" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "HOSTALLOW_WRITE = *" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "HOSTALLOW_READ = *" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "ALLOW_WRITE = *" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "ALLOW_READ = *" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "ALLOW_NEGOTIATOR = *" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "ALLOW_DAEMON = *" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "ALLOW_COLLECTOR = *" >> /etc/condor/config.d/00-minicondor-worker && \ + echo "NUM_CPUS = 1" >> /etc/condor/config.d/00-minicondor-worker + +# HTCondor: Some setup +RUN mkdir -p /var/lib/condor +RUN mkdir -p /var/lib/condor/log +RUN mkdir -p /var/lib/condor/execute +RUN chown condor:condor -R /var/lib/condor +RUN chmod -R 777 /var/lib/condor/execute + +# Install Pegasus +RUN wget -O - http://download.pegasus.isi.edu/pegasus/gpg.txt | sudo apt-key add - +RUN echo 'deb [arch=amd64] http://download.pegasus.isi.edu/pegasus/ubuntu bionic main' | sudo tee /etc/apt/sources.list.d/pegasus.list +RUN apt-get update +RUN apt-get install -y pegasus +RUN python3 -m pip install pegasus-wms.api --break-system-packages + +# Add wfcommons user +RUN useradd -ms /bin/bash wfcommons +RUN adduser wfcommons sudo +RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers +ENV PATH="$PATH:/home/wfcommons/.local/bin/" + +USER wfcommons +WORKDIR /home/wfcommons + +# Create script to start HTCondor +RUN echo "#!/bin/bash" >> ./start_condor.sh && \ + echo "export CONDOR_HOST=localhost" >> ./start_condor.sh && \ + echo "export COLLECTOR_HOST=localhost" >> ./start_condor.sh && \ + echo "CONDOR_CONFIG=/etc/condor/config.d/00-minicondor-master sudo condor_master" >> ./start_condor.sh && \ + echo "sudo condor_collector &" >> ./start_condor.sh && \ + echo "CONDOR_CONFIG=/etc/condor/config.d/00-minicondor-worker sudo condor_startd &" >> ./start_condor.sh && \ + echo "sleep 3" >> ./start_condor.sh && \ + echo "condor_status" >> ./start_condor.sh +RUN chmod +x ./start_condor.sh + +# Create script to run the Workflow +RUN echo "#!/bin/bash" >> ./run_workflow.sh && \ + echo "pegasus-plan --dir work --cleanup none --output-site local --submit \`ls *.yml\`" >> ./run_workflow.sh && \ + echo "sleep 10" >> ./run_workflow.sh && \ + echo "PID_FILE=\`ls work/wfcommons/pegasus/*/run0001/*.pid\`" >> ./run_workflow.sh && \ + echo "echo \"Waiting for workflow execution to complete...\"" >> ./run_workflow.sh && \ + echo "while [[ -f \$PID_FILE ]]" >> ./run_workflow.sh && \ + echo "do" >> ./run_workflow.sh && \ + echo " echo \"sleeping 5...\"" >> ./run_workflow.sh && \ + echo " sleep 5" >> ./run_workflow.sh && \ + echo "done" >> ./run_workflow.sh && \ + echo "# Sleep 10 so that status in database is up to date..." >> ./run_workflow.sh && \ + echo "sleep 10" >> ./run_workflow.sh && \ + echo "echo \"Workflow execution complete!\"" >> ./run_workflow.sh +RUN chmod +x ./run_workflow.sh + +ENV CONDOR_HOST localhost +ENV COLLECTOR_HOST localhost + + + + diff --git a/tests/translators/test_translators.py b/tests/translators/test_translators.py index 3092fa47..66948519 100644 --- a/tests/translators/test_translators.py +++ b/tests/translators/test_translators.py @@ -29,6 +29,7 @@ from wfcommons.wfbench import BashTranslator from wfcommons.wfbench import TaskVineTranslator from wfcommons.wfbench import CWLTranslator +from wfcommons.wfbench import PegasusTranslator def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=["sleep", "infinity"]): @@ -97,7 +98,7 @@ def _create_workflow_benchmark(): benchmark_full_path = "/tmp/blast-benchmark-{desired_num_tasks}.json" shutil.rmtree(benchmark_full_path, ignore_errors=True) benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=desired_num_tasks) - benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=10, data=10, percent_cpu=0.6) + benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=1, data=1, percent_cpu=0.6) with open(f"/tmp/blast-benchmark-{desired_num_tasks}.json", "r") as f: generated_json = json.load(f) num_tasks = len(generated_json["workflow"]["specification"]["tasks"]) @@ -120,6 +121,16 @@ def _additional_setup_taskvine(container): cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && vine_worker localhost 9123"], detach=True, stdout=True, stderr=True) +def _additional_setup_pegasus(container): + # Start Condor + exit_code, output = container.exec_run(cmd=["bash", "-c", + "bash /home/wfcommons/start_condor.sh"], + stdout=True, stderr=True) + # Run pegasus script + exit_code, output = container.exec_run(cmd=["bash", "-c", + "python3 ./pegasus_workflow.py"], + stdout=True, stderr=True) + additional_setup_methods = { "dask": noop, @@ -129,6 +140,7 @@ def _additional_setup_taskvine(container): "bash": noop, "taskvine": _additional_setup_taskvine, "cwl": noop, + "pegasus": _additional_setup_pegasus, } ############################################################################# @@ -196,7 +208,6 @@ def run_workflow_taskvine(container, num_tasks, str_dirpath): assert (output.decode().count("completed") == num_tasks) def run_workflow_cwl(container, num_tasks, str_dirpath): - # Run the workflow! # Note that the input file is hardcoded and Blast-specific exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ", stdout=True, stderr=True) @@ -208,6 +219,16 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): # and there is a 2* because there is a message for the job and for the step) assert (output.decode().count("completed success") == 3 + 2 *num_tasks) +def run_workflow_pegasus(container, num_tasks, str_dirpath): + # Run the workflow! + exit_code, output = container.exec_run(cmd="bash /home/wfcommons/run_workflow.sh", stdout=True, stderr=True) + ignored, status_output = container.exec_run(cmd="pegasus-status -l /tmp/pegasus_translated_workflow/work/wfcommons/pegasus/Blast-Benchmark/run0001", stdout=True, stderr=True) + # Kill the container + container.remove(force=True) + # Check sanity + assert(exit_code == 0) + assert("Workflow execution complete!" in output.decode()) + assert(status_output.decode().count("Success") == 2) run_workflow_methods = { "dask": run_workflow_dask, @@ -217,6 +238,7 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): "bash": run_workflow_bash, "taskvine": run_workflow_taskvine, "cwl": run_workflow_cwl, + "pegasus": run_workflow_pegasus, } translator_classes = { @@ -227,6 +249,7 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): "bash": BashTranslator, "taskvine": TaskVineTranslator, "cwl": CWLTranslator, + "pegasus": PegasusTranslator, } @@ -242,6 +265,7 @@ class TestTranslators: "bash", "taskvine", "cwl", + "pegasus", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") @@ -271,4 +295,4 @@ def test_translator(self, backend) -> None: sys.stderr.write("Running workflow...\n") start_time = time.time() run_workflow_methods[backend](container, num_tasks, str_dirpath) - sys.stderr.write("Workflow ran in %.2f seconds\n" % (time.time() - start_time)) \ No newline at end of file + sys.stderr.write("Workflow ran in %.2f seconds\n" % (time.time() - start_time)) diff --git a/wfcommons/wfbench/translator/pegasus.py b/wfcommons/wfbench/translator/pegasus.py index 2fb2aaac..ed9148d9 100644 --- a/wfcommons/wfbench/translator/pegasus.py +++ b/wfcommons/wfbench/translator/pegasus.py @@ -123,10 +123,10 @@ def _add_task(self, task_name: str, parent_task: Optional[str] = None, tasks_pri children = self.task_children[task_name] # Generate input spec - input_spec = "[" + input_spec = "\"[" for f in task.input_files: - input_spec += f"\"{f.file_id}\"," - input_spec = input_spec[:-1] + "]" + input_spec += f"\\\\\"{f.file_id}\\\\\"," + input_spec = input_spec[:-1] + "]\"" # output files output_spec = "\"{"