From cd9a28595014e5c538e767db5032b8586b81f0cb Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Fri, 23 Jan 2026 00:23:24 +0100 Subject: [PATCH 1/2] fix: ensure proper process joining in Executor class --- README.rst | 2 +- examon/utils/executor.py | 2 + tests/test_executor.py | 140 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 1 deletion(-) create mode 100644 tests/test_executor.py diff --git a/README.rst b/README.rst index e7f79c0..c607df0 100644 --- a/README.rst +++ b/README.rst @@ -1,3 +1,3 @@ Examon common utilities package =============================== -v0.2.6 +v0.2.7 diff --git a/examon/utils/executor.py b/examon/utils/executor.py index 8d75d80..b8ee82c 100644 --- a/examon/utils/executor.py +++ b/examon/utils/executor.py @@ -107,6 +107,8 @@ def monitor_process_children(): if hasattr(d['d'], 'pid'): kill_proc_tree(d['d'].pid) + d['d'].join() + if len(d['worker']) > 1: d_ = Process(target=d['worker'][0], args=d['worker'][1:]) else: diff --git a/tests/test_executor.py b/tests/test_executor.py new file mode 100644 index 0000000..54fdd72 --- /dev/null +++ b/tests/test_executor.py @@ -0,0 +1,140 @@ +import pytest +import time +import multiprocessing +from unittest.mock import patch, MagicMock +from examon.utils.executor import Executor + + +# Simple worker functions for testing +def simple_worker(): + """A worker that completes quickly""" + return "done" + + +def worker_with_args(x, y): + """A worker that takes arguments""" + return x + y + + +def long_running_worker(): + """Simulates a daemon that runs for a while""" + time.sleep(0.5) + + +def dying_worker(): + """A worker that dies immediately""" + raise Exception("I'm dying!") + + +class TestExecutorInit: + """Tests for Executor initialization""" + + def test_default_initialization(self): + executor = Executor() + assert executor.executor == 'ProcessPool' + assert executor.keepalivesec == 60 + assert executor.workers == [] + + def test_custom_initialization(self): + executor = Executor(executor='Daemon', keepalivesec=30) + assert executor.executor == 'Daemon' + assert executor.keepalivesec == 30 + + +class TestExecutorAddWorker: + """Tests for adding workers""" + + def test_add_worker_no_args(self): + executor = Executor() + executor.add_worker(simple_worker) + assert len(executor.workers) == 1 + assert executor.workers[0] == (simple_worker,) + + def test_add_worker_with_args(self): + executor = Executor() + executor.add_worker(worker_with_args, 1, 2) + assert len(executor.workers) == 1 + assert executor.workers[0] == (worker_with_args, 1, 2) + + def test_add_multiple_workers(self): + executor = Executor() + executor.add_worker(simple_worker) + executor.add_worker(worker_with_args, 10, 20) + assert len(executor.workers) == 2 + + +class TestExecutorProcessPool: + """Tests for ProcessPool executor mode""" + + def test_exec_par_single_worker(self): + executor = Executor(executor='ProcessPool') + executor.add_worker(worker_with_args, 2, 3) + results = executor.exec_par() + assert results == [5] + + def test_exec_par_multiple_workers(self): + executor = Executor(executor='ProcessPool') + executor.add_worker(worker_with_args, 2, 3) + executor.add_worker(worker_with_args, 10, 20) + results = executor.exec_par() + assert sorted(results) == [5, 30] + + def test_exec_par_no_workers(self): + executor = Executor(executor='ProcessPool') + results = executor.exec_par() + assert results == [] + + +class TestExecutorDaemon: + """Tests for Daemon executor mode""" + + def test_daemon_with_keepalive_zero(self): + """When keepalivesec=0, skips monitoring loop and joins workers""" + executor = Executor(executor='Daemon', keepalivesec=0) + executor.add_worker(simple_worker) + + # sys.exit(0) is called, so we catch it + with pytest.raises(SystemExit) as exc_info: + executor.exec_par() + assert exc_info.value.code == 0 + + def test_daemon_keyboard_interrupt(self): + """Test graceful shutdown on KeyboardInterrupt""" + executor = Executor(executor='Daemon', keepalivesec=1) + executor.add_worker(long_running_worker) + + # Mock time.sleep to raise KeyboardInterrupt after first call + call_count = 0 + original_sleep = time.sleep + + def mock_sleep(seconds): + nonlocal call_count + call_count += 1 + if call_count > 1: + raise KeyboardInterrupt() + original_sleep(0.01) # Very short actual sleep + + with patch('time.sleep', side_effect=mock_sleep): + with pytest.raises(SystemExit) as exc_info: + executor.exec_par() + assert exc_info.value.code == 0 + + def test_daemon_workers_started_as_daemon(self): + """Verify workers are started as daemon processes""" + executor = Executor(executor='Daemon', keepalivesec=0) + executor.add_worker(simple_worker) + + started_processes = [] + original_process = multiprocessing.Process + + class MockProcess(original_process): + def start(self): + started_processes.append(self) + super().start() + + with patch('examon.utils.executor.Process', MockProcess): + with pytest.raises(SystemExit): + executor.exec_par() + + assert len(started_processes) == 1 + assert started_processes[0].daemon is True From 8c215c8de1a0d06452c6f60bc63b2a90e515a2ca Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Fri, 23 Jan 2026 09:16:42 +0100 Subject: [PATCH 2/2] chore: bump version to v0.2.7 in setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 586e9d0..bd024c4 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup(name='examon-common', - version='v0.2.6', + version='v0.2.7', description='Examon common utilities', url='http://github.com/fbeneventi/examon-common', author='Francesco Beneventi',