From edba30eab8a75da88307220add971e5d3a3ec00e Mon Sep 17 00:00:00 2001 From: algol Date: Fri, 16 Jan 2026 16:03:31 +0000 Subject: [PATCH 01/14] moves block to cpu from block writing --- httomo/data/dataset_store.py | 1 - httomo/runner/task_runner.py | 7 ++++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/httomo/data/dataset_store.py b/httomo/data/dataset_store.py index 35c735646..e379c729a 100644 --- a/httomo/data/dataset_store.py +++ b/httomo/data/dataset_store.py @@ -117,7 +117,6 @@ def aux_data(self) -> AuxiliaryData: def write_block(self, block: DataSetBlock): if self._readonly: raise ValueError("Cannot write after creating a reader") - block.to_cpu() # TODO: possibly needs moving outside the block writing start = max(block.chunk_index_unpadded) if self._data is None: # if non-slice dims in block are different, update the shapes here diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index fbb65eb6b..2f4a6f18f 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -1,4 +1,3 @@ -from itertools import islice import logging import time from typing import Any, Dict, Literal, Optional, List, Tuple, Union @@ -38,6 +37,7 @@ log_rank, ) import numpy as np +from httomo.runner.method_wrapper import GpuTimeInfo class TaskRunner: @@ -238,9 +238,14 @@ def _setup_source_sink(self, section: Section, idx: int): def _execute_section_block( self, section: Section, block: DataSetBlock ) -> DataSetBlock: + self._gpu_time_info = GpuTimeInfo() for method in section: self.set_side_inputs(method) block = self._execute_method(method, block) + if method.cupyrun: + with catchtime() as t: + block.to_cpu() + self._gpu_time_info.device2host = t.elapsed return block def _log_pipeline(self, msg: Any, level: int = logging.INFO): From 3ac56323c626cf798d320392a6e3531ff3e0b697 Mon Sep 17 00:00:00 2001 From: algol Date: Fri, 16 Jan 2026 16:51:25 +0000 Subject: [PATCH 02/14] some logic refinement to deal with calculate_stats --- httomo/runner/task_runner.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 2f4a6f18f..d614a29ca 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -239,10 +239,18 @@ def _execute_section_block( self, section: Section, block: DataSetBlock ) -> DataSetBlock: self._gpu_time_info = GpuTimeInfo() + if_previous_block_is_on_gpu = False for method in section: + if_current_block_is_on_gpu = False self.set_side_inputs(method) block = self._execute_method(method, block) - if method.cupyrun: + if method.implementation == "gpu_cupy": + if_current_block_is_on_gpu = True + if method.method_name == "calculate_stats" and if_previous_block_is_on_gpu: + if_current_block_is_on_gpu = True + if_previous_block_is_on_gpu = if_current_block_is_on_gpu + + if if_current_block_is_on_gpu: with catchtime() as t: block.to_cpu() self._gpu_time_info.device2host = t.elapsed From fffdda922063a0806c107df8c5bc302b1ef935bc Mon Sep 17 00:00:00 2001 From: algol Date: Fri, 16 Jan 2026 17:06:17 +0000 Subject: [PATCH 03/14] fix --- httomo/runner/task_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index d614a29ca..ed055070e 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -253,7 +253,7 @@ def _execute_section_block( if if_current_block_is_on_gpu: with catchtime() as t: block.to_cpu() - self._gpu_time_info.device2host = t.elapsed + self._gpu_time_info.device2host += t.elapsed return block def _log_pipeline(self, msg: Any, level: int = logging.INFO): From 2cb161680fcfc7c34625960faa0085f6e45618e3 Mon Sep 17 00:00:00 2001 From: algol Date: Fri, 16 Jan 2026 17:49:32 +0000 Subject: [PATCH 04/14] moving block to cpu --- httomo/runner/task_runner.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index ed055070e..071c42f45 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -238,22 +238,27 @@ def _setup_source_sink(self, section: Section, idx: int): def _execute_section_block( self, section: Section, block: DataSetBlock ) -> DataSetBlock: - self._gpu_time_info = GpuTimeInfo() if_previous_block_is_on_gpu = False + convert_gpu_block_to_cpu = False + last_section_method_name = section[len(section) - 1].method_name + for method in section: if_current_block_is_on_gpu = False - self.set_side_inputs(method) - block = self._execute_method(method, block) if method.implementation == "gpu_cupy": if_current_block_is_on_gpu = True if method.method_name == "calculate_stats" and if_previous_block_is_on_gpu: if_current_block_is_on_gpu = True - if_previous_block_is_on_gpu = if_current_block_is_on_gpu - if if_current_block_is_on_gpu: - with catchtime() as t: - block.to_cpu() - self._gpu_time_info.device2host += t.elapsed + if ( + last_section_method_name == method.method_name + and if_current_block_is_on_gpu + ): + convert_gpu_block_to_cpu = True + + self.set_side_inputs(method) + block = self._execute_method(method, block, convert_gpu_block_to_cpu) + + if_previous_block_is_on_gpu = if_current_block_is_on_gpu return block def _log_pipeline(self, msg: Any, level: int = logging.INFO): @@ -283,13 +288,20 @@ def _load_datasets(self): ) def _execute_method( - self, method: MethodWrapper, block: DataSetBlock + self, method: MethodWrapper, block: DataSetBlock, convert_gpu_block_to_cpu: bool ) -> DataSetBlock: start = time.perf_counter_ns() block = method.execute(block) end = time.perf_counter_ns() + if block.is_last_in_chunk: self.append_side_outputs(method.get_side_output()) + + if convert_gpu_block_to_cpu: + with catchtime() as t: + block.to_cpu() + method.gpu_time.device2host += t.elapsed + if self.monitor is not None: self.monitor.report_method_block( method.method_name, From 5db461177bc95e9facb3949a12f7bccc62a28f45 Mon Sep 17 00:00:00 2001 From: algol Date: Fri, 16 Jan 2026 18:29:42 +0000 Subject: [PATCH 05/14] fix2 --- httomo/runner/task_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 071c42f45..eb50239e8 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -300,7 +300,7 @@ def _execute_method( if convert_gpu_block_to_cpu: with catchtime() as t: block.to_cpu() - method.gpu_time.device2host += t.elapsed + method.gpu_time.device2host = t.elapsed if self.monitor is not None: self.monitor.report_method_block( From 294f9f8f54b84a194fb3908b53b49aab71231161 Mon Sep 17 00:00:00 2001 From: algol Date: Mon, 19 Jan 2026 10:16:22 +0000 Subject: [PATCH 06/14] corr1 --- httomo/runner/task_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index eb50239e8..071c42f45 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -300,7 +300,7 @@ def _execute_method( if convert_gpu_block_to_cpu: with catchtime() as t: block.to_cpu() - method.gpu_time.device2host = t.elapsed + method.gpu_time.device2host += t.elapsed if self.monitor is not None: self.monitor.report_method_block( From 05ef4dc6e7eeffe7aea1e1d2f620e52b7e5db77c Mon Sep 17 00:00:00 2001 From: algol Date: Mon, 19 Jan 2026 11:39:47 +0000 Subject: [PATCH 07/14] corr2 --- httomo/runner/task_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 071c42f45..39a9be285 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -292,7 +292,6 @@ def _execute_method( ) -> DataSetBlock: start = time.perf_counter_ns() block = method.execute(block) - end = time.perf_counter_ns() if block.is_last_in_chunk: self.append_side_outputs(method.get_side_output()) @@ -302,6 +301,7 @@ def _execute_method( block.to_cpu() method.gpu_time.device2host += t.elapsed + end = time.perf_counter_ns() if self.monitor is not None: self.monitor.report_method_block( method.method_name, From 11def435820641e843f53861e18067a9d69e0a57 Mon Sep 17 00:00:00 2001 From: algol Date: Mon, 19 Jan 2026 12:32:30 +0000 Subject: [PATCH 08/14] fixing a bug due to data_checker name repetitions --- httomo/runner/task_runner.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 39a9be285..2b7814173 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -240,19 +240,15 @@ def _execute_section_block( ) -> DataSetBlock: if_previous_block_is_on_gpu = False convert_gpu_block_to_cpu = False - last_section_method_name = section[len(section) - 1].method_name - for method in section: + for ind, method in enumerate(section): if_current_block_is_on_gpu = False if method.implementation == "gpu_cupy": if_current_block_is_on_gpu = True if method.method_name == "calculate_stats" and if_previous_block_is_on_gpu: if_current_block_is_on_gpu = True - if ( - last_section_method_name == method.method_name - and if_current_block_is_on_gpu - ): + if ind == len(section) - 1 and if_current_block_is_on_gpu: convert_gpu_block_to_cpu = True self.set_side_inputs(method) From 0dc5e13d606f1f2c3fb9af879e3103cf8849726e Mon Sep 17 00:00:00 2001 From: algol Date: Mon, 19 Jan 2026 22:33:13 +0000 Subject: [PATCH 09/14] fixing 2 tests --- tests/runner/test_task_runner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/runner/test_task_runner.py b/tests/runner/test_task_runner.py index 398163487..ee8f69658 100644 --- a/tests/runner/test_task_runner.py +++ b/tests/runner/test_task_runner.py @@ -271,12 +271,12 @@ def test_calls_append_side_outputs_after_last_block( spy = mocker.patch.object(t, "append_side_outputs") t._prepare() t._execute_method( - method, block1 + method, block1, convert_gpu_block_to_cpu=False, ) # the first block shouldn't trigger a side output append call assert spy.call_count == 0 t._execute_method( - method, block2 + method, block2, convert_gpu_block_to_cpu=True, ) # the last block should trigger side output append call getmock.assert_called_once() spy.assert_called_once_with(side_outputs) @@ -319,7 +319,7 @@ def test_execute_method_updates_monitor( t = TaskRunner(p, reslice_dir=tmp_path, comm=MPI.COMM_WORLD, monitor=mon) t._prepare() mocker.patch.object(method1, "execute", return_value=dummy_block) - t._execute_method(method1, dummy_block) + t._execute_method(method1, dummy_block, convert_gpu_block_to_cpu=False) mon.report_method_block.assert_called_once_with( method1.method_name, From 9cc3790191a247344318646a2fbf101cbee8849e Mon Sep 17 00:00:00 2001 From: dkazanc Date: Tue, 20 Jan 2026 10:29:49 +0000 Subject: [PATCH 10/14] remove gputimeinfo import --- httomo/runner/task_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 2b7814173..26252eece 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -37,7 +37,6 @@ log_rank, ) import numpy as np -from httomo.runner.method_wrapper import GpuTimeInfo class TaskRunner: From 4a80b1beed5bfc4dba0371d7d9a2c12b2231976d Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Tue, 20 Jan 2026 14:25:38 +0000 Subject: [PATCH 11/14] Include new param in expected function calls in test --- tests/runner/test_task_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/test_task_runner.py b/tests/runner/test_task_runner.py index ee8f69658..7d418fde3 100644 --- a/tests/runner/test_task_runner.py +++ b/tests/runner/test_task_runner.py @@ -393,7 +393,7 @@ def test_execute_section_for_block( exec_method = mocker.patch.object(t, "_execute_method", return_value=dummy_block) t._execute_section_block(s[0], dummy_block) - calls = [call(method1, ANY), call(method2, ANY)] + calls = [call(method1, ANY, False), call(method2, ANY, False)] exec_method.assert_has_calls(calls) From c820a63bd2f7a30ea9664a9e73b1df61b43ba1b0 Mon Sep 17 00:00:00 2001 From: dkazanc Date: Tue, 20 Jan 2026 15:36:33 +0000 Subject: [PATCH 12/14] moving monitor related code to execute section block --- httomo/runner/task_runner.py | 53 ++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index ef6b400f9..83181287c 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -251,7 +251,31 @@ def _execute_section_block( convert_gpu_block_to_cpu = True self.set_side_inputs(method) - block = self._execute_method(method, block, convert_gpu_block_to_cpu) + + start = time.perf_counter_ns() + block = self._execute_method(method, block) + + if convert_gpu_block_to_cpu: + with catchtime() as t: + block.to_cpu() + method.gpu_time.device2host += t.elapsed + + end = time.perf_counter_ns() + + if self.monitor is not None: + self.monitor.report_method_block( + method.method_name, + method.module_path, + method.task_id, + _get_slicing_dim(method.pattern) - 1, + block.shape, + block.chunk_index, + block.global_index, + (end - start) * 1e-9, + method.gpu_time.kernel, + method.gpu_time.host2device, + method.gpu_time.device2host, + ) if_previous_block_is_on_gpu = if_current_block_is_on_gpu return block @@ -283,34 +307,11 @@ def _load_datasets(self): ) def _execute_method( - self, method: MethodWrapper, block: DataSetBlock, convert_gpu_block_to_cpu: bool - ) -> DataSetBlock: - start = time.perf_counter_ns() + self, method: MethodWrapper, block: DataSetBlock) -> DataSetBlock: block = method.execute(block) if block.is_last_in_chunk: self.append_side_outputs(method.get_side_output()) - - if convert_gpu_block_to_cpu: - with catchtime() as t: - block.to_cpu() - method.gpu_time.device2host += t.elapsed - - end = time.perf_counter_ns() - if self.monitor is not None: - self.monitor.report_method_block( - method.method_name, - method.module_path, - method.task_id, - _get_slicing_dim(method.pattern) - 1, - block.shape, - block.chunk_index, - block.global_index, - (end - start) * 1e-9, - method.gpu_time.kernel, - method.gpu_time.host2device, - method.gpu_time.device2host, - ) return block def append_side_outputs(self, side_outputs: Dict[str, Any]): @@ -415,7 +416,7 @@ def determine_max_slices(self, section: Section, slicing_dim: int): continue output_dims = m.calculate_output_dims(non_slice_dims_shape) - slices_estimated, available_memory = m.calculate_max_slices( + (slices_estimated, available_memory) = m.calculate_max_slices( SOURCE_DTYPE, # self.source.dtype, non_slice_dims_shape, available_memory, From 1bfe54bb2767f9435ec415779903ec77ffda7f61 Mon Sep 17 00:00:00 2001 From: algol Date: Thu, 22 Jan 2026 10:34:02 +0000 Subject: [PATCH 13/14] tests fixed --- httomo/runner/task_runner.py | 5 +++-- tests/runner/test_task_runner.py | 13 ++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 83181287c..7efe2aa09 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -259,7 +259,7 @@ def _execute_section_block( with catchtime() as t: block.to_cpu() method.gpu_time.device2host += t.elapsed - + end = time.perf_counter_ns() if self.monitor is not None: @@ -307,7 +307,8 @@ def _load_datasets(self): ) def _execute_method( - self, method: MethodWrapper, block: DataSetBlock) -> DataSetBlock: + self, method: MethodWrapper, block: DataSetBlock + ) -> DataSetBlock: block = method.execute(block) if block.is_last_in_chunk: diff --git a/tests/runner/test_task_runner.py b/tests/runner/test_task_runner.py index 7d418fde3..56a01e8c6 100644 --- a/tests/runner/test_task_runner.py +++ b/tests/runner/test_task_runner.py @@ -271,12 +271,14 @@ def test_calls_append_side_outputs_after_last_block( spy = mocker.patch.object(t, "append_side_outputs") t._prepare() t._execute_method( - method, block1, convert_gpu_block_to_cpu=False, + method, + block1, ) # the first block shouldn't trigger a side output append call assert spy.call_count == 0 t._execute_method( - method, block2, convert_gpu_block_to_cpu=True, + method, + block2, ) # the last block should trigger side output append call getmock.assert_called_once() spy.assert_called_once_with(side_outputs) @@ -306,7 +308,7 @@ def test_update_side_inputs_updates_downstream_methods( setitem3.assert_has_calls(method3_calls) -def test_execute_method_updates_monitor( +def test_execute_section_block_updates_monitor( mocker: MockerFixture, tmp_path: PathLike, dummy_block: DataSetBlock ): loader = make_test_loader(mocker) @@ -316,10 +318,11 @@ def test_execute_method_updates_monitor( ) mon = mocker.create_autospec(MonitoringInterface, instance=True) p = Pipeline(loader=loader, methods=[method1]) + s = sectionize(p) t = TaskRunner(p, reslice_dir=tmp_path, comm=MPI.COMM_WORLD, monitor=mon) t._prepare() mocker.patch.object(method1, "execute", return_value=dummy_block) - t._execute_method(method1, dummy_block, convert_gpu_block_to_cpu=False) + t._execute_section_block(s[0], dummy_block) mon.report_method_block.assert_called_once_with( method1.method_name, @@ -393,7 +396,7 @@ def test_execute_section_for_block( exec_method = mocker.patch.object(t, "_execute_method", return_value=dummy_block) t._execute_section_block(s[0], dummy_block) - calls = [call(method1, ANY, False), call(method2, ANY, False)] + calls = [call(method1, ANY), call(method2, ANY)] exec_method.assert_has_calls(calls) From aea57afe07ac7eb98dc7e8342a8870efb534d8cf Mon Sep 17 00:00:00 2001 From: algol Date: Thu, 22 Jan 2026 10:37:04 +0000 Subject: [PATCH 14/14] linting --- httomo/runner/task_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 7efe2aa09..19ad107e1 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -417,7 +417,7 @@ def determine_max_slices(self, section: Section, slicing_dim: int): continue output_dims = m.calculate_output_dims(non_slice_dims_shape) - (slices_estimated, available_memory) = m.calculate_max_slices( + slices_estimated, available_memory = m.calculate_max_slices( SOURCE_DTYPE, # self.source.dtype, non_slice_dims_shape, available_memory,