diff --git a/pyproject.toml b/pyproject.toml index bd6f20f8..a38c7e7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -147,7 +147,7 @@ commands = [ ["mypy", "tests"], ["pylint", "--ignore-paths=^src/cruiz/pyside6/.*$", "src/cruiz"], ["pylint", "--extension-pkg-allow-list=PySide6.QtCore", "--ignore-paths=^src/cruizlib/workers/api/v2/.*$", "--enable=all", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code,unexpected-keyword-arg,no-value-for-parameter,no-member,too-many-function-args,useless-suppression", "src/cruizlib"], - ["pylint", "--enable=all", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "tests"], + ["pylint", "--enable=all", "--extension-pkg-allow-list=PySide6.QtCore", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "tests"], ] [tool.tox.env.lint_conan1] @@ -164,7 +164,7 @@ commands = [ ["mypy", "tests"], ["pylint", "--ignore-paths=^src/cruiz/pyside6/.*$", "src/cruiz"], ["pylint", "--extension-pkg-allow-list=PySide6.QtCore", "--ignore-paths=^src/cruizlib/workers/api/v2/.*$", "--enable=all", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "src/cruizlib"], - ["pylint", "--enable=all", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "tests"], + ["pylint", "--enable=all", "--extension-pkg-allow-list=PySide6.QtCore", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "tests"], ] [tool.tox.env.lint_conan2] diff --git a/src/cruiz/commands/conaninvocation.py b/src/cruiz/commands/conaninvocation.py index 89125fa5..581fd223 100755 --- a/src/cruiz/commands/conaninvocation.py +++ b/src/cruiz/commands/conaninvocation.py @@ -21,9 +21,9 @@ from cruiz.settings.managers.generalpreferences import GeneralSettingsReader -import psutil +from cruizlib.messagereplyprocessor import MessageReplyProcessor -from .messagereplyprocessor import MessageReplyProcessor +import psutil if typing.TYPE_CHECKING: from cruizlib.interop.commandparameters import CommandParameters @@ -43,7 +43,6 @@ class ConanInvocation(QtCore.QObject): completed = QtCore.Signal(object, Exception) finished = QtCore.Signal() - _begin_processing = QtCore.Signal() def __del__(self) -> None: """Log when a ConanInvocation is deleted.""" @@ -58,7 +57,7 @@ def __init__(self) -> None: self._thread = QtCore.QThread() self._queue_processor = MessageReplyProcessor(self._process_queue) self._queue_processor.moveToThread(self._thread) - self._begin_processing.connect(self._queue_processor.process) + self._thread.started.connect(self._queue_processor.process) self._queue_processor.completed.connect(self.completed) # self._thread.finished not guaranteed to be delivered in a # qApp quitting scenario @@ -69,7 +68,6 @@ def __init__(self) -> None: self._cleanup_thread: typing.Optional[threading.Thread] = None self._thread.start() - self._begin_processing.emit() def close(self) -> None: """Tidy up any resources on the context that need closing.""" diff --git a/src/cruiz/commands/messagereplyprocessor.py b/src/cruizlib/messagereplyprocessor.py similarity index 82% rename from src/cruiz/commands/messagereplyprocessor.py rename to src/cruizlib/messagereplyprocessor.py index ded632c2..3b07755c 100755 --- a/src/cruiz/commands/messagereplyprocessor.py +++ b/src/cruizlib/messagereplyprocessor.py @@ -8,9 +8,11 @@ from __future__ import annotations +import functools import logging import multiprocessing import sys +import threading import typing from PySide6 import QtCore @@ -33,6 +35,22 @@ logger = logging.getLogger(__name__) +# needed for coverage to record from a QThread +# see https://github.com/coveragepy/coveragepy/issues/686 +def coverage_resolve_trace(fn: typing.Any) -> typing.Any: + """.""" + + @functools.wraps(fn) + def _wrapped( # pragma: no cover + *args: typing.Any, **kwargs: typing.Any + ) -> typing.Any: + if "coverage" in sys.modules: + sys.settrace(threading.gettrace()) + fn(*args, **kwargs) + + return _wrapped + + class MessageReplyProcessor(QtCore.QObject): """ Process replies across a multiprocessing Queue in a background thread. @@ -74,30 +92,33 @@ def stop(self) -> None: def __check_for_conan_leakage(self, entry: typing.Any = None) -> bool: if "conans" not in sys.modules: return True - if entry: + if entry: # pragma: no cover logger.critical("Conan has leaked into message queue object dump:") dump_object_types(entry, loglevel="CRITICAL") raise AssertionError("Conan has leaked into cruiz") - self.critical_failure.emit("Conan has leaked into cruiz") - return False + self.critical_failure.emit("Conan has leaked into cruiz") # pragma: no cover + return False # pragma: no cover + # pylint: disable=too-many-branches + @coverage_resolve_trace def process(self) -> None: """Process messages received from a child process.""" try: # make any debugger aware of this thread + # pylint: disable=import-outside-toplevel import pydevd - pydevd.settrace(suspend=False) + pydevd.settrace(suspend=False) # pragma: no cover except ModuleNotFoundError: pass while True: logger.debug("(%d) wait for queue entry...", id(self)) try: if not self.__check_for_conan_leakage(None): - break + break # pragma: no cover entry = self._queue.get() if not self.__check_for_conan_leakage(entry): - break + break # pragma: no cover if isinstance(entry, End): break if isinstance(entry, Stdout): @@ -121,7 +142,7 @@ def process(self) -> None: self.completed.emit(None, Exception(entry.message)) else: logger.error("Unknown message type: '%s'", entry) - except EOFError as exception: + except EOFError as exception: # pragma: no cover logger.debug( "(%d) queue failed because %s", id(self), diff --git a/src/cruizlib/workers/api/__init__.py b/src/cruizlib/workers/api/__init__.py index 1e66eedc..d8902575 100644 --- a/src/cruizlib/workers/api/__init__.py +++ b/src/cruizlib/workers/api/__init__.py @@ -2,7 +2,13 @@ import cruizlib.globals as cg -from .common import endmessagethread +from .common import ( + endmessagethread, + failuretest, + messagingtest, + successtest, + unknownmessagetest, +) if cg.CONAN_MAJOR_VERSION == 1: from .v1 import ( # noqa: E402 diff --git a/src/cruizlib/workers/api/common/failuretest.py b/src/cruizlib/workers/api/common/failuretest.py new file mode 100755 index 00000000..2cee692f --- /dev/null +++ b/src/cruizlib/workers/api/common/failuretest.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 + +"""Test a Failure reply.""" + +from __future__ import annotations + +import contextlib +import traceback +import typing + +from cruizlib.interop.message import End, Failure + +if typing.TYPE_CHECKING: + from cruizlib.multiprocessingmessagequeuetype import MultiProcessingMessageQueueType + + +def invoke(queue: MultiProcessingMessageQueueType, html: typing.Optional[str]) -> None: + """Return a failed message.""" + try: + raise ValueError("Failed Test!") + except ValueError as exc: + failure = Failure( + str(exc), type(exc).__name__, traceback.format_tb(exc.__traceback__) + ) + if html is not None: + failure.html = html + queue.put(failure) + queue.put(End()) + with contextlib.suppress(AttributeError): + # may throw exception if used with a queue.queue rather than multiprocessing + queue.close() + queue.join_thread() diff --git a/src/cruizlib/workers/api/common/messagingtest.py b/src/cruizlib/workers/api/common/messagingtest.py new file mode 100755 index 00000000..a17692b0 --- /dev/null +++ b/src/cruizlib/workers/api/common/messagingtest.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 + +"""Test the messaging system.""" + +from __future__ import annotations + +import contextlib +import typing + +from cruizlib.interop.message import ConanLogMessage, End, Stderr, Stdout + +if typing.TYPE_CHECKING: + from cruizlib.multiprocessingmessagequeuetype import MultiProcessingMessageQueueType + + +def invoke(queue: MultiProcessingMessageQueueType) -> None: + """Run messaging commands.""" + queue.put(Stdout("Stdout Test")) + queue.put(Stderr("Stderr Test")) + queue.put(ConanLogMessage("ConanLogMessage Test")) + queue.put(End()) + with contextlib.suppress(AttributeError): + # may throw exception if used with a queue.queue rather than multiprocessing + queue.close() + queue.join_thread() diff --git a/src/cruizlib/workers/api/common/successtest.py b/src/cruizlib/workers/api/common/successtest.py new file mode 100755 index 00000000..026a353d --- /dev/null +++ b/src/cruizlib/workers/api/common/successtest.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +"""Test a Successful reply.""" + +from __future__ import annotations + +import contextlib +import typing + +from cruizlib.interop.message import End, Success + +if typing.TYPE_CHECKING: + from cruizlib.multiprocessingmessagequeuetype import MultiProcessingMessageQueueType + + +def invoke(queue: MultiProcessingMessageQueueType) -> None: + """Return a successful message.""" + queue.put(Success("This was a success!")) + queue.put(End()) + with contextlib.suppress(AttributeError): + # may throw exception if used with a queue.queue rather than multiprocessing + queue.close() + queue.join_thread() diff --git a/src/cruizlib/workers/api/common/unknownmessagetest.py b/src/cruizlib/workers/api/common/unknownmessagetest.py new file mode 100755 index 00000000..4f19e644 --- /dev/null +++ b/src/cruizlib/workers/api/common/unknownmessagetest.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +"""Test an unknown reply.""" + +from __future__ import annotations + +import contextlib +import typing + +from cruizlib.interop.message import End + +if typing.TYPE_CHECKING: + from cruizlib.multiprocessingmessagequeuetype import MultiProcessingMessageQueueType + + +def invoke(queue: MultiProcessingMessageQueueType) -> None: + """Return an unknown message type.""" + queue.put(42) # type: ignore[arg-type] + queue.put(End()) + with contextlib.suppress(AttributeError): + # may throw exception if used with a queue.queue rather than multiprocessing + queue.close() + queue.join_thread() diff --git a/tests/ttypes.py b/tests/ttypes.py index becc90cc..dfa20364 100644 --- a/tests/ttypes.py +++ b/tests/ttypes.py @@ -4,6 +4,8 @@ import queue import typing +from PySide6 import QtCore + from cruizlib.interop.commandparameters import CommandParameters from cruizlib.interop.message import Message from cruizlib.interop.packagebinaryparameters import PackageBinaryParameters @@ -11,6 +13,7 @@ from cruizlib.interop.packagerevisionsparameters import PackageRevisionsParameters from cruizlib.interop.reciperevisionsparameters import RecipeRevisionsParameters from cruizlib.interop.searchrecipesparameters import SearchRecipesParameters +from cruizlib.messagereplyprocessor import MessageReplyProcessor from cruizlib.multiprocessingmessagequeuetype import ( MultiProcessingMessageQueueType, MultiProcessingStringJoinableQueueType, @@ -41,6 +44,17 @@ MultiprocessReplyQueueFixture = typing.Callable[[], MultiprocessReplyQueueReturnType] +# MessageReplyProcessor +MessageReplyProcessorReturnType = typing.Tuple[ + MultiProcessingMessageQueueType, + typing.List[Message], + QtCore.QThread, + MessageReplyProcessor, + multiprocessing.context.SpawnContext, +] + +MessageReplyProcessorFixture = typing.Callable[[], MessageReplyProcessorReturnType] + # Run worker RunWorkerFixture = typing.Callable[ [ @@ -60,6 +74,26 @@ None, ] +# Run worker using the MessageReplyProcessor +RunWorkerMessageProcessorFixture = typing.Callable[ + [ + typing.Any, + typing.Union[queue.Queue[Message], MultiProcessingMessageQueueType], + typing.Union[ + CommandParameters, + PackageBinaryParameters, + PackageIdParameters, + PackageRevisionsParameters, + RecipeRevisionsParameters, + SearchRecipesParameters, + ], + QtCore.QThread, + MessageReplyProcessor, + typing.Optional[multiprocessing.context.SpawnContext], + ], + None, +] + # Meta processing MetaFixture = typing.Tuple[ MultiProcessingStringJoinableQueueType, MultiProcessingMessageQueueType diff --git a/tests/workers/conftest.py b/tests/workers/conftest.py index 1e626aed..ec547e7b 100644 --- a/tests/workers/conftest.py +++ b/tests/workers/conftest.py @@ -14,6 +14,8 @@ import typing from unittest.mock import MagicMock +from PySide6 import QtCore + import cruizlib.workers.api as workers_api from cruizlib.globals import CONAN_MAJOR_VERSION, CONAN_VERSION_COMPONENTS from cruizlib.interop.commandparameters import CommandParameters @@ -26,6 +28,7 @@ Stdout, Success, ) +from cruizlib.messagereplyprocessor import MessageReplyProcessor from cruizlib.workers.metarequestconaninvocation import MetaRequestConanInvocation # pylint: disable=wrong-import-order @@ -48,10 +51,13 @@ ) from ttypes import ( + MessageReplyProcessorFixture, + MessageReplyProcessorReturnType, MetaFixture, MultiprocessReplyQueueFixture, MultiprocessReplyQueueReturnType, RunWorkerFixture, + RunWorkerMessageProcessorFixture, SingleprocessReplyQueueFixture, SingleprocessReplyQueueReturnType, ) @@ -309,6 +315,47 @@ def _reply_watcher( return _the_fixture +@pytest.fixture() +def messagereplyprocessor_fixture() -> MessageReplyProcessorFixture: + """ + Fixture factory to create a reply queue for a worker invocation on a child process. + + Uses a thread for message processing. + The calling test must send the End message and join the thread, before making any + assertions on the responses. + """ + replies: typing.List[typing.Any] = [] + + def _append(obj: object, exc: object) -> None: + if obj: + replies.append(obj) + if exc: + replies.append(exc) + + def _message(msg: str) -> None: + LOGGER.info(msg) + + def _the_fixture() -> MessageReplyProcessorReturnType: + context = multiprocessing.get_context("spawn") + reply_queue = context.Queue() + watcher_thread = QtCore.QThread() + processor = MessageReplyProcessor(reply_queue) + processor.moveToThread(watcher_thread) + watcher_thread.started.connect(processor.process) # pylint: disable=no-member + watcher_thread.finished.connect( # pylint: disable=no-member + watcher_thread.deleteLater + ) + processor.completed.connect(_append) + processor.stdout_message.connect(_message) + processor.stderr_message.connect(_message) + processor.conan_log_message.connect(_message) + processor.critical_failure.connect(_message) + watcher_thread.start() + return reply_queue, replies, watcher_thread, processor, context + + return _the_fixture + + @pytest.fixture() def run_worker() -> RunWorkerFixture: """Fixture factory returning how to run the worker.""" @@ -344,8 +391,8 @@ def _have_conan_symbols_leaked() -> None: # tell the watcher thread that there is no more information coming if context is not None: - # this must be done in a separate process because it closes the other side - # of the queue + # this must be done in a separate process because it closes the other + # side of the queue process = context.Process( target=workers_api.endmessagethread.invoke, args=(reply_queue,) ) @@ -365,6 +412,53 @@ def _have_conan_symbols_leaked() -> None: return _the_fixture +@pytest.fixture() +def run_worker_message_processor() -> RunWorkerMessageProcessorFixture: + """Fixture factory returning how to run the worker.""" # noqa: D202 + + # pylint: disable=too-many-arguments,too-many-positional-arguments + def _the_fixture( + worker: typing.Any, + reply_queue: typing.Union[ + queue.Queue[Message], MultiProcessingMessageQueueType + ], + params: typing.Union[ + CommandParameters, + PackageBinaryParameters, + PackageIdParameters, + PackageRevisionsParameters, + RecipeRevisionsParameters, + SearchRecipesParameters, + ], + watcher_thread: QtCore.QThread, + processor: MessageReplyProcessor, + context: typing.Optional[multiprocessing.context.SpawnContext], + ) -> None: + def _have_conan_symbols_leaked() -> None: + assert "conans" not in sys.modules + + if context is None: + # abusing the type system, as the API used for queue.Queue is the same + # as for multiprocessing.Queue + worker(reply_queue, params) + else: + _have_conan_symbols_leaked() + process = context.Process(target=worker, args=(reply_queue, params)) + process.start() + process.join() + + processor.stop() + + watcher_thread.wait(5) + if not watcher_thread.isFinished(): + raise texceptions.WatcherThreadTimeoutError() + + if context is not None: + _have_conan_symbols_leaked() + + return _the_fixture + + @pytest.fixture(name="conan_recipe_name_invalid") def fixture_conan_recipe_name_invalid() -> str: """Return an invalid recipe name.""" diff --git a/tests/workers/test_message_reply_processor.py b/tests/workers/test_message_reply_processor.py new file mode 100644 index 00000000..f0737a19 --- /dev/null +++ b/tests/workers/test_message_reply_processor.py @@ -0,0 +1,130 @@ +""".""" + +from __future__ import annotations + +import logging +import typing + +import cruizlib.workers.api as workers_api + +# pylint: disable=wrong-import-order +import pytest + +import texceptions + +if typing.TYPE_CHECKING: + from ttypes import MessageReplyProcessorFixture + +LOGGER = logging.getLogger(__name__) + + +def test_message_reply_processor_messaging( + messagereplyprocessor_fixture: MessageReplyProcessorFixture, + caplog: pytest.LogCaptureFixture, +) -> None: + """Exercise messaging in the message reply processor.""" + caplog.set_level(logging.INFO) + worker = workers_api.messagingtest.invoke + reply_queue, replies, watcher_thread, processor, context = ( + messagereplyprocessor_fixture() + ) + + process = context.Process(target=worker, args=(reply_queue,)) + process.start() + process.join() + + processor.stop() + + watcher_thread.wait(5) + if not watcher_thread.isFinished(): + raise texceptions.WatcherThreadTimeoutError() + + assert "Stdout Test" in caplog.text + assert "Stderr Test" in caplog.text + assert "ConanLogMessage Test" in caplog.text + assert not replies + + +def test_message_reply_processor_success( + messagereplyprocessor_fixture: MessageReplyProcessorFixture, + caplog: pytest.LogCaptureFixture, +) -> None: + """Exercise Successful replies in the message reply processor.""" + caplog.set_level(logging.INFO) + worker = workers_api.successtest.invoke + reply_queue, replies, watcher_thread, processor, context = ( + messagereplyprocessor_fixture() + ) + + process = context.Process(target=worker, args=(reply_queue,)) + process.start() + process.join() + + processor.stop() + + watcher_thread.wait(5) + if not watcher_thread.isFinished(): + raise texceptions.WatcherThreadTimeoutError() + + assert replies + assert len(replies) == 1 + assert isinstance(replies[0], str) + assert replies[0] == "This was a success!" + + +@pytest.mark.parametrize("html", [None, "

A failure

"]) +def test_message_reply_processor_failure( + messagereplyprocessor_fixture: MessageReplyProcessorFixture, + caplog: pytest.LogCaptureFixture, + html: typing.Optional[str], +) -> None: + """Exercise Failed replies in the message reply processor.""" + caplog.set_level(logging.INFO) + worker = workers_api.failuretest.invoke + reply_queue, replies, watcher_thread, processor, context = ( + messagereplyprocessor_fixture() + ) + + process = context.Process(target=worker, args=(reply_queue, html)) + process.start() + process.join() + + processor.stop() + + watcher_thread.wait(5) + if not watcher_thread.isFinished(): + raise texceptions.WatcherThreadTimeoutError() + + assert replies + assert len(replies) == 1 + assert isinstance(replies[0], Exception) + assert str(replies[0]) == "Failed Test!" + if html is not None: + assert html in caplog.text + else: + assert "" in caplog.text + + +def test_message_reply_processor_unknown( + messagereplyprocessor_fixture: MessageReplyProcessorFixture, + caplog: pytest.LogCaptureFixture, +) -> None: + """Exercise an unknown message in the message reply processor.""" + caplog.set_level(logging.INFO) + worker = workers_api.unknownmessagetest.invoke + reply_queue, replies, watcher_thread, processor, context = ( + messagereplyprocessor_fixture() + ) + + process = context.Process(target=worker, args=(reply_queue,)) + process.start() + process.join() + + processor.stop() + + watcher_thread.wait(5) + if not watcher_thread.isFinished(): + raise texceptions.WatcherThreadTimeoutError() + + assert not replies + assert "Unknown message type" in caplog.text