Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions tests/wfbench/test_wfbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ def test_create_from_recipe(self) -> None:

# Create the data_specification options
fixed_total_footprint_in_mb = 5
# TODO: This seems really broken right now
# per_type_footprint = {}
# for task_type in ["blastall", "split_fasta", None]:
# per_type_footprint[task_type] = "1" # string???

for data_spec in [fixed_total_footprint_in_mb]:
benchmark.create_benchmark(_create_fresh_local_dir(f"/tmp/benchmark"), cpu_work=1, data=data_spec, percent_cpu=0.6)
Expand Down
145 changes: 51 additions & 94 deletions wfcommons/wfbench/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def create_benchmark(self,
cpu_work: Union[int, Dict[str, int]] = None,
gpu_work: Union[int, Dict[str, int]] = None,
time: Optional[int] = None,
data: Optional[Union[int, Dict[str, str]]] = None,
data: Optional[int] = 0,
mem: Optional[float] = None,
lock_files_folder: Optional[pathlib.Path] = None,
regenerate: Optional[bool] = True,
Expand All @@ -271,7 +271,7 @@ def create_benchmark(self,
:type gpu_work: Union[int, Dict[str, int]]
:param time: Time limit for running each task (in seconds).
:type time: Optional[int]
:param data: Dictionary of input size files per workflow task type or total workflow data footprint (in MB).
:param data: Total workflow data footprint (in MB).
:type data: Optional[Union[int, Dict[str, str]]]
:param mem: Maximum amount of memory consumption per task (in MB).
:type mem: Optional[float]
Expand Down Expand Up @@ -317,7 +317,7 @@ def create_benchmark(self,
task.input_files = []
task.output_files = []

self._create_data_footprint(data, save_dir)
self._create_data_footprint(data)

# TODO: add a flag to allow the file names to be changed
workflow_input_files: List[File] = self._rename_files_to_wfbench_format()
Expand Down Expand Up @@ -439,48 +439,27 @@ def _generate_task_gpu_params(self, task: Task, gpu_work: Union[int, Dict[str, i

return [f"--gpu-work {_gpu_work}"]

def _create_data_footprint(self, data: Optional[Union[int, Dict[str, str]]], save_dir: pathlib.Path) -> None:

def _create_data_footprint(self, data: int) -> None:
"""
task's data footprint provided as individual data input size (JSON file)
task's data footprint provided as an int
"""
if isinstance(data, dict):
outputs = self._output_files(data)
for task in self.workflow.tasks.values():
outputs_file_size = {}
for child, data_size in outputs[task.task_id].items():
outputs_file_size[f"{task.task_id}_{child}_output.txt"] = data_size

task.args.extend([f"--output-files {outputs_file_size}"])
num_sys_files, num_total_files = self._calculate_input_files()
self.logger.debug(
f"Number of input files to be created by the system: {num_sys_files}")
self.logger.debug(
f"Total number of files used by the workflow: {num_total_files}")
file_size = round(data * 1000000 / num_total_files) # MB to B
self.logger.debug(
f"Every input/output file is of size: {file_size}")

self._add_output_files(outputs)
self._add_input_files(outputs, data)
self.logger.debug("Generating system files.")
# self._generate_data_for_root_nodes(save_dir, data)
for task in self.workflow.tasks.values():
output = {f"{task.task_id}_output.txt": file_size}
task.args.extend([f"--output-files {output}"])

# data footprint provided as an integer
elif isinstance(data, int):
num_sys_files, num_total_files = self._calculate_input_files()
self.logger.debug(
f"Number of input files to be created by the system: {num_sys_files}")
self.logger.debug(
f"Total number of files used by the workflow: {num_total_files}")
file_size = round(data * 1000000 / num_total_files) # MB to B
self.logger.debug(
f"Every input/output file is of size: {file_size}")
self._add_output_files(file_size)
self._add_input_files(file_size)

for task in self.workflow.tasks.values():
output = {f"{task.task_id}_output.txt": file_size}
task.args.extend([f"--output-files {output}"])
outputs = {}
if self.workflow.tasks_children[task.task_id]:
outputs.setdefault(task.task_id, {})
for child in self.workflow.tasks_children[task.task_id]:
outputs[task.task_id][child] = file_size

self._add_output_files(file_size)
self._add_input_files(outputs, file_size)
self.logger.debug("Generating system files.")
# self._generate_data_for_root_nodes(save_dir, file_size)

def _output_files(self, data: Dict[str, str]) -> Dict[str, Dict[str, int]]:
"""
Expand Down Expand Up @@ -525,78 +504,56 @@ def _calculate_input_files(self):

return tasks_need_input, total_num_files

def _add_output_files(self, output_files: Union[int, Dict[str, Dict[str, int]]]) -> None:
def _add_output_files(self, output_file_size: int) -> None:
"""
Add output files when input data was offered by the user.

:param output_files:
:type wf: Union[int, Dict[str, Dict[str, int]]]
:param output_file_size: file size in MB
:type output_file_size: int
"""
for task in self.workflow.tasks.values():
if isinstance(output_files, Dict):
for child, file_size in output_files[task.task_id].items():
task.output_files.append(
File(f"{task.task_id}_{child}_output.txt", file_size))
elif isinstance(output_files, int):
task.output_files.append(
File(f"{task.task_id}_output.txt", output_files))

def _add_input_files(self, output_files: Dict[str, Dict[str, str]], data: Union[int, Dict[str, str]]) -> None:
task.output_files.append(
File(f"{task.task_id}_output.txt", output_file_size))

def _add_input_files(self, input_file_size: int) -> None:
"""
Add input files when input data was offered by the user.

:param output_files:
:type wf: Dict[str, Dict[str, str]]
:param data:
:type data: Union[int, Dict[str, str]]
:param input_file_size: a file size in MB
:type input_file_size: int
"""
input_files = {}
for parent, children in output_files.items():
for child, file_size in children.items():
input_files.setdefault(child, {})
input_files[child][parent] = file_size

for task in self.workflow.tasks.values():
inputs = []
if not self.workflow.tasks_parents[task.task_id]:
task.input_files.append(
File(f"{task.task_id}_input.txt",
data[task.category] if isinstance(
data, Dict) else data))
File(f"{task.task_id}_input.txt", input_file_size))
inputs.append(f'{task.task_id}_input.txt')
else:
if isinstance(data, Dict):
for parent, file_size in input_files[task.task_id].items():
task.input_files.append(
File(f"{parent}_{task.task_id}_output.txt", file_size))
inputs.append(f"{parent}_{task.task_id}_output.txt")

elif isinstance(data, int):
for parent in self.workflow.tasks_parents[task.task_id]:
task.input_files.append(
File(f"{parent}_output.txt", data))
inputs.append(f"{parent}_output.txt")
for parent in self.workflow.tasks_parents[task.task_id]:
task.input_files.append(
File(f"{parent}_output.txt", input_file_size))
inputs.append(f"{parent}_output.txt")

task.args.append(f"--input-files {inputs}")

def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int, Dict[str, str]]) -> None:
"""
Generate workflow's input data for root nodes based on user's input.

:param save_dir:
:type save_dir: pathlib.Path
:param data:
:type data: Dict[str, str]
"""
for task in self.workflow.tasks.values():
if not self.workflow.tasks_parents[task.task_id]:
file_size = data[task.category] if isinstance(
data, Dict) else data
file = save_dir.joinpath(f"{task.task_id}_input.txt")
if not file.is_file():
with open(file, 'wb') as fp:
fp.write(os.urandom(int(file_size)))
self.logger.debug(f"Created file: {str(file)}")
# def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int, Dict[str, str]]) -> None:
# """
# Generate workflow's input data for root nodes based on user's input.
#
# :param save_dir:
# :type save_dir: pathlib.Path
# :param data:
# :type data: Dict[str, str]
# """
# for task in self.workflow.tasks.values():
# if not self.workflow.tasks_parents[task.task_id]:
# file_size = data[task.category] if isinstance(
# data, Dict) else data
# file = save_dir.joinpath(f"{task.task_id}_input.txt")
# if not file.is_file():
# with open(file, 'wb') as fp:
# fp.write(os.urandom(int(file_size)))
# self.logger.debug(f"Created file: {str(file)}")

def generate_input_file(self, path: pathlib.Path) -> None:
"""
Expand Down