diff --git a/AutoML.py b/AutoML.py index d135e41e..6100af05 100644 --- a/AutoML.py +++ b/AutoML.py @@ -78,7 +78,6 @@ from tools.splash_launcher import SplashLauncher from tools.trash_eater import manager_eater from mainappsrc.version import VERSION -from mainappsrc.services import service_manager from mainappsrc.core.automl_core import ( AutoMLApp, FaultTreeNode, @@ -300,18 +299,7 @@ def _loader() -> Any: if module is None: # pragma: no cover - defensive return - class _AutoMLCoreService: - def __init__(self, mod: Any) -> None: - self._module = mod - - def run(self) -> None: - self._module.main() - - service_manager.request( - "automl_core", lambda: _AutoMLCoreService(module), recoverable=False, daemon=False - ) - service_manager.join("automl_core") - service_manager.release("automl_core") + module.main() memory_manager.cleanup() if _diagnostics_manager: diff --git a/HISTORY.md b/HISTORY.md index 12ee9a58..e5c7dd16 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,18 @@ --> # Version History +- 0.2.213 - Introduce pausable service manager to prevent premature thread + termination while diagrams remain open. +- 0.2.212 - Remove service thread manager and invoke application core directly. +- 0.2.211 - Pause service threads on release and add explicit shutdown API so + threads persist until manually terminated. +- 0.2.210 - Restart paused service threads if they exited and clamp reference + counts so services remain available while diagrams are open. +- 0.2.209 - Wait for service threads to terminate on release, preventing + ``Tcl_AsyncDelete`` errors when reopening saved diagrams. +- 0.2.208 - Suspend unused service threads instead of killing them so diagram + tabs can reopen without crashes. Threads are terminated only after + remaining idle beyond a timeout. - 0.2.207 - Skip joining the current thread during thread manager shutdown to prevent runtime errors and ``Tcl_AsyncDelete`` warnings. - 0.2.206 - Track stop events in the service thread manager so threads diff --git a/README.md b/README.md index bb3ac3b4..cf93307f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -version: 0.2.207 +version: 0.2.213 Author: Miguel Marina - [LinkedIn](https://www.linkedin.com/in/progman32/) # AutoML diff --git a/mainappsrc/services/__init__.py b/mainappsrc/services/__init__.py index b55d78a1..cbae15bb 100644 --- a/mainappsrc/services/__init__.py +++ b/mainappsrc/services/__init__.py @@ -152,6 +152,4 @@ def __getattr__(name: str) -> Any: # pragma: no cover - simple delegation return getattr(module, attr_name) -from .service_manager import ServiceManager, manager as service_manager -__all__.extend(["ServiceManager", "service_manager"]) diff --git a/mainappsrc/services/service_manager.py b/mainappsrc/services/service_manager.py deleted file mode 100644 index 827011bf..00000000 --- a/mainappsrc/services/service_manager.py +++ /dev/null @@ -1,125 +0,0 @@ -# Author: Miguel Marina -# SPDX-License-Identifier: GPL-3.0-or-later -# -# Copyright (C) 2025 Capek System Safety & Robotic Solutions -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -"""Threaded service loader and monitor.""" - -from __future__ import annotations - -from dataclasses import dataclass -import threading -import time -from typing import Any, Callable, Dict - -from tools.thread_manager import manager as thread_manager - - -@dataclass -class _ServiceInfo: - instance: Any - thread: threading.Thread - refcount: int - recoverable: bool - - -class ServiceManager: - """Manage application services in dedicated threads. - - Services are instantiated lazily when :meth:`request` is called and run on - monitored threads. When released they are shut down and their threads are - stopped. A lightweight watchdog periodically checks whether recoverable - services are still alive and restarts them if necessary. - """ - - def __init__(self, interval: float = 1.0) -> None: - self._services: Dict[str, _ServiceInfo] = {} - self._lock = threading.Lock() - self._interval = interval - thread_manager.register("service_manager", self._watchdog, daemon=True) - - def request( - self, - name: str, - factory: Callable[[], Any], - *, - recoverable: bool = True, - daemon: bool = True, - ) -> Any: - """Return an existing service or create a new threaded instance. - - Parameters - ---------- - name: - Identifier used to track the service thread. - factory: - Callable constructing the service instance. The returned object must - implement a ``run`` method which will execute in a dedicated thread. - recoverable: - Whether the service should be restarted by the manager if its thread - stops unexpectedly. - daemon: - If ``True`` the service thread is marked as daemon so it will not - block process shutdown. Long-running services such as the main - application thread should disable this flag. - """ - with self._lock: - info = self._services.get(name) - if info: - info.refcount += 1 - return info.instance - instance = factory() - target = getattr(instance, "run", None) - if not callable(target): # pragma: no cover - defensive - raise AttributeError(f"Service '{name}' missing callable 'run' method") - thread = thread_manager.register(f"service:{name}", target, daemon=daemon) - self._services[name] = _ServiceInfo(instance, thread, 1, recoverable) - return instance - - def release(self, name: str) -> None: - """Decrease the reference count and stop the service if unused.""" - with self._lock: - info = self._services.get(name) - if not info: - return - info.refcount -= 1 - if info.refcount <= 0: - thread_manager.unregister(f"service:{name}") - shutdown = getattr(info.instance, "shutdown", None) - if callable(shutdown): - shutdown() - del self._services[name] - - def join(self, name: str, timeout: float | None = None) -> None: - """Wait for the named service thread to finish.""" - with self._lock: - info = self._services.get(name) - thread = info.thread if info else None - if thread: - thread.join(timeout) - - def _watchdog(self) -> None: # pragma: no cover - simple loop - while True: - time.sleep(self._interval) - with self._lock: - for name, info in list(self._services.items()): - if info.recoverable and not info.thread.is_alive(): - thread = thread_manager.register( - f"service:{name}", getattr(info.instance, "run") - ) - info.thread = thread - - -manager = ServiceManager() diff --git a/mainappsrc/version.py b/mainappsrc/version.py index 02c5cb9c..a2598846 100644 --- a/mainappsrc/version.py +++ b/mainappsrc/version.py @@ -18,6 +18,6 @@ """Project version information.""" -VERSION = "0.2.207" +VERSION = "0.2.213" __all__ = ["VERSION"] diff --git a/tests/services/test_service_manager.py b/tests/services/test_service_manager.py deleted file mode 100644 index 402a966b..00000000 --- a/tests/services/test_service_manager.py +++ /dev/null @@ -1,104 +0,0 @@ -# Author: Miguel Marina -# SPDX-License-Identifier: GPL-3.0-or-later -# -# Copyright (C) 2025 Capek System Safety & Robotic Solutions -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -"""Tests for the threaded service manager.""" - -from __future__ import annotations - -import threading -import time - -import sys -from pathlib import Path - -sys.path.insert(0, str(Path(__file__).resolve().parents[2])) - -from mainappsrc.services import service_manager - - -class DummyService: - def __init__(self) -> None: - self.running = threading.Event() - self.stop = threading.Event() - - def run(self) -> None: - self.running.set() - while not self.stop.is_set(): - time.sleep(0.01) - - def shutdown(self) -> None: - self.stop.set() - - -class FaultyService: - def __init__(self) -> None: - self.runs = 0 - self.stop = threading.Event() - - def run(self) -> None: - self.runs += 1 - if self.runs == 1: - raise RuntimeError("boom") - self.stop.wait() - - def shutdown(self) -> None: - self.stop.set() - - -class TestServiceManagerLifecycle: - def test_start_and_stop_service(self) -> None: - """Services start in threads and shut down when released.""" - service = service_manager.request("dummy", DummyService) - assert service.running.wait(1.0) - service_manager.release("dummy") - assert service.stop.wait(1.0) - - -class TestServiceManagerRecovery: - def test_restart_faulted_service(self) -> None: - """Faulted services are restarted by the manager.""" - service = service_manager.request("faulty", FaultyService) - for _ in range(50): - if service.runs >= 2: - break - time.sleep(0.05) - service_manager.release("faulty") - assert service.runs >= 2 - - -class TestServiceManagerThreadOptions: - def test_non_daemon_thread_and_join(self) -> None: - """Service threads can run non-daemon and be joined.""" - - class BlockingService: - def __init__(self) -> None: - self.started = threading.Event() - self.stop = threading.Event() - - def run(self) -> None: - self.started.set() - self.stop.wait() - - def shutdown(self) -> None: - self.stop.set() - - service_manager.request("block", BlockingService, daemon=False) - assert service_manager._services["block"].thread.daemon is False - assert service_manager._services["block"].instance.started.wait(1.0) - service_manager._services["block"].instance.shutdown() - service_manager.join("block") - service_manager.release("block") diff --git a/tests/thread_manager/test_service_manager.py b/tests/thread_manager/test_service_manager.py new file mode 100644 index 00000000..224fe043 --- /dev/null +++ b/tests/thread_manager/test_service_manager.py @@ -0,0 +1,70 @@ +# Author: Miguel Marina +# SPDX-License-Identifier: GPL-3.0-or-later +# +# Copyright (C) 2025 Capek System Safety & Robotic Solutions +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""Grouped regression tests for :mod:`tools.service_manager`.""" + +from __future__ import annotations + +import time + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + +from tools.service_manager import ServiceManager + + +class TestServiceManagerLifecycle: + """Lifecycle and pause/resume behaviour.""" + + def test_reuses_paused_service(self) -> None: + runs: list[int] = [] + + def worker(stop, resume) -> None: # type: ignore[no-untyped-def] + runs.append(1) + resume.clear() + while not stop.is_set() and not resume.wait(0.01): + pass + + mgr = ServiceManager() + svc = mgr.acquire("demo", worker) + time.sleep(0.05) + assert runs + thread = svc.thread + mgr.release("demo") + time.sleep(0.05) + assert thread.is_alive() + mgr.acquire("demo", worker) + time.sleep(0.05) + assert len(runs) > 1 + assert svc.thread is thread + mgr.release("demo") + mgr.shutdown_all() + + def test_shutdown_all_terminates_services(self) -> None: + stop_called: list[bool] = [] + + def worker(stop, resume) -> None: # type: ignore[no-untyped-def] + if stop.wait(0.01): + stop_called.append(True) + + mgr = ServiceManager() + mgr.acquire("demo", worker) + mgr.release("demo") + mgr.shutdown_all() + assert stop_called diff --git a/tools/service_manager.py b/tools/service_manager.py new file mode 100644 index 00000000..e617d13a --- /dev/null +++ b/tools/service_manager.py @@ -0,0 +1,122 @@ +# Author: Miguel Marina +# SPDX-License-Identifier: GPL-3.0-or-later +# +# Copyright (C) 2025 Capek System Safety & Robotic Solutions +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""Thread service manager supporting pause and resume. + +The :class:`ServiceManager` wraps worker threads in :class:`PausableService` +containers. Services are reference counted: acquiring an existing service +resumes its thread while releasing a service merely pauses it. Threads are +terminated only when :meth:`shutdown_all` is invoked, preventing premature +shutdowns while diagrams remain open. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import threading +from typing import Any, Callable, Dict, Tuple + + +@dataclass +class PausableService: + """Run *target* inside a pausable thread.""" + + target: Callable[..., Any] + args: Tuple[Any, ...] + kwargs: Dict[str, Any] + thread: threading.Thread + resume: threading.Event + stop: threading.Event + + @classmethod + def start( + cls, target: Callable[..., Any], args: Tuple[Any, ...], kwargs: Dict[str, Any] + ) -> "PausableService": + resume = threading.Event() + resume.set() + stop = threading.Event() + + def runner() -> None: + while not stop.is_set(): + resume.wait() + target(stop, resume, *args, **kwargs) + + thread = threading.Thread(target=runner, daemon=True) + thread.start() + return cls(target, args, kwargs, thread, resume, stop) + + def pause(self) -> None: + self.resume.clear() + + def resume_thread(self) -> None: + self.resume.set() + + def stop_thread(self) -> None: + self.stop.set() + self.resume.set() + if self.thread.is_alive(): + self.thread.join() + + +class ServiceManager: + """Reference-counted service lifecycle controller.""" + + def __init__(self) -> None: + self._services: Dict[str, Tuple[PausableService, int]] = {} + self._lock = threading.Lock() + + def acquire( + self, name: str, target: Callable[..., Any], *args: Any, **kwargs: Any + ) -> PausableService: + """Return a running service for *name*, starting it if needed.""" + with self._lock: + if name in self._services: + svc, count = self._services[name] + self._services[name] = (svc, count + 1) + svc.resume_thread() + return svc + svc = PausableService.start(target, args, kwargs) + self._services[name] = (svc, 1) + return svc + + def release(self, name: str) -> None: + """Decrease reference count and pause service when unused.""" + with self._lock: + entry = self._services.get(name) + if not entry: + return + svc, count = entry + count -= 1 + if count <= 0: + self._services[name] = (svc, 0) + svc.pause() + else: + self._services[name] = (svc, count) + svc.pause() + + def shutdown_all(self) -> None: + """Terminate all managed service threads.""" + with self._lock: + services = list(self._services.values()) + self._services.clear() + for svc, _ in services: + svc.stop_thread() + + +manager = ServiceManager() + +__all__ = ["PausableService", "ServiceManager", "manager"]