diff --git a/httomo/data/dataset_store.py b/httomo/data/dataset_store.py index 45b22c38f..dd6249c03 100644 --- a/httomo/data/dataset_store.py +++ b/httomo/data/dataset_store.py @@ -117,7 +117,6 @@ def aux_data(self) -> AuxiliaryData: def write_block(self, block: DataSetBlock): if self._readonly: raise ValueError("Cannot write after creating a reader") - block.to_cpu() # TODO: possibly needs moving outside the block writing start = max(block.chunk_index_unpadded) if self._data is None: # if non-slice dims in block are different, update the shapes here diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index b6384536d..19ad107e1 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -1,4 +1,3 @@ -from itertools import islice import logging import time from typing import Any, Dict, Literal, Optional, List, Tuple, Union @@ -238,9 +237,47 @@ def _setup_source_sink(self, section: Section, idx: int): def _execute_section_block( self, section: Section, block: DataSetBlock ) -> DataSetBlock: - for method in section: + if_previous_block_is_on_gpu = False + convert_gpu_block_to_cpu = False + + for ind, method in enumerate(section): + if_current_block_is_on_gpu = False + if method.implementation == "gpu_cupy": + if_current_block_is_on_gpu = True + if method.method_name == "calculate_stats" and if_previous_block_is_on_gpu: + if_current_block_is_on_gpu = True + + if ind == len(section) - 1 and if_current_block_is_on_gpu: + convert_gpu_block_to_cpu = True + self.set_side_inputs(method) + + start = time.perf_counter_ns() block = self._execute_method(method, block) + + if convert_gpu_block_to_cpu: + with catchtime() as t: + block.to_cpu() + method.gpu_time.device2host += t.elapsed + + end = time.perf_counter_ns() + + if self.monitor is not None: + self.monitor.report_method_block( + method.method_name, + method.module_path, + method.task_id, + _get_slicing_dim(method.pattern) - 1, + block.shape, + block.chunk_index, + block.global_index, + (end - start) * 1e-9, + method.gpu_time.kernel, + method.gpu_time.host2device, + method.gpu_time.device2host, + ) + + if_previous_block_is_on_gpu = if_current_block_is_on_gpu return block def _log_pipeline(self, msg: Any, level: int = logging.INFO): @@ -272,25 +309,10 @@ def _load_datasets(self): def _execute_method( self, method: MethodWrapper, block: DataSetBlock ) -> DataSetBlock: - start = time.perf_counter_ns() block = method.execute(block) - end = time.perf_counter_ns() + if block.is_last_in_chunk: self.append_side_outputs(method.get_side_output()) - if self.monitor is not None: - self.monitor.report_method_block( - method.method_name, - method.module_path, - method.task_id, - _get_slicing_dim(method.pattern) - 1, - block.shape, - block.chunk_index, - block.global_index, - (end - start) * 1e-9, - method.gpu_time.kernel, - method.gpu_time.host2device, - method.gpu_time.device2host, - ) return block def append_side_outputs(self, side_outputs: Dict[str, Any]): diff --git a/tests/runner/test_task_runner.py b/tests/runner/test_task_runner.py index 398163487..56a01e8c6 100644 --- a/tests/runner/test_task_runner.py +++ b/tests/runner/test_task_runner.py @@ -271,12 +271,14 @@ def test_calls_append_side_outputs_after_last_block( spy = mocker.patch.object(t, "append_side_outputs") t._prepare() t._execute_method( - method, block1 + method, + block1, ) # the first block shouldn't trigger a side output append call assert spy.call_count == 0 t._execute_method( - method, block2 + method, + block2, ) # the last block should trigger side output append call getmock.assert_called_once() spy.assert_called_once_with(side_outputs) @@ -306,7 +308,7 @@ def test_update_side_inputs_updates_downstream_methods( setitem3.assert_has_calls(method3_calls) -def test_execute_method_updates_monitor( +def test_execute_section_block_updates_monitor( mocker: MockerFixture, tmp_path: PathLike, dummy_block: DataSetBlock ): loader = make_test_loader(mocker) @@ -316,10 +318,11 @@ def test_execute_method_updates_monitor( ) mon = mocker.create_autospec(MonitoringInterface, instance=True) p = Pipeline(loader=loader, methods=[method1]) + s = sectionize(p) t = TaskRunner(p, reslice_dir=tmp_path, comm=MPI.COMM_WORLD, monitor=mon) t._prepare() mocker.patch.object(method1, "execute", return_value=dummy_block) - t._execute_method(method1, dummy_block) + t._execute_section_block(s[0], dummy_block) mon.report_method_block.assert_called_once_with( method1.method_name,