Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ commands = [
["mypy", "tests"],
["pylint", "--ignore-paths=^src/cruiz/pyside6/.*$", "src/cruiz"],
["pylint", "--extension-pkg-allow-list=PySide6.QtCore", "--ignore-paths=^src/cruizlib/workers/api/v2/.*$", "--enable=all", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code,unexpected-keyword-arg,no-value-for-parameter,no-member,too-many-function-args,useless-suppression", "src/cruizlib"],
["pylint", "--enable=all", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "tests"],
["pylint", "--enable=all", "--extension-pkg-allow-list=PySide6.QtCore", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "tests"],
]

[tool.tox.env.lint_conan1]
Expand All @@ -164,7 +164,7 @@ commands = [
["mypy", "tests"],
["pylint", "--ignore-paths=^src/cruiz/pyside6/.*$", "src/cruiz"],
["pylint", "--extension-pkg-allow-list=PySide6.QtCore", "--ignore-paths=^src/cruizlib/workers/api/v2/.*$", "--enable=all", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "src/cruizlib"],
["pylint", "--enable=all", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "tests"],
["pylint", "--enable=all", "--extension-pkg-allow-list=PySide6.QtCore", "--disable=locally-disabled,suppressed-message,fixme,duplicate-code", "tests"],
]

[tool.tox.env.lint_conan2]
Expand Down
8 changes: 3 additions & 5 deletions src/cruiz/commands/conaninvocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

from cruiz.settings.managers.generalpreferences import GeneralSettingsReader

import psutil
from cruizlib.messagereplyprocessor import MessageReplyProcessor

from .messagereplyprocessor import MessageReplyProcessor
import psutil

if typing.TYPE_CHECKING:
from cruizlib.interop.commandparameters import CommandParameters
Expand All @@ -43,7 +43,6 @@ class ConanInvocation(QtCore.QObject):

completed = QtCore.Signal(object, Exception)
finished = QtCore.Signal()
_begin_processing = QtCore.Signal()

def __del__(self) -> None:
"""Log when a ConanInvocation is deleted."""
Expand All @@ -58,7 +57,7 @@ def __init__(self) -> None:
self._thread = QtCore.QThread()
self._queue_processor = MessageReplyProcessor(self._process_queue)
self._queue_processor.moveToThread(self._thread)
self._begin_processing.connect(self._queue_processor.process)
self._thread.started.connect(self._queue_processor.process)
self._queue_processor.completed.connect(self.completed)
# self._thread.finished not guaranteed to be delivered in a
# qApp quitting scenario
Expand All @@ -69,7 +68,6 @@ def __init__(self) -> None:
self._cleanup_thread: typing.Optional[threading.Thread] = None

self._thread.start()
self._begin_processing.emit()

def close(self) -> None:
"""Tidy up any resources on the context that need closing."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

from __future__ import annotations

import functools
import logging
import multiprocessing
import sys
import threading
import typing

from PySide6 import QtCore
Expand All @@ -33,6 +35,22 @@
logger = logging.getLogger(__name__)


# needed for coverage to record from a QThread
# see https://github.com/coveragepy/coveragepy/issues/686
def coverage_resolve_trace(fn: typing.Any) -> typing.Any:
"""."""

@functools.wraps(fn)
def _wrapped( # pragma: no cover
*args: typing.Any, **kwargs: typing.Any
) -> typing.Any:
if "coverage" in sys.modules:
sys.settrace(threading.gettrace())
fn(*args, **kwargs)

return _wrapped


class MessageReplyProcessor(QtCore.QObject):
"""
Process replies across a multiprocessing Queue in a background thread.
Expand Down Expand Up @@ -74,30 +92,33 @@ def stop(self) -> None:
def __check_for_conan_leakage(self, entry: typing.Any = None) -> bool:
if "conans" not in sys.modules:
return True
if entry:
if entry: # pragma: no cover
logger.critical("Conan has leaked into message queue object dump:")
dump_object_types(entry, loglevel="CRITICAL")
raise AssertionError("Conan has leaked into cruiz")
self.critical_failure.emit("Conan has leaked into cruiz")
return False
self.critical_failure.emit("Conan has leaked into cruiz") # pragma: no cover
return False # pragma: no cover

# pylint: disable=too-many-branches
@coverage_resolve_trace
def process(self) -> None:
"""Process messages received from a child process."""
try:
# make any debugger aware of this thread
# pylint: disable=import-outside-toplevel
import pydevd

pydevd.settrace(suspend=False)
pydevd.settrace(suspend=False) # pragma: no cover
except ModuleNotFoundError:
pass
while True:
logger.debug("(%d) wait for queue entry...", id(self))
try:
if not self.__check_for_conan_leakage(None):
break
break # pragma: no cover
entry = self._queue.get()
if not self.__check_for_conan_leakage(entry):
break
break # pragma: no cover
if isinstance(entry, End):
break
if isinstance(entry, Stdout):
Expand All @@ -121,7 +142,7 @@ def process(self) -> None:
self.completed.emit(None, Exception(entry.message))
else:
logger.error("Unknown message type: '%s'", entry)
except EOFError as exception:
except EOFError as exception: # pragma: no cover
logger.debug(
"(%d) queue failed because %s",
id(self),
Expand Down
8 changes: 7 additions & 1 deletion src/cruizlib/workers/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import cruizlib.globals as cg

from .common import endmessagethread
from .common import (
endmessagethread,
failuretest,
messagingtest,
successtest,
unknownmessagetest,
)

if cg.CONAN_MAJOR_VERSION == 1:
from .v1 import ( # noqa: E402
Expand Down
32 changes: 32 additions & 0 deletions src/cruizlib/workers/api/common/failuretest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python3

"""Test a Failure reply."""

from __future__ import annotations

import contextlib
import traceback
import typing

from cruizlib.interop.message import End, Failure

if typing.TYPE_CHECKING:
from cruizlib.multiprocessingmessagequeuetype import MultiProcessingMessageQueueType


def invoke(queue: MultiProcessingMessageQueueType, html: typing.Optional[str]) -> None:
"""Return a failed message."""
try:
raise ValueError("Failed Test!")
except ValueError as exc:
failure = Failure(
str(exc), type(exc).__name__, traceback.format_tb(exc.__traceback__)
)
if html is not None:
failure.html = html
queue.put(failure)
queue.put(End())
with contextlib.suppress(AttributeError):
# may throw exception if used with a queue.queue rather than multiprocessing
queue.close()
queue.join_thread()
25 changes: 25 additions & 0 deletions src/cruizlib/workers/api/common/messagingtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env python3

"""Test the messaging system."""

from __future__ import annotations

import contextlib
import typing

from cruizlib.interop.message import ConanLogMessage, End, Stderr, Stdout

if typing.TYPE_CHECKING:
from cruizlib.multiprocessingmessagequeuetype import MultiProcessingMessageQueueType


def invoke(queue: MultiProcessingMessageQueueType) -> None:
"""Run messaging commands."""
queue.put(Stdout("Stdout Test"))
queue.put(Stderr("Stderr Test"))
queue.put(ConanLogMessage("ConanLogMessage Test"))
queue.put(End())
with contextlib.suppress(AttributeError):
# may throw exception if used with a queue.queue rather than multiprocessing
queue.close()
queue.join_thread()
23 changes: 23 additions & 0 deletions src/cruizlib/workers/api/common/successtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python3

"""Test a Successful reply."""

from __future__ import annotations

import contextlib
import typing

from cruizlib.interop.message import End, Success

if typing.TYPE_CHECKING:
from cruizlib.multiprocessingmessagequeuetype import MultiProcessingMessageQueueType


def invoke(queue: MultiProcessingMessageQueueType) -> None:
"""Return a successful message."""
queue.put(Success("This was a success!"))
queue.put(End())
with contextlib.suppress(AttributeError):
# may throw exception if used with a queue.queue rather than multiprocessing
queue.close()
queue.join_thread()
23 changes: 23 additions & 0 deletions src/cruizlib/workers/api/common/unknownmessagetest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python3

"""Test an unknown reply."""

from __future__ import annotations

import contextlib
import typing

from cruizlib.interop.message import End

if typing.TYPE_CHECKING:
from cruizlib.multiprocessingmessagequeuetype import MultiProcessingMessageQueueType


def invoke(queue: MultiProcessingMessageQueueType) -> None:
"""Return an unknown message type."""
queue.put(42) # type: ignore[arg-type]
queue.put(End())
with contextlib.suppress(AttributeError):
# may throw exception if used with a queue.queue rather than multiprocessing
queue.close()
queue.join_thread()
34 changes: 34 additions & 0 deletions tests/ttypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import queue
import typing

from PySide6 import QtCore

from cruizlib.interop.commandparameters import CommandParameters
from cruizlib.interop.message import Message
from cruizlib.interop.packagebinaryparameters import PackageBinaryParameters
from cruizlib.interop.packageidparameters import PackageIdParameters
from cruizlib.interop.packagerevisionsparameters import PackageRevisionsParameters
from cruizlib.interop.reciperevisionsparameters import RecipeRevisionsParameters
from cruizlib.interop.searchrecipesparameters import SearchRecipesParameters
from cruizlib.messagereplyprocessor import MessageReplyProcessor
from cruizlib.multiprocessingmessagequeuetype import (
MultiProcessingMessageQueueType,
MultiProcessingStringJoinableQueueType,
Expand Down Expand Up @@ -41,6 +44,17 @@

MultiprocessReplyQueueFixture = typing.Callable[[], MultiprocessReplyQueueReturnType]

# MessageReplyProcessor
MessageReplyProcessorReturnType = typing.Tuple[
MultiProcessingMessageQueueType,
typing.List[Message],
QtCore.QThread,
MessageReplyProcessor,
multiprocessing.context.SpawnContext,
]

MessageReplyProcessorFixture = typing.Callable[[], MessageReplyProcessorReturnType]

# Run worker
RunWorkerFixture = typing.Callable[
[
Expand All @@ -60,6 +74,26 @@
None,
]

# Run worker using the MessageReplyProcessor
RunWorkerMessageProcessorFixture = typing.Callable[
[
typing.Any,
typing.Union[queue.Queue[Message], MultiProcessingMessageQueueType],
typing.Union[
CommandParameters,
PackageBinaryParameters,
PackageIdParameters,
PackageRevisionsParameters,
RecipeRevisionsParameters,
SearchRecipesParameters,
],
QtCore.QThread,
MessageReplyProcessor,
typing.Optional[multiprocessing.context.SpawnContext],
],
None,
]

# Meta processing
MetaFixture = typing.Tuple[
MultiProcessingStringJoinableQueueType, MultiProcessingMessageQueueType
Expand Down
Loading