diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8382f6d..b1d1ced 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,10 @@ jobs: run: pip install -e ".[dev]" - name: Run tests + env: + # is it really necessary? + SUBPROCESS_SAME_ENV_TEST: true + BATCH_JOB_TEST: true run: python -m pytest -v --cov src - name: Upload coverage reports to Codecov diff --git a/MANIFEST.in b/MANIFEST.in index 1c83c30..cd1f2ed 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ +include LICENSE +include tests/resources/dummy +include tests/resources/empty_runner.py recursive-include tests *.py recursive-include tests/resources/ *.yaml -include tests/resources/dummy -include LICENSE diff --git a/src/dakara_base/safe_workers.py b/src/dakara_base/safe_workers.py index 46dcd6c..11a57e8 100644 --- a/src/dakara_base/safe_workers.py +++ b/src/dakara_base/safe_workers.py @@ -31,6 +31,8 @@ import logging +import platform +import signal import sys from functools import wraps from queue import Empty, Queue @@ -429,6 +431,9 @@ class WorkerSafeThread(BaseWorker): initialization assigns its own thread to the instance and makes it target a dummy method. + If a method `run_main` is defined, it will be called in the main thread. It + takes the stop event end the errors queue as arguments. + Attributes: stop (threading.Event): Stop event that notify to stop the entire program when set. @@ -494,22 +499,26 @@ class Runner: """Runner class. The runner creates the stop event and errors queue. It is designed to - execute the thread of a `WorkerSafeThread` instance until an error occurs - or an user interruption pops out (Ctrl+C). + execute the thread of a `WorkerSafeThread` instance until the end of the + program. This execution is done by the `run_safe` method, that will use a + blocking function. By default, the blocking function waits for an error to + occur or a user interruption to pop out (Ctrl+C). It can be replaced if a + `run_main` function is passed to `run_safe`, or if the `run_main` method of + the used `WorkerSafeThread` class is defined. The blocking function must + accept a stop event and an error queue, and be blocking as long as the stop + event is not set. The initialization creates the stop event and the errors queue and calls the custom init method. Attributes: - POLLING_INTERVAL (float): For Windows only, interval between two - attempts to wait for the stop event. stop (threading.Event): Stop event that notify to stop the execution of the thread. errors (queue.Queue): Error queue to communicate the exception of the thread. """ - POLLING_INTERVAL = 0.5 + ERROR_TIMEOUT = 5 def __init__(self, *args, **kwargs): # create stop event @@ -525,65 +534,115 @@ def init_runner(self, *args, **kwargs): """Custom initialization stub.""" pass - def run_safe(self, WorkerClass, *args, **kwargs): + def run_safe(self, worker_class, args=None, kwargs=None, run_main=None): """Execute a WorkerSafeThread instance thread. The thread is executed and the method waits for the stop event to be - set or a user interruption to be triggered (Ctrl+C). + set or a user interruption to be triggered (Ctrl+C). An alternative + waiting function can be provided by `run_main`, or by the method + `run_main`. The choice of the function to run is: + + 1. Provided `run_main` function; + 2. Worker class `run_main` method; or + 3. Default module `wait` function. Args: - WorkerClass (WorkerSafeThread): Worker class with safe thread. + worker_class (WorkerSafeThread): Worker class with safe thread. Note you have to pass a custom class based on `WorkerSafeThread`. - Other arguments are passed to the thread of WorkerClass. + args (list): Positional arguments passed to the worker class constructor. + kwargs (dict): Named arguments passed to the worker class constructor. + run_main (function): Function to execute in the main thread. It + must accept the stop event and the errors queue, and must be + blocking as long as the stop event is not set. """ + if args is None: + args = () + + if kwargs is None: + kwargs = {} + + # register user interruption + def on_sigint(signal_number, frame): + logger.debug("Receiving signal %i to close", signal_number) + self.errors.put(None) + self.stop.set() + + signal.signal(signal.SIGINT, on_sigint) + + # create worker thread + with worker_class(self.stop, self.errors, *args, **kwargs) as worker: + logger.debug("Create worker thread") + worker.thread.start() + + # select blocking function + if run_main is not None: + # if a function is provided, run it + # it must ruturn when the stop event is set + block = run_main + + elif hasattr(worker, "run_main"): + # if the worker has a function to run, run it + # it must ruturn when the stop event is set + block = getattr(worker, "run_main") + + else: + block = wait + + # wait + block(self.stop, self.errors) + + # get the error from the error queue + # a delay of 5 seconds is accorded for the error to be retrieved try: - # create worker thread - with WorkerClass(self.stop, self.errors, *args, **kwargs) as worker: - - logger.debug("Create worker thread") - worker.thread.start() - - # wait for stop event - logger.debug("Waiting for stop event") - - # We have to use a different code for Windows because the - # Ctrl+C event will not be handled during `self.stop.wait()`. - # This method is blocking for Windows, not for Linux, which is - # due to the way Ctrl+C is differently handled by the two OSs. - # For Windows, a quick and dirty solution consists in polling - # the `self.stop.wait()` with a timeout argument, so the call - # is non-permanently blocking. - # More resources on this: - # https://mail.python.org/pipermail/python-dev/2017-August/148800.html - # https://stackoverflow.com/a/51954792/4584444 - if sys.platform.startswith("win"): - while not self.stop.is_set(): - self.stop.wait(self.POLLING_INTERVAL) - - else: - self.stop.wait() - - # stop on Ctrl+C - except KeyboardInterrupt: + reason = self.errors.get(timeout=self.ERROR_TIMEOUT) + + # if there is no error in the error queue, raise a general error + # this case is very unlikely to happen and is not tested + except Empty as empty_error: + raise EmptyErrorsQueueError("Unknown error happened") from empty_error + + # if there is no error, this is a normal interruption + if reason is None: logger.debug("User stop caught") - self.stop.set() + return + + _, error, traceback = reason + logger.debug("Internal error caught") + error.with_traceback(traceback) + raise error + + +def wait(stop, errors, interval=0.5): + """Wait for stop event to be set. + + We have to use a specific code for Windows because the Ctrl+C event will + not be handled during `self.stop.wait()`. This method is blocking for + Windows, not for Linux, which is due to the way Ctrl+C is differently + handled by the two OSs. For Windows, a quick and dirty solution consists + in polling the `self.stop.wait()` with a timeout argument, so the call is + non-permanently blocking. + + See also: + https://mail.python.org/pipermail/python-dev/2017-August/148800.html + https://stackoverflow.com/a/51954792/4584444 + + Args: + stop (threading.Event): Stop event. + errors (queue.Queue): Errors queue. + interval (float): For Windows only, interval between two attempts to + wait for the stop event. + """ + logger.debug("Waiting for stop event") - # stop on error - else: - logger.debug("Internal error caught") + if platform.system() == "Windows": + while not stop.is_set(): + stop.wait(interval) - # get the error from the error queue and re-raise it - # a delay of 5 seconds is accorded for the error to be retrieved - try: - _, error, traceback = self.errors.get(5) - error.with_traceback(traceback) - raise error + return - # if there is no error in the error queue, raise a general error - # this case is very unlikely to happen and is not tested - except Empty as empty_error: - raise NoErrorCaughtError("Unknown error happened") from empty_error + # otherwise just wait for the program to stop + stop.wait() class UnredefinedTimerError(DakaraError): @@ -602,7 +661,7 @@ class UnredefinedThreadError(DakaraError): """ -class NoErrorCaughtError(RuntimeError): +class EmptyErrorsQueueError(RuntimeError): """No error caught error. Error raised if the safe workers mechanism stops for an error, but there is diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/test_safe_workers.py b/tests/integration/test_safe_workers.py new file mode 100644 index 0000000..7065e99 --- /dev/null +++ b/tests/integration/test_safe_workers.py @@ -0,0 +1,79 @@ +import os +import platform +import signal +import subprocess +from importlib.resources import path +from time import sleep +from unittest import TestCase, skipIf, skipUnless + +from path import Path, TempDir + +import dakara_base # noqa F401 + + +class RunnerIntegrationTestCase(TestCase): + IS_SUBPROCESS_SAME_ENV = ( + platform.system() != "Windows" or "SUBPROCESS_SAME_ENV_TEST" in os.environ + ) + IS_BATCH_JOB = platform.system() == "Windows" and "BATCH_JOB_TEST" in os.environ + + @staticmethod + def wait_output(process, line, interval=0.1): + """Wait a process to output a line. + + Beware this method consumes the lines from the process, so they will + not be included in `process.communicate()`. + + Args: + process (subprocess.Popen): Process to evaluate. + line (str): Line of text to obtain to process output. + interval (float): Interval in seconds between two evaluations. + + Returns: + list of str: Lines outputed by the process before the expected one + showed up. + """ + lines = [] + while process.poll() is None: + sleep(interval) + out = process.stdout.readline() + if not out: + continue + + lines.append(out.strip()) + + if line in lines: + return lines + + @skipUnless( + IS_SUBPROCESS_SAME_ENV, + "Can only be tested if subprocess environment is same as current environment", + ) + @skipIf(IS_BATCH_JOB, "Can only be tested if script is not launched in batch job") + def test_run_safe_signal(self): + """Test to send an interruption signal to a runner.""" + with TempDir() as tempdir: + with path("tests.resources", "empty_runner.py") as resource_path: + file_path = Path(resource_path).copy(tempdir) + + process = subprocess.Popen( + ["python", "-u", str(file_path)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + start_lines = self.wait_output(process, "starting worker") + + system = platform.system() + if system == "Windows": + process.send_signal(signal.CTRL_C_EVENT) + + else: + process.send_signal(signal.SIGINT) + + out, _ = process.communicate() + end_lines = out.splitlines() + self.assertListEqual(start_lines, ["starting runner", "starting worker"]) + self.assertListEqual(end_lines, ["ending worker", "ending runner"]) + + self.assertEqual(process.returncode, 0) diff --git a/tests/resources/empty_runner.py b/tests/resources/empty_runner.py new file mode 100644 index 0000000..2469719 --- /dev/null +++ b/tests/resources/empty_runner.py @@ -0,0 +1,23 @@ +from dakara_base.safe_workers import Runner, WorkerSafeThread + + +class MyWorker(WorkerSafeThread): + def init_worker(self): + self.thread = self.create_thread(target=self.run_thread) + + def run_thread(self): + print("starting worker") + self.stop.wait() + print("ending worker") + + +class MyRunner(Runner): + def run(self): + print("starting runner") + self.run_safe(MyWorker) + print("ending runner") + + +if __name__ == "__main__": + runner = MyRunner() + runner.run() diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_config.py b/tests/unit/test_config.py similarity index 100% rename from tests/test_config.py rename to tests/unit/test_config.py diff --git a/tests/test_directory.py b/tests/unit/test_directory.py similarity index 100% rename from tests/test_directory.py rename to tests/unit/test_directory.py diff --git a/tests/test_exceptions.py b/tests/unit/test_exceptions.py similarity index 100% rename from tests/test_exceptions.py rename to tests/unit/test_exceptions.py diff --git a/tests/test_http_client.py b/tests/unit/test_http_client.py similarity index 100% rename from tests/test_http_client.py rename to tests/unit/test_http_client.py diff --git a/tests/test_progress_bar.py b/tests/unit/test_progress_bar.py similarity index 100% rename from tests/test_progress_bar.py rename to tests/unit/test_progress_bar.py diff --git a/tests/test_safe_workers.py b/tests/unit/test_safe_workers.py similarity index 86% rename from tests/test_safe_workers.py rename to tests/unit/test_safe_workers.py index 71e8150..153761c 100644 --- a/tests/test_safe_workers.py +++ b/tests/unit/test_safe_workers.py @@ -1,13 +1,13 @@ from contextlib import contextmanager from queue import Queue -from threading import Event, Timer +from threading import Event, Thread, Timer from time import sleep from unittest import TestCase -from unittest.mock import MagicMock from dakara_base.safe_workers import ( BaseSafeThread, BaseWorker, + EmptyErrorsQueueError, Runner, SafeThread, SafeTimer, @@ -17,6 +17,7 @@ WorkerSafeThread, WorkerSafeTimer, safe, + wait, ) @@ -539,43 +540,123 @@ def setUp(self): # create class to test self.runner = Runner() - def test_run_safe_interrupt(self): - """Test a run with an interruption by KeyboardInterrupt exception. + def test_run_safe_error(self): + """Test a run with an error. + + The run should raise a MyError, end with a set stop event and an + empty error queue. + """ + # pre assertions + self.assertFalse(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + # call the method + with self.assertRaises(MyError): + self.runner.run_safe(self.WorkerError) + + # post assertions + self.assertTrue(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + def test_run_safe_function_quit(self): + """Test a run with a function to execute on main thread. - The run should end with a set stop event and an empty errors queue. + The function simply exits. """ # pre assertions self.assertFalse(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) - # modify stop event wait method - self.runner.stop.wait = MagicMock() - self.runner.stop.wait.side_effect = KeyboardInterrupt + def function(stop, errors): + errors.put(None) + stop.set() # call the method - self.runner.run_safe(self.WorkerNormal) + self.runner.run_safe(self.WorkerNormal, run_main=function) # post assertions self.assertTrue(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) - # assert stop event wait method was called - self.runner.stop.wait.assert_called_once() + def test_run_safe_function_no_error(self): + """Test a run that sets stop event but does not give an error.""" + # pre assertions + self.assertFalse(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + def function(stop, errors): + stop.set() - def test_run_safe_error(self): - """Test a run with an error. + # call the method + with self.assertRaises(EmptyErrorsQueueError): + self.runner.ERROR_TIMEOUT = 0 + self.runner.run_safe(self.WorkerNormal, run_main=function) - The run should raise a MyError, end with a set stop event and an - empty error queue. + # post assertions + self.assertTrue(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + def test_run_safe_function_error(self): + """Test a run with a function to execute on main thread. + + The function raises an exception. """ # pre assertions self.assertFalse(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) + def function(stop, errors): + raise MyError("error") + # call the method with self.assertRaises(MyError): - self.runner.run_safe(self.WorkerError) + self.runner.run_safe(self.WorkerNormal, run_main=function) + + # post assertions + self.assertTrue(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + def test_run_safe_worker_function_error(self): + """Test a run with a worker function to execute on main thread. + + The function raises an exception. + """ + # pre assertions + self.assertFalse(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + class WorkerWithFunction(self.WorkerNormal): + def run_main(self, stop, errors): + raise MyError("error") + + # call the method + with self.assertRaises(MyError): + self.runner.run_safe(WorkerWithFunction) # post assertions self.assertTrue(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) + + +class WaitTestCase(TestCase): + DELAY = 0.5 + + def test_wait(self): + """Test to wait for an event.""" + stop = Event() + ended = Event() + + def work(): + wait(stop, None) + ended.set() + + thread = Thread(target=work) + thread.start() + + sleep(self.DELAY) + stop.set() + + thread.join() + + self.assertTrue(stop.is_set()) + self.assertTrue(ended.is_set()) diff --git a/tests/test_utils.py b/tests/unit/test_utils.py similarity index 100% rename from tests/test_utils.py rename to tests/unit/test_utils.py diff --git a/tests/test_websocket_client.py b/tests/unit/test_websocket_client.py similarity index 100% rename from tests/test_websocket_client.py rename to tests/unit/test_websocket_client.py