From a2fe668a6e96baf74153ab9a79d2305dc0d9e5ee Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Sun, 1 Jun 2025 09:26:01 +1200 Subject: [PATCH 01/10] Move ProcessExecutor into its own module. --- labtech/runners/_process_executor.py | 222 ++++++++++++++++++++++++++ labtech/runners/process.py | 225 ++------------------------- 2 files changed, 232 insertions(+), 215 deletions(-) create mode 100644 labtech/runners/_process_executor.py diff --git a/labtech/runners/_process_executor.py b/labtech/runners/_process_executor.py new file mode 100644 index 0000000..7e130cd --- /dev/null +++ b/labtech/runners/_process_executor.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import functools +import multiprocessing +import os +from dataclasses import dataclass, field +from enum import StrEnum, auto +from itertools import count +from queue import Empty +from threading import Thread +from typing import TYPE_CHECKING + +from labtech.exceptions import TaskDiedError + +if TYPE_CHECKING: + from collections.abc import Callable, Sequence + from multiprocessing.context import BaseContext + from queue import Queue + from typing import Any + + +class FutureStateError(Exception): + pass + + +class FutureState(StrEnum): + PENDING = auto() + CANCELLED = auto() + FINISHED = auto() + + +@dataclass +class ExecutorFuture: + """Representation of a result to be returned in the future by a runner. + + An ExecutorFuture's state transitions between states according to + the following finite state machine: + + * An ExecutorFuture starts in a PENDING state + * A PENDING ExecutorFuture can be transitioned to FINISHED by calling + set_result() or set_exception() + * Any ExecutorFuture can be transitioned to CANCELLED by calling + cancel() + * result() can only be called on a FINISHED ExecutorFuture, and it will + either return the result set by set_result() or raise the exception + set by set_exception() + + """ + # Auto-incrementing ID (does not need to be process-safe because + # all futures are generated in the main process): + id: int = field(default_factory=count().__next__, init=False) + _state: FutureState = FutureState.PENDING + _ex: BaseException | None = None + _result: Any | None = None + + def __eq__(self, other: object) -> bool: + if not isinstance(other, self.__class__): + return False + return id(self.id) == id(other.id) + + def __hash__(self) -> int: + return hash(self.id) + + @property + def done(self) -> bool: + return self._state in {FutureState.FINISHED, FutureState.CANCELLED} + + @property + def cancelled(self) -> bool: + return self._state == FutureState.CANCELLED + + def set_result(self, result: Any): + if self.done: + raise FutureStateError(f'Attempted to set a result on a {self._state} future.') + self._result = result + self._state = FutureState.FINISHED + + def set_exception(self, ex: BaseException): + if self.done: + raise FutureStateError(f'Attempted to set an exception on a {self._state} future.') + self._ex = ex + self._state = FutureState.FINISHED + + def cancel(self): + self._state = FutureState.CANCELLED + + def result(self) -> Any: + if self._state != FutureState.FINISHED: + raise FutureStateError(f'Attempted to get result from a {self._state} future.') + if self._ex is not None: + raise self._ex + return self._result + + +def split_done_futures(futures: Sequence[ExecutorFuture]) -> tuple[list[ExecutorFuture], list[ExecutorFuture]]: + done_futures = [] + not_done_futures = [] + for future in futures: + if future.done: + done_futures.append(future) + else: + not_done_futures.append(future) + return (done_futures, not_done_futures) + + +def _subprocess_target(*, future_id: int, thunk: Callable[[], Any], result_queue: Queue) -> None: + try: + result = thunk() + except BaseException as ex: + result_queue.put((future_id, ex)) + else: + result_queue.put((future_id, result)) + + +class ProcessExecutor: + + def __init__(self, mp_context: BaseContext, max_workers: int | None): + self.mp_context = mp_context + self.max_workers = (os.cpu_count() or 1) if max_workers is None else max_workers + self._pending_future_to_thunk: dict[ExecutorFuture, Callable[[], Any]] = {} + self._running_id_to_future_and_process: dict[int, tuple[ExecutorFuture, multiprocessing.Process]] = {} + # Use a Manager().Queue() to be able to share with subprocesses + self._result_queue: Queue = multiprocessing.Manager().Queue(-1) + + def _start_processes(self): + """Start processes for the oldest pending futures to bring + running process count up to max_workers.""" + start_count = max(0, self.max_workers - len(self._running_id_to_future_and_process)) + futures_to_start = list(self._pending_future_to_thunk.keys())[:start_count] + for future in futures_to_start: + thunk = self._pending_future_to_thunk[future] + del self._pending_future_to_thunk[future] + process = self.mp_context.Process( + target=_subprocess_target, + kwargs=dict( + future_id=future.id, + thunk=thunk, + result_queue=self._result_queue, + ), + ) + self._running_id_to_future_and_process[future.id] = (future, process) + process.start() + + def submit(self, fn: Callable, /, *args, **kwargs) -> ExecutorFuture: + """Schedule the given fn to be called with the given *args and + **kwargs, and return an ExecutorFuture that will be updated + with the outcome of function call.""" + future = ExecutorFuture() + self._pending_future_to_thunk[future] = functools.partial(fn, *args, **kwargs) + self._start_processes() + return future + + def cancel(self) -> None: + """Cancel all pending futures.""" + pending_futures = list(self._pending_future_to_thunk.keys()) + for future in pending_futures: + future.cancel() + del self._pending_future_to_thunk[future] + + def stop(self) -> None: + """Cancel all running futures and immediately terminate their execution.""" + future_process_pairs = list(self._running_id_to_future_and_process.values()) + for future, process in future_process_pairs: + process.terminate() + future.cancel() + del self._running_id_to_future_and_process[future.id] + + def _consume_result_queue(self, *, timeout_seconds: float | None): + # Avoid race condition of a process finishing after we have + # consumed the result_queue by fetching process statuses + # before checking for process completion. + dead_process_futures = [ + future for future, process in self._running_id_to_future_and_process.values() + if not process.is_alive() + ] + + def _consume(): + inner_timeout_seconds = timeout_seconds + while True: + try: + future_id, result_or_ex = self._result_queue.get(True, timeout=inner_timeout_seconds) + except Empty: + break + + # Don't wait for the timeout on subsequent calls to + # self._result_queue.get() + inner_timeout_seconds = 0 + + future, _ = self._running_id_to_future_and_process[future_id] + del self._running_id_to_future_and_process[future_id] + if not future.done: + if isinstance(result_or_ex, BaseException): + future.set_exception(result_or_ex) + else: + future.set_result(result_or_ex) + + # Consume the result queue in a thread so that it is not + # interrupt by KeyboardInterrupt, which can result in us not + # fully processing a completed result. Despite the fact we are + # using subprocesses, starting a thread at this point should + # be safe because we will not start any subprocesses while + # this is running? + consumer_thread = Thread(target=_consume) + consumer_thread.start() + consumer_thread.join() + + # If any processes have died without the future being + # cancelled or finished, then set an exception for it. + for future in dead_process_futures: + if future.done: + continue + future.set_exception(TaskDiedError()) + del self._running_id_to_future_and_process[future.id] + + def wait(self, futures: Sequence[ExecutorFuture], *, timeout_seconds: float | None) -> tuple[list[ExecutorFuture], list[ExecutorFuture]]: + """Wait up to timeout_seconds or until at least one of the + given futures is done, then return a list of futures in a done + state and a list of futures in all other states.""" + self._consume_result_queue(timeout_seconds=timeout_seconds) + # Having consumed completed results, start new processes + self._start_processes() + return split_done_futures(futures) diff --git a/labtech/runners/process.py b/labtech/runners/process.py index 7c461e4..7283942 100644 --- a/labtech/runners/process.py +++ b/labtech/runners/process.py @@ -1,248 +1,43 @@ from __future__ import annotations -import functools import logging import multiprocessing -import os import signal import sys from abc import ABC, abstractmethod -from dataclasses import dataclass, field -from enum import StrEnum, auto -from itertools import count +from dataclasses import dataclass from logging.handlers import QueueHandler from queue import Empty -from threading import Thread from typing import TYPE_CHECKING, cast from uuid import uuid4 import psutil -from labtech.exceptions import RunnerError, TaskDiedError +from labtech.exceptions import RunnerError from labtech.monitor import get_process_info from labtech.tasks import get_direct_dependencies from labtech.types import Runner, RunnerBackend from labtech.utils import LoggerFileProxy, get_supported_start_methods, is_interactive, logger +from ._process_executor import ProcessExecutor from .base import run_or_load_task if TYPE_CHECKING: from collections.abc import Callable, Iterator, Sequence from multiprocessing.context import BaseContext, SpawnContext from queue import Queue - from typing import Any from uuid import UUID from labtech.types import LabContext, ResultMeta, ResultsMap, Storage, Task, TaskMonitorInfo, TaskResult + from ._process_executor import ExecutorFuture + if sys.platform != 'win32': from multiprocessing.context import ForkContext else: ForkContext = BaseContext -class FutureStateError(Exception): - pass - - -class FutureState(StrEnum): - PENDING = auto() - CANCELLED = auto() - FINISHED = auto() - - -@dataclass -class Future: - """Representation of a result to be returned in the future by a runner. - - A Future's state transitions between states according to the following - finite state machine: - - * A Future starts in a PENDING state - * A PENDING Future can be transitioned to FINISHED by calling - set_result() or set_exception() - * Any Future can be transitioned to CANCELLED by calling cancel() - * result() can only be called on a FINISHED Future, and it will either - return the result set by set_result() or raise the exception set by - set_exception() - - """ - # Auto-incrementing ID (does not need to be process-safe because - # all futures are generated in the main process): - id: int = field(default_factory=count().__next__, init=False) - _state: FutureState = FutureState.PENDING - _ex: BaseException | None = None - _result: Any | None = None - - def __eq__(self, other: object) -> bool: - if not isinstance(other, self.__class__): - return False - return id(self.id) == id(other.id) - - def __hash__(self) -> int: - return hash(self.id) - - @property - def done(self) -> bool: - return self._state in {FutureState.FINISHED, FutureState.CANCELLED} - - @property - def cancelled(self) -> bool: - return self._state == FutureState.CANCELLED - - def set_result(self, result: Any): - if self.done: - raise FutureStateError(f'Attempted to set a result on a {self._state} future.') - self._result = result - self._state = FutureState.FINISHED - - def set_exception(self, ex: BaseException): - if self.done: - raise FutureStateError(f'Attempted to set an exception on a {self._state} future.') - self._ex = ex - self._state = FutureState.FINISHED - - def cancel(self): - self._state = FutureState.CANCELLED - - def result(self) -> Any: - if self._state != FutureState.FINISHED: - raise FutureStateError(f'Attempted to get result from a {self._state} future.') - if self._ex is not None: - raise self._ex - return self._result - - -def split_done_futures(futures: Sequence[Future]) -> tuple[list[Future], list[Future]]: - done_futures = [] - not_done_futures = [] - for future in futures: - if future.done: - done_futures.append(future) - else: - not_done_futures.append(future) - return (done_futures, not_done_futures) - - -def _subprocess_target(*, future_id: int, thunk: Callable[[], Any], result_queue: Queue) -> None: - try: - result = thunk() - except BaseException as ex: - result_queue.put((future_id, ex)) - else: - result_queue.put((future_id, result)) - - -class ProcessExecutor: - - def __init__(self, mp_context: BaseContext, max_workers: int | None): - self.mp_context = mp_context - self.max_workers = (os.cpu_count() or 1) if max_workers is None else max_workers - self._pending_future_to_thunk: dict[Future, Callable[[], Any]] = {} - self._running_id_to_future_and_process: dict[int, tuple[Future, multiprocessing.Process]] = {} - # Use a Manager().Queue() to be able to share with subprocesses - self._result_queue: Queue = multiprocessing.Manager().Queue(-1) - - def _start_processes(self): - """Start processes for the oldest pending futures to bring - running process count up to max_workers.""" - start_count = max(0, self.max_workers - len(self._running_id_to_future_and_process)) - futures_to_start = list(self._pending_future_to_thunk.keys())[:start_count] - for future in futures_to_start: - thunk = self._pending_future_to_thunk[future] - del self._pending_future_to_thunk[future] - process = self.mp_context.Process( - target=_subprocess_target, - kwargs=dict( - future_id=future.id, - thunk=thunk, - result_queue=self._result_queue, - ), - ) - self._running_id_to_future_and_process[future.id] = (future, process) - process.start() - - def submit(self, fn: Callable, /, *args, **kwargs) -> Future: - """Schedule the given fn to be called with the given *args and - **kwargs, and return a Future that will be updated with the - outcome of function call.""" - future = Future() - self._pending_future_to_thunk[future] = functools.partial(fn, *args, **kwargs) - self._start_processes() - return future - - def cancel(self) -> None: - """Cancel all pending futures.""" - pending_futures = list(self._pending_future_to_thunk.keys()) - for future in pending_futures: - future.cancel() - del self._pending_future_to_thunk[future] - - def stop(self) -> None: - """Cancel all running futures and immediately terminate their execution.""" - future_process_pairs = list(self._running_id_to_future_and_process.values()) - for future, process in future_process_pairs: - process.terminate() - future.cancel() - del self._running_id_to_future_and_process[future.id] - - def _consume_result_queue(self, *, timeout_seconds: float | None): - # Avoid race condition of a process finishing after we have - # consumed the result_queue by fetching process statuses - # before checking for process completion. - dead_process_futures = [ - future for future, process in self._running_id_to_future_and_process.values() - if not process.is_alive() - ] - - def _consume(): - inner_timeout_seconds = timeout_seconds - while True: - try: - future_id, result_or_ex = self._result_queue.get(True, timeout=inner_timeout_seconds) - except Empty: - break - - # Don't wait for the timeout on subsequent calls to - # self._result_queue.get() - inner_timeout_seconds = 0 - - future, _ = self._running_id_to_future_and_process[future_id] - del self._running_id_to_future_and_process[future_id] - if not future.done: - if isinstance(result_or_ex, BaseException): - future.set_exception(result_or_ex) - else: - future.set_result(result_or_ex) - - # Consume the result queue in a thread so that it is not - # interrupt by KeyboardInterrupt, which can result in us not - # fully processing a completed result. Despite the fact we are - # using subprocesses, starting a thread at this point should - # be safe because we will not start any subprocesses while - # this is running? - consumer_thread = Thread(target=_consume) - consumer_thread.start() - consumer_thread.join() - - # If any processes have died without the future being - # cancelled or finished, then set an exception for it. - for future in dead_process_futures: - if future.done: - continue - future.set_exception(TaskDiedError()) - del self._running_id_to_future_and_process[future.id] - - def wait(self, futures: Sequence[Future], *, timeout_seconds: float | None) -> tuple[list[Future], list[Future]]: - """Wait up to timeout_seconds or until at least one of the - given futures is done, then return a list of futures in a done - state and a list of futures in all other states.""" - self._consume_result_queue(timeout_seconds=timeout_seconds) - # Having consumed completed results, start new processes - self._start_processes() - return split_done_futures(futures) - - class ProcessEvent: pass @@ -325,7 +120,7 @@ def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | ) self.results_map: dict[Task, TaskResult] = {} - self.future_to_task: dict[Future, Task] = {} + self.future_to_task: dict[ExecutorFuture, Task] = {} def _consume_log_queue(self): # See: https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes @@ -446,9 +241,9 @@ def _get_mp_context(self) -> BaseContext: @abstractmethod def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> Future: + use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: """Should submit the execution of self._subprocess_func() on the given - task to the given executor and return the resulting Future. + task to the given executor and return the resulting ExecutorFuture. Sub-classes can use the implementation of this method to load or otherwise prepare context or dependency results for the task. @@ -467,7 +262,7 @@ def _get_mp_context(self) -> SpawnContext: return multiprocessing.get_context('spawn') def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> Future: + use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: if is_interactive() and task.__class__.__module__ == '__main__': raise RunnerError( (f'Unable to submit {task.__class__.__qualname__} tasks to ' @@ -567,7 +362,7 @@ def _fork_subprocess_func(*, _subprocess_func: Callable, task: Task, task_name: ) def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> Future: + use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: return executor.submit( self._fork_subprocess_func, _subprocess_func=self._subprocess_func, From c469b4b59e649be9c2b8d857d5779b64bb7d3b33 Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Sun, 1 Jun 2025 09:27:23 +1200 Subject: [PATCH 02/10] Rename process events to task events --- labtech/runners/process.py | 58 ++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/labtech/runners/process.py b/labtech/runners/process.py index 7283942..53cef11 100644 --- a/labtech/runners/process.py +++ b/labtech/runners/process.py @@ -38,47 +38,43 @@ ForkContext = BaseContext -class ProcessEvent: - pass - - @dataclass(frozen=True) -class ProcessStartEvent(ProcessEvent): +class TaskStartEvent: task_name: str pid: int use_cache: bool @dataclass(frozen=True) -class ProcessEndEvent(ProcessEvent): +class TaskEndEvent: task_name: str class ProcessMonitor: - def __init__(self, *, process_event_queue: Queue): - self.process_event_queue = process_event_queue - self.active_process_events: dict[str, ProcessStartEvent] = {} + def __init__(self, *, task_event_queue: Queue): + self.task_event_queue = task_event_queue + self.active_task_events: dict[str, TaskStartEvent] = {} self.active_processes_and_children: dict[str, tuple[psutil.Process, dict[int, psutil.Process]]] = {} def _consume_monitor_queue(self): while True: try: - event = self.process_event_queue.get_nowait() + event = self.task_event_queue.get_nowait() except Empty: break - if isinstance(event, ProcessStartEvent): - self.active_process_events[event.task_name] = event - elif isinstance(event, ProcessEndEvent): - if event.task_name in self.active_process_events: - del self.active_process_events[event.task_name] + if isinstance(event, TaskStartEvent): + self.active_task_events[event.task_name] = event + elif isinstance(event, TaskEndEvent): + if event.task_name in self.active_task_events: + del self.active_task_events[event.task_name] if event.task_name in self.active_processes_and_children: del self.active_processes_and_children[event.task_name] else: - raise RunnerError(f'Unexpected process event: {event}') + raise RunnerError(f'Unexpected task event: {event}') - def _get_process_info(self, start_event: ProcessStartEvent) -> TaskMonitorInfo | None: + def _get_process_info(self, start_event: TaskStartEvent) -> TaskMonitorInfo | None: pid = start_event.pid try: if start_event.task_name not in self.active_processes_and_children: @@ -99,7 +95,7 @@ def _get_process_info(self, start_event: ProcessStartEvent) -> TaskMonitorInfo | def get_process_infos(self) -> list[TaskMonitorInfo]: self._consume_monitor_queue() process_infos: list[TaskMonitorInfo] = [] - for start_event in self.active_process_events.values(): + for start_event in self.active_task_events.values(): process_info = self._get_process_info(start_event) if process_info is not None: process_infos.append(process_info) @@ -111,8 +107,8 @@ class ProcessRunner(Runner, ABC): def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None): mp_context = self._get_mp_context() - self.process_event_queue = mp_context.Manager().Queue(-1) - self.process_monitor = ProcessMonitor(process_event_queue = self.process_event_queue) + self.task_event_queue = mp_context.Manager().Queue(-1) + self.process_monitor = ProcessMonitor(task_event_queue = self.task_event_queue) self.log_queue = multiprocessing.Manager().Queue(-1) self.executor = ProcessExecutor( mp_context=mp_context, @@ -135,7 +131,7 @@ def _consume_log_queue(self): @staticmethod def _subprocess_func(*, task: Task, task_name: str, use_cache: bool, results_map: ResultsMap, filtered_context: LabContext, - storage: Storage, process_event_queue: Queue, + storage: Storage, task_event_queue: Queue, log_queue: Queue) -> TaskResult: signal.signal(signal.SIGINT, signal.SIG_IGN) # Subprocesses should log onto the queue in order to printed @@ -150,7 +146,7 @@ def _subprocess_func(*, task: Task, task_name: str, use_cache: bool, try: current_process = multiprocessing.current_process() - process_event_queue.put(ProcessStartEvent( + task_event_queue.put(TaskStartEvent( task_name=task_name, pid=cast('int', current_process.pid), use_cache=use_cache, @@ -171,7 +167,7 @@ def _subprocess_func(*, task: Task, task_name: str, use_cache: bool, finally: current_process.name = orig_process_name finally: - process_event_queue.put(ProcessEndEvent( + task_event_queue.put(TaskEndEvent( task_name=task_name, )) sys.stdout.flush() @@ -185,7 +181,7 @@ def submit_task(self, task: Task, task_name: str, use_cache: bool) -> None: task=task, task_name=task_name, use_cache=use_cache, - process_event_queue=self.process_event_queue, + task_event_queue=self.task_event_queue, log_queue=self.log_queue, ) self.future_to_task[future] = task @@ -241,7 +237,7 @@ def _get_mp_context(self) -> BaseContext: @abstractmethod def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + use_cache: bool, task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: """Should submit the execution of self._subprocess_func() on the given task to the given executor and return the resulting ExecutorFuture. @@ -262,7 +258,7 @@ def _get_mp_context(self) -> SpawnContext: return multiprocessing.get_context('spawn') def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + use_cache: bool, task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: if is_interactive() and task.__class__.__module__ == '__main__': raise RunnerError( (f'Unable to submit {task.__class__.__qualname__} tasks to ' @@ -293,7 +289,7 @@ def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, results_map=results_map, filtered_context=filtered_context, storage=self.storage, - process_event_queue=process_event_queue, + task_event_queue=task_event_queue, log_queue=log_queue, ) @@ -347,7 +343,7 @@ def _get_mp_context(self) -> ForkContext: @staticmethod def _fork_subprocess_func(*, _subprocess_func: Callable, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue, + use_cache: bool, task_event_queue: Queue, log_queue: Queue, uuid: UUID) -> TaskResult: runner_memory = _RUNNER_FORK_MEMORY[uuid] return _subprocess_func( @@ -357,19 +353,19 @@ def _fork_subprocess_func(*, _subprocess_func: Callable, task: Task, task_name: filtered_context=task.filter_context(runner_memory.context), storage=runner_memory.storage, results_map=runner_memory.results_map, - process_event_queue=process_event_queue, + task_event_queue=task_event_queue, log_queue=log_queue, ) def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, process_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + use_cache: bool, task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: return executor.submit( self._fork_subprocess_func, _subprocess_func=self._subprocess_func, task=task, task_name=task_name, use_cache=use_cache, - process_event_queue=process_event_queue, + task_event_queue=task_event_queue, log_queue=log_queue, uuid=self.uuid, ) From df665075c2193aae032546b64e644f05fc33942a Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Sun, 1 Jun 2025 21:20:27 +1200 Subject: [PATCH 03/10] Refactor process runners to use composition over inheritance and add process-pool runner backends --- labtech/runners/_process_executor.py | 12 +- labtech/runners/process.py | 546 +++++++++++++++++++-------- 2 files changed, 398 insertions(+), 160 deletions(-) diff --git a/labtech/runners/_process_executor.py b/labtech/runners/_process_executor.py index 7e130cd..a2c79bb 100644 --- a/labtech/runners/_process_executor.py +++ b/labtech/runners/_process_executor.py @@ -61,22 +61,20 @@ def __eq__(self, other: object) -> bool: def __hash__(self) -> int: return hash(self.id) - @property def done(self) -> bool: return self._state in {FutureState.FINISHED, FutureState.CANCELLED} - @property def cancelled(self) -> bool: return self._state == FutureState.CANCELLED def set_result(self, result: Any): - if self.done: + if self.done(): raise FutureStateError(f'Attempted to set a result on a {self._state} future.') self._result = result self._state = FutureState.FINISHED def set_exception(self, ex: BaseException): - if self.done: + if self.done(): raise FutureStateError(f'Attempted to set an exception on a {self._state} future.') self._ex = ex self._state = FutureState.FINISHED @@ -96,7 +94,7 @@ def split_done_futures(futures: Sequence[ExecutorFuture]) -> tuple[list[Executor done_futures = [] not_done_futures = [] for future in futures: - if future.done: + if future.done(): done_futures.append(future) else: not_done_futures.append(future) @@ -188,7 +186,7 @@ def _consume(): future, _ = self._running_id_to_future_and_process[future_id] del self._running_id_to_future_and_process[future_id] - if not future.done: + if not future.done(): if isinstance(result_or_ex, BaseException): future.set_exception(result_or_ex) else: @@ -207,7 +205,7 @@ def _consume(): # If any processes have died without the future being # cancelled or finished, then set an exception for it. for future in dead_process_futures: - if future.done: + if future.done(): continue future.set_exception(TaskDiedError()) del self._running_id_to_future_and_process[future.id] diff --git a/labtech/runners/process.py b/labtech/runners/process.py index 53cef11..f12c866 100644 --- a/labtech/runners/process.py +++ b/labtech/runners/process.py @@ -5,10 +5,13 @@ import signal import sys from abc import ABC, abstractmethod +from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor +from concurrent.futures import Future as ConcurrentFuture +from concurrent.futures import wait as wait_futures from dataclasses import dataclass from logging.handlers import QueueHandler from queue import Empty -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Generic, TypeVar, cast from uuid import uuid4 import psutil @@ -19,24 +22,17 @@ from labtech.types import Runner, RunnerBackend from labtech.utils import LoggerFileProxy, get_supported_start_methods, is_interactive, logger -from ._process_executor import ProcessExecutor +from ._process_executor import ExecutorFuture, ProcessExecutor from .base import run_or_load_task if TYPE_CHECKING: - from collections.abc import Callable, Iterator, Sequence - from multiprocessing.context import BaseContext, SpawnContext + from collections.abc import Iterator, Sequence + from multiprocessing.context import BaseContext from queue import Queue from uuid import UUID from labtech.types import LabContext, ResultMeta, ResultsMap, Storage, Task, TaskMonitorInfo, TaskResult - from ._process_executor import ExecutorFuture - - if sys.platform != 'win32': - from multiprocessing.context import ForkContext - else: - ForkContext = BaseContext - @dataclass(frozen=True) class TaskStartEvent: @@ -102,21 +98,112 @@ def get_process_infos(self) -> list[TaskMonitorInfo]: return process_infos -class ProcessRunner(Runner, ABC): - """Base class for Runner's based on Python multiprocessing.""" +def _task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, + results_map: ResultsMap, + filtered_context: LabContext, storage: Storage, + task_event_queue: Queue, log_queue: Queue) -> TaskResult: + signal.signal(signal.SIGINT, signal.SIG_IGN) + # Subprocesses should log onto the queue in order to printed + # in serial by the main process. + logger.handlers = [] + logger.addHandler(QueueHandler(log_queue)) + orig_stdout = sys.stdout + orig_stderr = sys.stderr + # Ignore type errors for type of value used to override stdout and stderr + sys.stdout = LoggerFileProxy(logger.info, 'Captured STDOUT:\n') # type: ignore[assignment] + sys.stderr = LoggerFileProxy(logger.error, 'Captured STDERR:\n') # type: ignore[assignment] + + try: + current_process = multiprocessing.current_process() + task_event_queue.put(TaskStartEvent( + task_name=task_name, + pid=cast('int', current_process.pid), + use_cache=use_cache, + )) - def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None): - mp_context = self._get_mp_context() - self.task_event_queue = mp_context.Manager().Queue(-1) - self.process_monitor = ProcessMonitor(task_event_queue = self.task_event_queue) - self.log_queue = multiprocessing.Manager().Queue(-1) - self.executor = ProcessExecutor( - mp_context=mp_context, - max_workers=max_workers, - ) + for dependency_task in get_direct_dependencies(task, all_identities=True): + dependency_task._set_results_map(results_map) + orig_process_name = current_process.name + try: + current_process.name = task_name + return run_or_load_task( + task=task, + use_cache=use_cache, + filtered_context=filtered_context, + storage=storage + ) + finally: + current_process.name = orig_process_name + finally: + task_event_queue.put(TaskEndEvent( + task_name=task_name, + )) + sys.stdout.flush() + sys.stderr.flush() + sys.stdout = orig_stdout + sys.stderr = orig_stderr + + +FutureT = TypeVar('FutureT', bound=ExecutorFuture | ConcurrentFuture) + + +class ProcessManager(ABC, Generic[FutureT]): + + def __init__(self) -> None: self.results_map: dict[Task, TaskResult] = {} - self.future_to_task: dict[ExecutorFuture, Task] = {} + + def set_result(self, task: Task, task_result: TaskResult) -> None: + self.results_map[task] = task_result + + def get_result(self, task: Task) -> TaskResult: + return self.results_map[task] + + def remove_result(self, task: Task) -> None: + if task not in self.results_map: + return + logger.debug(f"Removing result from in-memory cache for task: '{task}'") + del self.results_map[task] + + @abstractmethod + def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, + task_event_queue: Queue, log_queue: Queue) -> FutureT: + """Should submit the execution of _task_subprocess_func() on the given + in a subprocess and return the resulting future. + + The implementation of this method to load or otherwise prepare + context or dependency results for the task. + + """ + + @abstractmethod + def get_completed_futures(self, futures: list[FutureT], timeout_seconds: float | None) -> list[FutureT]: + """Return a sub-sequence of the given futures that have been completed.""" + + @abstractmethod + def cancel(self) -> None: + """Cancel all scheduled subprocesses that have not yet been started.""" + + @abstractmethod + def stop(self) -> None: + """Stop all currently running subprocesses.""" + + @abstractmethod + def close(self) -> None: + """Stop all currently running subprocesses.""" + + +class ProcessRunner(Runner, Generic[FutureT]): + """Runner based on Python multiprocessing.""" + + def __init__(self, *, process_manager: ProcessManager[FutureT]) -> None: + self.process_manager = process_manager + + self.log_queue = multiprocessing.Manager().Queue(-1) + self.task_event_queue = multiprocessing.Manager().Queue(-1) + self.process_monitor = ProcessMonitor(task_event_queue = self.task_event_queue) + + self.future_to_task: dict[FutureT, Task] = {} def _consume_log_queue(self): # See: https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes @@ -128,56 +215,8 @@ def _consume_log_queue(self): logger = logging.getLogger(record.name) logger.handle(record) - @staticmethod - def _subprocess_func(*, task: Task, task_name: str, use_cache: bool, - results_map: ResultsMap, filtered_context: LabContext, - storage: Storage, task_event_queue: Queue, - log_queue: Queue) -> TaskResult: - signal.signal(signal.SIGINT, signal.SIG_IGN) - # Subprocesses should log onto the queue in order to printed - # in serial by the main process. - logger.handlers = [] - logger.addHandler(QueueHandler(log_queue)) - orig_stdout = sys.stdout - orig_stderr = sys.stderr - # Ignore type errors for type of value used to override stdout and stderr - sys.stdout = LoggerFileProxy(logger.info, 'Captured STDOUT:\n') # type: ignore[assignment] - sys.stderr = LoggerFileProxy(logger.error, 'Captured STDERR:\n') # type: ignore[assignment] - - try: - current_process = multiprocessing.current_process() - task_event_queue.put(TaskStartEvent( - task_name=task_name, - pid=cast('int', current_process.pid), - use_cache=use_cache, - )) - - for dependency_task in get_direct_dependencies(task, all_identities=True): - dependency_task._set_results_map(results_map) - - orig_process_name = current_process.name - try: - current_process.name = task_name - return run_or_load_task( - task=task, - use_cache=use_cache, - filtered_context=filtered_context, - storage=storage - ) - finally: - current_process.name = orig_process_name - finally: - task_event_queue.put(TaskEndEvent( - task_name=task_name, - )) - sys.stdout.flush() - sys.stderr.flush() - sys.stdout = orig_stdout - sys.stderr = orig_stderr - def submit_task(self, task: Task, task_name: str, use_cache: bool) -> None: - future = self._submit_task( - executor=self.executor, + future = self.process_manager.schedule_subprocess( task=task, task_name=task_name, use_cache=use_cache, @@ -188,17 +227,20 @@ def submit_task(self, task: Task, task_name: str, use_cache: bool) -> None: def wait(self, *, timeout_seconds: float | None) -> Iterator[tuple[Task, ResultMeta | BaseException]]: self._consume_log_queue() - done, _ = self.executor.wait(list(self.future_to_task.keys()), timeout_seconds=timeout_seconds) + done = self.process_manager.get_completed_futures( + futures=list(self.future_to_task.keys()), + timeout_seconds=timeout_seconds, + ) for future in done: task = self.future_to_task[future] - if future.cancelled: + if future.cancelled(): continue try: task_result = future.result() except BaseException as ex: yield (task, ex) else: - self.results_map[task] = task_result + self.process_manager.set_result(task, task_result) yield (task, task_result.meta) self.future_to_task = { future: self.future_to_task[future] @@ -207,67 +249,97 @@ def wait(self, *, timeout_seconds: float | None) -> Iterator[tuple[Task, ResultM } def cancel(self) -> None: - self.executor.cancel() + self.process_manager.cancel() def stop(self) -> None: - self.executor.stop() + self.process_manager.stop() def close(self) -> None: self._consume_log_queue() + self.process_manager.close() def pending_task_count(self) -> int: return len(self.future_to_task) + def get_task_infos(self) -> list[TaskMonitorInfo]: + return self.process_monitor.get_process_infos() + def get_result(self, task: Task) -> TaskResult: - return self.results_map[task] + return self.process_manager.get_result(task) def remove_results(self, tasks: Sequence[Task]) -> None: for task in tasks: - if task not in self.results_map: - return - logger.debug(f"Removing result from in-memory cache for task: '{task}'") - del self.results_map[task] + self.process_manager.remove_result(task) - def get_task_infos(self) -> list[TaskMonitorInfo]: - return self.process_monitor.get_process_infos() - @abstractmethod - def _get_mp_context(self) -> BaseContext: - """Return a multiprocessing context from which to start subprocesses.""" +def _spawn_start_method_check() -> None: + if 'spawn' not in get_supported_start_methods(): + raise RunnerError( + ("The 'spawn' start method for processes is not supported by your operating system. " + "Please specify a system-compatible runner_backend.") + ) - @abstractmethod - def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: - """Should submit the execution of self._subprocess_func() on the given - task to the given executor and return the resulting ExecutorFuture. - Sub-classes can use the implementation of this method to load - or otherwise prepare context or dependency results for the task. +def _spawn_interactive_main_check(cls: type, task: Task) -> None: + if is_interactive() and task.__class__.__module__ == '__main__': + raise RunnerError( + (f'Unable to submit {task.__class__.__qualname__} tasks to ' + f'{cls.__qualname__} because the task type is defined in the ' + '__main__ module from an interactive Python session. ' + 'Please define your task types in a separate `.py` Python ' + 'module file. For details, see: ' + 'https://ben-denham.github.io/labtech/cookbook/#spawn-interactive-main') + ) + - """ +def _fork_start_method_check() -> None: + if 'fork' not in get_supported_start_methods(): + raise RunnerError( + ("The 'fork' start method for processes is not supported by your operating system. " + "Try switching to runner_backend='spawn' or specify another system-compatible runner_backend.") + ) + + +# === Subprocess Pools === +class PoolProcessManager(ProcessManager[ConcurrentFuture], ABC): -class SpawnProcessRunner(ProcessRunner): + def __init__(self, *, mp_context: BaseContext, max_workers: int | None) -> None: + super().__init__() + self.concurrent_executor = ProcessPoolExecutor( + mp_context=mp_context, + max_workers=max_workers, + ) + + def get_completed_futures(self, futures: list[ExecutorFuture], timeout_seconds: float | None) -> list[ExecutorFuture]: + done, _ = wait_futures(futures, timeout=timeout_seconds, return_when=FIRST_COMPLETED) + return done + + def cancel(self) -> None: + self.concurrent_executor.shutdown(wait=True, cancel_futures=True) - def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None): - super().__init__(context=context, storage=storage, max_workers=max_workers) + def stop(self) -> None: + self.concurrent_executor.shutdown(wait=True, cancel_futures=True) + for process in self.concurrent_executor._processes.values(): + process.terminate() + + def close(self) -> None: + self.concurrent_executor.shutdown(wait=True) + + +class SpawnPoolProcessManager(PoolProcessManager): + + def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: + super().__init__( + mp_context=multiprocessing.get_context('spawn'), + max_workers=max_workers, + ) self.context = context self.storage = storage - def _get_mp_context(self) -> SpawnContext: - return multiprocessing.get_context('spawn') - - def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: - if is_interactive() and task.__class__.__module__ == '__main__': - raise RunnerError( - (f'Unable to submit {task.__class__.__qualname__} tasks to ' - 'SpawnProcessRunner because the task type is defined in the ' - '__main__ module from an interactive Python session. ' - 'Please define your task types in a separate `.py` Python ' - 'module file. For details, see: ' - 'https://ben-denham.github.io/labtech/cookbook/#spawn-interactive-main') - ) + def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, + task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + _spawn_interactive_main_check(self.__class__, task) filtered_context: LabContext = {} results_map: dict[Task, TaskResult] = {} @@ -281,8 +353,9 @@ def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, dependency_task: self.results_map[dependency_task] for dependency_task in get_direct_dependencies(task, all_identities=False) } - return executor.submit( - self._subprocess_func, + + return self.concurrent_executor.submit( + _task_subprocess_func, task=task, task_name=task_name, use_cache=use_cache, @@ -294,59 +367,230 @@ def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, ) -class SpawnRunnerBackend(RunnerBackend): +class SpawnPoolRunnerBackend(RunnerBackend): """ - Runner Backend that runs each task in a spawned subprocess. + Runner Backend that runs tasks on a pool of spawned subprocesses. The required context and dependency task results are copied/duplicated into the memory of each subprocess. """ - def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> SpawnProcessRunner: - if 'spawn' not in get_supported_start_methods(): - raise RunnerError( - ("The 'spawn' start method for processes is not supported by your operating system. " - "Please specify a system-compatible runner_backend.") + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ProcessRunner: + _spawn_start_method_check() + return ProcessRunner( + process_manager=SpawnPoolProcessManager( + context=context, + storage=storage, + max_workers=max_workers, ) + ) + - return SpawnProcessRunner( +@dataclass +class PoolRunnerMemory: + context: LabContext + storage: Storage + + +_RUNNER_FORK_POOL_MEMORY: dict[UUID, PoolRunnerMemory] = {} + + +class ForkPoolProcessManager(PoolProcessManager): + + def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: + super().__init__( + mp_context=multiprocessing.get_context('fork'), + max_workers=max_workers, + ) + self.uuid = uuid4() + _RUNNER_FORK_POOL_MEMORY[self.uuid] = PoolRunnerMemory( context=context, storage=storage, + ) + + @staticmethod + def _fork_task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, + results_map: ResultsMap, task_event_queue: Queue, + log_queue: Queue, uuid: UUID): + runner_memory = _RUNNER_FORK_POOL_MEMORY[uuid] + return _task_subprocess_func( + task=task, + task_name=task_name, + use_cache=use_cache, + filtered_context=task.filter_context(runner_memory.context), + storage=runner_memory.storage, + results_map=results_map, + task_event_queue=task_event_queue, + log_queue=log_queue, + ) + + def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, + task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + results_map: dict[Task, TaskResult] = {} + if not use_cache: + # In order to minimise memory use, only transfer results + # to the subprocess if we are going to run the task (and + # not just load its result from cache). + results_map = { + dependency_task: self.results_map[dependency_task] + for dependency_task in get_direct_dependencies(task, all_identities=False) + } + return self.concurrent_executor.submit( + self._fork_task_subprocess_func, + task=task, + task_name=task_name, + use_cache=use_cache, + results_map=results_map, + task_event_queue=task_event_queue, + log_queue=log_queue, + uuid=self.uuid, + ) + + def close(self) -> None: + super().close() + try: + del _RUNNER_FORK_POOL_MEMORY[self.uuid] + except KeyError: + # uuid not may be found if close() is called twice. + pass + + +class ForkPoolRunnerBackend(RunnerBackend): + """ + Runner Backend that runs tasks on a pool of forked subprocesses. + + The context is shared in-memory between each subprocess. Dependency task + results are copied/duplicated into the memory of each subprocess. + + """ + + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ProcessRunner: + _fork_start_method_check() + return ProcessRunner( + process_manager=ForkPoolProcessManager( + context=context, + storage=storage, + max_workers=max_workers, + ) + ) + + +# === Subprocesses Per-Task === + +class PerTaskProcessManager(ProcessManager[ExecutorFuture]): + + def __init__(self, *, mp_context: BaseContext, max_workers: int | None) -> None: + super().__init__() + self.executor = ProcessExecutor( + mp_context=mp_context, + max_workers=max_workers, + ) + + def get_completed_futures(self, futures: list[ExecutorFuture], timeout_seconds: float | None) -> list[ExecutorFuture]: + done, _ = self.executor.wait(futures, timeout_seconds=timeout_seconds) + return done + + def cancel(self) -> None: + self.executor.cancel() + + def stop(self) -> None: + self.executor.stop() + + def close(self) -> None: + pass + + +class SpawnPerTaskProcessManager(PerTaskProcessManager): + + def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: + super().__init__( + mp_context=multiprocessing.get_context('spawn'), max_workers=max_workers, ) + self.context = context + self.storage = storage + + def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, + task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + _spawn_interactive_main_check(self.__class__, task) + + filtered_context: LabContext = {} + results_map: dict[Task, TaskResult] = {} + if not use_cache: + # In order to minimise memory use, only transfer context + # and results to the subprocess if we are going to run the + # task (and not just load its result from cache) and allow + # the task to filter the context to only what it needs. + filtered_context = task.filter_context(self.context) + results_map = { + dependency_task: self.results_map[dependency_task] + for dependency_task in get_direct_dependencies(task, all_identities=False) + } + + return self.executor.submit( + _task_subprocess_func, + task=task, + task_name=task_name, + use_cache=use_cache, + results_map=results_map, + filtered_context=filtered_context, + storage=self.storage, + task_event_queue=task_event_queue, + log_queue=log_queue, + ) + + +class SpawnPerTaskRunnerBackend(RunnerBackend): + """ + Runner Backend that runs each task in a spawned subprocess. + + The required context and dependency task results are + copied/duplicated into the memory of each subprocess. + + """ + + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ProcessRunner: + _spawn_start_method_check() + return ProcessRunner( + process_manager=SpawnPerTaskProcessManager( + context=context, + storage=storage, + max_workers=max_workers, + ) + ) @dataclass -class RunnerMemory: +class PerTaskRunnerMemory: context: LabContext storage: Storage results_map: ResultsMap -_RUNNER_FORK_MEMORY: dict[UUID, RunnerMemory] = {} +_RUNNER_FORK_PER_TASK_MEMORY: dict[UUID, PerTaskRunnerMemory] = {} -class ForkProcessRunner(ProcessRunner): +class ForkPerTaskProcessManager(PerTaskProcessManager): - def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None): - super().__init__(context=context, storage=storage, max_workers=max_workers) + def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: + super().__init__( + mp_context=multiprocessing.get_context('fork'), + max_workers=max_workers, + ) self.uuid = uuid4() - _RUNNER_FORK_MEMORY[self.uuid] = RunnerMemory( + _RUNNER_FORK_PER_TASK_MEMORY[self.uuid] = PerTaskRunnerMemory( context=context, storage=storage, results_map=self.results_map, ) - def _get_mp_context(self) -> ForkContext: - return multiprocessing.get_context('fork') - @staticmethod - def _fork_subprocess_func(*, _subprocess_func: Callable, task: Task, task_name: str, - use_cache: bool, task_event_queue: Queue, log_queue: Queue, - uuid: UUID) -> TaskResult: - runner_memory = _RUNNER_FORK_MEMORY[uuid] - return _subprocess_func( + def _fork_task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, + task_event_queue: Queue, log_queue: Queue, + uuid: UUID): + runner_memory = _RUNNER_FORK_PER_TASK_MEMORY[uuid] + return _task_subprocess_func( task=task, task_name=task_name, use_cache=use_cache, @@ -357,11 +601,10 @@ def _fork_subprocess_func(*, _subprocess_func: Callable, task: Task, task_name: log_queue=log_queue, ) - def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, - use_cache: bool, task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: - return executor.submit( - self._fork_subprocess_func, - _subprocess_func=self._subprocess_func, + def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, + task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + return self.executor.submit( + self._fork_task_subprocess_func, task=task, task_name=task_name, use_cache=use_cache, @@ -373,13 +616,13 @@ def _submit_task(self, executor: ProcessExecutor, task: Task, task_name: str, def close(self) -> None: super().close() try: - del _RUNNER_FORK_MEMORY[self.uuid] + del _RUNNER_FORK_PER_TASK_MEMORY[self.uuid] except KeyError: # uuid not may be found if close() is called twice. pass -class ForkRunnerBackend(RunnerBackend): +class ForkPerTaskRunnerBackend(RunnerBackend): """ Runner Backend that runs each task in a forked subprocess. @@ -388,15 +631,12 @@ class ForkRunnerBackend(RunnerBackend): """ - def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ForkProcessRunner: - if 'fork' not in get_supported_start_methods(): - raise RunnerError( - ("The 'fork' start method for processes is not supported by your operating system. " - "Try switching to runner_backend='spawn' or specify another system-compatible runner_backend.") + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ProcessRunner: + _fork_start_method_check() + return ProcessRunner( + process_manager=ForkPerTaskProcessManager( + context=context, + storage=storage, + max_workers=max_workers, ) - - return ForkProcessRunner( - context=context, - storage=storage, - max_workers=max_workers, ) From 8cb4219c982fcffe45831da40f375b944e017327 Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Sun, 1 Jun 2025 21:48:15 +1200 Subject: [PATCH 04/10] Simplify process runners by using inheritance instead of composition --- labtech/runners/process.py | 221 +++++++++++++++---------------------- 1 file changed, 92 insertions(+), 129 deletions(-) diff --git a/labtech/runners/process.py b/labtech/runners/process.py index f12c866..9361399 100644 --- a/labtech/runners/process.py +++ b/labtech/runners/process.py @@ -33,6 +33,9 @@ from labtech.types import LabContext, ResultMeta, ResultsMap, Storage, Task, TaskMonitorInfo, TaskResult +Future = ConcurrentFuture | ExecutorFuture +FutureT = TypeVar('FutureT', bound=Future, covariant=True) + @dataclass(frozen=True) class TaskStartEvent: @@ -145,64 +148,15 @@ def _task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, sys.stderr = orig_stderr -FutureT = TypeVar('FutureT', bound=ExecutorFuture | ConcurrentFuture) - - -class ProcessManager(ABC, Generic[FutureT]): - - def __init__(self) -> None: - self.results_map: dict[Task, TaskResult] = {} - - def set_result(self, task: Task, task_result: TaskResult) -> None: - self.results_map[task] = task_result - - def get_result(self, task: Task) -> TaskResult: - return self.results_map[task] - - def remove_result(self, task: Task) -> None: - if task not in self.results_map: - return - logger.debug(f"Removing result from in-memory cache for task: '{task}'") - del self.results_map[task] - - @abstractmethod - def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, - task_event_queue: Queue, log_queue: Queue) -> FutureT: - """Should submit the execution of _task_subprocess_func() on the given - in a subprocess and return the resulting future. - - The implementation of this method to load or otherwise prepare - context or dependency results for the task. - - """ - - @abstractmethod - def get_completed_futures(self, futures: list[FutureT], timeout_seconds: float | None) -> list[FutureT]: - """Return a sub-sequence of the given futures that have been completed.""" - - @abstractmethod - def cancel(self) -> None: - """Cancel all scheduled subprocesses that have not yet been started.""" - - @abstractmethod - def stop(self) -> None: - """Stop all currently running subprocesses.""" - - @abstractmethod - def close(self) -> None: - """Stop all currently running subprocesses.""" - - -class ProcessRunner(Runner, Generic[FutureT]): +class ProcessRunner(Runner, Generic[FutureT], ABC): """Runner based on Python multiprocessing.""" - def __init__(self, *, process_manager: ProcessManager[FutureT]) -> None: - self.process_manager = process_manager - + def __init__(self) -> None: self.log_queue = multiprocessing.Manager().Queue(-1) self.task_event_queue = multiprocessing.Manager().Queue(-1) self.process_monitor = ProcessMonitor(task_event_queue = self.task_event_queue) + self.results_map: dict[Task, TaskResult] = {} self.future_to_task: dict[FutureT, Task] = {} def _consume_log_queue(self): @@ -216,18 +170,16 @@ def _consume_log_queue(self): logger.handle(record) def submit_task(self, task: Task, task_name: str, use_cache: bool) -> None: - future = self.process_manager.schedule_subprocess( + future = self._schedule_subprocess( task=task, task_name=task_name, use_cache=use_cache, - task_event_queue=self.task_event_queue, - log_queue=self.log_queue, ) self.future_to_task[future] = task def wait(self, *, timeout_seconds: float | None) -> Iterator[tuple[Task, ResultMeta | BaseException]]: self._consume_log_queue() - done = self.process_manager.get_completed_futures( + done = self._get_completed_futures( futures=list(self.future_to_task.keys()), timeout_seconds=timeout_seconds, ) @@ -240,7 +192,7 @@ def wait(self, *, timeout_seconds: float | None) -> Iterator[tuple[Task, ResultM except BaseException as ex: yield (task, ex) else: - self.process_manager.set_result(task, task_result) + self.results_map[task] = task_result yield (task, task_result.meta) self.future_to_task = { future: self.future_to_task[future] @@ -248,15 +200,9 @@ def wait(self, *, timeout_seconds: float | None) -> Iterator[tuple[Task, ResultM if future not in done } - def cancel(self) -> None: - self.process_manager.cancel() - - def stop(self) -> None: - self.process_manager.stop() - def close(self) -> None: self._consume_log_queue() - self.process_manager.close() + self._close_executor() def pending_task_count(self) -> int: return len(self.future_to_task) @@ -265,11 +211,40 @@ def get_task_infos(self) -> list[TaskMonitorInfo]: return self.process_monitor.get_process_infos() def get_result(self, task: Task) -> TaskResult: - return self.process_manager.get_result(task) + return self.results_map[task] def remove_results(self, tasks: Sequence[Task]) -> None: for task in tasks: - self.process_manager.remove_result(task) + if task not in self.results_map: + return + logger.debug(f"Removing result from in-memory cache for task: '{task}'") + del self.results_map[task] + + @abstractmethod + def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> FutureT: + """Should submit the execution of _task_subprocess_func() + for the given task in a subprocess and return the resulting Future. + + The implementation of this method to load or otherwise prepare + context or dependency results for the task. + + """ + + @abstractmethod + def _get_completed_futures(self, futures: list[FutureT], timeout_seconds: float | None) -> list[FutureT]: + """Return a sub-sequence of the given futures that have been completed.""" + + @abstractmethod + def cancel(self) -> None: + pass + + @abstractmethod + def stop(self) -> None: + pass + + @abstractmethod + def _close_executor(self) -> None: + """Stop all currently running subprocesses.""" def _spawn_start_method_check() -> None: @@ -302,7 +277,7 @@ def _fork_start_method_check() -> None: # === Subprocess Pools === -class PoolProcessManager(ProcessManager[ConcurrentFuture], ABC): +class PoolProcessRunner(ProcessRunner[ConcurrentFuture], ABC): def __init__(self, *, mp_context: BaseContext, max_workers: int | None) -> None: super().__init__() @@ -311,9 +286,9 @@ def __init__(self, *, mp_context: BaseContext, max_workers: int | None) -> None: max_workers=max_workers, ) - def get_completed_futures(self, futures: list[ExecutorFuture], timeout_seconds: float | None) -> list[ExecutorFuture]: + def _get_completed_futures(self, futures: list[ConcurrentFuture], timeout_seconds: float | None) -> list[ConcurrentFuture]: done, _ = wait_futures(futures, timeout=timeout_seconds, return_when=FIRST_COMPLETED) - return done + return list(done) def cancel(self) -> None: self.concurrent_executor.shutdown(wait=True, cancel_futures=True) @@ -323,11 +298,11 @@ def stop(self) -> None: for process in self.concurrent_executor._processes.values(): process.terminate() - def close(self) -> None: + def _close_executor(self) -> None: self.concurrent_executor.shutdown(wait=True) -class SpawnPoolProcessManager(PoolProcessManager): +class SpawnPoolProcessRunner(PoolProcessRunner): def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: super().__init__( @@ -337,8 +312,7 @@ def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | self.context = context self.storage = storage - def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, - task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> ConcurrentFuture: _spawn_interactive_main_check(self.__class__, task) filtered_context: LabContext = {} @@ -362,8 +336,8 @@ def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, results_map=results_map, filtered_context=filtered_context, storage=self.storage, - task_event_queue=task_event_queue, - log_queue=log_queue, + task_event_queue=self.task_event_queue, + log_queue=self.log_queue, ) @@ -376,14 +350,12 @@ class SpawnPoolRunnerBackend(RunnerBackend): """ - def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ProcessRunner: + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> SpawnPoolProcessRunner: _spawn_start_method_check() - return ProcessRunner( - process_manager=SpawnPoolProcessManager( - context=context, - storage=storage, - max_workers=max_workers, - ) + return SpawnPoolProcessRunner( + context=context, + storage=storage, + max_workers=max_workers, ) @@ -396,7 +368,7 @@ class PoolRunnerMemory: _RUNNER_FORK_POOL_MEMORY: dict[UUID, PoolRunnerMemory] = {} -class ForkPoolProcessManager(PoolProcessManager): +class ForkPoolProcessRunner(PoolProcessRunner): def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: super().__init__( @@ -412,7 +384,7 @@ def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | @staticmethod def _fork_task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, results_map: ResultsMap, task_event_queue: Queue, - log_queue: Queue, uuid: UUID): + log_queue: Queue, uuid: UUID) -> TaskResult: runner_memory = _RUNNER_FORK_POOL_MEMORY[uuid] return _task_subprocess_func( task=task, @@ -425,8 +397,7 @@ def _fork_task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, log_queue=log_queue, ) - def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, - task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> ConcurrentFuture: results_map: dict[Task, TaskResult] = {} if not use_cache: # In order to minimise memory use, only transfer results @@ -442,17 +413,17 @@ def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, task_name=task_name, use_cache=use_cache, results_map=results_map, - task_event_queue=task_event_queue, - log_queue=log_queue, + task_event_queue=self.task_event_queue, + log_queue=self.log_queue, uuid=self.uuid, ) - def close(self) -> None: - super().close() + def _close_executor(self) -> None: + super()._close_executor() try: del _RUNNER_FORK_POOL_MEMORY[self.uuid] except KeyError: - # uuid not may be found if close() is called twice. + # uuid not may be found if _close_executor() is called twice. pass @@ -465,20 +436,18 @@ class ForkPoolRunnerBackend(RunnerBackend): """ - def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ProcessRunner: + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ForkPoolProcessRunner: _fork_start_method_check() - return ProcessRunner( - process_manager=ForkPoolProcessManager( - context=context, - storage=storage, - max_workers=max_workers, - ) + return ForkPoolProcessRunner( + context=context, + storage=storage, + max_workers=max_workers, ) # === Subprocesses Per-Task === -class PerTaskProcessManager(ProcessManager[ExecutorFuture]): +class PerTaskProcessRunner(ProcessRunner[ExecutorFuture], ABC): def __init__(self, *, mp_context: BaseContext, max_workers: int | None) -> None: super().__init__() @@ -487,7 +456,7 @@ def __init__(self, *, mp_context: BaseContext, max_workers: int | None) -> None: max_workers=max_workers, ) - def get_completed_futures(self, futures: list[ExecutorFuture], timeout_seconds: float | None) -> list[ExecutorFuture]: + def _get_completed_futures(self, futures: list[ExecutorFuture], timeout_seconds: float | None) -> list[ExecutorFuture]: done, _ = self.executor.wait(futures, timeout_seconds=timeout_seconds) return done @@ -497,11 +466,11 @@ def cancel(self) -> None: def stop(self) -> None: self.executor.stop() - def close(self) -> None: + def _close_executor(self) -> None: pass -class SpawnPerTaskProcessManager(PerTaskProcessManager): +class SpawnPerTaskProcessRunner(PerTaskProcessRunner): def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: super().__init__( @@ -511,8 +480,7 @@ def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | self.context = context self.storage = storage - def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, - task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> ExecutorFuture: _spawn_interactive_main_check(self.__class__, task) filtered_context: LabContext = {} @@ -536,8 +504,8 @@ def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, results_map=results_map, filtered_context=filtered_context, storage=self.storage, - task_event_queue=task_event_queue, - log_queue=log_queue, + task_event_queue=self.task_event_queue, + log_queue=self.log_queue, ) @@ -550,14 +518,12 @@ class SpawnPerTaskRunnerBackend(RunnerBackend): """ - def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ProcessRunner: + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> SpawnPerTaskProcessRunner: _spawn_start_method_check() - return ProcessRunner( - process_manager=SpawnPerTaskProcessManager( - context=context, - storage=storage, - max_workers=max_workers, - ) + return SpawnPerTaskProcessRunner( + context=context, + storage=storage, + max_workers=max_workers, ) @@ -571,7 +537,7 @@ class PerTaskRunnerMemory: _RUNNER_FORK_PER_TASK_MEMORY: dict[UUID, PerTaskRunnerMemory] = {} -class ForkPerTaskProcessManager(PerTaskProcessManager): +class ForkPerTaskProcessRunner(PerTaskProcessRunner): def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: super().__init__( @@ -588,7 +554,7 @@ def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | @staticmethod def _fork_task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, task_event_queue: Queue, log_queue: Queue, - uuid: UUID): + uuid: UUID) -> TaskResult: runner_memory = _RUNNER_FORK_PER_TASK_MEMORY[uuid] return _task_subprocess_func( task=task, @@ -601,24 +567,23 @@ def _fork_task_subprocess_func(*, task: Task, task_name: str, use_cache: bool, log_queue=log_queue, ) - def schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool, - task_event_queue: Queue, log_queue: Queue) -> ExecutorFuture: + def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> ExecutorFuture: return self.executor.submit( self._fork_task_subprocess_func, task=task, task_name=task_name, use_cache=use_cache, - task_event_queue=task_event_queue, - log_queue=log_queue, + task_event_queue=self.task_event_queue, + log_queue=self.log_queue, uuid=self.uuid, ) - def close(self) -> None: - super().close() + def _close_executor(self) -> None: + super()._close_executor() try: del _RUNNER_FORK_PER_TASK_MEMORY[self.uuid] except KeyError: - # uuid not may be found if close() is called twice. + # uuid not may be found if _close_executor() is called twice. pass @@ -631,12 +596,10 @@ class ForkPerTaskRunnerBackend(RunnerBackend): """ - def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ProcessRunner: + def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ForkPerTaskProcessRunner: _fork_start_method_check() - return ProcessRunner( - process_manager=ForkPerTaskProcessManager( - context=context, - storage=storage, - max_workers=max_workers, - ) + return ForkPerTaskProcessRunner( + context=context, + storage=storage, + max_workers=max_workers, ) From f02e81e080ca982318d721d249e5d4d4e4c02d2b Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Sun, 1 Jun 2025 21:48:52 +1200 Subject: [PATCH 05/10] Default to pool-based process runners --- docs/runners.md | 9 +++++++-- labtech/lab.py | 18 +++++++++--------- labtech/runners/__init__.py | 13 ++++++++++--- tests/integration/test_e2e.py | 3 ++- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/docs/runners.md b/docs/runners.md index 68af181..0cdaa71 100644 --- a/docs/runners.md +++ b/docs/runners.md @@ -4,12 +4,17 @@ You can control how tasks are executed in parallel by specifying an instance of one of the following Runner Backend classes for the `runner_backend` argument of your [`Lab`][labtech.Lab]: -::: labtech.runners.ForkRunnerBackend +::: labtech.runners.SpawnPoolRunnerBackend options: heading_level: 3 show_source: False -::: labtech.runners.SpawnRunnerBackend +::: labtech.runners.ForkPoolRunnerBackend + options: + heading_level: 3 + show_source: False + +::: labtech.runners.ForkPerTaskRunnerBackend options: heading_level: 3 show_source: False diff --git a/labtech/lab.py b/labtech/lab.py index 22bf8f3..47b58fc 100644 --- a/labtech/lab.py +++ b/labtech/lab.py @@ -12,7 +12,7 @@ from .exceptions import LabError, TaskNotFound from .monitor import TaskMonitor -from .runners import ForkRunnerBackend, SerialRunnerBackend, SpawnRunnerBackend, ThreadRunnerBackend +from .runners import ForkPoolRunnerBackend, SerialRunnerBackend, SpawnPoolRunnerBackend, ThreadRunnerBackend from .storage import LocalStorage, NullStorage from .tasks import get_direct_dependencies from .types import ResultMeta, is_task, is_task_type @@ -344,16 +344,16 @@ def __init__(self, *, optionally be set to one of the following options: * `'fork'`: Uses the - [`ForkRunnerBackend`][labtech.runners.ForkRunnerBackend] - to run each task in a forked subprocess. Memory use + [`ForkPoolRunnerBackend`][labtech.runners.ForkPoolRunnerBackend] + to run tasks on a pool of forked subprocesses. Memory use is reduced by sharing the context and dependency task results between tasks with memory inherited from the parent process. The default on platforms that support forked Python subprocesses when `max_workers > 1`: Linux and other POSIX systems, but not macOS or Windows. * `'spawn'`: Uses the - [`SpawnRunnerBackend`][labtech.runners.SpawnRunnerBackend] - to run each task in a spawned subprocess. The + [`SpawnPoolRunnerBackend`][labtech.runners.SpawnPoolRunnerBackend] + to run tasks on a pool of spawned subprocesses. The context and dependency task results are copied/duplicated into the memory of each subprocess. The default on macOS and Windows when @@ -402,18 +402,18 @@ def __init__(self, *, if self.max_workers == 1: runner_backend = ThreadRunnerBackend() elif 'fork' in start_methods: - runner_backend = ForkRunnerBackend() + runner_backend = ForkPoolRunnerBackend() elif 'spawn' in start_methods: - runner_backend = SpawnRunnerBackend() + runner_backend = SpawnPoolRunnerBackend() else: raise LabError(('Default \'fork\' and \'spawn\' multiprocessing runner ' 'backends are not supported on your system.' 'Please specify a system-compatible runner_backend.')) elif isinstance(runner_backend, str): if runner_backend == 'fork': - runner_backend = ForkRunnerBackend() + runner_backend = ForkPoolRunnerBackend() elif runner_backend == 'spawn': - runner_backend = SpawnRunnerBackend() + runner_backend = SpawnPoolRunnerBackend() elif runner_backend == 'serial': runner_backend = SerialRunnerBackend() elif runner_backend == 'thread': diff --git a/labtech/runners/__init__.py b/labtech/runners/__init__.py index 8b7fa77..75fc8c3 100644 --- a/labtech/runners/__init__.py +++ b/labtech/runners/__init__.py @@ -1,10 +1,17 @@ -from .process import ForkRunnerBackend, SpawnRunnerBackend +from .process import ( + ForkPerTaskRunnerBackend, + ForkPoolRunnerBackend, + SpawnPerTaskRunnerBackend, + SpawnPoolRunnerBackend, +) from .serial import SerialRunnerBackend from .thread import ThreadRunnerBackend __all__ = [ - 'ForkRunnerBackend', - 'SpawnRunnerBackend', + 'ForkPoolRunnerBackend', + 'SpawnPoolRunnerBackend', + 'ForkPerTaskRunnerBackend', + 'SpawnPerTaskRunnerBackend', 'SerialRunnerBackend', 'ThreadRunnerBackend', ] diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py index 24a972a..0af04e7 100644 --- a/tests/integration/test_e2e.py +++ b/tests/integration/test_e2e.py @@ -10,6 +10,7 @@ import labtech from labtech.exceptions import RunnerError +from labtech.runners.process import ForkPerTaskRunnerBackend, SpawnPerTaskRunnerBackend from labtech.runners.ray import RayRunnerBackend if TYPE_CHECKING: @@ -181,7 +182,7 @@ def evaluations(context: dict[str, Any]) -> dict[str, Evaluation]: class TestE2E: @pytest.mark.parametrize('max_workers', [1, 4, None]) - @pytest.mark.parametrize('runner_backend', ['serial', 'fork', 'spawn', 'thread']) + @pytest.mark.parametrize('runner_backend', ['serial', 'fork', ForkPerTaskRunnerBackend(), 'spawn', SpawnPerTaskRunnerBackend(), 'thread']) @pytest.mark.parametrize('evaluation_key', active_evaluation_keys) def test_e2e(self, max_workers: int, runner_backend: str, evaluation_key: str, context: dict[str, Any], evaluations: dict[str, Evaluation]) -> None: evaluation = evaluations[evaluation_key] From dd20d7a4796c6daa8092f2a7b08e8ea8c7d8251e Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Sun, 1 Jun 2025 21:56:59 +1200 Subject: [PATCH 06/10] Remove SpawnPerTaskRunnerBackend, and add fork-ondemand shorthand. SpawnPerTaskRunnerBackend doesn't save memory and inefficiently spawns a subprocess per task. --- labtech/lab.py | 28 +++++++++++------ labtech/runners/__init__.py | 2 -- labtech/runners/process.py | 57 ----------------------------------- tests/integration/test_e2e.py | 5 ++- 4 files changed, 21 insertions(+), 71 deletions(-) diff --git a/labtech/lab.py b/labtech/lab.py index 47b58fc..37da232 100644 --- a/labtech/lab.py +++ b/labtech/lab.py @@ -12,7 +12,7 @@ from .exceptions import LabError, TaskNotFound from .monitor import TaskMonitor -from .runners import ForkPoolRunnerBackend, SerialRunnerBackend, SpawnPoolRunnerBackend, ThreadRunnerBackend +from .runners import ForkPerTaskRunnerBackend, ForkPoolRunnerBackend, SerialRunnerBackend, SpawnPoolRunnerBackend, ThreadRunnerBackend from .storage import LocalStorage, NullStorage from .tasks import get_direct_dependencies from .types import ResultMeta, is_task, is_task_type @@ -343,14 +343,6 @@ def __init__(self, *, runner_backend: Controls how tasks are run in parallel. It can optionally be set to one of the following options: - * `'fork'`: Uses the - [`ForkPoolRunnerBackend`][labtech.runners.ForkPoolRunnerBackend] - to run tasks on a pool of forked subprocesses. Memory use - is reduced by sharing the context and dependency task - results between tasks with memory inherited from the - parent process. The default on platforms that support - forked Python subprocesses when `max_workers > 1`: Linux - and other POSIX systems, but not macOS or Windows. * `'spawn'`: Uses the [`SpawnPoolRunnerBackend`][labtech.runners.SpawnPoolRunnerBackend] to run tasks on a pool of spawned subprocesses. The @@ -358,6 +350,22 @@ def __init__(self, *, copied/duplicated into the memory of each subprocess. The default on macOS and Windows when `max_workers > 1`. + * `'fork'`: Uses the + [`ForkPoolRunnerBackend`][labtech.runners.ForkPoolRunnerBackend] + to run tasks on a pool of forked subprocesses. + Memory use is reduced by sharing the context between + tasks with memory inherited from the parent process. + The default on platforms that support forked Python + subprocesses when `max_workers > 1`: Linux and other + POSIX systems, but not macOS or Windows. + * `'fork-ondemand'`: Uses the + [`ForkPerTaskRunnerBackend`][labtech.runners.ForkPerTaskRunnerBackend] + to run each task in a forked subprocess. Shares + dependency task results as well as the context in + memory shared between subprocesses, but at the cost + of forking a new subprocess for each task. Best used + when dependency task results are large compared to + the overall number of tasks. * `'thread'`: Uses the [`ThreadRunnerBackend`][labtech.runners.ThreadRunnerBackend] to run each task in a separate Python thread. Because @@ -412,6 +420,8 @@ def __init__(self, *, elif isinstance(runner_backend, str): if runner_backend == 'fork': runner_backend = ForkPoolRunnerBackend() + elif runner_backend == 'fork-ondemand': + runner_backend = ForkPerTaskRunnerBackend() elif runner_backend == 'spawn': runner_backend = SpawnPoolRunnerBackend() elif runner_backend == 'serial': diff --git a/labtech/runners/__init__.py b/labtech/runners/__init__.py index 75fc8c3..6efa9bf 100644 --- a/labtech/runners/__init__.py +++ b/labtech/runners/__init__.py @@ -1,7 +1,6 @@ from .process import ( ForkPerTaskRunnerBackend, ForkPoolRunnerBackend, - SpawnPerTaskRunnerBackend, SpawnPoolRunnerBackend, ) from .serial import SerialRunnerBackend @@ -11,7 +10,6 @@ 'ForkPoolRunnerBackend', 'SpawnPoolRunnerBackend', 'ForkPerTaskRunnerBackend', - 'SpawnPerTaskRunnerBackend', 'SerialRunnerBackend', 'ThreadRunnerBackend', ] diff --git a/labtech/runners/process.py b/labtech/runners/process.py index 9361399..94a3356 100644 --- a/labtech/runners/process.py +++ b/labtech/runners/process.py @@ -470,63 +470,6 @@ def _close_executor(self) -> None: pass -class SpawnPerTaskProcessRunner(PerTaskProcessRunner): - - def __init__(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> None: - super().__init__( - mp_context=multiprocessing.get_context('spawn'), - max_workers=max_workers, - ) - self.context = context - self.storage = storage - - def _schedule_subprocess(self, *, task: Task, task_name: str, use_cache: bool) -> ExecutorFuture: - _spawn_interactive_main_check(self.__class__, task) - - filtered_context: LabContext = {} - results_map: dict[Task, TaskResult] = {} - if not use_cache: - # In order to minimise memory use, only transfer context - # and results to the subprocess if we are going to run the - # task (and not just load its result from cache) and allow - # the task to filter the context to only what it needs. - filtered_context = task.filter_context(self.context) - results_map = { - dependency_task: self.results_map[dependency_task] - for dependency_task in get_direct_dependencies(task, all_identities=False) - } - - return self.executor.submit( - _task_subprocess_func, - task=task, - task_name=task_name, - use_cache=use_cache, - results_map=results_map, - filtered_context=filtered_context, - storage=self.storage, - task_event_queue=self.task_event_queue, - log_queue=self.log_queue, - ) - - -class SpawnPerTaskRunnerBackend(RunnerBackend): - """ - Runner Backend that runs each task in a spawned subprocess. - - The required context and dependency task results are - copied/duplicated into the memory of each subprocess. - - """ - - def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> SpawnPerTaskProcessRunner: - _spawn_start_method_check() - return SpawnPerTaskProcessRunner( - context=context, - storage=storage, - max_workers=max_workers, - ) - - @dataclass class PerTaskRunnerMemory: context: LabContext diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py index 0af04e7..518ed60 100644 --- a/tests/integration/test_e2e.py +++ b/tests/integration/test_e2e.py @@ -10,7 +10,6 @@ import labtech from labtech.exceptions import RunnerError -from labtech.runners.process import ForkPerTaskRunnerBackend, SpawnPerTaskRunnerBackend from labtech.runners.ray import RayRunnerBackend if TYPE_CHECKING: @@ -182,13 +181,13 @@ def evaluations(context: dict[str, Any]) -> dict[str, Evaluation]: class TestE2E: @pytest.mark.parametrize('max_workers', [1, 4, None]) - @pytest.mark.parametrize('runner_backend', ['serial', 'fork', ForkPerTaskRunnerBackend(), 'spawn', SpawnPerTaskRunnerBackend(), 'thread']) + @pytest.mark.parametrize('runner_backend', ['serial', 'fork', 'fork-ondemand', 'spawn', 'thread']) @pytest.mark.parametrize('evaluation_key', active_evaluation_keys) def test_e2e(self, max_workers: int, runner_backend: str, evaluation_key: str, context: dict[str, Any], evaluations: dict[str, Evaluation]) -> None: evaluation = evaluations[evaluation_key] # macOS and Windows don't support fork, so test graceful failure: - if runner_backend == 'fork' and platform.system() in {'Darwin', 'Windows'}: + if runner_backend in {'fork', 'fork-ondemand'} and platform.system() in {'Darwin', 'Windows'}: lab = labtech.Lab( storage=None, context=context, From 496dd7d846740534fb1c9276aca5c7c2c80dc90f Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Mon, 2 Jun 2025 11:01:42 +1200 Subject: [PATCH 07/10] Rename fork-ondemand to fork-per-task --- labtech/lab.py | 6 +++--- tests/integration/test_e2e.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/labtech/lab.py b/labtech/lab.py index 37da232..b078626 100644 --- a/labtech/lab.py +++ b/labtech/lab.py @@ -358,11 +358,11 @@ def __init__(self, *, The default on platforms that support forked Python subprocesses when `max_workers > 1`: Linux and other POSIX systems, but not macOS or Windows. - * `'fork-ondemand'`: Uses the + * `'fork-per-task'`: Uses the [`ForkPerTaskRunnerBackend`][labtech.runners.ForkPerTaskRunnerBackend] to run each task in a forked subprocess. Shares dependency task results as well as the context in - memory shared between subprocesses, but at the cost + memory shared between subprocesses but at the cost of forking a new subprocess for each task. Best used when dependency task results are large compared to the overall number of tasks. @@ -420,7 +420,7 @@ def __init__(self, *, elif isinstance(runner_backend, str): if runner_backend == 'fork': runner_backend = ForkPoolRunnerBackend() - elif runner_backend == 'fork-ondemand': + elif runner_backend == 'fork-per-task': runner_backend = ForkPerTaskRunnerBackend() elif runner_backend == 'spawn': runner_backend = SpawnPoolRunnerBackend() diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py index 518ed60..613bad7 100644 --- a/tests/integration/test_e2e.py +++ b/tests/integration/test_e2e.py @@ -181,13 +181,13 @@ def evaluations(context: dict[str, Any]) -> dict[str, Evaluation]: class TestE2E: @pytest.mark.parametrize('max_workers', [1, 4, None]) - @pytest.mark.parametrize('runner_backend', ['serial', 'fork', 'fork-ondemand', 'spawn', 'thread']) + @pytest.mark.parametrize('runner_backend', ['serial', 'fork', 'fork-per-task', 'spawn', 'thread']) @pytest.mark.parametrize('evaluation_key', active_evaluation_keys) def test_e2e(self, max_workers: int, runner_backend: str, evaluation_key: str, context: dict[str, Any], evaluations: dict[str, Evaluation]) -> None: evaluation = evaluations[evaluation_key] # macOS and Windows don't support fork, so test graceful failure: - if runner_backend in {'fork', 'fork-ondemand'} and platform.system() in {'Darwin', 'Windows'}: + if runner_backend in {'fork', 'fork-per-task'} and platform.system() in {'Darwin', 'Windows'}: lab = labtech.Lab( storage=None, context=context, From 248c66f8b11e2fa7b4dab6c0c18fc17f02ac58c6 Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Mon, 2 Jun 2025 11:01:52 +1200 Subject: [PATCH 08/10] Improve docs for process runner backends --- labtech/runners/process.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/labtech/runners/process.py b/labtech/runners/process.py index 94a3356..4add445 100644 --- a/labtech/runners/process.py +++ b/labtech/runners/process.py @@ -428,12 +428,16 @@ def _close_executor(self) -> None: class ForkPoolRunnerBackend(RunnerBackend): - """ - Runner Backend that runs tasks on a pool of forked subprocesses. + """Runner Backend that runs tasks on a pool of forked subprocesses. The context is shared in-memory between each subprocess. Dependency task results are copied/duplicated into the memory of each subprocess. + Because process forking is more efficient than spawning, and + because of the added benefit of not duplicating the context for + each task, this runner backend is recommended for any system that + supports process forking. + """ def build_runner(self, *, context: LabContext, storage: Storage, max_workers: int | None) -> ForkPoolProcessRunner: @@ -531,11 +535,17 @@ def _close_executor(self) -> None: class ForkPerTaskRunnerBackend(RunnerBackend): - """ - Runner Backend that runs each task in a forked subprocess. + """Runner Backend that runs each task in a separate forked + subprocess. The context and dependency task results are shared in-memory - between each subprocess. + between each subprocess but at the cost of forking a new + subprocess for each task. + + This runner backend is best used when dependency task results are + large (so time will be saved through memory sharing) compared to + the overall number of tasks (for large numbers of tasks, forking a + separate process for each may be a substantial overhead). """ From f8ecea2d21a3fe98ce38ff6e48b74de3811c81ff Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Mon, 2 Jun 2025 12:30:59 +1200 Subject: [PATCH 09/10] Update example notebook contents --- examples/cookbook.ipynb | 128 +++++++++++++++++++++------------------- examples/tutorial.ipynb | 86 +++++++++++++-------------- 2 files changed, 110 insertions(+), 104 deletions(-) diff --git a/examples/cookbook.ipynb b/examples/cookbook.ipynb index cccb062..f59faa8 100644 --- a/examples/cookbook.ipynb +++ b/examples/cookbook.ipynb @@ -11,7 +11,7 @@ "You can also run this cookbook as an [interactive\n", "notebook](https://mybinder.org/v2/gh/ben-denham/labtech/main?filepath=examples/cookbook.ipynb)." ], - "id": "364635a1-54b3-4bba-b443-2ad252916653" + "id": "fb1b0e78-7c15-4c31-8ed0-aee1fccf2aa2" }, { "cell_type": "code", @@ -21,7 +21,7 @@ "source": [ "%pip install labtech fsspec mlflow pandas scikit-learn setuptools" ], - "id": "5d00f3c1-3365-4599-8655-6bcd77ee7f27" + "id": "b65cb2a4-4725-472e-9ba4-7d26469eb73b" }, { "cell_type": "code", @@ -31,7 +31,7 @@ "source": [ "!mkdir storage" ], - "id": "fd946354-cf8f-4039-b903-522d4c796a4b" + "id": "045df8d2-4a2e-4882-addb-425c0d6b010c" }, { "cell_type": "code", @@ -50,7 +50,7 @@ "digits_X, digits_y = datasets.load_digits(return_X_y=True)\n", "digits_X = StandardScaler().fit_transform(digits_X)" ], - "id": "a2a4329d-0788-4188-b39f-3e01d36727b3" + "id": "c8421b93-ba49-4018-aa0b-e60c8696291a" }, { "cell_type": "markdown", @@ -64,7 +64,7 @@ "is sent to `STDOUT` (e.g. calls to `print()`) or `STDERR` (e.g. uncaught\n", "exceptions) will also be captured and logged:" ], - "id": "8304d056-7765-439b-bca7-68704d528a5f" + "id": "acc6dce7-c01c-4272-8683-1ad4126aac3e" }, { "cell_type": "code", @@ -91,7 +91,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(experiments)" ], - "id": "cbe87a89-a25e-49e4-8dcd-83bc108198bd" + "id": "1139f529-6be4-46d6-8d24-6403b24482a2" }, { "cell_type": "markdown", @@ -130,7 +130,7 @@ "learning model (like `LRClassifierTask` below), and then make a task of\n", "that type a parameter for your primary experiment task:" ], - "id": "d4983b56-b663-4ac6-ae08-03a615f60936" + "id": "2e1b31df-8536-48ea-a30a-bf2ed4c455d8" }, { "cell_type": "code", @@ -171,7 +171,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks([experiment])" ], - "id": "1ae81ffa-08e9-4c35-a8d6-d1364775ff80" + "id": "7da1cd65-d7b2-45bb-b7d5-3f58b59aa192" }, { "cell_type": "markdown", @@ -182,7 +182,7 @@ "[Protocol](https://docs.python.org/3/library/typing.html#typing.Protocol)\n", "that defines their common result type:" ], - "id": "475d760e-3c26-4002-a3e0-442fcd909d82" + "id": "8e1dcd85-6d94-4d98-a169-0dbd92b6facb" }, { "cell_type": "code", @@ -242,7 +242,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(experiments)" ], - "id": "023ad2b6-5ef6-4020-b248-6a6c6727f29b" + "id": "b660b25d-d759-4f65-91fd-40062b67d4b3" }, { "cell_type": "markdown", @@ -262,7 +262,7 @@ "> `Enum` must support equality between identical (but distinct) object\n", "> instances." ], - "id": "ebe47933-08da-4389-9311-84c4e8f961cd" + "id": "cb773f2f-0f02-4254-b0bc-39867d0cbb24" }, { "cell_type": "code", @@ -310,7 +310,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(experiments)" ], - "id": "4e346a5a-5d55-4652-8145-4b4f0ade903f" + "id": "fa3d794a-fc3c-4413-a397-5563fe92fec0" }, { "cell_type": "markdown", @@ -333,7 +333,7 @@ "The following example demonstrates specifying a `dataset_key` parameter\n", "to a task that is used to look up a dataset from the lab context:" ], - "id": "36c5721d-82c8-41a2-a84a-d4f700602dbd" + "id": "65b8bb54-9ed6-425f-b38b-fc986911e325" }, { "cell_type": "code", @@ -370,7 +370,7 @@ ")\n", "results = lab.run_tasks(experiments)" ], - "id": "aef913b8-bda7-4581-8968-917d8dff2872" + "id": "215bf95d-dd89-406d-baa9-d9f1c9f4272f" }, { "cell_type": "markdown", @@ -388,7 +388,7 @@ "cross-validation within the task using a number of workers specified in\n", "the lab context as `within_task_workers`:" ], - "id": "6a4317f8-c586-44e9-94b2-266f4cfa2e3a" + "id": "6e6a4356-3081-451e-8dc0-59ca74c73283" }, { "cell_type": "code", @@ -429,7 +429,7 @@ ")\n", "results = lab.run_tasks(experiments)" ], - "id": "09babc37-e996-4eea-80b3-779304159437" + "id": "0e14d9b3-ca31-4999-957b-d655f2eff507" }, { "cell_type": "markdown", @@ -456,7 +456,7 @@ "raised during the execution of a task will be logged, but the execution\n", "of other tasks will continue:" ], - "id": "e44e3be7-7c52-4fb5-b6f3-a58666c91a50" + "id": "b6944b5d-517d-4046-b924-d5ab1fcf9d6c" }, { "cell_type": "code", @@ -469,7 +469,7 @@ " continue_on_failure=True,\n", ")" ], - "id": "b7e09493-a45c-4898-889d-298c640a5234" + "id": "db0bb66e-6c66-4b50-b3ef-8babd5d3b798" }, { "cell_type": "markdown", @@ -488,7 +488,7 @@ "sub-class for that extension so that you can continue using caches for\n", "the base class:" ], - "id": "704ea6e1-ff9a-4a27-baeb-056dc97a119a" + "id": "76e200ae-f22d-463d-bf4b-39b550be3504" }, { "cell_type": "code", @@ -512,7 +512,7 @@ " base_result = super().run()\n", " return base_result * self.multiplier" ], - "id": "a66736a2-0583-4ab3-be9e-4175fa5aedef" + "id": "c57b89b6-db19-4b32-89c9-5176dd638669" }, { "cell_type": "markdown", @@ -524,7 +524,7 @@ "all cached task instances for a list of task types. You can then “run”\n", "the tasks to load their cached results:" ], - "id": "ebddf631-7c9d-40c7-a48a-345638749367" + "id": "0a54dfe7-7977-40ef-8f2a-d471bdc0412f" }, { "cell_type": "code", @@ -535,7 +535,7 @@ "cached_cvexperiment_tasks = lab.cached_tasks([CVExperiment])\n", "results = lab.run_tasks(cached_cvexperiment_tasks)" ], - "id": "97da569d-3d96-41df-bd53-8b2dea62beb8" + "id": "973cd6c4-8fe5-4e8b-8694-34d0aa019446" }, { "cell_type": "markdown", @@ -546,7 +546,7 @@ "You can clear the cache for a list of tasks using the `uncache_tasks()`\n", "method of a `Lab` instance:" ], - "id": "566dd0b8-1f88-498f-bc0c-6e549f976724" + "id": "3a834ec6-3f5a-4661-899f-528aa2d77524" }, { "cell_type": "code", @@ -556,7 +556,7 @@ "source": [ "lab.uncache_tasks(cached_cvexperiment_tasks)" ], - "id": "bbd00906-d7a7-4329-821f-c360bae7dba5" + "id": "f2188c69-e1c3-4dee-a1a1-f4b38ad32bc8" }, { "cell_type": "markdown", @@ -565,7 +565,7 @@ "You can also ignore all previously cached results when running a list of\n", "tasks by passing the `bust_cache` option to `run_tasks()`:" ], - "id": "d553cb51-9b41-4282-b3a3-f3b308fa2bf3" + "id": "5cfee59f-cf7a-457a-bdd1-32fea5ee81b9" }, { "cell_type": "code", @@ -575,7 +575,7 @@ "source": [ "lab.run_tasks(cached_cvexperiment_tasks, bust_cache=True)" ], - "id": "7b53d28b-42d0-4468-b0aa-118bb69798a7" + "id": "bad4fc13-ad3b-4864-8eb7-9759e3e61693" }, { "cell_type": "markdown", @@ -587,7 +587,7 @@ "(i.e. most changes to the `run()` method or the code it depends on) you\n", "should add or updated the `code_version` in `@task`. For example:" ], - "id": "09f964d7-3377-40d9-a722-c5964a51d7f4" + "id": "008c4ff3-5e49-469a-9e44-aa830d13a98e" }, { "cell_type": "code", @@ -599,7 +599,7 @@ "class Experiment:\n", " ..." ], - "id": "315bb5b2-6c0e-4d61-b5a5-b804e349cd95" + "id": "09194c41-7c71-4cbf-9345-b050e1628ef7" }, { "cell_type": "markdown", @@ -614,7 +614,7 @@ "results where the `code_version` does not match the\n", "`current_code_version`:" ], - "id": "dbb8113e-d1e1-4233-a806-a5a086c8d23b" + "id": "43f9dceb-bb61-466c-bb03-8b9a462e8f2a" }, { "cell_type": "code", @@ -632,7 +632,7 @@ "]\n", "lab.uncache_tasks(stale_cached_tasks)" ], - "id": "4f097252-6644-43dc-bbb6-332c20b1ca11" + "id": "2355ad82-fce5-4f15-b016-3816faf7a6f8" }, { "cell_type": "markdown", @@ -656,7 +656,7 @@ "consider using a\n", "[`TypeDict`](https://docs.python.org/3/library/typing.html#typing.TypedDict):" ], - "id": "23739b98-86a3-44e0-9d19-cd97d5549ab8" + "id": "83760fcc-042d-40ed-9cd4-f51e9f80b685" }, { "cell_type": "code", @@ -683,7 +683,7 @@ " model_weights=np.array([self.seed, self.seed ** 2]),\n", " )" ], - "id": "9823a688-dffe-4993-8121-0a2207bbf374" + "id": "478fff35-aaf9-4e2f-b83c-26f2d1dfaad0" }, { "cell_type": "markdown", @@ -701,7 +701,7 @@ "The following example demonstrates defining and using a custom cache\n", "type to store Pandas DataFrames as parquet files:" ], - "id": "dca9209d-15fe-434a-9612-009449b4eeb1" + "id": "e847ff3c-258d-4b3a-994c-b5dbc41d8267" }, { "cell_type": "code", @@ -743,7 +743,7 @@ "lab = labtech.Lab(storage='storage/parquet_example')\n", "lab.run_tasks([TabularTask()])" ], - "id": "71d1a759-e719-4342-903e-e0806d4e50d6" + "id": "d9519dd7-1305-429f-a9fd-1264cd86e1cc" }, { "cell_type": "markdown", @@ -765,7 +765,7 @@ "storage providers like [Azure Blob\n", "Storage](https://github.com/fsspec/adlfs)." ], - "id": "1a91866a-a191-4115-bd2f-c524c6a13444" + "id": "e2744532-5a16-4190-8a72-5951699e6326" }, { "cell_type": "code", @@ -775,7 +775,7 @@ "source": [ "%pip install s3fs" ], - "id": "8913a01c-01d9-43e4-86c4-ca568e639c80" + "id": "cc94a96e-55e9-4524-8cc2-5e8e7a3d1089" }, { "cell_type": "code", @@ -819,7 +819,7 @@ ")\n", "results = lab.run_tasks(experiments)" ], - "id": "47d2a888-c751-47dd-b2c8-863d01a4f635" + "id": "257e379a-40e9-422b-9951-971567430df1" }, { "cell_type": "markdown", @@ -850,7 +850,7 @@ "`AggregationTask` to aggregate the results from many individual tasks to\n", "create an aggregated cache that can be loaded more efficiently:" ], - "id": "031466a8-d4db-4179-ab94-9c8b92421965" + "id": "d30f8c48-8b2c-489a-bffb-e252a342ecb4" }, { "cell_type": "code", @@ -891,7 +891,7 @@ "lab = labtech.Lab(storage='storage/aggregation_lab')\n", "result = lab.run_task(aggregation_task)" ], - "id": "0c85100b-1c2e-41b6-a2e0-97c1865d0b33" + "id": "7c81fd6c-5755-4702-a3c1-17d0564858f9" }, { "cell_type": "markdown", @@ -919,7 +919,7 @@ "it was originally executed and how long it took to execute from the\n", "task’s `.result_meta` attribute:" ], - "id": "d7006021-4f19-40da-8084-a39a51083a99" + "id": "37b1b669-e6d0-4bb2-81f1-1be00ceb3047" }, { "cell_type": "code", @@ -930,7 +930,7 @@ "print(f'The task was executed at: {aggregation_task.result_meta.start}')\n", "print(f'The task execution took: {aggregation_task.result_meta.duration}')" ], - "id": "264e1538-7e09-4206-9724-6dde000f67af" + "id": "9160cd5a-ca4e-43ab-ada0-bcfcd708a4b4" }, { "cell_type": "markdown", @@ -950,7 +950,7 @@ "Another approach is to include all of the intermediate tasks for which\n", "you wish to access the results for in the call to `run_tasks()`:" ], - "id": "d24b1665-5e74-43da-9aac-61d28c6d6d6a" + "id": "31edb972-a422-4aa2-b532-bef239cd0504" }, { "cell_type": "code", @@ -978,7 +978,7 @@ " for experiment in experiments\n", "])" ], - "id": "48a86572-37e4-4bf5-a35f-ae2008af8b31" + "id": "a68e5143-9d15-4692-b751-10125599af6e" }, { "cell_type": "markdown", @@ -994,7 +994,7 @@ "This is modeled in labtech by defining a task type for each step, and\n", "having each step depend on the result from the previous step:" ], - "id": "1ec8595a-148e-48db-acc2-e84090200d2c" + "id": "74f2c64c-6ead-4253-a063-5c8b15a87437" }, { "cell_type": "code", @@ -1044,7 +1044,7 @@ "result = lab.run_task(task_c)\n", "print(result)" ], - "id": "592413ea-cf8f-4cd2-8cb8-89e77e1e5629" + "id": "b2bf5fe1-c87c-4fde-a8c2-7bfe25d161e9" }, { "cell_type": "markdown", @@ -1056,7 +1056,7 @@ "[Mermaid diagram](https://mermaid.js.org/syntax/classDiagram.html) of\n", "task types for a given list of tasks:" ], - "id": "82bf7154-b81d-4cb4-8792-af0f1b0e180b" + "id": "8059c0fd-2d64-4b60-8e63-f723545eca1b" }, { "cell_type": "code", @@ -1071,7 +1071,7 @@ " direction='RL',\n", ")" ], - "id": "002c3981-5afc-4641-a0a9-2cc1490463ea" + "id": "d1baadba-329d-42bf-98df-56eea1c3d6a6" }, { "cell_type": "markdown", @@ -1095,7 +1095,7 @@ "additional tracking calls (such as `mlflow.log_metric()` or\n", "`mlflow.log_model()`) in the body of your task’s `run()` method:" ], - "id": "1dff695d-201b-425e-9dee-5714b01aad81" + "id": "9307ee42-9141-4c3b-aaf5-55ab62d432da" }, { "cell_type": "code", @@ -1142,7 +1142,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(runs)" ], - "id": "ebf1abbd-92c3-4308-b4be-4b92e1645960" + "id": "bcc62e6c-3040-4259-a69f-5179ced0df9a" }, { "cell_type": "markdown", @@ -1191,7 +1191,7 @@ "non-definition code for a Python script in a `main()` function, and then\n", "guard the call to `main()` with `__name__ == '__main__'`:" ], - "id": "99ffcf6d-0ef9-4d30-b2f2-90daec2e1578" + "id": "28bc0b4d-9c72-4ccb-8c60-7d5bf71fa4c7" }, { "cell_type": "code", @@ -1222,7 +1222,7 @@ "if __name__ == '__main__':\n", " main()" ], - "id": "2978a1cc-a541-4df8-adf4-4deaca7ee629" + "id": "17513a58-5bdd-482d-8004-f114d1ebd588" }, { "cell_type": "markdown", @@ -1231,25 +1231,31 @@ "For details, see [Safe importing of main\n", "module](https://docs.python.org/3/library/multiprocessing.html#multiprocessing-safe-main-import).\n", "\n", - "### Why do I see the following error: `AttributeError: Can't get attribute 'YOUR_TASK_CLASS' on `?\n", + "### Why do I see the following error: `RunnerError: Unable to submit YourTaskType tasks to SpawnProcessRunner because the task type is defined in the __main__ module from an interactive Python session`?\n", "\n", - "You will see this error (as part of a very long stack trace) when\n", - "running a Lab with `runner_backend='spawn'` (the default on macOS and\n", - "Windows) from an interactive Python shell.\n", + "You may see this error when running a Lab with `runner_backend='spawn'`\n", + "(the default on macOS and Windows) from an interactive Python shell\n", + "(e.g. a Jupyter notebook session or a Python script).\n", "\n", - "The solution to this error is to define all of your labtech `Task` types\n", - "in a separate `.py` Python module file which you can import into your\n", - "interactive shell session (e.g. `from my_module import MyTask`).\n", + "The solution to this error is to define all of the classes you are using\n", + "from your labtech context and tasks (including task types) in a separate\n", + "`.py` Python module file which you can import into your interactive\n", + "shell session (e.g. `from my_module import MyClass`).\n", "\n", "The reason for this error is that “spawned” task subprocesses will not\n", "receive a copy the current state of your `__main__` module (which\n", "contains the variables you declare interactively in the Python shell,\n", - "including task definitions). This error does not occur with\n", + "including class definitions). This error does not occur with\n", "`runner_backend='fork'` (the default on Linux) because forked\n", "subprocesses *do* receive the current state of all modules (including\n", - "`__main__`) from the parent process." + "`__main__`) from the parent process.\n", + "\n", + "### Why do I see the following error: `AttributeError: Can't get attribute 'YOUR_CLASS' on `?\n", + "\n", + "[See the answer to the question directly\n", + "above.](#spawn-interactive-main)" ], - "id": "63ad8a10-fe70-461d-8087-56a3e300531c" + "id": "7f9a928e-163e-4f4b-a269-dd22c13801da" } ], "nbformat": 4, diff --git a/examples/tutorial.ipynb b/examples/tutorial.ipynb index a125298..9495193 100644 --- a/examples/tutorial.ipynb +++ b/examples/tutorial.ipynb @@ -15,7 +15,7 @@ "Before we begin, let’s install `labtech` along with some other\n", "dependencies we will use in this tutorial:" ], - "id": "2d2a9b34-b2b2-4048-abaa-f6d0f45e9288" + "id": "dbc7b3ab-ba2b-4315-99a6-fb5674004b80" }, { "cell_type": "code", @@ -25,7 +25,7 @@ "source": [ "%pip install labtech mlflow scikit-learn" ], - "id": "711274c8-add0-4e88-9bce-3db22e7f1174" + "id": "b8195763-04fa-42b3-a9d8-f02c8ea93f4c" }, { "cell_type": "markdown", @@ -34,7 +34,7 @@ "Let’s also clear any caches that were created by previous runs of this\n", "tutorial:" ], - "id": "d11d3ff3-5fe0-420f-81b5-6339ae76e028" + "id": "5e56dc98-d48e-45c4-95a6-0f3e6a9b823d" }, { "cell_type": "code", @@ -45,7 +45,7 @@ "!rm -rf storage/tutorial/\n", "!mkdir -p storage/tutorial/" ], - "id": "58b8ed8d-9ce1-4c54-b6e0-c05789dc762c" + "id": "1fe3b275-69c1-4dcd-822a-04aff6f75d1b" }, { "cell_type": "markdown", @@ -56,7 +56,7 @@ "To get started, we’ll take the following simple machine learning\n", "experiment code and convert it to be run with labtech." ], - "id": "7fd5137c-c633-4fd3-8319-bed9a9a3620d" + "id": "5d8dba7c-0bd6-4b5c-9ee7-88c19efbc2e2" }, { "cell_type": "code", @@ -81,7 +81,7 @@ "\n", "print(f'{log_loss(digits_y, prob_y) = :.3}')" ], - "id": "6560b421-93c5-499b-99b9-3c6fe02e9533" + "id": "2773b39d-142b-4553-a556-0179475f16ea" }, { "cell_type": "markdown", @@ -119,7 +119,7 @@ "method that performs the experiment and returns its result (the\n", "predicted probabilities):" ], - "id": "378bf464-0012-403f-9b62-445b1d67cdcb" + "id": "6f17b621-5db6-45ee-bbf9-b53c681bf3aa" }, { "cell_type": "code", @@ -144,7 +144,7 @@ " prob_y = clf.predict_proba(digits_X)\n", " return prob_y" ], - "id": "8a375c74-8dd1-4838-94ff-eeb5a3c1df63" + "id": "7adfeec1-d287-4132-a15c-b250c04d6aec" }, { "cell_type": "markdown", @@ -155,7 +155,7 @@ "`storage/tutorial/classification_lab_1` and to display notebook-friendly\n", "progress bars:" ], - "id": "b79cb5be-18e9-49cf-b10a-ebd2d9307ea9" + "id": "d8ca5ab4-225c-4e6b-9f3e-bce5c0577c96" }, { "cell_type": "code", @@ -165,7 +165,7 @@ "source": [ "lab = labtech.Lab(storage='storage/tutorial/classification_lab_1')" ], - "id": "b3ba5acf-f1b0-43e5-b513-5a0844c39bf4" + "id": "dbb95e33-1872-44fc-b0ac-7e9b112cf5de" }, { "cell_type": "markdown", @@ -176,7 +176,7 @@ "probabilities returned by the task’s `run()` method, so we can calculate\n", "the loss from them as before:" ], - "id": "28e799ad-b37f-485a-9205-53ac39f14525" + "id": "96edd5ce-911b-4409-95bf-cff81b9a5c20" }, { "cell_type": "code", @@ -188,7 +188,7 @@ "prob_y = lab.run_task(classifier_experiment)\n", "print(f'{log_loss(digits_y, prob_y) = :.3}')" ], - "id": "2404f4bb-1448-44b7-925f-a290c99dba4c" + "id": "9ff8b9b1-d5da-47c8-b646-4eef012b3c33" }, { "cell_type": "markdown", @@ -199,7 +199,7 @@ "calls to run the same experiment (even after restarting Python) will\n", "load the result from the cache:" ], - "id": "8f357f93-98ab-4110-bff3-41f8aa20cc9c" + "id": "c8f597d1-6bbd-4437-9c3e-3c3201b863d8" }, { "cell_type": "code", @@ -210,7 +210,7 @@ "prob_y = lab.run_task(classifier_experiment)\n", "print(f'{log_loss(digits_y, prob_y) = :.3}')" ], - "id": "15975fa5-7b33-4359-be2c-35e6b6702479" + "id": "cea1e579-a42d-4a12-a350-7402794b7d98" }, { "cell_type": "markdown", @@ -227,7 +227,7 @@ "(or we could pass a list of tasks to `lab.run_tasks()`, as we will see\n", "in the next section of this tutorial)." ], - "id": "4bd3abc6-8c1a-4fd9-9d49-16336c552565" + "id": "1822bc40-56a1-4087-bcf8-f84cb10ee12b" }, { "cell_type": "code", @@ -239,7 +239,7 @@ " ClassifierExperiment,\n", "])" ], - "id": "d047c769-0310-4578-b7b6-5c92ba4c6718" + "id": "a6e02945-f9b1-4c5a-86c3-f1cd91ede65f" }, { "cell_type": "markdown", @@ -257,7 +257,7 @@ "You may like to save storage space by clearing up old cached results\n", "with `lab.uncache_tasks()`:" ], - "id": "e2a2ef05-6298-4df7-b951-f7dde9a3f599" + "id": "582ac56e-b453-4c59-8b9a-456ece3520c7" }, { "cell_type": "code", @@ -269,7 +269,7 @@ " classifier_experiment,\n", "])" ], - "id": "ef7da846-0ed5-4db7-8b6b-1451f66bb657" + "id": "5b0568ec-164a-4d1d-aba9-7e13cae98cf0" }, { "cell_type": "markdown", @@ -286,7 +286,7 @@ "the same way as\n", "[dataclass](https://docs.python.org/3/library/dataclasses.html) fields:" ], - "id": "b8d8545a-751d-4302-b8c9-f20cbfba3de2" + "id": "e2174d79-df76-4005-8507-f23b9d813c21" }, { "cell_type": "code", @@ -312,7 +312,7 @@ " prob_y = clf.predict_proba(digits_X)\n", " return prob_y" ], - "id": "eca46278-fa75-4574-9a10-30499c3b00f1" + "id": "0a43d7ac-a617-4102-b6d8-2423daacf586" }, { "cell_type": "markdown", @@ -321,7 +321,7 @@ "Now we’ll use a list comprehension to construct a list of\n", "`ClassifierExperiment` tasks with different `n_estimators` values:" ], - "id": "67ceaa46-a471-43ef-8cf9-a205a0fe4a14" + "id": "e8efbf82-7f83-4f08-b905-069d130d1594" }, { "cell_type": "code", @@ -336,7 +336,7 @@ " for n_estimators in range(1, 11)\n", "]" ], - "id": "ee86c550-e398-4d86-a051-a901852e2938" + "id": "fd429f38-11b8-4024-a9cb-b4263dbf45c6" }, { "cell_type": "markdown", @@ -350,7 +350,7 @@ "caches for the new definition separate by constructing a new lab that\n", "uses a different storage directory:" ], - "id": "6ae023f0-8598-461f-a2f0-d05e4d332865" + "id": "71d49540-ef23-4c51-bdee-2cb26a4fccb7" }, { "cell_type": "code", @@ -361,7 +361,7 @@ "lab = labtech.Lab(storage='storage/tutorial/classification_lab_2')\n", "results = lab.run_tasks(classifier_experiments)" ], - "id": "cbdeb082-8d17-4022-ae14-876bca5fc75a" + "id": "1fd32624-624e-441e-8f2c-85b1b23d2450" }, { "cell_type": "markdown", @@ -371,7 +371,7 @@ "result it returned, which we can loop over to print loss metrics for\n", "each experiment:" ], - "id": "e2416e76-1c18-49ea-8686-95aec7254916" + "id": "150ee979-de21-408d-9149-99b610be2f6f" }, { "cell_type": "code", @@ -382,7 +382,7 @@ "for experiment, prob_y in results.items():\n", " print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')" ], - "id": "9ab73459-3b30-4c4c-a128-13c52488f37f" + "id": "88578678-f9ca-4162-b8a2-b56836ecf7be" }, { "cell_type": "markdown", @@ -408,7 +408,7 @@ "allowing us to access the result from the `.result` attribute of the\n", "task parameter (i.e. `self.classifier_experiment.result`):" ], - "id": "2abae8a6-2b3e-45b9-b91f-9b11ca358cb4" + "id": "b9c04fd0-2833-4ac8-9fa3-6f7d7235d0f3" }, { "cell_type": "code", @@ -431,7 +431,7 @@ " min_max_prob_y[np.arange(len(prob_y)), prob_y.argmax(axis=1)] = 1\n", " return min_max_prob_y" ], - "id": "c886b332-faa8-4fa8-9047-1a31fb6be2f4" + "id": "1cf5eebe-8ed5-4b75-ae89-6e7fdac254d3" }, { "cell_type": "markdown", @@ -444,7 +444,7 @@ "`MinMaxProbabilityExperiment` is run, re-using results depended on by\n", "multiple tasks and loading previously cached results wherever possible:" ], - "id": "1bf7347f-ea9b-4023-9f00-af93670d353f" + "id": "4b939912-c8eb-4159-9e49-aace448559b6" }, { "cell_type": "code", @@ -463,7 +463,7 @@ "for experiment, prob_y in results.items():\n", " print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')" ], - "id": "f51b8877-dfb8-4f4f-a58c-390c6d139858" + "id": "35216676-61f9-4283-a5d9-959a36992424" }, { "cell_type": "markdown", @@ -506,7 +506,7 @@ " `ClassifierExperiment` tasks, the `run()` method first creates\n", " its own copy of the classifier with `clone()`." ], - "id": "ca1788e2-1eb6-4bb0-b764-de76009ecbc6" + "id": "90657fa9-9671-48f5-b294-909fddcef14a" }, { "cell_type": "code", @@ -560,7 +560,7 @@ " prob_y = clf.predict_proba(digits_X)\n", " return prob_y" ], - "id": "9f3934b7-738c-4e16-917d-ec5d446e3136" + "id": "46ff6847-4fd8-4816-8935-e2c566afeb26" }, { "cell_type": "markdown", @@ -571,7 +571,7 @@ "for each of these `RFClassifierTask` tasks as well as an\n", "`LRClassifierTask` task:" ], - "id": "6c49872c-a7bb-40df-a7fd-41a721ff3a1c" + "id": "d6cef3b7-0d86-48be-8d30-4e539c0d444e" }, { "cell_type": "code", @@ -601,7 +601,7 @@ "for experiment, prob_y in results.items():\n", " print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')" ], - "id": "f59b8d72-aaa9-4f09-8fd3-20c06344ce49" + "id": "879cd7df-28f9-4006-92ac-0688b98d43ad" }, { "cell_type": "markdown", @@ -615,7 +615,7 @@ "our experiments) outside of any task, allowing us to inspect these\n", "datasets before and after the tasks have been run:" ], - "id": "58a0099a-18aa-46b0-a12d-45b2aa027535" + "id": "8ee4b630-ff85-41ef-8bd8-dba599f8c97b" }, { "cell_type": "code", @@ -631,7 +631,7 @@ " 'iris': {'X': iris_X, 'y': iris_y},\n", "}" ], - "id": "71970b53-23a0-40c6-affb-f45476ec9092" + "id": "5dd087ae-7f74-4c3b-81ce-d8a25ff0057b" }, { "cell_type": "markdown", @@ -648,7 +648,7 @@ "3. Alter the task generation and evaluation code to handle multiple\n", " datasets." ], - "id": "83eb7eda-06d5-4a6d-8c44-59d9041c6005" + "id": "059dbe50-6ab1-482c-8d23-2d0ca21ed2ae" }, { "cell_type": "code", @@ -694,7 +694,7 @@ " dataset_y = DATASETS[experiment.dataset_key][\"y\"]\n", " print(f'{experiment}: {log_loss(dataset_y, prob_y) = :.3}')" ], - "id": "717aa712-f655-437f-96fd-6523dab6dd53" + "id": "b713f8f5-4717-481b-b4ce-ec87e37b21d4" }, { "cell_type": "markdown", @@ -740,7 +740,7 @@ " `mlflow.set_experiment('example_labtech_experiment')` before the\n", " tasks are run." ], - "id": "cc698fd0-98f7-4bfd-962b-3daed974515f" + "id": "a7732a92-ad04-4f1f-af86-f4828ea19468" }, { "cell_type": "code", @@ -912,7 +912,7 @@ "for experiment, result in evaluation_result.items():\n", " print(f'{experiment}: log_loss = {result[\"log_loss\"]:.3}')" ], - "id": "99aa111c-1a86-467b-b9ab-2d5dd81ec678" + "id": "b80d78d9-2cf8-47c3-b80c-993af5d39d4f" }, { "cell_type": "markdown", @@ -923,7 +923,7 @@ "Finally, we can use Labtech to generate a diagram of a list of tasks\n", "that shows all of the task types, parameters, and dependencies:" ], - "id": "14838576-79ea-41f6-b179-974d89cb65d2" + "id": "5ca210ed-0242-4fbe-822e-276831374546" }, { "cell_type": "code", @@ -937,7 +937,7 @@ " evaluation_task,\n", "], direction='BT')" ], - "id": "de344d04-26e3-4e8c-bc0e-2a6807f4e23d" + "id": "67646f0b-819f-4bb4-b127-cf4b7d042483" }, { "cell_type": "markdown", @@ -999,7 +999,7 @@ "- [More\n", " examples](https://github.com/ben-denham/labtech/tree/main/examples)" ], - "id": "59d2db01-5ae0-4626-b0f6-7a327d957992" + "id": "52db5238-0aca-4253-b277-30528b20c06b" } ], "nbformat": 4, From cc4fc972d9648f1001593a33ed61f04fe2fd3361 Mon Sep 17 00:00:00 2001 From: Ben Denham Date: Mon, 2 Jun 2025 12:36:16 +1200 Subject: [PATCH 10/10] Update docs about memory usage --- docs/cookbook.md | 17 ++++-- examples/cookbook.ipynb | 121 +++++++++++++++++++++------------------- examples/tutorial.ipynb | 86 ++++++++++++++-------------- 3 files changed, 119 insertions(+), 105 deletions(-) diff --git a/docs/cookbook.md b/docs/cookbook.md index c8e8816..e8f1bcf 100644 --- a/docs/cookbook.md +++ b/docs/cookbook.md @@ -672,8 +672,15 @@ define a [`filter_context()`][labtech.types.Task.filter_context] in order to only pass necessary parts of the context to each task. If you are running a Lab with with `runner_backend='fork'` (the -default on Linux), then you can rely on Labtech to share results and -context between task processes using shared memory. +default on Linux), then you can rely on Labtech to share the context +between task processes using shared memory. Furthermore, +`runner_backend='fork-per-task'` will also share task results between +processes using shared memory, but at the cost of forking a new +subprocess for each task - `runner_backend='fork-per-task'` is best +used when dependency task results are large (so time will be saved +through memory sharing) compared to the overall number of tasks (for +large numbers of tasks, forking a separate process for each may be a +substantial overhead). ### How can I see when a task was run and how long it took to execute? @@ -688,9 +695,9 @@ print(f'The task execution took: {aggregation_task.result_meta.duration}') ### How can I access the results of intermediate/dependency tasks? -To conserve memory, labtech's default behaviour is to unload the -results of intermediate/dependency tasks once their directly dependent -tasks have finished executing. +To conserve memory, labtech unloads the results of +intermediate/dependency tasks once their directly dependent tasks have +finished executing. A simple approach to access the results of an intermediate task may simply be to include it's results as part of the result of the task diff --git a/examples/cookbook.ipynb b/examples/cookbook.ipynb index f59faa8..c4ad341 100644 --- a/examples/cookbook.ipynb +++ b/examples/cookbook.ipynb @@ -11,7 +11,7 @@ "You can also run this cookbook as an [interactive\n", "notebook](https://mybinder.org/v2/gh/ben-denham/labtech/main?filepath=examples/cookbook.ipynb)." ], - "id": "fb1b0e78-7c15-4c31-8ed0-aee1fccf2aa2" + "id": "018a57d7-11f4-490b-b726-e6c183758c1b" }, { "cell_type": "code", @@ -21,7 +21,7 @@ "source": [ "%pip install labtech fsspec mlflow pandas scikit-learn setuptools" ], - "id": "b65cb2a4-4725-472e-9ba4-7d26469eb73b" + "id": "03016d94-70c8-4dde-890e-20d7fd2aea33" }, { "cell_type": "code", @@ -31,7 +31,7 @@ "source": [ "!mkdir storage" ], - "id": "045df8d2-4a2e-4882-addb-425c0d6b010c" + "id": "b63bc804-ff16-4f9a-aa57-adfa0477f713" }, { "cell_type": "code", @@ -50,7 +50,7 @@ "digits_X, digits_y = datasets.load_digits(return_X_y=True)\n", "digits_X = StandardScaler().fit_transform(digits_X)" ], - "id": "c8421b93-ba49-4018-aa0b-e60c8696291a" + "id": "63c832d8-2ba5-4cbd-8cd8-6cdc8dd5ae41" }, { "cell_type": "markdown", @@ -64,7 +64,7 @@ "is sent to `STDOUT` (e.g. calls to `print()`) or `STDERR` (e.g. uncaught\n", "exceptions) will also be captured and logged:" ], - "id": "acc6dce7-c01c-4272-8683-1ad4126aac3e" + "id": "9690c1c2-ba16-4ea7-aa57-36796b01dec2" }, { "cell_type": "code", @@ -91,7 +91,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(experiments)" ], - "id": "1139f529-6be4-46d6-8d24-6403b24482a2" + "id": "7503db7d-0335-4502-836a-2150e48b1927" }, { "cell_type": "markdown", @@ -130,7 +130,7 @@ "learning model (like `LRClassifierTask` below), and then make a task of\n", "that type a parameter for your primary experiment task:" ], - "id": "2e1b31df-8536-48ea-a30a-bf2ed4c455d8" + "id": "2df30640-45bf-4d43-9528-33e2ca826db8" }, { "cell_type": "code", @@ -171,7 +171,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks([experiment])" ], - "id": "7da1cd65-d7b2-45bb-b7d5-3f58b59aa192" + "id": "37132698-d3f7-49b4-b17b-77727d4ae493" }, { "cell_type": "markdown", @@ -182,7 +182,7 @@ "[Protocol](https://docs.python.org/3/library/typing.html#typing.Protocol)\n", "that defines their common result type:" ], - "id": "8e1dcd85-6d94-4d98-a169-0dbd92b6facb" + "id": "18373985-4a11-4a4a-9102-f38894034e07" }, { "cell_type": "code", @@ -242,7 +242,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(experiments)" ], - "id": "b660b25d-d759-4f65-91fd-40062b67d4b3" + "id": "5e68d591-4290-4688-9b09-30ed6f26341e" }, { "cell_type": "markdown", @@ -262,7 +262,7 @@ "> `Enum` must support equality between identical (but distinct) object\n", "> instances." ], - "id": "cb773f2f-0f02-4254-b0bc-39867d0cbb24" + "id": "439c38de-bf0f-4e55-9002-f94645754724" }, { "cell_type": "code", @@ -310,7 +310,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(experiments)" ], - "id": "fa3d794a-fc3c-4413-a397-5563fe92fec0" + "id": "99a44858-f1fd-4afe-8a7b-78249dc99ccd" }, { "cell_type": "markdown", @@ -333,7 +333,7 @@ "The following example demonstrates specifying a `dataset_key` parameter\n", "to a task that is used to look up a dataset from the lab context:" ], - "id": "65b8bb54-9ed6-425f-b38b-fc986911e325" + "id": "49b5a378-0716-486b-98c0-7f83100c4d74" }, { "cell_type": "code", @@ -370,7 +370,7 @@ ")\n", "results = lab.run_tasks(experiments)" ], - "id": "215bf95d-dd89-406d-baa9-d9f1c9f4272f" + "id": "0f312541-c489-40d9-90fc-6125b3b47591" }, { "cell_type": "markdown", @@ -388,7 +388,7 @@ "cross-validation within the task using a number of workers specified in\n", "the lab context as `within_task_workers`:" ], - "id": "6e6a4356-3081-451e-8dc0-59ca74c73283" + "id": "a1e3b0d3-183a-4dc6-a78a-47385fd17180" }, { "cell_type": "code", @@ -429,7 +429,7 @@ ")\n", "results = lab.run_tasks(experiments)" ], - "id": "0e14d9b3-ca31-4999-957b-d655f2eff507" + "id": "0f769852-2804-4c92-8160-1dc9f8ce11ce" }, { "cell_type": "markdown", @@ -456,7 +456,7 @@ "raised during the execution of a task will be logged, but the execution\n", "of other tasks will continue:" ], - "id": "b6944b5d-517d-4046-b924-d5ab1fcf9d6c" + "id": "094a2009-e173-4e0e-b3c9-7baf023641e3" }, { "cell_type": "code", @@ -469,7 +469,7 @@ " continue_on_failure=True,\n", ")" ], - "id": "db0bb66e-6c66-4b50-b3ef-8babd5d3b798" + "id": "a62c7ae5-91e4-46b4-a35d-56ab1db69d29" }, { "cell_type": "markdown", @@ -488,7 +488,7 @@ "sub-class for that extension so that you can continue using caches for\n", "the base class:" ], - "id": "76e200ae-f22d-463d-bf4b-39b550be3504" + "id": "f3f9b209-6fbd-4e5c-afcd-612ea0653d93" }, { "cell_type": "code", @@ -512,7 +512,7 @@ " base_result = super().run()\n", " return base_result * self.multiplier" ], - "id": "c57b89b6-db19-4b32-89c9-5176dd638669" + "id": "0c8dbd57-b80c-4346-bda7-537877e92c71" }, { "cell_type": "markdown", @@ -524,7 +524,7 @@ "all cached task instances for a list of task types. You can then “run”\n", "the tasks to load their cached results:" ], - "id": "0a54dfe7-7977-40ef-8f2a-d471bdc0412f" + "id": "94c8f311-8ded-46ac-a693-d696b9c9fb25" }, { "cell_type": "code", @@ -535,7 +535,7 @@ "cached_cvexperiment_tasks = lab.cached_tasks([CVExperiment])\n", "results = lab.run_tasks(cached_cvexperiment_tasks)" ], - "id": "973cd6c4-8fe5-4e8b-8694-34d0aa019446" + "id": "90c62001-0620-4f58-95c9-b95f31cc38bd" }, { "cell_type": "markdown", @@ -546,7 +546,7 @@ "You can clear the cache for a list of tasks using the `uncache_tasks()`\n", "method of a `Lab` instance:" ], - "id": "3a834ec6-3f5a-4661-899f-528aa2d77524" + "id": "96458c81-c7dc-4bba-ab39-849571597207" }, { "cell_type": "code", @@ -556,7 +556,7 @@ "source": [ "lab.uncache_tasks(cached_cvexperiment_tasks)" ], - "id": "f2188c69-e1c3-4dee-a1a1-f4b38ad32bc8" + "id": "1eaa11f0-de72-42f7-a157-221256acfaf8" }, { "cell_type": "markdown", @@ -565,7 +565,7 @@ "You can also ignore all previously cached results when running a list of\n", "tasks by passing the `bust_cache` option to `run_tasks()`:" ], - "id": "5cfee59f-cf7a-457a-bdd1-32fea5ee81b9" + "id": "f7ad7ac6-e3ef-4850-acb5-8a8ec24ec6e4" }, { "cell_type": "code", @@ -575,7 +575,7 @@ "source": [ "lab.run_tasks(cached_cvexperiment_tasks, bust_cache=True)" ], - "id": "bad4fc13-ad3b-4864-8eb7-9759e3e61693" + "id": "d9bc2ae9-6d7b-4424-83a1-b70c1b744b6d" }, { "cell_type": "markdown", @@ -587,7 +587,7 @@ "(i.e. most changes to the `run()` method or the code it depends on) you\n", "should add or updated the `code_version` in `@task`. For example:" ], - "id": "008c4ff3-5e49-469a-9e44-aa830d13a98e" + "id": "15457df7-41dd-4a07-a7d9-bb794f323b67" }, { "cell_type": "code", @@ -599,7 +599,7 @@ "class Experiment:\n", " ..." ], - "id": "09194c41-7c71-4cbf-9345-b050e1628ef7" + "id": "77d7e97f-d244-45ff-8f35-5fb6d8a34013" }, { "cell_type": "markdown", @@ -614,7 +614,7 @@ "results where the `code_version` does not match the\n", "`current_code_version`:" ], - "id": "43f9dceb-bb61-466c-bb03-8b9a462e8f2a" + "id": "558730bd-60d8-4016-b81c-278f3270ebcd" }, { "cell_type": "code", @@ -632,7 +632,7 @@ "]\n", "lab.uncache_tasks(stale_cached_tasks)" ], - "id": "2355ad82-fce5-4f15-b016-3816faf7a6f8" + "id": "1bee9543-e6c8-4d12-aaba-f0af650a1134" }, { "cell_type": "markdown", @@ -656,7 +656,7 @@ "consider using a\n", "[`TypeDict`](https://docs.python.org/3/library/typing.html#typing.TypedDict):" ], - "id": "83760fcc-042d-40ed-9cd4-f51e9f80b685" + "id": "af017217-bd24-47a4-b577-a338aecacaf3" }, { "cell_type": "code", @@ -683,7 +683,7 @@ " model_weights=np.array([self.seed, self.seed ** 2]),\n", " )" ], - "id": "478fff35-aaf9-4e2f-b83c-26f2d1dfaad0" + "id": "72cb728b-20d3-4e47-b1fa-a00f7fd6f7c5" }, { "cell_type": "markdown", @@ -701,7 +701,7 @@ "The following example demonstrates defining and using a custom cache\n", "type to store Pandas DataFrames as parquet files:" ], - "id": "e847ff3c-258d-4b3a-994c-b5dbc41d8267" + "id": "66b47a9a-3deb-495c-84d1-7e7dac8aab7d" }, { "cell_type": "code", @@ -743,7 +743,7 @@ "lab = labtech.Lab(storage='storage/parquet_example')\n", "lab.run_tasks([TabularTask()])" ], - "id": "d9519dd7-1305-429f-a9fd-1264cd86e1cc" + "id": "e5ec6c85-34ba-4df5-8936-191c96688b0a" }, { "cell_type": "markdown", @@ -765,7 +765,7 @@ "storage providers like [Azure Blob\n", "Storage](https://github.com/fsspec/adlfs)." ], - "id": "e2744532-5a16-4190-8a72-5951699e6326" + "id": "5c19dc03-4ded-4569-904a-54e58d796ef7" }, { "cell_type": "code", @@ -775,7 +775,7 @@ "source": [ "%pip install s3fs" ], - "id": "cc94a96e-55e9-4524-8cc2-5e8e7a3d1089" + "id": "3e71090e-6555-4fa8-a2e9-edca1ce6e193" }, { "cell_type": "code", @@ -819,7 +819,7 @@ ")\n", "results = lab.run_tasks(experiments)" ], - "id": "257e379a-40e9-422b-9951-971567430df1" + "id": "fabb8e24-cd23-4073-a340-7421d6e3517d" }, { "cell_type": "markdown", @@ -850,7 +850,7 @@ "`AggregationTask` to aggregate the results from many individual tasks to\n", "create an aggregated cache that can be loaded more efficiently:" ], - "id": "d30f8c48-8b2c-489a-bffb-e252a342ecb4" + "id": "dd941f48-da9b-4299-88d9-2871c8425c78" }, { "cell_type": "code", @@ -891,7 +891,7 @@ "lab = labtech.Lab(storage='storage/aggregation_lab')\n", "result = lab.run_task(aggregation_task)" ], - "id": "7c81fd6c-5755-4702-a3c1-17d0564858f9" + "id": "72ba856c-2b12-491c-bd02-220fd5da8012" }, { "cell_type": "markdown", @@ -910,8 +910,15 @@ "only pass necessary parts of the context to each task.\n", "\n", "If you are running a Lab with with `runner_backend='fork'` (the default\n", - "on Linux), then you can rely on Labtech to share results and context\n", - "between task processes using shared memory.\n", + "on Linux), then you can rely on Labtech to share the context between\n", + "task processes using shared memory. Furthermore,\n", + "`runner_backend='fork-per-task'` will also share task results between\n", + "processes using shared memory, but at the cost of forking a new\n", + "subprocess for each task - `runner_backend='fork-per-task'` is best used\n", + "when dependency task results are large (so time will be saved through\n", + "memory sharing) compared to the overall number of tasks (for large\n", + "numbers of tasks, forking a separate process for each may be a\n", + "substantial overhead).\n", "\n", "### How can I see when a task was run and how long it took to execute?\n", "\n", @@ -919,7 +926,7 @@ "it was originally executed and how long it took to execute from the\n", "task’s `.result_meta` attribute:" ], - "id": "37b1b669-e6d0-4bb2-81f1-1be00ceb3047" + "id": "3a70d99f-5623-4a28-8c5d-7638b433266c" }, { "cell_type": "code", @@ -930,7 +937,7 @@ "print(f'The task was executed at: {aggregation_task.result_meta.start}')\n", "print(f'The task execution took: {aggregation_task.result_meta.duration}')" ], - "id": "9160cd5a-ca4e-43ab-ada0-bcfcd708a4b4" + "id": "57ab3724-e692-4dd3-a6a5-79c8d5764864" }, { "cell_type": "markdown", @@ -938,9 +945,9 @@ "source": [ "### How can I access the results of intermediate/dependency tasks?\n", "\n", - "To conserve memory, labtech’s default behaviour is to unload the results\n", - "of intermediate/dependency tasks once their directly dependent tasks\n", - "have finished executing.\n", + "To conserve memory, labtech unloads the results of\n", + "intermediate/dependency tasks once their directly dependent tasks have\n", + "finished executing.\n", "\n", "A simple approach to access the results of an intermediate task may\n", "simply be to include it’s results as part of the result of the task that\n", @@ -950,7 +957,7 @@ "Another approach is to include all of the intermediate tasks for which\n", "you wish to access the results for in the call to `run_tasks()`:" ], - "id": "31edb972-a422-4aa2-b532-bef239cd0504" + "id": "1645b811-e319-43b6-bd7d-b0cf94fb2ed5" }, { "cell_type": "code", @@ -978,7 +985,7 @@ " for experiment in experiments\n", "])" ], - "id": "a68e5143-9d15-4692-b751-10125599af6e" + "id": "a5a73bed-a845-415b-bbc4-ec2e2a0cf2c4" }, { "cell_type": "markdown", @@ -994,7 +1001,7 @@ "This is modeled in labtech by defining a task type for each step, and\n", "having each step depend on the result from the previous step:" ], - "id": "74f2c64c-6ead-4253-a063-5c8b15a87437" + "id": "707ce19f-7485-4af4-89c8-5573d61e3f9d" }, { "cell_type": "code", @@ -1044,7 +1051,7 @@ "result = lab.run_task(task_c)\n", "print(result)" ], - "id": "b2bf5fe1-c87c-4fde-a8c2-7bfe25d161e9" + "id": "f2b1a32d-3202-436f-b2f3-cd7cf963ee74" }, { "cell_type": "markdown", @@ -1056,7 +1063,7 @@ "[Mermaid diagram](https://mermaid.js.org/syntax/classDiagram.html) of\n", "task types for a given list of tasks:" ], - "id": "8059c0fd-2d64-4b60-8e63-f723545eca1b" + "id": "00f1c855-33a3-446c-abec-33ad9114bc87" }, { "cell_type": "code", @@ -1071,7 +1078,7 @@ " direction='RL',\n", ")" ], - "id": "d1baadba-329d-42bf-98df-56eea1c3d6a6" + "id": "15d93d7b-2a49-455a-a8c0-e5f78c5c1bab" }, { "cell_type": "markdown", @@ -1095,7 +1102,7 @@ "additional tracking calls (such as `mlflow.log_metric()` or\n", "`mlflow.log_model()`) in the body of your task’s `run()` method:" ], - "id": "9307ee42-9141-4c3b-aaf5-55ab62d432da" + "id": "3309dd84-6557-496b-adac-9a8072e754e6" }, { "cell_type": "code", @@ -1142,7 +1149,7 @@ "lab = labtech.Lab(storage=None)\n", "results = lab.run_tasks(runs)" ], - "id": "bcc62e6c-3040-4259-a69f-5179ced0df9a" + "id": "6ea43d30-4ec6-4487-baec-57c89f552923" }, { "cell_type": "markdown", @@ -1191,7 +1198,7 @@ "non-definition code for a Python script in a `main()` function, and then\n", "guard the call to `main()` with `__name__ == '__main__'`:" ], - "id": "28bc0b4d-9c72-4ccb-8c60-7d5bf71fa4c7" + "id": "e955e54a-e479-4467-9d5a-14975d39ff24" }, { "cell_type": "code", @@ -1222,7 +1229,7 @@ "if __name__ == '__main__':\n", " main()" ], - "id": "17513a58-5bdd-482d-8004-f114d1ebd588" + "id": "5be23e12-734c-4e38-bcfa-86f1e0a9624a" }, { "cell_type": "markdown", @@ -1255,7 +1262,7 @@ "[See the answer to the question directly\n", "above.](#spawn-interactive-main)" ], - "id": "7f9a928e-163e-4f4b-a269-dd22c13801da" + "id": "7f15e42b-1c86-4c68-87b6-c162559c5347" } ], "nbformat": 4, diff --git a/examples/tutorial.ipynb b/examples/tutorial.ipynb index 9495193..9c3edd3 100644 --- a/examples/tutorial.ipynb +++ b/examples/tutorial.ipynb @@ -15,7 +15,7 @@ "Before we begin, let’s install `labtech` along with some other\n", "dependencies we will use in this tutorial:" ], - "id": "dbc7b3ab-ba2b-4315-99a6-fb5674004b80" + "id": "c02059a1-5983-4a6b-a1c8-cf30fa5dc97a" }, { "cell_type": "code", @@ -25,7 +25,7 @@ "source": [ "%pip install labtech mlflow scikit-learn" ], - "id": "b8195763-04fa-42b3-a9d8-f02c8ea93f4c" + "id": "55a83c61-7be7-412e-9752-119bc993d356" }, { "cell_type": "markdown", @@ -34,7 +34,7 @@ "Let’s also clear any caches that were created by previous runs of this\n", "tutorial:" ], - "id": "5e56dc98-d48e-45c4-95a6-0f3e6a9b823d" + "id": "2cc1deac-2f64-4611-9525-5b27e852c8dd" }, { "cell_type": "code", @@ -45,7 +45,7 @@ "!rm -rf storage/tutorial/\n", "!mkdir -p storage/tutorial/" ], - "id": "1fe3b275-69c1-4dcd-822a-04aff6f75d1b" + "id": "80d4c40c-a541-474c-9104-eb3a8f29b29c" }, { "cell_type": "markdown", @@ -56,7 +56,7 @@ "To get started, we’ll take the following simple machine learning\n", "experiment code and convert it to be run with labtech." ], - "id": "5d8dba7c-0bd6-4b5c-9ee7-88c19efbc2e2" + "id": "3409c920-4c4d-4af5-802e-29ecbbd86886" }, { "cell_type": "code", @@ -81,7 +81,7 @@ "\n", "print(f'{log_loss(digits_y, prob_y) = :.3}')" ], - "id": "2773b39d-142b-4553-a556-0179475f16ea" + "id": "472d811c-e437-474e-b50a-dfac4ad4f684" }, { "cell_type": "markdown", @@ -119,7 +119,7 @@ "method that performs the experiment and returns its result (the\n", "predicted probabilities):" ], - "id": "6f17b621-5db6-45ee-bbf9-b53c681bf3aa" + "id": "1fea2cc0-c772-4ffb-9244-5fcbbbe7d609" }, { "cell_type": "code", @@ -144,7 +144,7 @@ " prob_y = clf.predict_proba(digits_X)\n", " return prob_y" ], - "id": "7adfeec1-d287-4132-a15c-b250c04d6aec" + "id": "6a17aed9-a8fe-4202-b470-e8e3cb6a989f" }, { "cell_type": "markdown", @@ -155,7 +155,7 @@ "`storage/tutorial/classification_lab_1` and to display notebook-friendly\n", "progress bars:" ], - "id": "d8ca5ab4-225c-4e6b-9f3e-bce5c0577c96" + "id": "f7970afd-7549-473d-a933-3c8fb7af293e" }, { "cell_type": "code", @@ -165,7 +165,7 @@ "source": [ "lab = labtech.Lab(storage='storage/tutorial/classification_lab_1')" ], - "id": "dbb95e33-1872-44fc-b0ac-7e9b112cf5de" + "id": "76ffe679-ad49-4656-abdf-d1a75e7e1a6e" }, { "cell_type": "markdown", @@ -176,7 +176,7 @@ "probabilities returned by the task’s `run()` method, so we can calculate\n", "the loss from them as before:" ], - "id": "96edd5ce-911b-4409-95bf-cff81b9a5c20" + "id": "7b400a63-52bd-42e4-bcd9-91e401ead91a" }, { "cell_type": "code", @@ -188,7 +188,7 @@ "prob_y = lab.run_task(classifier_experiment)\n", "print(f'{log_loss(digits_y, prob_y) = :.3}')" ], - "id": "9ff8b9b1-d5da-47c8-b646-4eef012b3c33" + "id": "02ab72c7-e8fe-441a-afdf-4b37abd3be71" }, { "cell_type": "markdown", @@ -199,7 +199,7 @@ "calls to run the same experiment (even after restarting Python) will\n", "load the result from the cache:" ], - "id": "c8f597d1-6bbd-4437-9c3e-3c3201b863d8" + "id": "da5bbe14-be2d-4c3a-b348-3361998c48e2" }, { "cell_type": "code", @@ -210,7 +210,7 @@ "prob_y = lab.run_task(classifier_experiment)\n", "print(f'{log_loss(digits_y, prob_y) = :.3}')" ], - "id": "cea1e579-a42d-4a12-a350-7402794b7d98" + "id": "feb26567-f71b-42e9-8228-598673b4c0e7" }, { "cell_type": "markdown", @@ -227,7 +227,7 @@ "(or we could pass a list of tasks to `lab.run_tasks()`, as we will see\n", "in the next section of this tutorial)." ], - "id": "1822bc40-56a1-4087-bcf8-f84cb10ee12b" + "id": "f2df1d77-ac18-434e-8009-0bd97d23591a" }, { "cell_type": "code", @@ -239,7 +239,7 @@ " ClassifierExperiment,\n", "])" ], - "id": "a6e02945-f9b1-4c5a-86c3-f1cd91ede65f" + "id": "2da17b35-add0-4d3d-be2b-f0163d318d3a" }, { "cell_type": "markdown", @@ -257,7 +257,7 @@ "You may like to save storage space by clearing up old cached results\n", "with `lab.uncache_tasks()`:" ], - "id": "582ac56e-b453-4c59-8b9a-456ece3520c7" + "id": "af684fc9-ccce-42de-94d4-7f3761cf8bff" }, { "cell_type": "code", @@ -269,7 +269,7 @@ " classifier_experiment,\n", "])" ], - "id": "5b0568ec-164a-4d1d-aba9-7e13cae98cf0" + "id": "b451fc76-ecb3-4531-9069-d3cbc38b3ea5" }, { "cell_type": "markdown", @@ -286,7 +286,7 @@ "the same way as\n", "[dataclass](https://docs.python.org/3/library/dataclasses.html) fields:" ], - "id": "e2174d79-df76-4005-8507-f23b9d813c21" + "id": "20e53d43-a028-4dba-8527-530b6209cbf9" }, { "cell_type": "code", @@ -312,7 +312,7 @@ " prob_y = clf.predict_proba(digits_X)\n", " return prob_y" ], - "id": "0a43d7ac-a617-4102-b6d8-2423daacf586" + "id": "a650f30a-4747-44cb-86d9-b42a4e70bad7" }, { "cell_type": "markdown", @@ -321,7 +321,7 @@ "Now we’ll use a list comprehension to construct a list of\n", "`ClassifierExperiment` tasks with different `n_estimators` values:" ], - "id": "e8efbf82-7f83-4f08-b905-069d130d1594" + "id": "3c670ff2-5805-4cd1-b0fe-b6cf0c8c79ff" }, { "cell_type": "code", @@ -336,7 +336,7 @@ " for n_estimators in range(1, 11)\n", "]" ], - "id": "fd429f38-11b8-4024-a9cb-b4263dbf45c6" + "id": "e1721efc-15d1-440d-8cfd-fe7594d66b69" }, { "cell_type": "markdown", @@ -350,7 +350,7 @@ "caches for the new definition separate by constructing a new lab that\n", "uses a different storage directory:" ], - "id": "71d49540-ef23-4c51-bdee-2cb26a4fccb7" + "id": "8171c8fa-41e0-49b1-ab20-807dd40b618d" }, { "cell_type": "code", @@ -361,7 +361,7 @@ "lab = labtech.Lab(storage='storage/tutorial/classification_lab_2')\n", "results = lab.run_tasks(classifier_experiments)" ], - "id": "1fd32624-624e-441e-8f2c-85b1b23d2450" + "id": "da9e0411-4bc4-428d-8b62-4c1cc0e2a1d4" }, { "cell_type": "markdown", @@ -371,7 +371,7 @@ "result it returned, which we can loop over to print loss metrics for\n", "each experiment:" ], - "id": "150ee979-de21-408d-9149-99b610be2f6f" + "id": "9b13c6ca-d6e5-4c31-b808-032fdc93797a" }, { "cell_type": "code", @@ -382,7 +382,7 @@ "for experiment, prob_y in results.items():\n", " print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')" ], - "id": "88578678-f9ca-4162-b8a2-b56836ecf7be" + "id": "6da4888c-f5c3-4ff6-b844-116476de93ab" }, { "cell_type": "markdown", @@ -408,7 +408,7 @@ "allowing us to access the result from the `.result` attribute of the\n", "task parameter (i.e. `self.classifier_experiment.result`):" ], - "id": "b9c04fd0-2833-4ac8-9fa3-6f7d7235d0f3" + "id": "99375805-d882-4fd3-8a29-3c017c0354cc" }, { "cell_type": "code", @@ -431,7 +431,7 @@ " min_max_prob_y[np.arange(len(prob_y)), prob_y.argmax(axis=1)] = 1\n", " return min_max_prob_y" ], - "id": "1cf5eebe-8ed5-4b75-ae89-6e7fdac254d3" + "id": "d0658ca3-b867-4f97-a2b1-3bbf4f7327a6" }, { "cell_type": "markdown", @@ -444,7 +444,7 @@ "`MinMaxProbabilityExperiment` is run, re-using results depended on by\n", "multiple tasks and loading previously cached results wherever possible:" ], - "id": "4b939912-c8eb-4159-9e49-aace448559b6" + "id": "65fbbd0f-b452-46b1-99ed-7e474633ba29" }, { "cell_type": "code", @@ -463,7 +463,7 @@ "for experiment, prob_y in results.items():\n", " print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')" ], - "id": "35216676-61f9-4283-a5d9-959a36992424" + "id": "9bfd9f10-82bb-4b4a-ab7b-7d10d3482d87" }, { "cell_type": "markdown", @@ -506,7 +506,7 @@ " `ClassifierExperiment` tasks, the `run()` method first creates\n", " its own copy of the classifier with `clone()`." ], - "id": "90657fa9-9671-48f5-b294-909fddcef14a" + "id": "640591ed-88d2-4604-a636-d15238c3178b" }, { "cell_type": "code", @@ -560,7 +560,7 @@ " prob_y = clf.predict_proba(digits_X)\n", " return prob_y" ], - "id": "46ff6847-4fd8-4816-8935-e2c566afeb26" + "id": "e1044da1-abb7-444f-b0ed-7e25b5686ac5" }, { "cell_type": "markdown", @@ -571,7 +571,7 @@ "for each of these `RFClassifierTask` tasks as well as an\n", "`LRClassifierTask` task:" ], - "id": "d6cef3b7-0d86-48be-8d30-4e539c0d444e" + "id": "06834351-1466-40ea-9d8e-c9bc9c8b0131" }, { "cell_type": "code", @@ -601,7 +601,7 @@ "for experiment, prob_y in results.items():\n", " print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')" ], - "id": "879cd7df-28f9-4006-92ac-0688b98d43ad" + "id": "b918c6e3-d7da-421a-88ae-e585ccfe4e06" }, { "cell_type": "markdown", @@ -615,7 +615,7 @@ "our experiments) outside of any task, allowing us to inspect these\n", "datasets before and after the tasks have been run:" ], - "id": "8ee4b630-ff85-41ef-8bd8-dba599f8c97b" + "id": "eafb68f6-c38b-4d6f-b21d-5b7de5e9f59e" }, { "cell_type": "code", @@ -631,7 +631,7 @@ " 'iris': {'X': iris_X, 'y': iris_y},\n", "}" ], - "id": "5dd087ae-7f74-4c3b-81ce-d8a25ff0057b" + "id": "b8b8baf8-3895-4550-bbc1-cc7233335dae" }, { "cell_type": "markdown", @@ -648,7 +648,7 @@ "3. Alter the task generation and evaluation code to handle multiple\n", " datasets." ], - "id": "059dbe50-6ab1-482c-8d23-2d0ca21ed2ae" + "id": "b108e730-4cc9-433b-a30e-38140bff7093" }, { "cell_type": "code", @@ -694,7 +694,7 @@ " dataset_y = DATASETS[experiment.dataset_key][\"y\"]\n", " print(f'{experiment}: {log_loss(dataset_y, prob_y) = :.3}')" ], - "id": "b713f8f5-4717-481b-b4ce-ec87e37b21d4" + "id": "99a0e7d5-5dec-44d9-bd91-2f3b654e4ce3" }, { "cell_type": "markdown", @@ -740,7 +740,7 @@ " `mlflow.set_experiment('example_labtech_experiment')` before the\n", " tasks are run." ], - "id": "a7732a92-ad04-4f1f-af86-f4828ea19468" + "id": "aabb242f-232b-42c5-9c9b-a4a9774cb76d" }, { "cell_type": "code", @@ -912,7 +912,7 @@ "for experiment, result in evaluation_result.items():\n", " print(f'{experiment}: log_loss = {result[\"log_loss\"]:.3}')" ], - "id": "b80d78d9-2cf8-47c3-b80c-993af5d39d4f" + "id": "fa57fec5-dde8-4ce9-8a14-43a559c2c23a" }, { "cell_type": "markdown", @@ -923,7 +923,7 @@ "Finally, we can use Labtech to generate a diagram of a list of tasks\n", "that shows all of the task types, parameters, and dependencies:" ], - "id": "5ca210ed-0242-4fbe-822e-276831374546" + "id": "1189903a-b819-4e30-8a0f-f8dd9c4f901c" }, { "cell_type": "code", @@ -937,7 +937,7 @@ " evaluation_task,\n", "], direction='BT')" ], - "id": "67646f0b-819f-4bb4-b127-cf4b7d042483" + "id": "7a6fd33d-011b-4c22-9377-1142f847d422" }, { "cell_type": "markdown", @@ -999,7 +999,7 @@ "- [More\n", " examples](https://github.com/ben-denham/labtech/tree/main/examples)" ], - "id": "52db5238-0aca-4253-b277-30528b20c06b" + "id": "f97c3e93-0930-4939-b2fd-b66a63146860" } ], "nbformat": 4,