Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions tests/translators/Dockerfile.pegasus
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# docker build --platform amd64 -t wfcommons-dev-dask -f Dockerfile.dask .
# docker run -it --rm -v `pwd`:/home/wfcommons wfcommons-dev-dask /bin/bash

FROM amd64/ubuntu:noble

LABEL org.containers.image.authors="henric@hawaii.edu"

# update repositories
RUN apt-get update

# set timezone
RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata

# install useful stuff
RUN apt-get -y install pkg-config
RUN apt-get -y install git
RUN apt-get -y install wget
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install cmake-data
RUN apt-get -y install sudo
RUN apt-get -y install vim --fix-missing
RUN apt-get -y install gcc
RUN apt-get -y install gcc-multilib

# Python stuff
RUN apt-get -y install python3 python3-pip
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
RUN python3 -m pip install --break-system-packages pathos pandas filelock
RUN python3 -m pip install --break-system-packages networkx scipy matplotlib
RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests
RUN python3 -m pip install --break-system-packages --upgrade setuptools

# Stress-ng
RUN apt-get -y install stress-ng

# HTCondor
RUN apt-get install -y curl
RUN curl -fsSL https://get.htcondor.org | sudo /bin/bash -s -- --no-dry-run --channel stable
RUN rm /etc/condor/config.d/00-minicondor

# HTCondor: Create master config
RUN echo "use ROLE: CentralManager" >> /etc/condor/config.d/00-minicondor-master && \
echo "use ROLE: Submit" >> /etc/condor/config.d/00-minicondor-master && \
echo "BIND_ALL_INTERFACES = True" >> /etc/condor/config.d/00-minicondor-master && \
echo "CONDOR_HOST = localhost" >> /etc/condor/config.d/00-minicondor-master && \
echo "SEC_DEFAULT_AUTHENTICATION = OPTIONAL" >> /etc/condor/config.d/00-minicondor-master && \
echo "SEC_DEFAULT_AUTHENTICATION_METHODS = CLAIMTOBE" >> /etc/condor/config.d/00-minicondor-master && \
echo "SCHEDD_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-master && \
echo "NEGOTIATOR_INTERVAL = 2" >> /etc/condor/config.d/00-minicondor-master && \
echo "NEGOTIATOR_CYCLE_DELAY = 5" >> /etc/condor/config.d/00-minicondor-master && \
echo "STARTER_UPDATE_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-master && \
echo "SHADOW_QUEUE_UPDATE_INTERVAL = 10" >> /etc/condor/config.d/00-minicondor-master && \
echo "UPDATE_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-master && \
echo "RUNBENCHMARKS = 0" >> /etc/condor/config.d/00-minicondor-master && \
echo "HOSTALLOW_READ = *" >> /etc/condor/config.d/00-minicondor-master && \
echo "ALLOW_WRITE = *" >> /etc/condor/config.d/00-minicondor-master && \
echo "ALLOW_READ = *" >> /etc/condor/config.d/00-minicondor-master && \
echo "ALLOW_NEGOTIATOR = *" >> /etc/condor/config.d/00-minicondor-master && \
echo "ALLOW_DAEMON = *" >> /etc/condor/config.d/00-minicondor-master && \
echo "ALLOW_COLLECTOR = *" >> /etc/condor/config.d/00-minicondor-master && \
echo "DAGMAN_USE_STRICT = 0" >> /etc/condor/config.d/00-minicondor-master && \
echo "MAX_TRANSFER_OUTPUT_MB = 2048" >> /etc/condor/config.d/00-minicondor-master && \
echo "JOB_MAX_FILE_TRANSFER_SIZE = 100" >> /etc/condor/config.d/00-minicondor-master && \
echo "JOB_DEFAULT_REQUESTDISK = 10MB" >> /etc/condor/config.d/00-minicondor-master && \
echo "NUM_CPUS = 1" >> /etc/condor/config.d/00-minicondor-master

