From bacd0755e819f74f230cb58a2d257eb0cad21dab Mon Sep 17 00:00:00 2001 From: Neraste Date: Fri, 6 May 2022 00:03:31 +0900 Subject: [PATCH 01/13] Allow a Runner to run a blocking function in main thread --- src/dakara_base/safe_workers.py | 55 ++++++++++++++++++++++++--------- tests/test_safe_workers.py | 41 ++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 15 deletions(-) diff --git a/src/dakara_base/safe_workers.py b/src/dakara_base/safe_workers.py index 46dcd6c..0ee3d30 100644 --- a/src/dakara_base/safe_workers.py +++ b/src/dakara_base/safe_workers.py @@ -31,6 +31,7 @@ import logging +import platform import sys from functools import wraps from queue import Empty, Queue @@ -429,6 +430,9 @@ class WorkerSafeThread(BaseWorker): initialization assigns its own thread to the instance and makes it target a dummy method. + If a method `run_function` is defined, it will be called in the main + thread. It must take no argument. + Attributes: stop (threading.Event): Stop event that notify to stop the entire program when set. @@ -525,21 +529,31 @@ def init_runner(self, *args, **kwargs): """Custom initialization stub.""" pass - def run_safe(self, WorkerClass, *args, **kwargs): + def run_safe(self, worker_class, args=None, kwargs=None, function=None): """Execute a WorkerSafeThread instance thread. The thread is executed and the method waits for the stop event to be set or a user interruption to be triggered (Ctrl+C). Args: - WorkerClass (WorkerSafeThread): Worker class with safe thread. + worker_class (WorkerSafeThread): Worker class with safe thread. Note you have to pass a custom class based on `WorkerSafeThread`. - Other arguments are passed to the thread of WorkerClass. + args (list): Positional arguments passed to the worker class constructor. + kwargs (dict): Named arguments passed to the worker class constructor. + function (function): Function to execute in the main thread. It + must accept the stop event and be blocking as long as the event + is not set. """ + if args is None: + args = () + + if kwargs is None: + kwargs = {} + try: # create worker thread - with WorkerClass(self.stop, self.errors, *args, **kwargs) as worker: + with worker_class(self.stop, self.errors, *args, **kwargs) as worker: logger.debug("Create worker thread") worker.thread.start() @@ -547,21 +561,32 @@ def run_safe(self, WorkerClass, *args, **kwargs): # wait for stop event logger.debug("Waiting for stop event") - # We have to use a different code for Windows because the - # Ctrl+C event will not be handled during `self.stop.wait()`. - # This method is blocking for Windows, not for Linux, which is - # due to the way Ctrl+C is differently handled by the two OSs. - # For Windows, a quick and dirty solution consists in polling - # the `self.stop.wait()` with a timeout argument, so the call - # is non-permanently blocking. - # More resources on this: - # https://mail.python.org/pipermail/python-dev/2017-August/148800.html - # https://stackoverflow.com/a/51954792/4584444 - if sys.platform.startswith("win"): + if function is not None: + # if a function is provided, run it + # it must ruturn when the stop event is set + function(self.stop) + + elif hasattr(worker, "run_function"): + # if the worker has a function to run, run it + # it must ruturn when the stop event is set + worker.run_function() + + elif platform.system() == "Windows": + # We have to use a specific code for Windows because the + # Ctrl+C event will not be handled during `self.stop.wait()`. + # This method is blocking for Windows, not for Linux, which is + # due to the way Ctrl+C is differently handled by the two OSs. + # For Windows, a quick and dirty solution consists in polling + # the `self.stop.wait()` with a timeout argument, so the call + # is non-permanently blocking. + # More resources on this: + # https://mail.python.org/pipermail/python-dev/2017-August/148800.html + # https://stackoverflow.com/a/51954792/4584444 while not self.stop.is_set(): self.stop.wait(self.POLLING_INTERVAL) else: + # by default, just wait for the program to stop self.stop.wait() # stop on Ctrl+C diff --git a/tests/test_safe_workers.py b/tests/test_safe_workers.py index 71e8150..76cee26 100644 --- a/tests/test_safe_workers.py +++ b/tests/test_safe_workers.py @@ -579,3 +579,44 @@ def test_run_safe_error(self): # post assertions self.assertTrue(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) + + def test_run_safe_function(self): + """Test a run with a function to execute on main thread. + + The function raises an exception. + """ + # pre assertions + self.assertFalse(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + def function(stop): + raise MyError("error") + + # call the method + with self.assertRaises(MyError): + self.runner.run_safe(self.WorkerNormal, function=function) + + # post assertions + self.assertTrue(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + def test_run_safe_worker_function(self): + """Test a run with a worker function to execute on main thread. + + The function raises an exception. + """ + # pre assertions + self.assertFalse(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + class WorkerWithFunction(self.WorkerNormal): + def run_function(self): + raise MyError("error") + + # call the method + with self.assertRaises(MyError): + self.runner.run_safe(WorkerWithFunction) + + # post assertions + self.assertTrue(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) From 2ba0716735736e523a46b1a888b4b047f065666e Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 8 May 2022 14:37:03 +0900 Subject: [PATCH 02/13] Allow to run function in main thread --- MANIFEST.in | 5 +- src/dakara_base/safe_workers.py | 133 ++++++++++++++----------- tests/resources/empty_runner.py | 23 +++++ tests/test_safe_workers.py | 71 ++++++++----- tests/test_safe_workers_integration.py | 60 +++++++++++ 5 files changed, 209 insertions(+), 83 deletions(-) create mode 100644 tests/resources/empty_runner.py create mode 100644 tests/test_safe_workers_integration.py diff --git a/MANIFEST.in b/MANIFEST.in index 1c83c30..cd1f2ed 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ +include LICENSE +include tests/resources/dummy +include tests/resources/empty_runner.py recursive-include tests *.py recursive-include tests/resources/ *.yaml -include tests/resources/dummy -include LICENSE diff --git a/src/dakara_base/safe_workers.py b/src/dakara_base/safe_workers.py index 0ee3d30..f9bd572 100644 --- a/src/dakara_base/safe_workers.py +++ b/src/dakara_base/safe_workers.py @@ -32,6 +32,7 @@ import logging import platform +import signal import sys from functools import wraps from queue import Empty, Queue @@ -430,8 +431,8 @@ class WorkerSafeThread(BaseWorker): initialization assigns its own thread to the instance and makes it target a dummy method. - If a method `run_function` is defined, it will be called in the main - thread. It must take no argument. + If a method `run_main` is defined, it will be called in the main thread. It + takes the stop event end the errors queue as arguments. Attributes: stop (threading.Event): Stop event that notify to stop the entire @@ -514,6 +515,7 @@ class Runner: """ POLLING_INTERVAL = 0.5 + ERROR_TIMEOUT = 5 def __init__(self, *args, **kwargs): # create stop event @@ -529,7 +531,7 @@ def init_runner(self, *args, **kwargs): """Custom initialization stub.""" pass - def run_safe(self, worker_class, args=None, kwargs=None, function=None): + def run_safe(self, worker_class, args=None, kwargs=None, run_main=None): """Execute a WorkerSafeThread instance thread. The thread is executed and the method waits for the stop event to be @@ -541,7 +543,7 @@ def run_safe(self, worker_class, args=None, kwargs=None, function=None): `WorkerSafeThread`. args (list): Positional arguments passed to the worker class constructor. kwargs (dict): Named arguments passed to the worker class constructor. - function (function): Function to execute in the main thread. It + run_main (function): Function to execute in the main thread. It must accept the stop event and be blocking as long as the event is not set. """ @@ -551,64 +553,79 @@ def run_safe(self, worker_class, args=None, kwargs=None, function=None): if kwargs is None: kwargs = {} + # register user interruption + def on_sigint(signal_number, frame): + logger.debug(f"Receiving signal {signal_number} to close") + self.errors.put(None) + self.stop.set() + + signal.signal(signal.SIGINT, on_sigint) + + # create worker thread + with worker_class(self.stop, self.errors, *args, **kwargs) as worker: + logger.debug("Create worker thread") + worker.thread.start() + + if run_main is not None: + # if a function is provided, run it + # it must ruturn when the stop event is set + run_main(self.stop, self.errors) + + elif hasattr(worker, "run_main"): + # if the worker has a function to run, run it + # it must ruturn when the stop event is set + worker.run_main() + + else: + wait(self.stop) + + # get the error from the error queue + # a delay of 5 seconds is accorded for the error to be retrieved try: - # create worker thread - with worker_class(self.stop, self.errors, *args, **kwargs) as worker: - - logger.debug("Create worker thread") - worker.thread.start() - - # wait for stop event - logger.debug("Waiting for stop event") - - if function is not None: - # if a function is provided, run it - # it must ruturn when the stop event is set - function(self.stop) - - elif hasattr(worker, "run_function"): - # if the worker has a function to run, run it - # it must ruturn when the stop event is set - worker.run_function() - - elif platform.system() == "Windows": - # We have to use a specific code for Windows because the - # Ctrl+C event will not be handled during `self.stop.wait()`. - # This method is blocking for Windows, not for Linux, which is - # due to the way Ctrl+C is differently handled by the two OSs. - # For Windows, a quick and dirty solution consists in polling - # the `self.stop.wait()` with a timeout argument, so the call - # is non-permanently blocking. - # More resources on this: - # https://mail.python.org/pipermail/python-dev/2017-August/148800.html - # https://stackoverflow.com/a/51954792/4584444 - while not self.stop.is_set(): - self.stop.wait(self.POLLING_INTERVAL) - - else: - # by default, just wait for the program to stop - self.stop.wait() - - # stop on Ctrl+C - except KeyboardInterrupt: + reason = self.errors.get(timeout=self.ERROR_TIMEOUT) + + # if there is no error in the error queue, raise a general error + # this case is very unlikely to happen and is not tested + except Empty as empty_error: + raise EmptyErrorsQueueError("Unknown error happened") from empty_error + + # if there is no error, this is a normal interruption + if reason is None: logger.debug("User stop caught") - self.stop.set() + return + + _, error, traceback = reason + logger.debug("Internal error caught") + error.with_traceback(traceback) + raise error + + +def wait(stop): + """Wait for stop event to be set. + + Args: + stop (threading.Event): Stop event. + """ + logger.debug("Waiting for stop event") - # stop on error - else: - logger.debug("Internal error caught") + if platform.system() == "Windows": + # We have to use a specific code for Windows because the + # Ctrl+C event will not be handled during `self.stop.wait()`. + # This method is blocking for Windows, not for Linux, which is + # due to the way Ctrl+C is differently handled by the two OSs. + # For Windows, a quick and dirty solution consists in polling + # the `self.stop.wait()` with a timeout argument, so the call + # is non-permanently blocking. + # More resources on this: + # https://mail.python.org/pipermail/python-dev/2017-August/148800.html + # https://stackoverflow.com/a/51954792/4584444 + while not stop.is_set(): + stop.wait(Runner.POLLING_INTERVAL) - # get the error from the error queue and re-raise it - # a delay of 5 seconds is accorded for the error to be retrieved - try: - _, error, traceback = self.errors.get(5) - error.with_traceback(traceback) - raise error + return - # if there is no error in the error queue, raise a general error - # this case is very unlikely to happen and is not tested - except Empty as empty_error: - raise NoErrorCaughtError("Unknown error happened") from empty_error + # otherwise just wait for the program to stop + stop.wait() class UnredefinedTimerError(DakaraError): @@ -627,7 +644,7 @@ class UnredefinedThreadError(DakaraError): """ -class NoErrorCaughtError(RuntimeError): +class EmptyErrorsQueueError(RuntimeError): """No error caught error. Error raised if the safe workers mechanism stops for an error, but there is diff --git a/tests/resources/empty_runner.py b/tests/resources/empty_runner.py new file mode 100644 index 0000000..2469719 --- /dev/null +++ b/tests/resources/empty_runner.py @@ -0,0 +1,23 @@ +from dakara_base.safe_workers import Runner, WorkerSafeThread + + +class MyWorker(WorkerSafeThread): + def init_worker(self): + self.thread = self.create_thread(target=self.run_thread) + + def run_thread(self): + print("starting worker") + self.stop.wait() + print("ending worker") + + +class MyRunner(Runner): + def run(self): + print("starting runner") + self.run_safe(MyWorker) + print("ending runner") + + +if __name__ == "__main__": + runner = MyRunner() + runner.run() diff --git a/tests/test_safe_workers.py b/tests/test_safe_workers.py index 76cee26..2d5673b 100644 --- a/tests/test_safe_workers.py +++ b/tests/test_safe_workers.py @@ -3,11 +3,11 @@ from threading import Event, Timer from time import sleep from unittest import TestCase -from unittest.mock import MagicMock from dakara_base.safe_workers import ( BaseSafeThread, BaseWorker, + EmptyErrorsQueueError, Runner, SafeThread, SafeTimer, @@ -17,6 +17,7 @@ WorkerSafeThread, WorkerSafeTimer, safe, + wait, ) @@ -539,48 +540,63 @@ def setUp(self): # create class to test self.runner = Runner() - def test_run_safe_interrupt(self): - """Test a run with an interruption by KeyboardInterrupt exception. + def test_run_safe_error(self): + """Test a run with an error. - The run should end with a set stop event and an empty errors queue. + The run should raise a MyError, end with a set stop event and an + empty error queue. """ # pre assertions self.assertFalse(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) - # modify stop event wait method - self.runner.stop.wait = MagicMock() - self.runner.stop.wait.side_effect = KeyboardInterrupt - # call the method - self.runner.run_safe(self.WorkerNormal) + with self.assertRaises(MyError): + self.runner.run_safe(self.WorkerError) # post assertions self.assertTrue(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) - # assert stop event wait method was called - self.runner.stop.wait.assert_called_once() - - def test_run_safe_error(self): - """Test a run with an error. + def test_run_safe_function_quit(self): + """Test a run with a function to execute on main thread. - The run should raise a MyError, end with a set stop event and an - empty error queue. + The function simply exits. """ # pre assertions self.assertFalse(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) + def function(stop, errors): + errors.put(None) + stop.set() + # call the method - with self.assertRaises(MyError): - self.runner.run_safe(self.WorkerError) + self.runner.run_safe(self.WorkerNormal, run_main=function) + + # post assertions + self.assertTrue(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + def test_run_safe_function_no_error(self): + """Test a run that sets stop event but does not give an error.""" + # pre assertions + self.assertFalse(self.runner.stop.is_set()) + self.assertTrue(self.runner.errors.empty()) + + def function(stop, errors): + stop.set() + + # call the method + with self.assertRaises(EmptyErrorsQueueError): + self.runner.ERROR_TIMEOUT = 0 + self.runner.run_safe(self.WorkerNormal, run_main=function) # post assertions self.assertTrue(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) - def test_run_safe_function(self): + def test_run_safe_function_error(self): """Test a run with a function to execute on main thread. The function raises an exception. @@ -589,18 +605,18 @@ def test_run_safe_function(self): self.assertFalse(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) - def function(stop): + def function(stop, errors): raise MyError("error") # call the method with self.assertRaises(MyError): - self.runner.run_safe(self.WorkerNormal, function=function) + self.runner.run_safe(self.WorkerNormal, run_main=function) # post assertions self.assertTrue(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) - def test_run_safe_worker_function(self): + def test_run_safe_worker_function_error(self): """Test a run with a worker function to execute on main thread. The function raises an exception. @@ -610,7 +626,7 @@ def test_run_safe_worker_function(self): self.assertTrue(self.runner.errors.empty()) class WorkerWithFunction(self.WorkerNormal): - def run_function(self): + def run_main(self): raise MyError("error") # call the method @@ -620,3 +636,12 @@ def run_function(self): # post assertions self.assertTrue(self.runner.stop.is_set()) self.assertTrue(self.runner.errors.empty()) + + +class WaitTestCase(TestCase): + def test_wait(self): + """Test to wait for an event.""" + stop = Event() + stop.set() + wait(stop) + self.assertTrue(stop.is_set()) diff --git a/tests/test_safe_workers_integration.py b/tests/test_safe_workers_integration.py new file mode 100644 index 0000000..283cdc7 --- /dev/null +++ b/tests/test_safe_workers_integration.py @@ -0,0 +1,60 @@ +import signal +import subprocess +from importlib.resources import path +from time import sleep +from unittest import TestCase + +from path import Path, TempDir + +import dakara_base # noqa F401 + + +class RunnerIntegrationTestCase(TestCase): + @staticmethod + def wait_output(process, line, interval=0.1): + """Wait a process to output a line. + + Beware this method consumes the lines from the process, so they will + not be included in `process.communicate()`. + + Args: + process (subprocess.Popen): Process to evaluate. + line (str): Line of text to obtain to process output. + interval (float): Interval in seconds between two evaluations. + + Returns: + list of str: Lines outputed by the process before the expected one + showed up. + """ + lines = [] + while process.poll() is None: + sleep(interval) + out = process.stdout.readline() + print(out) + if not out: + continue + + lines.append(out.strip()) + + if line in lines: + return lines + + def test_run_safe_signal(self): + """Test to send an interruption signal to a runner.""" + with TempDir() as tempdir: + with path("tests.resources", "empty_runner.py") as resource_path: + file_path = Path(resource_path).copy(tempdir) + + process = subprocess.Popen( + ["python", "-u", str(file_path)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + start_lines = self.wait_output(process, "starting worker") + process.send_signal(signal.SIGINT) + process.poll() + out, _ = process.communicate() + end_lines = out.splitlines() + self.assertListEqual(start_lines, ["starting runner", "starting worker"]) + self.assertListEqual(end_lines, ["ending worker", "ending runner"]) From 894042d753e89abaf70fca439f5e7519c872495d Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 8 May 2022 18:13:26 +0900 Subject: [PATCH 03/13] Skip runner integration test case if subprocess environment is different from main one --- .appveyor.yml | 3 +++ tests/test_safe_workers_integration.py | 13 +++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/.appveyor.yml b/.appveyor.yml index 4975ae0..222e826 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -100,6 +100,9 @@ for: - image: Visual Studio 2019 PYTHON: "3.10" + environment: + SUBPROCESS_SAME_ENV_TEST: true + cache: # enable cache for Python dependencies - "%LOCALAPPDATA%\\pip\\Cache" diff --git a/tests/test_safe_workers_integration.py b/tests/test_safe_workers_integration.py index 283cdc7..dbbf024 100644 --- a/tests/test_safe_workers_integration.py +++ b/tests/test_safe_workers_integration.py @@ -1,8 +1,10 @@ +import os +import platform import signal import subprocess from importlib.resources import path from time import sleep -from unittest import TestCase +from unittest import TestCase, skipUnless from path import Path, TempDir @@ -10,6 +12,10 @@ class RunnerIntegrationTestCase(TestCase): + IS_SUBPROCESS_SAME_ENV = platform.system() != "Windows" or os.environ.get( + "SUBPROCESS_SAME_ENV_TEST" + ) + @staticmethod def wait_output(process, line, interval=0.1): """Wait a process to output a line. @@ -30,7 +36,6 @@ def wait_output(process, line, interval=0.1): while process.poll() is None: sleep(interval) out = process.stdout.readline() - print(out) if not out: continue @@ -39,6 +44,10 @@ def wait_output(process, line, interval=0.1): if line in lines: return lines + @skipUnless( + IS_SUBPROCESS_SAME_ENV, + "Can only be tested if subprocess environment is same as current environment", + ) def test_run_safe_signal(self): """Test to send an interruption signal to a runner.""" with TempDir() as tempdir: From bc954e84db08db6d7a1b656f534edee8d93e3fc5 Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 8 May 2022 18:17:42 +0900 Subject: [PATCH 04/13] Split unit tests and integration tests --- tests/integration/__init__.py | 0 .../test_safe_workers.py} | 0 tests/unit/__init__.py | 0 tests/{ => unit}/test_config.py | 0 tests/{ => unit}/test_directory.py | 0 tests/{ => unit}/test_exceptions.py | 0 tests/{ => unit}/test_http_client.py | 0 tests/{ => unit}/test_progress_bar.py | 0 tests/{ => unit}/test_safe_workers.py | 0 tests/{ => unit}/test_utils.py | 0 tests/{ => unit}/test_websocket_client.py | 0 11 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/integration/__init__.py rename tests/{test_safe_workers_integration.py => integration/test_safe_workers.py} (100%) create mode 100644 tests/unit/__init__.py rename tests/{ => unit}/test_config.py (100%) rename tests/{ => unit}/test_directory.py (100%) rename tests/{ => unit}/test_exceptions.py (100%) rename tests/{ => unit}/test_http_client.py (100%) rename tests/{ => unit}/test_progress_bar.py (100%) rename tests/{ => unit}/test_safe_workers.py (100%) rename tests/{ => unit}/test_utils.py (100%) rename tests/{ => unit}/test_websocket_client.py (100%) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_safe_workers_integration.py b/tests/integration/test_safe_workers.py similarity index 100% rename from tests/test_safe_workers_integration.py rename to tests/integration/test_safe_workers.py diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_config.py b/tests/unit/test_config.py similarity index 100% rename from tests/test_config.py rename to tests/unit/test_config.py diff --git a/tests/test_directory.py b/tests/unit/test_directory.py similarity index 100% rename from tests/test_directory.py rename to tests/unit/test_directory.py diff --git a/tests/test_exceptions.py b/tests/unit/test_exceptions.py similarity index 100% rename from tests/test_exceptions.py rename to tests/unit/test_exceptions.py diff --git a/tests/test_http_client.py b/tests/unit/test_http_client.py similarity index 100% rename from tests/test_http_client.py rename to tests/unit/test_http_client.py diff --git a/tests/test_progress_bar.py b/tests/unit/test_progress_bar.py similarity index 100% rename from tests/test_progress_bar.py rename to tests/unit/test_progress_bar.py diff --git a/tests/test_safe_workers.py b/tests/unit/test_safe_workers.py similarity index 100% rename from tests/test_safe_workers.py rename to tests/unit/test_safe_workers.py diff --git a/tests/test_utils.py b/tests/unit/test_utils.py similarity index 100% rename from tests/test_utils.py rename to tests/unit/test_utils.py diff --git a/tests/test_websocket_client.py b/tests/unit/test_websocket_client.py similarity index 100% rename from tests/test_websocket_client.py rename to tests/unit/test_websocket_client.py From b63af71016fc197ab2880c36358a42a2546c9df2 Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 8 May 2022 19:07:14 +0900 Subject: [PATCH 05/13] Fix Mac test execution line --- .appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.appveyor.yml b/.appveyor.yml index 222e826..2211496 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -83,7 +83,7 @@ for: test_script: # run tests - - "python -m coverage run -m pytest -v" + - "python -m pytest -v --cov-report term" # run code formatting tests - "python -m black . --check" From 5f4e10595cf0d746fcbeefceb479e058fed469c7 Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 8 May 2022 19:17:52 +0900 Subject: [PATCH 06/13] Send CTRL_C_EVENT instead of SIGINT on windows --- tests/integration/test_safe_workers.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_safe_workers.py b/tests/integration/test_safe_workers.py index dbbf024..f57cd16 100644 --- a/tests/integration/test_safe_workers.py +++ b/tests/integration/test_safe_workers.py @@ -61,7 +61,14 @@ def test_run_safe_signal(self): text=True, ) start_lines = self.wait_output(process, "starting worker") - process.send_signal(signal.SIGINT) + + system = platform.system() + if system == "Windows": + process.send_signal(signal.CTRL_C_EVENT) + + else: + process.send_signal(signal.SIGINT) + process.poll() out, _ = process.communicate() end_lines = out.splitlines() From 35be2e78db9ea0028167b0415bc74db336e82cda Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 8 May 2022 19:30:00 +0900 Subject: [PATCH 07/13] Replace CTRL_C_EVENT by CTRL_BREAK_EVENT --- tests/integration/test_safe_workers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_safe_workers.py b/tests/integration/test_safe_workers.py index f57cd16..20d2168 100644 --- a/tests/integration/test_safe_workers.py +++ b/tests/integration/test_safe_workers.py @@ -64,7 +64,7 @@ def test_run_safe_signal(self): system = platform.system() if system == "Windows": - process.send_signal(signal.CTRL_C_EVENT) + process.send_signal(signal.CTRL_BREAK_EVENT) else: process.send_signal(signal.SIGINT) From f8455124529a73e998e4321709e55b26e98b0cd4 Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 8 May 2022 19:51:42 +0900 Subject: [PATCH 08/13] Disable runner integration test for windows --- .appveyor.yml | 1 + tests/integration/test_safe_workers.py | 13 ++++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/.appveyor.yml b/.appveyor.yml index 2211496..d375813 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -102,6 +102,7 @@ for: environment: SUBPROCESS_SAME_ENV_TEST: true + BATCH_JOB_TEST: true cache: # enable cache for Python dependencies diff --git a/tests/integration/test_safe_workers.py b/tests/integration/test_safe_workers.py index 20d2168..2256a18 100644 --- a/tests/integration/test_safe_workers.py +++ b/tests/integration/test_safe_workers.py @@ -4,7 +4,7 @@ import subprocess from importlib.resources import path from time import sleep -from unittest import TestCase, skipUnless +from unittest import TestCase, skipIf, skipUnless from path import Path, TempDir @@ -12,9 +12,10 @@ class RunnerIntegrationTestCase(TestCase): - IS_SUBPROCESS_SAME_ENV = platform.system() != "Windows" or os.environ.get( - "SUBPROCESS_SAME_ENV_TEST" + IS_SUBPROCESS_SAME_ENV = ( + platform.system() != "Windows" or "SUBPROCESS_SAME_ENV_TEST" in os.environ ) + IS_BATCH_JOB = platform.system() == "Windows" and "BATCH_JOB_TEST" not in os.environ @staticmethod def wait_output(process, line, interval=0.1): @@ -48,6 +49,7 @@ def wait_output(process, line, interval=0.1): IS_SUBPROCESS_SAME_ENV, "Can only be tested if subprocess environment is same as current environment", ) + @skipIf(IS_BATCH_JOB, "Can only be tested if script is not launched in batch job") def test_run_safe_signal(self): """Test to send an interruption signal to a runner.""" with TempDir() as tempdir: @@ -64,13 +66,14 @@ def test_run_safe_signal(self): system = platform.system() if system == "Windows": - process.send_signal(signal.CTRL_BREAK_EVENT) + process.send_signal(signal.CTRL_C_EVENT) else: process.send_signal(signal.SIGINT) - process.poll() out, _ = process.communicate() end_lines = out.splitlines() self.assertListEqual(start_lines, ["starting runner", "starting worker"]) self.assertListEqual(end_lines, ["ending worker", "ending runner"]) + + self.assertEqual(process.returncode, 0) From 0f681576eb6010a67096d264ce9fccd317f8d78e Mon Sep 17 00:00:00 2001 From: Neraste Date: Mon, 9 May 2022 02:39:18 +0900 Subject: [PATCH 09/13] Fix IS_BATCH_JOB condition --- tests/integration/test_safe_workers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_safe_workers.py b/tests/integration/test_safe_workers.py index 2256a18..7065e99 100644 --- a/tests/integration/test_safe_workers.py +++ b/tests/integration/test_safe_workers.py @@ -15,7 +15,7 @@ class RunnerIntegrationTestCase(TestCase): IS_SUBPROCESS_SAME_ENV = ( platform.system() != "Windows" or "SUBPROCESS_SAME_ENV_TEST" in os.environ ) - IS_BATCH_JOB = platform.system() == "Windows" and "BATCH_JOB_TEST" not in os.environ + IS_BATCH_JOB = platform.system() == "Windows" and "BATCH_JOB_TEST" in os.environ @staticmethod def wait_output(process, line, interval=0.1): From 10897731a681e502454a5ffcc66441b0a1ac9dfb Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 10 Jul 2022 17:38:27 +0900 Subject: [PATCH 10/13] Improve test of waiting function Move waiting function interval from class attribute to argument. --- src/dakara_base/safe_workers.py | 30 +++++++++++++++--------------- tests/unit/test_safe_workers.py | 19 +++++++++++++++++-- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/src/dakara_base/safe_workers.py b/src/dakara_base/safe_workers.py index f9bd572..9fc5b0b 100644 --- a/src/dakara_base/safe_workers.py +++ b/src/dakara_base/safe_workers.py @@ -506,15 +506,12 @@ class Runner: the custom init method. Attributes: - POLLING_INTERVAL (float): For Windows only, interval between two - attempts to wait for the stop event. stop (threading.Event): Stop event that notify to stop the execution of the thread. errors (queue.Queue): Error queue to communicate the exception of the thread. """ - POLLING_INTERVAL = 0.5 ERROR_TIMEOUT = 5 def __init__(self, *args, **kwargs): @@ -600,27 +597,30 @@ def on_sigint(signal_number, frame): raise error -def wait(stop): +def wait(stop, interval=0.5): """Wait for stop event to be set. + We have to use a specific code for Windows because the Ctrl+C event will + not be handled during `self.stop.wait()`. This method is blocking for + Windows, not for Linux, which is due to the way Ctrl+C is differently + handled by the two OSs. For Windows, a quick and dirty solution consists + in polling the `self.stop.wait()` with a timeout argument, so the call is + non-permanently blocking. + + See also: + https://mail.python.org/pipermail/python-dev/2017-August/148800.html + https://stackoverflow.com/a/51954792/4584444 + Args: stop (threading.Event): Stop event. + interval (float): For Windows only, interval between two + attempts to wait for the stop event. """ logger.debug("Waiting for stop event") if platform.system() == "Windows": - # We have to use a specific code for Windows because the - # Ctrl+C event will not be handled during `self.stop.wait()`. - # This method is blocking for Windows, not for Linux, which is - # due to the way Ctrl+C is differently handled by the two OSs. - # For Windows, a quick and dirty solution consists in polling - # the `self.stop.wait()` with a timeout argument, so the call - # is non-permanently blocking. - # More resources on this: - # https://mail.python.org/pipermail/python-dev/2017-August/148800.html - # https://stackoverflow.com/a/51954792/4584444 while not stop.is_set(): - stop.wait(Runner.POLLING_INTERVAL) + stop.wait(interval) return diff --git a/tests/unit/test_safe_workers.py b/tests/unit/test_safe_workers.py index 2d5673b..d8dd6a4 100644 --- a/tests/unit/test_safe_workers.py +++ b/tests/unit/test_safe_workers.py @@ -1,6 +1,6 @@ from contextlib import contextmanager from queue import Queue -from threading import Event, Timer +from threading import Event, Thread, Timer from time import sleep from unittest import TestCase @@ -639,9 +639,24 @@ def run_main(self): class WaitTestCase(TestCase): + DELAY = 0.5 + def test_wait(self): """Test to wait for an event.""" stop = Event() + ended = Event() + + def work(): + wait(stop) + ended.set() + + thread = Thread(target=work) + thread.start() + + sleep(self.DELAY) stop.set() - wait(stop) + + thread.join() + self.assertTrue(stop.is_set()) + self.assertTrue(ended.is_set()) From c55b69e5badd544a4b362713b463f24154dc89d1 Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 10 Jul 2022 19:39:28 +0900 Subject: [PATCH 11/13] Improve documentation Force `run_main` to have the same arguments. --- src/dakara_base/safe_workers.py | 35 ++++++++++++++++++++++----------- tests/unit/test_safe_workers.py | 4 ++-- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/dakara_base/safe_workers.py b/src/dakara_base/safe_workers.py index 9fc5b0b..6225c06 100644 --- a/src/dakara_base/safe_workers.py +++ b/src/dakara_base/safe_workers.py @@ -499,8 +499,13 @@ class Runner: """Runner class. The runner creates the stop event and errors queue. It is designed to - execute the thread of a `WorkerSafeThread` instance until an error occurs - or an user interruption pops out (Ctrl+C). + execute the thread of a `WorkerSafeThread` instance until the end of the + program. This execution is done by the `run_safe` method, that will use a + blocking function. By default, the blocking function waits for an error to + occur or a user interruption to pop out (Ctrl+C). It can be replaced if a + `run_main` function is passed to `run_safe`, or if the `run_main` method of + the class is defined. The blocking function must accept a stop event and an + error queue, and be blocking untill the stop event is not set. The initialization creates the stop event and the errors queue and calls the custom init method. @@ -532,7 +537,13 @@ def run_safe(self, worker_class, args=None, kwargs=None, run_main=None): """Execute a WorkerSafeThread instance thread. The thread is executed and the method waits for the stop event to be - set or a user interruption to be triggered (Ctrl+C). + set or a user interruption to be triggered (Ctrl+C). An alternative + waiting function can be provided by `run_main`, or by the method + `run_main`. The choice of the function to run is: + + 1. Provided `run_main` function; + 2. Class `run_main` method; or + 3. Default module `wait` function. Args: worker_class (WorkerSafeThread): Worker class with safe thread. @@ -541,8 +552,8 @@ def run_safe(self, worker_class, args=None, kwargs=None, run_main=None): args (list): Positional arguments passed to the worker class constructor. kwargs (dict): Named arguments passed to the worker class constructor. run_main (function): Function to execute in the main thread. It - must accept the stop event and be blocking as long as the event - is not set. + must accept the stop event and the errors queue, and must be + blocking as long as the stop event is not set. """ if args is None: args = () @@ -552,7 +563,7 @@ def run_safe(self, worker_class, args=None, kwargs=None, run_main=None): # register user interruption def on_sigint(signal_number, frame): - logger.debug(f"Receiving signal {signal_number} to close") + logger.debug("Receiving signal %i to close", signal_number) self.errors.put(None) self.stop.set() @@ -563,6 +574,7 @@ def on_sigint(signal_number, frame): logger.debug("Create worker thread") worker.thread.start() + # wait if run_main is not None: # if a function is provided, run it # it must ruturn when the stop event is set @@ -571,10 +583,10 @@ def on_sigint(signal_number, frame): elif hasattr(worker, "run_main"): # if the worker has a function to run, run it # it must ruturn when the stop event is set - worker.run_main() + worker.run_main(self.stop, self.errors) else: - wait(self.stop) + wait(self.stop, self.errors) # get the error from the error queue # a delay of 5 seconds is accorded for the error to be retrieved @@ -597,7 +609,7 @@ def on_sigint(signal_number, frame): raise error -def wait(stop, interval=0.5): +def wait(stop, errors, interval=0.5): """Wait for stop event to be set. We have to use a specific code for Windows because the Ctrl+C event will @@ -613,8 +625,9 @@ def wait(stop, interval=0.5): Args: stop (threading.Event): Stop event. - interval (float): For Windows only, interval between two - attempts to wait for the stop event. + errors (queue.Queue): Errors queue. + interval (float): For Windows only, interval between two attempts to + wait for the stop event. """ logger.debug("Waiting for stop event") diff --git a/tests/unit/test_safe_workers.py b/tests/unit/test_safe_workers.py index d8dd6a4..153761c 100644 --- a/tests/unit/test_safe_workers.py +++ b/tests/unit/test_safe_workers.py @@ -626,7 +626,7 @@ def test_run_safe_worker_function_error(self): self.assertTrue(self.runner.errors.empty()) class WorkerWithFunction(self.WorkerNormal): - def run_main(self): + def run_main(self, stop, errors): raise MyError("error") # call the method @@ -647,7 +647,7 @@ def test_wait(self): ended = Event() def work(): - wait(stop) + wait(stop, None) ended.set() thread = Thread(target=work) From 2b0c165117712ee551d47b7337e1c9cc9c874e2b Mon Sep 17 00:00:00 2001 From: Neraste Date: Sun, 10 Jul 2022 19:47:27 +0900 Subject: [PATCH 12/13] Fix documentation and harmonize code --- src/dakara_base/safe_workers.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/dakara_base/safe_workers.py b/src/dakara_base/safe_workers.py index 6225c06..11a57e8 100644 --- a/src/dakara_base/safe_workers.py +++ b/src/dakara_base/safe_workers.py @@ -504,8 +504,9 @@ class Runner: blocking function. By default, the blocking function waits for an error to occur or a user interruption to pop out (Ctrl+C). It can be replaced if a `run_main` function is passed to `run_safe`, or if the `run_main` method of - the class is defined. The blocking function must accept a stop event and an - error queue, and be blocking untill the stop event is not set. + the used `WorkerSafeThread` class is defined. The blocking function must + accept a stop event and an error queue, and be blocking as long as the stop + event is not set. The initialization creates the stop event and the errors queue and calls the custom init method. @@ -542,7 +543,7 @@ def run_safe(self, worker_class, args=None, kwargs=None, run_main=None): `run_main`. The choice of the function to run is: 1. Provided `run_main` function; - 2. Class `run_main` method; or + 2. Worker class `run_main` method; or 3. Default module `wait` function. Args: @@ -574,19 +575,22 @@ def on_sigint(signal_number, frame): logger.debug("Create worker thread") worker.thread.start() - # wait + # select blocking function if run_main is not None: # if a function is provided, run it # it must ruturn when the stop event is set - run_main(self.stop, self.errors) + block = run_main elif hasattr(worker, "run_main"): # if the worker has a function to run, run it # it must ruturn when the stop event is set - worker.run_main(self.stop, self.errors) + block = getattr(worker, "run_main") else: - wait(self.stop, self.errors) + block = wait + + # wait + block(self.stop, self.errors) # get the error from the error queue # a delay of 5 seconds is accorded for the error to be retrieved From ce9eff8a27d2c9b21eb5d63e1b4bbe01d0f4a8c6 Mon Sep 17 00:00:00 2001 From: Neraste Date: Sat, 24 Feb 2024 22:03:41 +0100 Subject: [PATCH 13/13] Add env for tests --- .github/workflows/ci.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8382f6d..b1d1ced 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,10 @@ jobs: run: pip install -e ".[dev]" - name: Run tests + env: + # is it really necessary? + SUBPROCESS_SAME_ENV_TEST: true + BATCH_JOB_TEST: true run: python -m pytest -v --cov src - name: Upload coverage reports to Codecov