diff --git a/tests/wfbench/test_wfbench.py b/tests/wfbench/test_wfbench.py index 81bc664a..6b88b577 100644 --- a/tests/wfbench/test_wfbench.py +++ b/tests/wfbench/test_wfbench.py @@ -142,10 +142,6 @@ def test_create_from_recipe(self) -> None: # Create the data_specification options fixed_total_footprint_in_mb = 5 - # TODO: This seems really broken right now - # per_type_footprint = {} - # for task_type in ["blastall", "split_fasta", None]: - # per_type_footprint[task_type] = "1" # string??? for data_spec in [fixed_total_footprint_in_mb]: benchmark.create_benchmark(_create_fresh_local_dir(f"/tmp/benchmark"), cpu_work=1, data=data_spec, percent_cpu=0.6) diff --git a/wfcommons/wfbench/bench.py b/wfcommons/wfbench/bench.py index b102784e..2f013546 100644 --- a/wfcommons/wfbench/bench.py +++ b/wfcommons/wfbench/bench.py @@ -253,7 +253,7 @@ def create_benchmark(self, cpu_work: Union[int, Dict[str, int]] = None, gpu_work: Union[int, Dict[str, int]] = None, time: Optional[int] = None, - data: Optional[Union[int, Dict[str, str]]] = None, + data: Optional[int] = 0, mem: Optional[float] = None, lock_files_folder: Optional[pathlib.Path] = None, regenerate: Optional[bool] = True, @@ -271,7 +271,7 @@ def create_benchmark(self, :type gpu_work: Union[int, Dict[str, int]] :param time: Time limit for running each task (in seconds). :type time: Optional[int] - :param data: Dictionary of input size files per workflow task type or total workflow data footprint (in MB). + :param data: Total workflow data footprint (in MB). :type data: Optional[Union[int, Dict[str, str]]] :param mem: Maximum amount of memory consumption per task (in MB). :type mem: Optional[float] @@ -317,7 +317,7 @@ def create_benchmark(self, task.input_files = [] task.output_files = [] - self._create_data_footprint(data, save_dir) + self._create_data_footprint(data) # TODO: add a flag to allow the file names to be changed workflow_input_files: List[File] = self._rename_files_to_wfbench_format() @@ -439,48 +439,27 @@ def _generate_task_gpu_params(self, task: Task, gpu_work: Union[int, Dict[str, i return [f"--gpu-work {_gpu_work}"] - def _create_data_footprint(self, data: Optional[Union[int, Dict[str, str]]], save_dir: pathlib.Path) -> None: + + def _create_data_footprint(self, data: int) -> None: """ - task's data footprint provided as individual data input size (JSON file) + task's data footprint provided as an int """ - if isinstance(data, dict): - outputs = self._output_files(data) - for task in self.workflow.tasks.values(): - outputs_file_size = {} - for child, data_size in outputs[task.task_id].items(): - outputs_file_size[f"{task.task_id}_{child}_output.txt"] = data_size - - task.args.extend([f"--output-files {outputs_file_size}"]) + num_sys_files, num_total_files = self._calculate_input_files() + self.logger.debug( + f"Number of input files to be created by the system: {num_sys_files}") + self.logger.debug( + f"Total number of files used by the workflow: {num_total_files}") + file_size = round(data * 1000000 / num_total_files) # MB to B + self.logger.debug( + f"Every input/output file is of size: {file_size}") - self._add_output_files(outputs) - self._add_input_files(outputs, data) - self.logger.debug("Generating system files.") - # self._generate_data_for_root_nodes(save_dir, data) + for task in self.workflow.tasks.values(): + output = {f"{task.task_id}_output.txt": file_size} + task.args.extend([f"--output-files {output}"]) - # data footprint provided as an integer - elif isinstance(data, int): - num_sys_files, num_total_files = self._calculate_input_files() - self.logger.debug( - f"Number of input files to be created by the system: {num_sys_files}") - self.logger.debug( - f"Total number of files used by the workflow: {num_total_files}") - file_size = round(data * 1000000 / num_total_files) # MB to B - self.logger.debug( - f"Every input/output file is of size: {file_size}") + self._add_output_files(file_size) + self._add_input_files(file_size) - for task in self.workflow.tasks.values(): - output = {f"{task.task_id}_output.txt": file_size} - task.args.extend([f"--output-files {output}"]) - outputs = {} - if self.workflow.tasks_children[task.task_id]: - outputs.setdefault(task.task_id, {}) - for child in self.workflow.tasks_children[task.task_id]: - outputs[task.task_id][child] = file_size - - self._add_output_files(file_size) - self._add_input_files(outputs, file_size) - self.logger.debug("Generating system files.") - # self._generate_data_for_root_nodes(save_dir, file_size) def _output_files(self, data: Dict[str, str]) -> Dict[str, Dict[str, int]]: """ @@ -525,78 +504,56 @@ def _calculate_input_files(self): return tasks_need_input, total_num_files - def _add_output_files(self, output_files: Union[int, Dict[str, Dict[str, int]]]) -> None: + def _add_output_files(self, output_file_size: int) -> None: """ Add output files when input data was offered by the user. - :param output_files: - :type wf: Union[int, Dict[str, Dict[str, int]]] + :param output_file_size: file size in MB + :type output_file_size: int """ for task in self.workflow.tasks.values(): - if isinstance(output_files, Dict): - for child, file_size in output_files[task.task_id].items(): - task.output_files.append( - File(f"{task.task_id}_{child}_output.txt", file_size)) - elif isinstance(output_files, int): - task.output_files.append( - File(f"{task.task_id}_output.txt", output_files)) - - def _add_input_files(self, output_files: Dict[str, Dict[str, str]], data: Union[int, Dict[str, str]]) -> None: + task.output_files.append( + File(f"{task.task_id}_output.txt", output_file_size)) + + def _add_input_files(self, input_file_size: int) -> None: """ Add input files when input data was offered by the user. - :param output_files: - :type wf: Dict[str, Dict[str, str]] - :param data: - :type data: Union[int, Dict[str, str]] + :param input_file_size: a file size in MB + :type input_file_size: int """ - input_files = {} - for parent, children in output_files.items(): - for child, file_size in children.items(): - input_files.setdefault(child, {}) - input_files[child][parent] = file_size - for task in self.workflow.tasks.values(): inputs = [] if not self.workflow.tasks_parents[task.task_id]: task.input_files.append( - File(f"{task.task_id}_input.txt", - data[task.category] if isinstance( - data, Dict) else data)) + File(f"{task.task_id}_input.txt", input_file_size)) inputs.append(f'{task.task_id}_input.txt') else: - if isinstance(data, Dict): - for parent, file_size in input_files[task.task_id].items(): - task.input_files.append( - File(f"{parent}_{task.task_id}_output.txt", file_size)) - inputs.append(f"{parent}_{task.task_id}_output.txt") - - elif isinstance(data, int): - for parent in self.workflow.tasks_parents[task.task_id]: - task.input_files.append( - File(f"{parent}_output.txt", data)) - inputs.append(f"{parent}_output.txt") + for parent in self.workflow.tasks_parents[task.task_id]: + task.input_files.append( + File(f"{parent}_output.txt", input_file_size)) + inputs.append(f"{parent}_output.txt") task.args.append(f"--input-files {inputs}") - def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int, Dict[str, str]]) -> None: - """ - Generate workflow's input data for root nodes based on user's input. - - :param save_dir: - :type save_dir: pathlib.Path - :param data: - :type data: Dict[str, str] - """ - for task in self.workflow.tasks.values(): - if not self.workflow.tasks_parents[task.task_id]: - file_size = data[task.category] if isinstance( - data, Dict) else data - file = save_dir.joinpath(f"{task.task_id}_input.txt") - if not file.is_file(): - with open(file, 'wb') as fp: - fp.write(os.urandom(int(file_size))) - self.logger.debug(f"Created file: {str(file)}") + # def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int, Dict[str, str]]) -> None: + # """ + # Generate workflow's input data for root nodes based on user's input. + # + # :param save_dir: + # :type save_dir: pathlib.Path + # :param data: + # :type data: Dict[str, str] + # """ + # for task in self.workflow.tasks.values(): + # if not self.workflow.tasks_parents[task.task_id]: + # file_size = data[task.category] if isinstance( + # data, Dict) else data + # file = save_dir.joinpath(f"{task.task_id}_input.txt") + # if not file.is_file(): + # with open(file, 'wb') as fp: + # fp.write(os.urandom(int(file_size))) + # self.logger.debug(f"Created file: {str(file)}") def generate_input_file(self, path: pathlib.Path) -> None: """