# HTCondor: Create worker config
RUN echo "use ROLE: Execute" >> /etc/condor/config.d/00-minicondor-worker && \
echo "BIND_ALL_INTERFACES = True" >> /etc/condor/config.d/00-minicondor-worker && \
echo "CONDOR_HOST = localhost" >> /etc/condor/config.d/00-minicondor-worker && \
echo "COLLECTOR_HOST = localhost" >> /etc/condor/config.d/00-minicondor-worker && \
echo "SEC_DEFAULT_AUTHENTICATION = OPTIONAL" >> /etc/condor/config.d/00-minicondor-worker && \
echo "SEC_DEFAULT_AUTHENTICATION_METHODS = CLAIMTOBE" >> /etc/condor/config.d/00-minicondor-worker && \
echo "SCHEDD_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-worker && \
echo "NEGOTIATOR_INTERVAL = 2" >> /etc/condor/config.d/00-minicondor-worker && \
echo "NEGOTIATOR_CYCLE_DELAY = 5" >> /etc/condor/config.d/00-minicondor-worker && \
echo "STARTER_UPDATE_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-worker && \
echo "SHADOW_QUEUE_UPDATE_INTERVAL = 10" >> /etc/condor/config.d/00-minicondor-worker && \
echo "UPDATE_INTERVAL = 5" >> /etc/condor/config.d/00-minicondor-worker && \
echo "RUNBENCHMARKS = 0" >> /etc/condor/config.d/00-minicondor-worker && \
echo "HOSTALLOW_WRITE = *" >> /etc/condor/config.d/00-minicondor-worker && \
echo "HOSTALLOW_READ = *" >> /etc/condor/config.d/00-minicondor-worker && \
echo "ALLOW_WRITE = *" >> /etc/condor/config.d/00-minicondor-worker && \
echo "ALLOW_READ = *" >> /etc/condor/config.d/00-minicondor-worker && \
echo "ALLOW_NEGOTIATOR = *" >> /etc/condor/config.d/00-minicondor-worker && \
echo "ALLOW_DAEMON = *" >> /etc/condor/config.d/00-minicondor-worker && \
echo "ALLOW_COLLECTOR = *" >> /etc/condor/config.d/00-minicondor-worker && \
echo "NUM_CPUS = 1" >> /etc/condor/config.d/00-minicondor-worker

# HTCondor: Some setup
RUN mkdir -p /var/lib/condor
RUN mkdir -p /var/lib/condor/log
RUN mkdir -p /var/lib/condor/execute
RUN chown condor:condor -R /var/lib/condor
RUN chmod -R 777 /var/lib/condor/execute

# Install Pegasus
RUN wget -O - http://download.pegasus.isi.edu/pegasus/gpg.txt | sudo apt-key add -
RUN echo 'deb [arch=amd64] http://download.pegasus.isi.edu/pegasus/ubuntu bionic main' | sudo tee /etc/apt/sources.list.d/pegasus.list
RUN apt-get update
RUN apt-get install -y pegasus
RUN python3 -m pip install pegasus-wms.api --break-system-packages

# Add wfcommons user
RUN useradd -ms /bin/bash wfcommons
RUN adduser wfcommons sudo
RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
ENV PATH="$PATH:/home/wfcommons/.local/bin/"

USER wfcommons
WORKDIR /home/wfcommons

# Create script to start HTCondor
RUN echo "#!/bin/bash" >> ./start_condor.sh && \
echo "export CONDOR_HOST=localhost" >> ./start_condor.sh && \
echo "export COLLECTOR_HOST=localhost" >> ./start_condor.sh && \
echo "CONDOR_CONFIG=/etc/condor/config.d/00-minicondor-master sudo condor_master" >> ./start_condor.sh && \
echo "sudo condor_collector &" >> ./start_condor.sh && \
echo "CONDOR_CONFIG=/etc/condor/config.d/00-minicondor-worker sudo condor_startd &" >> ./start_condor.sh && \
echo "sleep 3" >> ./start_condor.sh && \
echo "condor_status" >> ./start_condor.sh
RUN chmod +x ./start_condor.sh

# Create script to run the Workflow
RUN echo "#!/bin/bash" >> ./run_workflow.sh && \
echo "pegasus-plan --dir work --cleanup none --output-site local --submit \`ls *.yml\`" >> ./run_workflow.sh && \
echo "sleep 10" >> ./run_workflow.sh && \
echo "PID_FILE=\`ls work/wfcommons/pegasus/*/run0001/*.pid\`" >> ./run_workflow.sh && \
echo "echo \"Waiting for workflow execution to complete...\"" >> ./run_workflow.sh && \
echo "while [[ -f \$PID_FILE ]]" >> ./run_workflow.sh && \
echo "do" >> ./run_workflow.sh && \
echo " echo \"sleeping 5...\"" >> ./run_workflow.sh && \
echo " sleep 5" >> ./run_workflow.sh && \
echo "done" >> ./run_workflow.sh && \
echo "# Sleep 10 so that status in database is up to date..." >> ./run_workflow.sh && \
echo "sleep 10" >> ./run_workflow.sh && \
echo "echo \"Workflow execution complete!\"" >> ./run_workflow.sh
RUN chmod +x ./run_workflow.sh

ENV CONDOR_HOST localhost
ENV COLLECTOR_HOST localhost




30 changes: 27 additions & 3 deletions tests/translators/test_translators.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from wfcommons.wfbench import BashTranslator
from wfcommons.wfbench import TaskVineTranslator
from wfcommons.wfbench import CWLTranslator
from wfcommons.wfbench import PegasusTranslator


