Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion httomo/data/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def aux_data(self) -> AuxiliaryData:
def write_block(self, block: DataSetBlock):
if self._readonly:
raise ValueError("Cannot write after creating a reader")
block.to_cpu() # TODO: possibly needs moving outside the block writing
start = max(block.chunk_index_unpadded)
if self._data is None:
# if non-slice dims in block are different, update the shapes here
Expand Down
58 changes: 40 additions & 18 deletions httomo/runner/task_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from itertools import islice
import logging
import time
from typing import Any, Dict, Literal, Optional, List, Tuple, Union
Expand Down Expand Up @@ -238,9 +237,47 @@ def _setup_source_sink(self, section: Section, idx: int):
def _execute_section_block(
self, section: Section, block: DataSetBlock
) -> DataSetBlock:
for method in section:
if_previous_block_is_on_gpu = False
convert_gpu_block_to_cpu = False

for ind, method in enumerate(section):
if_current_block_is_on_gpu = False
if method.implementation == "gpu_cupy":
if_current_block_is_on_gpu = True
if method.method_name == "calculate_stats" and if_previous_block_is_on_gpu:
if_current_block_is_on_gpu = True

if ind == len(section) - 1 and if_current_block_is_on_gpu:
convert_gpu_block_to_cpu = True

self.set_side_inputs(method)

start = time.perf_counter_ns()
block = self._execute_method(method, block)

if convert_gpu_block_to_cpu:
with catchtime() as t:
block.to_cpu()
method.gpu_time.device2host += t.elapsed

end = time.perf_counter_ns()

if self.monitor is not None:
self.monitor.report_method_block(
method.method_name,
method.module_path,
method.task_id,
_get_slicing_dim(method.pattern) - 1,
block.shape,
block.chunk_index,
block.global_index,
(end - start) * 1e-9,
method.gpu_time.kernel,
method.gpu_time.host2device,
method.gpu_time.device2host,
)

if_previous_block_is_on_gpu = if_current_block_is_on_gpu
return block

def _log_pipeline(self, msg: Any, level: int = logging.INFO):
Expand Down Expand Up @@ -272,25 +309,10 @@ def _load_datasets(self):
def _execute_method(
self, method: MethodWrapper, block: DataSetBlock
) -> DataSetBlock:
start = time.perf_counter_ns()
block = method.execute(block)
end = time.perf_counter_ns()

if block.is_last_in_chunk:
self.append_side_outputs(method.get_side_output())
if self.monitor is not None:
self.monitor.report_method_block(
method.method_name,
method.module_path,
method.task_id,
_get_slicing_dim(method.pattern) - 1,
block.shape,
block.chunk_index,
block.global_index,
(end - start) * 1e-9,
method.gpu_time.kernel,
method.gpu_time.host2device,
method.gpu_time.device2host,
)
return block

def append_side_outputs(self, side_outputs: Dict[str, Any]):
Expand Down
11 changes: 7 additions & 4 deletions tests/runner/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,14 @@ def test_calls_append_side_outputs_after_last_block(
spy = mocker.patch.object(t, "append_side_outputs")
t._prepare()
t._execute_method(
method, block1
method,
block1,
) # the first block shouldn't trigger a side output append call
assert spy.call_count == 0

t._execute_method(
method, block2
method,
block2,
) # the last block should trigger side output append call
getmock.assert_called_once()
spy.assert_called_once_with(side_outputs)
Expand Down Expand Up @@ -306,7 +308,7 @@ def test_update_side_inputs_updates_downstream_methods(
setitem3.assert_has_calls(method3_calls)


def test_execute_method_updates_monitor(
def test_execute_section_block_updates_monitor(
mocker: MockerFixture, tmp_path: PathLike, dummy_block: DataSetBlock
):
loader = make_test_loader(mocker)
Expand All @@ -316,10 +318,11 @@ def test_execute_method_updates_monitor(
)
mon = mocker.create_autospec(MonitoringInterface, instance=True)
p = Pipeline(loader=loader, methods=[method1])
s = sectionize(p)
t = TaskRunner(p, reslice_dir=tmp_path, comm=MPI.COMM_WORLD, monitor=mon)
t._prepare()
mocker.patch.object(method1, "execute", return_value=dummy_block)
t._execute_method(method1, dummy_block)
t._execute_section_block(s[0], dummy_block)

mon.report_method_block.assert_called_once_with(
method1.method_name,
Expand Down