Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions bin/wfbench
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ from io import StringIO
from filelock import FileLock
from typing import List, Optional


this_dir = pathlib.Path(__file__).resolve().parent


def lock_core(path_locked: pathlib.Path,
path_cores: pathlib.Path) -> int:
"""
Expand Down Expand Up @@ -126,7 +128,7 @@ def cpu_mem_benchmark(cpu_threads: Optional[int] = 5,
cpu_procs.append(cpu_proc)

if mem_threads > 0:
mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
if core:
os.sched_setaffinity(mem_proc.pid, {core})
mem_procs.append(mem_proc)
Expand Down Expand Up @@ -165,7 +167,7 @@ def io_write_benchmark_user_input_data_size(outputs,

def io_alternate(inputs, outputs, memory_limit=None, rundir=None, event=None):
"""Alternate between reading and writing to a file, ensuring read only happens after write."""

if memory_limit is None:
memory_limit = 10 * 1024 * 1024 # sys.maxsize
memory_limit = int(memory_limit)
Expand All @@ -186,19 +188,21 @@ def io_alternate(inputs, outputs, memory_limit=None, rundir=None, event=None):


def get_available_gpus():
proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = proc.communicate()
df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ")
return df[df["utilization.gpu"] <= 5].index.to_list()


def gpu_benchmark(time: int = 100,
work: int = 100,
device: int = 0): #work, device

gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"]
work: int = 100,
device: int = 0): # work, device

gpu_prog = [
f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"]
print(f"Running GPU Benchmark: {gpu_prog}")
subprocess.Popen(gpu_prog, shell=True)
subprocess.Popen(gpu_prog, shell=True)


def get_parser() -> argparse.ArgumentParser:
Expand All @@ -212,11 +216,31 @@ def get_parser() -> argparse.ArgumentParser:
help="Path to cores file.")
parser.add_argument("--cpu-work", default=None, help="Amount of CPU work.")
parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.")
parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete the task (overrides CPU and GPU works)")
parser.add_argument("--time", default=None,
help="Time limit (in seconds) to complete the task (overrides CPU and GPU works)")
parser.add_argument("--mem", default=None, help="Max amount (in MB) of memory consumption.")
parser.add_argument("--out", help="output files name.")
parser.add_argument("--with-flowcept", action="store_true", default=False, help="If you want to use Flowcept.")
parser.add_argument("--workflow_id", default=None, help="Id to group tasks in a workflow.")
return parser



def begin_flowcept(args, other):
print("Running with Flowcept.")
from flowcept import Flowcept, FlowceptTask
# TODO: parametrize to allow storing individual tasks
f = Flowcept(workflow_id=args.workflow_id,
bundle_exec_id=args.workflow_id,
start_persistence=False, save_workflow=False)
f.start()
t = FlowceptTask(workflow_id=args.workflow_id, used={**args.__dict__, "other_args": other})
return f, t


def end_flowcept(flowcept, flowcept_task):
flowcept_task.end()
flowcept.stop()


def main():
"""Main program."""
Expand All @@ -225,6 +249,9 @@ def main():
print("[Wfbench] ARGS", args)
core = None

if args.with_flowcept:
flowcept, flowcept_task = begin_flowcept(args, other)

if args.rundir:
rundir = pathlib.Path(args.rundir)
else:
Expand All @@ -249,19 +276,19 @@ def main():

# Remove all escape characters before attempting to parse the JSON string
cleaned_output = re.sub(r'\\+', '', args.out)

# Attempt to parse the cleaned string
try:
outputs_dict = json.loads(cleaned_output)
except json.JSONDecodeError as e:
print("Failed to decode JSON:", e)
outputs_dict = {}

print("OUTPUT",outputs_dict)
print("OUTPUT", outputs_dict)
print("INPUTS", other)

# Create a multiprocessing event that in the first run is set to True
write_done_event = multiprocessing.Event()
write_done_event = multiprocessing.Event()
# Set this to True to allow the first read to happen
write_done_event.set()
# Print the value of the event
Expand All @@ -273,10 +300,9 @@ def main():
io_proc.start()
procs.append(io_proc)


if args.gpu_work:
print("[WfBench] Starting GPU Benchmark...")
available_gpus = get_available_gpus() #checking for available GPUs
available_gpus = get_available_gpus() # checking for available GPUs

if not available_gpus:
print("No GPU available")
Expand All @@ -289,20 +315,20 @@ def main():
gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device)
else:
gpu_benchmark(work=int(args.gpu_work), device=device)



if args.cpu_work:
print("[WfBench] Starting CPU and Memory Benchmarks...")
if core:
print(f"[WfBench] {args.name} acquired core {core}")

mem_threads=int(10 - 10 * args.percent_cpu)
mem_threads = int(10 - 10 * args.percent_cpu)
cpu_procs, mem_procs = cpu_mem_benchmark(cpu_threads=int(10 * args.percent_cpu),
mem_threads=mem_threads,
cpu_work=sys.maxsize if args.time else int(args.cpu_work),
core=core,
total_mem=args.mem)

mem_threads=mem_threads,
cpu_work=sys.maxsize if args.time else int(
args.cpu_work),
core=core,
total_mem=args.mem)

procs.extend(cpu_procs)
if args.time:
time.sleep(int(args.time))
Expand All @@ -316,7 +342,6 @@ def main():
io_proc.terminate()
io_proc.join()


for mem_proc in mem_procs:
try:
os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails
Expand All @@ -340,7 +365,12 @@ def main():

if core:
unlock_core(path_locked, path_cores, core)

if args.with_flowcept:
end_flowcept(flowcept, flowcept_task)

print("WfBench Benchmark completed!")


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Tracker = "https://github.com/wfcommons/WfCommons/issues"

[project.optional-dependencies]
test = ["pytest", "pytest-cov"]
flowcept = ["flowcept"]

[tool.setuptools.dynamic]
version = {attr = "wfcommons.version.__version__"}
Expand Down
1 change: 1 addition & 0 deletions wfcommons/common/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(self,
self.tasks: Task = {}
self.tasks_parents = {}
self.tasks_children = {}
self.workflow_id: str = None
super().__init__(name=name, makespan=self.makespan, executedat=self.executed_at)

def add_task(self, task: Task) -> None:
Expand Down
17 changes: 14 additions & 3 deletions wfcommons/wfbench/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ class WorkflowBenchmark:
def __init__(self,
recipe: Type[WfChefWorkflowRecipe],
num_tasks: int,
logger: Optional[Logger] = None) -> None:
logger: Optional[Logger] = None, with_flowcept=False) -> None:
"""Create an object that represents a workflow benchmark generator."""
self.logger: Logger = logging.getLogger(
__name__) if logger is None else logger
self.recipe = recipe
self.num_tasks = num_tasks
self.with_flowcept = with_flowcept
self.workflow: Workflow = None

def create_benchmark_from_input_file(self,
Expand Down Expand Up @@ -254,7 +255,8 @@ def create_benchmark(self,
mem: Optional[float] = None,
lock_files_folder: Optional[pathlib.Path] = None,
regenerate: Optional[bool] = True,
rundir: Optional[pathlib.Path] = None) -> pathlib.Path:
rundir: Optional[pathlib.Path] = None,
) -> pathlib.Path:
"""Create a workflow benchmark.

