Skip to content

Commit 02f7bea

Browse files
committed
Removed category-based data size specification for creating benchmark
1 parent c6c838a commit 02f7bea

File tree

2 files changed

+38
-90
lines changed

2 files changed

+38
-90
lines changed

tests/wfbench/test_wfbench.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,6 @@ def test_create_from_recipe(self) -> None:
142142

143143
# Create the data_specification options
144144
fixed_total_footprint_in_mb = 5
145-
# TODO: This seems really broken right now
146-
# per_type_footprint = {}
147-
# for task_type in ["blastall", "split_fasta", None]:
148-
# per_type_footprint[task_type] = "1" # string???
149145

150146
for data_spec in [fixed_total_footprint_in_mb]:
151147
benchmark.create_benchmark(_create_fresh_local_dir(f"/tmp/benchmark"), cpu_work=1, data=data_spec, percent_cpu=0.6)

wfcommons/wfbench/bench.py

Lines changed: 38 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ def create_benchmark(self,
253253
cpu_work: Union[int, Dict[str, int]] = None,
254254
gpu_work: Union[int, Dict[str, int]] = None,
255255
time: Optional[int] = None,
256-
data: Optional[Union[int, Dict[str, str]]] = None,
256+
data: Optional[int] = 0,
257257
mem: Optional[float] = None,
258258
lock_files_folder: Optional[pathlib.Path] = None,
259259
regenerate: Optional[bool] = True,
@@ -271,7 +271,7 @@ def create_benchmark(self,
271271
:type gpu_work: Union[int, Dict[str, int]]
272272
:param time: Time limit for running each task (in seconds).
273273
:type time: Optional[int]
274-
:param data: Dictionary of input size files per workflow task type or total workflow data footprint (in MB).
274+
:param data: Total workflow data footprint (in MB).
275275
:type data: Optional[Union[int, Dict[str, str]]]
276276
:param mem: Maximum amount of memory consumption per task (in MB).
277277
:type mem: Optional[float]
@@ -439,34 +439,11 @@ def _generate_task_gpu_params(self, task: Task, gpu_work: Union[int, Dict[str, i
439439

440440
return [f"--gpu-work {_gpu_work}"]
441441

442-
def _create_data_footprint(self, data: Optional[Union[int, Dict[str, str]]]) -> None:
442+
443+
def _create_data_footprint(self, data: int) -> None:
443444
"""
444-
task's data footprint provided as an int or individual file sizes
445+
task's data footprint provided as an int
445446
"""
446-
if isinstance(data, dict):
447-
self._create_data_footprint_dict(data)
448-
elif isinstance(data, int):
449-
self._create_data_footprint_int(data)
450-
else:
451-
raise NotImplementedError("Internal error: invalid type for argument to _create_data_footprint() method")
452-
453-
def _create_data_footprint_dict(self, data: Dict[str, str]) -> None:
454-
raise NotImplementedError("Creating data footprint based on individual file sizes is still work in progress")
455-
# TODO: This needs to be fixed one day...the notion of task types is weird anyway
456-
# outputs = self._output_files(data)
457-
# for task in self.workflow.tasks.values():
458-
# outputs_file_size = {}
459-
# for child, data_size in outputs[task.task_id].items():
460-
# outputs_file_size[f"{task.task_id}_{child}_output.txt"] = data_size
461-
#
462-
# task.args.extend([f"--output-files {outputs_file_size}"])
463-
# self._add_output_files(outputs)
464-
# # TODO: THIS IS CLEARLY A BUG!!!
465-
# self._add_input_files(outputs, data)
466-
# self.logger.debug("Generating system files.")
467-
# # self._generate_data_for_root_nodes(save_dir, data)
468-
469-
def _create_data_footprint_int(self, data: int) -> None:
470447
num_sys_files, num_total_files = self._calculate_input_files()
471448
self.logger.debug(
472449
f"Number of input files to be created by the system: {num_sys_files}")
@@ -481,9 +458,8 @@ def _create_data_footprint_int(self, data: int) -> None:
481458
task.args.extend([f"--output-files {output}"])
482459

483460
self._add_output_files(file_size)
484-
self._add_input_files({}, file_size)
461+
self._add_input_files(file_size)
485462

486-
# self._generate_data_for_root_nodes(save_dir, file_size)
487463

488464
def _output_files(self, data: Dict[str, str]) -> Dict[str, Dict[str, int]]:
489465
"""
@@ -528,80 +504,56 @@ def _calculate_input_files(self):
528504

529505
return tasks_need_input, total_num_files
530506

531-
def _add_output_files(self, output_files: Union[int, Dict[str, Dict[str, int]]]) -> None:
507+
def _add_output_files(self, output_file_size: int) -> None:
532508
"""
533509
Add output files when input data was offered by the user.
534510
535-
:param output_files:
536-
:type wf: Union[int, Dict[str, Dict[str, int]]]
511+
:param output_file_size: file size in MB
512+
:type output_file_size: int
537513
"""
538514
for task in self.workflow.tasks.values():
539-
if isinstance(output_files, Dict):
540-
for child, file_size in output_files[task.task_id].items():
541-
task.output_files.append(
542-
File(f"{task.task_id}_{child}_output.txt", file_size))
543-
elif isinstance(output_files, int):
544-
task.output_files.append(
545-
File(f"{task.task_id}_output.txt", output_files))
546-
547-
def _add_input_files(self, output_files: Dict[str, Dict[str, str]], data: Union[int, Dict[str, str]]) -> None:
515+
task.output_files.append(
516+
File(f"{task.task_id}_output.txt", output_file_size))
517+
518+
def _add_input_files(self, input_file_size: int) -> None:
548519
"""
549520
Add input files when input data was offered by the user.
550521
551-
:param output_files: specification of individual output files, or {} if a single size is specified
552-
:type wf: Dict[str, Dict[str, str]]
553-
:param data: Either a single size or a specification on specific files
554-
:type data: Union[int, Dict[str, str]]
522+
:param input_file_size: a file size in MB
523+
:type input_file_size: int
555524
"""
556525
for task in self.workflow.tasks.values():
557526
inputs = []
558527
if not self.workflow.tasks_parents[task.task_id]:
559528
task.input_files.append(
560-
File(f"{task.task_id}_input.txt",
561-
data[task.category] if isinstance(
562-
data, Dict) else data))
529+
File(f"{task.task_id}_input.txt", input_file_size))
563530
inputs.append(f'{task.task_id}_input.txt')
564531
else:
565-
if isinstance(data, Dict):
566-
NotImplementedError("Creating data footprint based on individual file sizes is still work in progress")
567-
# TODO: See previous bug in _create_data_footprint_dict()
568-
# input_files = {}
569-
# for parent, children in output_files.items():
570-
# for child, file_size in children.items():
571-
# input_files.setdefault(child, {})
572-
# input_files[child][parent] = file_size
573-
#
574-
# for parent, file_size in input_files[task.task_id].items():
575-
# task.input_files.append(
576-
# File(f"{parent}_{task.task_id}_output.txt", file_size))
577-
# inputs.append(f"{parent}_{task.task_id}_output.txt")
578-
579-
elif isinstance(data, int):
580-
for parent in self.workflow.tasks_parents[task.task_id]:
581-
task.input_files.append(
582-
File(f"{parent}_output.txt", data))
583-
inputs.append(f"{parent}_output.txt")
532+
for parent in self.workflow.tasks_parents[task.task_id]:
533+
task.input_files.append(
534+
File(f"{parent}_output.txt", input_file_size))
535+
inputs.append(f"{parent}_output.txt")
584536

585537
task.args.append(f"--input-files {inputs}")
586538

587-
def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int, Dict[str, str]]) -> None:
588-
"""
589-
Generate workflow's input data for root nodes based on user's input.
590-
591-
:param save_dir:
592-
:type save_dir: pathlib.Path
593-
:param data:
594-
:type data: Dict[str, str]
595-
"""
596-
for task in self.workflow.tasks.values():
597-
if not self.workflow.tasks_parents[task.task_id]:
598-
file_size = data[task.category] if isinstance(
599-
data, Dict) else data
600-
file = save_dir.joinpath(f"{task.task_id}_input.txt")
601-
if not file.is_file():
602-
with open(file, 'wb') as fp:
603-
fp.write(os.urandom(int(file_size)))
604-
self.logger.debug(f"Created file: {str(file)}")
539+
# def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int, Dict[str, str]]) -> None:
540+
# """
541+
# Generate workflow's input data for root nodes based on user's input.
542+
#
543+
# :param save_dir:
544+
# :type save_dir: pathlib.Path
545+
# :param data:
546+
# :type data: Dict[str, str]
547+
# """
548+
# for task in self.workflow.tasks.values():
549+
# if not self.workflow.tasks_parents[task.task_id]:
550+
# file_size = data[task.category] if isinstance(
551+
# data, Dict) else data
552+
# file = save_dir.joinpath(f"{task.task_id}_input.txt")
553+
# if not file.is_file():
554+
# with open(file, 'wb') as fp:
555+
# fp.write(os.urandom(int(file_size)))
556+
# self.logger.debug(f"Created file: {str(file)}")
605557

606558
def generate_input_file(self, path: pathlib.Path) -> None:
607559
"""

0 commit comments

Comments
 (0)