diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 18d0983f..64816bde 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -28,12 +28,14 @@ def _make_tarfile_of_wfcommons(): tar_stream.seek(0) return tar_stream + def _install_WfCommons_on_container(container): # sys.stderr.write("Installing WfCommons on the container...\n") # Copy the WfCommons code to it (removing stuff that should be removed) target_path = '/tmp/' # inside container tar_data = _make_tarfile_of_wfcommons() container.put_archive(target_path, tar_data) + # Cleanup files from the host exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True) exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True) exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True, @@ -44,6 +46,8 @@ def _install_WfCommons_on_container(container): # Install WfCommons on the container (to install wfbench and cpu-benchmark really) exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages", workdir="/tmp/WfCommons", stdout=True, stderr=True) + if exit_code != 0: + raise RuntimeError("Failed to install WfCommons on the container"); def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=None): if command is None: @@ -75,10 +79,17 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= # Copy over the wfbench and cpu-benchmark executables to where they should go on the container if bin_dir: + sys.stderr.write("Copying wfbench and cpu-benchmark...\n") exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir], stdout=True, stderr=True) + if exit_code != 0: + raise RuntimeError("Failed to copy wfbench script to the bin directory") exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir], stdout=True, stderr=True) + if exit_code != 0: + raise RuntimeError("Failed to copy cpu-benchmark executable to the bin directory") + else: + sys.stderr.write("Not Copying wfbench and cpu-benchmark...\n") return container diff --git a/tests/translators/Dockerfile.airflow b/tests/translators_loggers/Dockerfile.airflow similarity index 100% rename from tests/translators/Dockerfile.airflow rename to tests/translators_loggers/Dockerfile.airflow diff --git a/tests/translators/Dockerfile.bash b/tests/translators_loggers/Dockerfile.bash similarity index 100% rename from tests/translators/Dockerfile.bash rename to tests/translators_loggers/Dockerfile.bash diff --git a/tests/translators/Dockerfile.cwl b/tests/translators_loggers/Dockerfile.cwl similarity index 100% rename from tests/translators/Dockerfile.cwl rename to tests/translators_loggers/Dockerfile.cwl diff --git a/tests/translators/Dockerfile.dask b/tests/translators_loggers/Dockerfile.dask similarity index 100% rename from tests/translators/Dockerfile.dask rename to tests/translators_loggers/Dockerfile.dask diff --git a/tests/translators/Dockerfile.nextflow b/tests/translators_loggers/Dockerfile.nextflow similarity index 100% rename from tests/translators/Dockerfile.nextflow rename to tests/translators_loggers/Dockerfile.nextflow diff --git a/tests/translators/Dockerfile.parsl b/tests/translators_loggers/Dockerfile.parsl similarity index 100% rename from tests/translators/Dockerfile.parsl rename to tests/translators_loggers/Dockerfile.parsl diff --git a/tests/translators/Dockerfile.pegasus b/tests/translators_loggers/Dockerfile.pegasus similarity index 100% rename from tests/translators/Dockerfile.pegasus rename to tests/translators_loggers/Dockerfile.pegasus diff --git a/tests/translators/Dockerfile.pycompss b/tests/translators_loggers/Dockerfile.pycompss similarity index 100% rename from tests/translators/Dockerfile.pycompss rename to tests/translators_loggers/Dockerfile.pycompss diff --git a/tests/translators/Dockerfile.swiftt b/tests/translators_loggers/Dockerfile.swiftt similarity index 100% rename from tests/translators/Dockerfile.swiftt rename to tests/translators_loggers/Dockerfile.swiftt diff --git a/tests/translators/Dockerfile.taskvine b/tests/translators_loggers/Dockerfile.taskvine similarity index 100% rename from tests/translators/Dockerfile.taskvine rename to tests/translators_loggers/Dockerfile.taskvine diff --git a/tests/translators/README.airflow b/tests/translators_loggers/README.airflow similarity index 100% rename from tests/translators/README.airflow rename to tests/translators_loggers/README.airflow diff --git a/tests/translators/build_docker_docker_images.sh b/tests/translators_loggers/build_docker_docker_images.sh similarity index 100% rename from tests/translators/build_docker_docker_images.sh rename to tests/translators_loggers/build_docker_docker_images.sh diff --git a/tests/translators/test_translators.py b/tests/translators_loggers/test_translators_loggers.py similarity index 88% rename from tests/translators/test_translators.py rename to tests/translators_loggers/test_translators_loggers.py index 3138e1de..ed8b81a3 100644 --- a/tests/translators/test_translators.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -31,6 +31,8 @@ from wfcommons.wfbench import SwiftTTranslator from wfcommons.wfinstances import PegasusLogsParser +from wfcommons.wfinstances.logs import TaskVineLogsParser + def _create_workflow_benchmark(): # Create a workflow benchmark object to generate specifications based on a recipe (in /tmp/, whatever) @@ -56,25 +58,35 @@ def _additional_setup_taskvine(container): exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && poncho_package_create taskvine_poncho.json taskvine_poncho.tar.gz"], stdout=True, stderr=True) + if exit_code != 0: + raise Exception("Failed to setup TaskVine: cannot create poncho package") # Start a vine worker in the background exit_code, output = container.exec_run( cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && vine_worker localhost 9123"], detach=True, stdout=True, stderr=True) + # Note that exit_code will always be None because of detach=True. So hopefully this works. + # TODO?: check that the vine_worker is running (so as to abort early) def _additional_setup_pegasus(container): # Start Condor exit_code, output = container.exec_run(cmd=["bash", "-c", "bash /home/wfcommons/start_condor.sh"], stdout=True, stderr=True) + if exit_code != 0: + raise Exception("Failed to setup Pegasus: cannot start HTCondor") # Run pegasus script exit_code, output = container.exec_run(cmd=["bash", "-c", "python3 ./pegasus_workflow.py"], stdout=True, stderr=True) + if exit_code != 0: + raise Exception("Failed to setup Pegasus: error while running the pegasus_workflow.py script") def _additional_setup_swiftt(container): # Start a redis server in the background exit_code, output = container.exec_run( cmd=["bash", "-c", "redis-server"], detach=True, stdout=True, stderr=True) + # Note that exit_code will always be None because of detach=True. So hopefully this works. + # TODO?: check that the vine_worker is running.... additional_setup_methods = { "dask": noop, @@ -209,25 +221,22 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath): "swiftt": SwiftTTranslator, } -logs_parser_classes = { - "pegasus": PegasusLogsParser, -} - class TestTranslators: @pytest.mark.parametrize( "backend", [ - "dask", - "parsl", - "nextflow", - "airflow", + "dask", + "parsl", + "nextflow", + "airflow", + "bash", "bash", - "taskvine", - "cwl", - "pegasus", - "swiftt", + "taskvine", + "cwl", + "pegasus", + "swiftt", ]) @pytest.mark.unit # @pytest.mark.skip(reason="tmp") @@ -260,10 +269,16 @@ def test_translator(self, backend) -> None: sys.stderr.write("Workflow ran in %.2f seconds\n" % (time.time() - start_time)) # Run the log parser if any - if backend in logs_parser_classes: + if backend == "pegasus": + parser = PegasusLogsParser(dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001/") + # elif backend == "taskvine": + # parser = TaskVineLogsParser(dirpath / "vine-run-info/", filenames_to_ignore=["cpu-benchmark","stress-ng"]) + else: + parser = None + + if parser: sys.stderr.write("\nParsing the logs...\n") - parser = logs_parser_classes[backend](submit_dir=dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001") workflow = parser.build_workflow("reconstructed_workflow") # TODO: test more stuff + workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json")) assert(num_tasks == len(workflow.tasks)) - pass diff --git a/wfcommons/wfinstances/logs/__init__.py b/wfcommons/wfinstances/logs/__init__.py index c46b9680..5966892a 100644 --- a/wfcommons/wfinstances/logs/__init__.py +++ b/wfcommons/wfinstances/logs/__init__.py @@ -9,6 +9,7 @@ # (at your option) any later version. from .makeflow import MakeflowLogsParser +from .taskvine import TaskVineLogsParser from .nextflow import NextflowLogsParser from .pegasus import PegasusLogsParser from .pegasusrec import HierarchicalPegasusLogsParser diff --git a/wfcommons/wfinstances/logs/taskvine.py b/wfcommons/wfinstances/logs/taskvine.py new file mode 100644 index 00000000..b729ed83 --- /dev/null +++ b/wfcommons/wfinstances/logs/taskvine.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2021 The WfCommons Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +import json +import itertools +import math +import pathlib +import sys + +from datetime import datetime, timezone +from logging import Logger +from typing import List, Optional + +from .abstract_logs_parser import LogsParser +from ...common.file import File +from ...common.machine import Machine +from ...common.task import Task, TaskType +from ...common.workflow import Workflow + + +class TaskVineLogsParser(LogsParser): + """ + Parse TaskVine logs to generate workflow instance. This parser has some limitations in that + it may miss task input/output items due to them not being files or URLs. More importantly, + because in TaskVine different tasks can have the same file names as input/output but those file names + actually my correspond to different data sources, the parser will assume that these tasks have + the exact same input/output files. There is likely a way to address this, but it hasn't been done yet. + For instance, the Gutenberg TaskVine example isn't parsed correctly by this parser due to the above feature. + + :param vine_run_info_dir: TaskVine's vine-run-info directory. + :type vine_run_info_dir: pathlib.Path + :param filenames_to_ignore: TaskVine considers that executables and package files (e.g., poncho package.tgz) + are input to tasks. This argument is the list of names of files that should be + ignored in the reconstructed instances, which typically do not include such + files at task input. + :type filenames_to_ignore: List[str] + :param description: Workflow instance description. + :type description: Optional[str] + :param logger: The logger where to log information/warning or errors (optional). + :type logger: Optional[Logger] + """ + def __init__(self, + vine_run_info_dir: pathlib.Path, + filenames_to_ignore: Optional[List[str]] = None, + description: Optional[str] = None, + logger: Optional[Logger] = None) -> None: + """Create an object of the makeflow log parser.""" + super().__init__('TaskVine', 'http://https://ccl.cse.nd.edu/software/taskvine/', description, logger) + + # Sanity check + if not vine_run_info_dir.is_dir(): + raise OSError(f'The provided path does not exist or is not a folder: {vine_run_info_dir}') + + debug_file: pathlib.Path = vine_run_info_dir / "most-recent/vine-logs/debug" + if not debug_file.is_file(): + raise OSError(f'Cannot find file: {debug_file}') + taskgraph_file: pathlib.Path = vine_run_info_dir / "most-recent/vine-logs/taskgraph" + if not taskgraph_file.is_file(): + raise OSError(f'Cannot find file: {taskgraph_file}') + transactions_file: pathlib.Path = vine_run_info_dir / "most-recent/vine-logs/transactions" + if not transactions_file.is_file(): + raise OSError(f'Cannot find file: {transactions_file}') + + self.debug_file: pathlib.Path = debug_file + self.taskgraph_file: pathlib.Path = taskgraph_file + self.transactions_file: pathlib.Path = transactions_file + + self.filenames_to_ignore: set[str] = set(filenames_to_ignore) or set({}) + + self.files_map = {} + self.task_command_lines = {} + self.task_runtimes = {} + self.task_input_files = {} + self.task_output_files = {} + + def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: + """ + Create workflow instance based on the workflow execution logs. + + :param workflow_name: The workflow name. + :type workflow_name: Optional[str] + + :return: A workflow instance object. + :rtype: Workflow + """ + self.workflow_name = workflow_name + + # create base workflow instance object + self.workflow = Workflow(name=self.workflow_name, + description=self.description, + runtime_system_name=self.wms_name, + runtime_system_url=self.wms_url) + + # Construct the task command-line array + self._construct_task_command_lines() + # sys.stderr.write(str(self.task_command_lines) + "\n") + + # Construct file map + self._construct_file_map() + # sys.stderr.write("FILEMAP: " + str(self.files_map) + "\n") + + # Construct the task runtimes + self._construct_task_runtimes() + # sys.stderr.write("TASK RUN TIMES: " + str(self.task_runtimes) + "\n") + + # Construct the input and output file for each task + self._construct_task_input_output_files() + # print("TASK INPUT FILES: " + str(self.task_input_files)) + # print("TASK OUTPUT FILES: " + str(self.task_output_files)) + + # Construct the workflow + self._construct_workflow() + + return self.workflow + + + def _construct_task_command_lines(self) -> None: + with open(self.debug_file) as f: + for line in f: + if "state change: READY (1) to RUNNING (2)" in line: + [task_index] = line[line.find("Task ") + len("Task "):].split()[0:1] + command_line = previous_line[previous_line.find("busy on '") + len("busy on '"):-2] + self.task_command_lines[int(task_index)] = command_line + executable = command_line.split()[0] + self.filenames_to_ignore.add(executable) + previous_line = line + + + def _construct_file_map(self) -> None: + + filename_to_key_map = {} + # One pass through the debug file to create the initial file key -> filename mapping + with open(self.debug_file) as f: + for line in f: + if "__vine_env_task" in line: # Ignore that weird task/file + continue + # 2025/09/09 21:12:48.02 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): infile file-rnd-fmtpwpiobiumeze blastall_00000016_outfile_0016 0 + if "infile " in line: + [file_key, filename] = line[line.find("infile ") + len("infile "):].split()[0:2] + # 2025/09/09 21:12:42.12 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): outfile file-rnd-jpnzrjrjnxxqhej blastall_00000017_outfile_0017 0 + elif "outfile " in line: + [file_key, filename] = line[line.find("outfile ") + len("outfile "):].split()[0:2] + else: + continue + if filename in self.filenames_to_ignore: + continue + self.files_map[file_key] = {"filename": filename} + filename_to_key_map[filename] = file_key + + # Another pass through the debug file to get the actual file paths + with open(self.debug_file) as f: + for line in f: + # 2025/09/09 21:12:48.01 vine_manager[239]vine: dab178765b01 (127.0.0.1:34382) needs file data/blastall_00000003_outfile_0003 as blastall_00000003_outfile_0003 + if "needs file " in line: + [full_path, ignore, filename] = line[line.find("needs file ") + len("needs file "):].split()[0:3] + file_key = filename_to_key_map.get(filename) + # 2025/09/09 21:12:47.92 vine_manager[239]vine: dab178765b01 (127.0.0.1:34382) sending back file-rnd-jajwzwsrtyzbkfs to data/blastall_00000020_outfile_0020 + elif "sending back " in line: + [file_key, ignore, full_path] = line[line.find("sending back ") + len("sending back "):].split()[0:3] + filename = self.files_map[file_key]["filename"] + else: + continue + if filename in self.filenames_to_ignore: + continue + self.files_map[file_key]["path"] = full_path + + # Pass through the transactions file to get the file sizes + with open(self.transactions_file) as f: + for line in f: + # 1757452362084671 239 WORKER worker-50dc215f08057f4005f3b65dee08592f TRANSFER OUTPUT file-rnd-wzkjcrgiivvzbci 227273 1327 1757452362083301 + # 1757452358704968 239 WORKER worker-50dc215f08057f4005f3b65dee08592f TRANSFER INPUT file-meta-9b84b334875319e856f72be634aae964 17648 1129 1757452358703835 + if "TRANSFER INPUT " in line: + [file_key, file_size] = line[line.find("TRANSFER INPUT ") + len("TRANSFER INPUT "):].split()[0:2] + elif "TRANSFER OUTPUT " in line: + [file_key, file_size] = line[line.find("TRANSFER OUTPUT ") + len("TRANSFER OUTPUT "):].split()[0:2] + elif "CACHE_UPDATE " in line: + [file_key, file_size] = line[line.find("CACHE_UPDATE ") + len("CACHE_UPDATE "):].split()[0:2] + else: + continue + if file_key in self.files_map: + self.files_map[file_key]["size"] = int(file_size) + + # print(str(self.files_map)) + + + def _construct_task_runtimes(self) -> None: + task_start_times = {} + task_end_times = {} + + with open(self.transactions_file) as f: + for line in f: + if line[0] == "#": + continue + if "RUNNING" in line: + [start_date, ignore, ignore, task_index] = line.split()[0:4] + task_start_times[int(task_index)] = int(start_date) + elif "DONE" in line: + [end_date, ignore, ignore, task_index] = line.split()[0:4] + task_end_times[int(task_index)] = int(end_date) + + for task_index in task_start_times: + self.task_runtimes[task_index] = ( + float(task_end_times[task_index] - task_start_times[task_index]) / 1_000_000.0) + + def _construct_task_input_output_files(self) -> None: + + # Initialize all entries + for task_id in self.task_runtimes.keys(): + self.task_input_files[task_id] = [] + self.task_output_files[task_id] = [] + + with open(self.taskgraph_file) as f: + for line in f: + if "->" not in line: + continue + if "file-task" in line: # Ignoring what I think are taskvine internal/specific things + continue + line = line[:-1] + # print(f"LINE: {line}") + [source, ignore, destination] = line.split() + # Remove quotes + source = source [1:-1] + destination = destination [1:-2] + # Remove weird file- prefix + if source.startswith("file-"): + source = source[len("file-"):] + if destination.startswith("file-"): + destination = destination[len("file-"):] + + if "task" in source and "file" not in source: + task_id = int(source.split("-")[1]) + if task_id not in self.task_runtimes: + continue + file_key = destination + if file_key not in self.files_map: + continue + output_file = self.files_map[file_key]["filename"] + self.task_output_files[task_id].append(output_file) + elif "task" in destination and "file" not in destination: + task_id = int(destination.split("-")[1]) + if task_id not in self.task_runtimes: + continue + file_key = source + if file_key not in self.files_map: + continue + input_file = self.files_map[file_key]["filename"] + self.task_input_files[task_id].append(input_file) + else: + raise ValueError("Error in the taskgraph file") + + + def _construct_workflow(self) -> None: + # Create files and put them in a map + file_object_map = {} + for file_key in self.files_map: + filename = self.files_map[file_key]["filename"] + file_size = self.files_map[file_key]["size"] + # file_path = self.files_map[file_key]["path"] + file_object_map[filename] = File(file_id=filename, + size=file_size, + logger=self.logger) + + # Create all tasks + task_map = {} + for task_id in self.task_runtimes: + task_name = "Task %d" % task_id + task = Task(name=task_name, + task_id=task_name, + task_type=TaskType.COMPUTE, + runtime=self.task_runtimes[task_id], + program=self.task_command_lines[task_id].split()[0], + args=self.task_command_lines[task_id].split()[1:], + cores=1, + input_files=[file_object_map[filename] for filename in self.task_input_files[task_id]], + output_files=[file_object_map[filename] for filename in self.task_output_files[task_id]], + logger=self.logger) + task_map[task_id] = task + self.workflow.add_task(task) + # sys.stderr.write(f"Added task {task_name}: {len(self.workflow.tasks)}\n") + + + # Adding all edges, which is pretty inefficiently done for now, by looking at all pairs of tasks! + for task1_id in self.task_runtimes: + for task2_id in self.task_runtimes: + if task1_id == task2_id: + continue + task1_output_files = self.task_output_files[task1_id] + task2_input_files = self.task_input_files[task2_id] + has_intersection = bool(set(task1_output_files) & set(task2_input_files)) + if has_intersection: + self.workflow.add_dependency(task_map[task1_id].name, task_map[task2_id].name) + # sys.stderr.write(f"Added dependency {task_map[task1_id].name} -> {task_map[task2_id].name}\n") \ No newline at end of file