diff --git a/bin/wfbench b/bin/wfbench index 6cc2901d..bba0e4ea 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -24,8 +24,10 @@ from io import StringIO from filelock import FileLock from typing import List, Optional + this_dir = pathlib.Path(__file__).resolve().parent + def lock_core(path_locked: pathlib.Path, path_cores: pathlib.Path) -> int: """ @@ -126,7 +128,7 @@ def cpu_mem_benchmark(cpu_threads: Optional[int] = 5, cpu_procs.append(cpu_proc) if mem_threads > 0: - mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) + mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) if core: os.sched_setaffinity(mem_proc.pid, {core}) mem_procs.append(mem_proc) @@ -165,7 +167,7 @@ def io_write_benchmark_user_input_data_size(outputs, def io_alternate(inputs, outputs, memory_limit=None, rundir=None, event=None): """Alternate between reading and writing to a file, ensuring read only happens after write.""" - + if memory_limit is None: memory_limit = 10 * 1024 * 1024 # sys.maxsize memory_limit = int(memory_limit) @@ -186,19 +188,21 @@ def io_alternate(inputs, outputs, memory_limit=None, rundir=None, event=None): def get_available_gpus(): - proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, _ = proc.communicate() df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") return df[df["utilization.gpu"] <= 5].index.to_list() def gpu_benchmark(time: int = 100, - work: int = 100, - device: int = 0): #work, device - - gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] + work: int = 100, + device: int = 0): # work, device + + gpu_prog = [ + f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] print(f"Running GPU Benchmark: {gpu_prog}") - subprocess.Popen(gpu_prog, shell=True) + subprocess.Popen(gpu_prog, shell=True) def get_parser() -> argparse.ArgumentParser: @@ -212,11 +216,31 @@ def get_parser() -> argparse.ArgumentParser: help="Path to cores file.") parser.add_argument("--cpu-work", default=None, help="Amount of CPU work.") parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.") - parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete the task (overrides CPU and GPU works)") + parser.add_argument("--time", default=None, + help="Time limit (in seconds) to complete the task (overrides CPU and GPU works)") parser.add_argument("--mem", default=None, help="Max amount (in MB) of memory consumption.") parser.add_argument("--out", help="output files name.") + parser.add_argument("--with-flowcept", action="store_true", default=False, help="If you want to use Flowcept.") + parser.add_argument("--workflow_id", default=None, help="Id to group tasks in a workflow.") return parser - + + +def begin_flowcept(args, other): + print("Running with Flowcept.") + from flowcept import Flowcept, FlowceptTask + # TODO: parametrize to allow storing individual tasks + f = Flowcept(workflow_id=args.workflow_id, + bundle_exec_id=args.workflow_id, + start_persistence=False, save_workflow=False) + f.start() + t = FlowceptTask(workflow_id=args.workflow_id, used={**args.__dict__, "other_args": other}) + return f, t + + +def end_flowcept(flowcept, flowcept_task): + flowcept_task.end() + flowcept.stop() + def main(): """Main program.""" @@ -225,6 +249,9 @@ def main(): print("[Wfbench] ARGS", args) core = None + if args.with_flowcept: + flowcept, flowcept_task = begin_flowcept(args, other) + if args.rundir: rundir = pathlib.Path(args.rundir) else: @@ -249,7 +276,7 @@ def main(): # Remove all escape characters before attempting to parse the JSON string cleaned_output = re.sub(r'\\+', '', args.out) - + # Attempt to parse the cleaned string try: outputs_dict = json.loads(cleaned_output) @@ -257,11 +284,11 @@ def main(): print("Failed to decode JSON:", e) outputs_dict = {} - print("OUTPUT",outputs_dict) + print("OUTPUT", outputs_dict) print("INPUTS", other) # Create a multiprocessing event that in the first run is set to True - write_done_event = multiprocessing.Event() + write_done_event = multiprocessing.Event() # Set this to True to allow the first read to happen write_done_event.set() # Print the value of the event @@ -273,10 +300,9 @@ def main(): io_proc.start() procs.append(io_proc) - if args.gpu_work: print("[WfBench] Starting GPU Benchmark...") - available_gpus = get_available_gpus() #checking for available GPUs + available_gpus = get_available_gpus() # checking for available GPUs if not available_gpus: print("No GPU available") @@ -289,20 +315,20 @@ def main(): gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device) else: gpu_benchmark(work=int(args.gpu_work), device=device) - - + if args.cpu_work: print("[WfBench] Starting CPU and Memory Benchmarks...") if core: print(f"[WfBench] {args.name} acquired core {core}") - mem_threads=int(10 - 10 * args.percent_cpu) + mem_threads = int(10 - 10 * args.percent_cpu) cpu_procs, mem_procs = cpu_mem_benchmark(cpu_threads=int(10 * args.percent_cpu), - mem_threads=mem_threads, - cpu_work=sys.maxsize if args.time else int(args.cpu_work), - core=core, - total_mem=args.mem) - + mem_threads=mem_threads, + cpu_work=sys.maxsize if args.time else int( + args.cpu_work), + core=core, + total_mem=args.mem) + procs.extend(cpu_procs) if args.time: time.sleep(int(args.time)) @@ -316,7 +342,6 @@ def main(): io_proc.terminate() io_proc.join() - for mem_proc in mem_procs: try: os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails @@ -340,7 +365,12 @@ def main(): if core: unlock_core(path_locked, path_cores, core) + + if args.with_flowcept: + end_flowcept(flowcept, flowcept_task) + print("WfBench Benchmark completed!") + if __name__ == "__main__": main() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index d0ad7e44..736f21e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ Tracker = "https://github.com/wfcommons/WfCommons/issues" [project.optional-dependencies] test = ["pytest", "pytest-cov"] +flowcept = ["flowcept"] [tool.setuptools.dynamic] version = {attr = "wfcommons.version.__version__"} diff --git a/wfcommons/common/workflow.py b/wfcommons/common/workflow.py index 8168f456..dde483b3 100644 --- a/wfcommons/common/workflow.py +++ b/wfcommons/common/workflow.py @@ -82,6 +82,7 @@ def __init__(self, self.tasks: Task = {} self.tasks_parents = {} self.tasks_children = {} + self.workflow_id: str = None super().__init__(name=name, makespan=self.makespan, executedat=self.executed_at) def add_task(self, task: Task) -> None: diff --git a/wfcommons/wfbench/bench.py b/wfcommons/wfbench/bench.py index 96c3548c..28dda3ce 100644 --- a/wfcommons/wfbench/bench.py +++ b/wfcommons/wfbench/bench.py @@ -46,12 +46,13 @@ class WorkflowBenchmark: def __init__(self, recipe: Type[WfChefWorkflowRecipe], num_tasks: int, - logger: Optional[Logger] = None) -> None: + logger: Optional[Logger] = None, with_flowcept=False) -> None: """Create an object that represents a workflow benchmark generator.""" self.logger: Logger = logging.getLogger( __name__) if logger is None else logger self.recipe = recipe self.num_tasks = num_tasks + self.with_flowcept = with_flowcept self.workflow: Workflow = None def create_benchmark_from_input_file(self, @@ -254,7 +255,8 @@ def create_benchmark(self, mem: Optional[float] = None, lock_files_folder: Optional[pathlib.Path] = None, regenerate: Optional[bool] = True, - rundir: Optional[pathlib.Path] = None) -> pathlib.Path: + rundir: Optional[pathlib.Path] = None, + ) -> pathlib.Path: """Create a workflow benchmark. :param save_dir: Folder to generate the workflow benchmark JSON instance and input data files. @@ -293,6 +295,9 @@ def create_benchmark(self, json_path = save_dir.joinpath( f"{self.workflow.name.lower()}-{self.num_tasks}").with_suffix(".json") + if self.with_flowcept: + self.workflow.workflow_id = str(uuid.uuid4()) + cores, lock = self._creating_lock_files(lock_files_folder) for task in self.workflow.tasks.values(): self._set_argument_parameters( @@ -305,7 +310,7 @@ def create_benchmark(self, lock_files_folder, cores, lock, - rundir + rundir, ) task.input_files = [] task.output_files = [] @@ -384,6 +389,12 @@ def _set_argument_parameters(self, if rundir: params.extend([f"--rundir {rundir}"]) + if self.with_flowcept: + params.extend(["--with-flowcept"]) + + if self.workflow.workflow_id: + params.extend([f"--workflow_id {self.workflow.workflow_id}"]) + task.runtime = 0 task.program = "wfbench" diff --git a/wfcommons/wfbench/translator/abstract_translator.py b/wfcommons/wfbench/translator/abstract_translator.py index 24d5a0d3..440871d9 100644 --- a/wfcommons/wfbench/translator/abstract_translator.py +++ b/wfcommons/wfbench/translator/abstract_translator.py @@ -42,7 +42,7 @@ def __init__(self, self.workflow = workflow else: instance = Instance(workflow, logger=logger) - self.workflow = instance.workflow + self.workflow: Workflow = instance.workflow self.workflow.write_json() diff --git a/wfcommons/wfbench/translator/taskvine.py b/wfcommons/wfbench/translator/taskvine.py index a9e1db47..9a901f5c 100644 --- a/wfcommons/wfbench/translator/taskvine.py +++ b/wfcommons/wfbench/translator/taskvine.py @@ -10,9 +10,9 @@ import pathlib import shutil - +import textwrap from logging import Logger -from typing import Dict, Optional, Union +from typing import Optional, Union from .abstract_translator import Translator from ...common import Workflow @@ -20,6 +20,15 @@ this_dir = pathlib.Path(__file__).resolve().parent +def get_flowcept_init(workflow_id, workflow_name): + code = textwrap.dedent(f""" + from flowcept.flowcept_api.flowcept_controller import Flowcept + f = Flowcept(workflow_id="{workflow_id}", workflow_name="{workflow_name}", bundle_exec_id="{workflow_id}") + f.start() + """) + return code + + class TaskVineTranslator(Translator): """ A WfFormat parser for creating TaskVine workflow applications. @@ -31,11 +40,13 @@ class TaskVineTranslator(Translator): """ def __init__(self, workflow: Union[Workflow, pathlib.Path], + with_flowcept: Optional[bool] = False, logger: Optional[Logger] = None) -> None: """Create an object of the translator.""" super().__init__(workflow, logger) self.parsed_tasks = [] self.task_counter = 1 + self.with_flowcept = with_flowcept self.output_files_map = {} def translate(self, output_folder: pathlib.Path) -> None: @@ -57,7 +68,11 @@ def translate(self, output_folder: pathlib.Path) -> None: with open(this_dir.joinpath("templates/taskvine_template.py")) as fp: run_workflow_code = fp.read() run_workflow_code = run_workflow_code.replace("# Generated code goes here", self.script) - + + if self.with_flowcept: + run_workflow_code = run_workflow_code.replace("# FLOWCEPT_INIT", get_flowcept_init(self.workflow.workflow_id, self.workflow.name)) + run_workflow_code = run_workflow_code.replace("# FLOWCEPT_END", "f.stop()") + # write benchmark files output_folder.mkdir(parents=True) with open(output_folder.joinpath("taskvine_workflow.py"), "w") as fp: @@ -67,6 +82,9 @@ def translate(self, output_folder: pathlib.Path) -> None: self._copy_binary_files(output_folder) self._generate_input_files(output_folder) shutil.copy(this_dir.joinpath("templates/taskvine_poncho.json"), output_folder) + + + def _add_level_tasks(self, tasks_list: list[str]) -> list[str]: """ diff --git a/wfcommons/wfbench/translator/templates/taskvine_template.py b/wfcommons/wfbench/translator/templates/taskvine_template.py index 98d55c69..98e2c813 100644 --- a/wfcommons/wfbench/translator/templates/taskvine_template.py +++ b/wfcommons/wfbench/translator/templates/taskvine_template.py @@ -12,6 +12,7 @@ import ndcctools.taskvine as vine +# FLOWCEPT_INIT # Create a new manager m = vine.Manager(9123) @@ -47,4 +48,6 @@ def wait_for_tasks_completion(): cpu_bench = m.declare_file("bin/cpu-benchmark", cache="workflow") stress_ng = m.declare_file(shutil.which("stress-ng"), cache="workflow") -# Generated code goes here \ No newline at end of file +# Generated code goes here + +# FLOWCEPT_END