def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=["sleep", "infinity"]):
Expand Down Expand Up @@ -97,7 +98,7 @@ def _create_workflow_benchmark():
benchmark_full_path = "/tmp/blast-benchmark-{desired_num_tasks}.json"
shutil.rmtree(benchmark_full_path, ignore_errors=True)
benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=desired_num_tasks)
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=10, data=10, percent_cpu=0.6)
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=1, data=1, percent_cpu=0.6)
with open(f"/tmp/blast-benchmark-{desired_num_tasks}.json", "r") as f:
generated_json = json.load(f)
num_tasks = len(generated_json["workflow"]["specification"]["tasks"])
Expand All @@ -120,6 +121,16 @@ def _additional_setup_taskvine(container):
cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && vine_worker localhost 9123"],
detach=True, stdout=True, stderr=True)

def _additional_setup_pegasus(container):
# Start Condor
exit_code, output = container.exec_run(cmd=["bash", "-c",
"bash /home/wfcommons/start_condor.sh"],
stdout=True, stderr=True)
# Run pegasus script
exit_code, output = container.exec_run(cmd=["bash", "-c",
"python3 ./pegasus_workflow.py"],
stdout=True, stderr=True)


additional_setup_methods = {
"dask": noop,
Expand All @@ -129,6 +140,7 @@ def _additional_setup_taskvine(container):
"bash": noop,
"taskvine": _additional_setup_taskvine,
"cwl": noop,
"pegasus": _additional_setup_pegasus,
}

#############################################################################
Expand Down Expand Up @@ -196,7 +208,6 @@ def run_workflow_taskvine(container, num_tasks, str_dirpath):
assert (output.decode().count("completed") == num_tasks)

def run_workflow_cwl(container, num_tasks, str_dirpath):

# Run the workflow!
# Note that the input file is hardcoded and Blast-specific
exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ", stdout=True, stderr=True)
Expand All @@ -208,6 +219,16 @@ def run_workflow_cwl(container, num_tasks, str_dirpath):
# and there is a 2* because there is a message for the job and for the step)
assert (output.decode().count("completed success") == 3 + 2 *num_tasks)

def run_workflow_pegasus(container, num_tasks, str_dirpath):
# Run the workflow!
exit_code, output = container.exec_run(cmd="bash /home/wfcommons/run_workflow.sh", stdout=True, stderr=True)
ignored, status_output = container.exec_run(cmd="pegasus-status -l /tmp/pegasus_translated_workflow/work/wfcommons/pegasus/Blast-Benchmark/run0001", stdout=True, stderr=True)
# Kill the container
container.remove(force=True)
# Check sanity
assert(exit_code == 0)
assert("Workflow execution complete!" in output.decode())
assert(status_output.decode().count("Success") == 2)

run_workflow_methods = {
"dask": run_workflow_dask,
Expand All @@ -217,6 +238,7 @@ def run_workflow_cwl(container, num_tasks, str_dirpath):
"bash": run_workflow_bash,
"taskvine": run_workflow_taskvine,
"cwl": run_workflow_cwl,
"pegasus": run_workflow_pegasus,
}

translator_classes = {
Expand All @@ -227,6 +249,7 @@ def run_workflow_cwl(container, num_tasks, str_dirpath):
"bash": BashTranslator,
"taskvine": TaskVineTranslator,
"cwl": CWLTranslator,
"pegasus": PegasusTranslator,
}


Expand All @@ -242,6 +265,7 @@ class TestTranslators:
"bash",
"taskvine",
"cwl",
"pegasus",
])
@pytest.mark.unit
# @pytest.mark.skip(reason="tmp")
Expand Down Expand Up @@ -271,4 +295,4 @@ def test_translator(self, backend) -> None:
sys.stderr.write("Running workflow...\n")
start_time = time.time()
run_workflow_methods[backend](container, num_tasks, str_dirpath)
sys.stderr.write("Workflow ran in %.2f seconds\n" % (time.time() - start_time))
sys.stderr.write("Workflow ran in %.2f seconds\n" % (time.time() - start_time))
6 changes: 3 additions & 3 deletions wfcommons/wfbench/translator/pegasus.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ def _add_task(self, task_name: str, parent_task: Optional[str] = None, tasks_pri
children = self.task_children[task_name]

# Generate input spec
input_spec = "["
input_spec = "\"["
for f in task.input_files:
input_spec += f"\"{f.file_id}\","
input_spec = input_spec[:-1] + "]"
input_spec += f"\\\\\"{f.file_id}\\\\\","
input_spec = input_spec[:-1] + "]\""

# output files
output_spec = "\"{"
Expand Down