:param save_dir: Folder to generate the workflow benchmark JSON instance and input data files.
Expand Down Expand Up @@ -293,6 +295,9 @@ def create_benchmark(self,
json_path = save_dir.joinpath(
f"{self.workflow.name.lower()}-{self.num_tasks}").with_suffix(".json")

if self.with_flowcept:
self.workflow.workflow_id = str(uuid.uuid4())

cores, lock = self._creating_lock_files(lock_files_folder)
for task in self.workflow.tasks.values():
self._set_argument_parameters(
Expand All @@ -305,7 +310,7 @@ def create_benchmark(self,
lock_files_folder,
cores,
lock,
rundir
rundir,
)
task.input_files = []
task.output_files = []
Expand Down Expand Up @@ -384,6 +389,12 @@ def _set_argument_parameters(self,
if rundir:
params.extend([f"--rundir {rundir}"])

if self.with_flowcept:
params.extend(["--with-flowcept"])

if self.workflow.workflow_id:
params.extend([f"--workflow_id {self.workflow.workflow_id}"])

task.runtime = 0

task.program = "wfbench"
Expand Down
2 changes: 1 addition & 1 deletion wfcommons/wfbench/translator/abstract_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self,
self.workflow = workflow
else:
instance = Instance(workflow, logger=logger)
self.workflow = instance.workflow
self.workflow: Workflow = instance.workflow

self.workflow.write_json()

Expand Down
24 changes: 21 additions & 3 deletions wfcommons/wfbench/translator/taskvine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,25 @@

import pathlib
import shutil

import textwrap
from logging import Logger
from typing import Dict, Optional, Union
from typing import Optional, Union

from .abstract_translator import Translator
from ...common import Workflow

this_dir = pathlib.Path(__file__).resolve().parent


def get_flowcept_init(workflow_id, workflow_name):
code = textwrap.dedent(f"""
from flowcept.flowcept_api.flowcept_controller import Flowcept
f = Flowcept(workflow_id="{workflow_id}", workflow_name="{workflow_name}", bundle_exec_id="{workflow_id}")
f.start()
""")
return code


class TaskVineTranslator(Translator):
"""
A WfFormat parser for creating TaskVine workflow applications.
Expand All @@ -31,11 +40,13 @@ class TaskVineTranslator(Translator):
"""
def __init__(self,
workflow: Union[Workflow, pathlib.Path],
with_flowcept: Optional[bool] = False,
logger: Optional[Logger] = None) -> None:
"""Create an object of the translator."""
super().__init__(workflow, logger)
self.parsed_tasks = []
self.task_counter = 1
self.with_flowcept = with_flowcept
self.output_files_map = {}

def translate(self, output_folder: pathlib.Path) -> None:
Expand All @@ -57,7 +68,11 @@ def translate(self, output_folder: pathlib.Path) -> None:
with open(this_dir.joinpath("templates/taskvine_template.py")) as fp:
run_workflow_code = fp.read()
run_workflow_code = run_workflow_code.replace("# Generated code goes here", self.script)


if self.with_flowcept:
run_workflow_code = run_workflow_code.replace("# FLOWCEPT_INIT", get_flowcept_init(self.workflow.workflow_id, self.workflow.name))
run_workflow_code = run_workflow_code.replace("# FLOWCEPT_END", "f.stop()")

# write benchmark files
output_folder.mkdir(parents=True)
with open(output_folder.joinpath("taskvine_workflow.py"), "w") as fp:
Expand All @@ -67,6 +82,9 @@ def translate(self, output_folder: pathlib.Path) -> None:
self._copy_binary_files(output_folder)
self._generate_input_files(output_folder)
shutil.copy(this_dir.joinpath("templates/taskvine_poncho.json"), output_folder)




def _add_level_tasks(self, tasks_list: list[str]) -> list[str]:
"""
Expand Down
5 changes: 4 additions & 1 deletion wfcommons/wfbench/translator/templates/taskvine_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import ndcctools.taskvine as vine

# FLOWCEPT_INIT

# Create a new manager
m = vine.Manager(9123)
Expand Down Expand Up @@ -47,4 +48,6 @@ def wait_for_tasks_completion():
cpu_bench = m.declare_file("bin/cpu-benchmark", cache="workflow")
stress_ng = m.declare_file(shutil.which("stress-ng"), cache="workflow")

# Generated code goes here
# Generated code goes here

# FLOWCEPT_END
Loading