From 387b877f8e4e3bd3d40b28a12ee445d18000b889 Mon Sep 17 00:00:00 2001 From: Fu Hanxi Date: Thu, 11 Sep 2025 12:27:44 +0200 Subject: [PATCH] refactor: always use multiprocess spawn --- pytest-embedded/pytest_embedded/dut.py | 5 ++- .../pytest_embedded/dut_factory.py | 18 +++++------ pytest-embedded/pytest_embedded/log.py | 32 +++++++++++-------- pytest-embedded/pytest_embedded/plugin.py | 21 +++++++++--- pytest-embedded/tests/test_base.py | 1 - 5 files changed, 45 insertions(+), 32 deletions(-) diff --git a/pytest-embedded/pytest_embedded/dut.py b/pytest-embedded/pytest_embedded/dut.py index 4164f3c7..6104b200 100644 --- a/pytest-embedded/pytest_embedded/dut.py +++ b/pytest-embedded/pytest_embedded/dut.py @@ -1,6 +1,5 @@ import functools import logging -import multiprocessing import os.path import re from collections.abc import Callable @@ -10,7 +9,7 @@ import pexpect from .app import App -from .log import PexpectProcess +from .log import MessageQueue, PexpectProcess from .unity import UNITY_SUMMARY_LINE_REGEX, TestSuite from .utils import Meta, _InjectMixinCls, remove_asci_color_code, to_bytes, to_list @@ -29,7 +28,7 @@ class Dut(_InjectMixinCls): def __init__( self, pexpect_proc: PexpectProcess, - msg_queue: multiprocessing.Queue, + msg_queue: MessageQueue, app: App, pexpect_logfile: str, test_case_name: str, diff --git a/pytest-embedded/pytest_embedded/dut_factory.py b/pytest-embedded/pytest_embedded/dut_factory.py index 8eee222c..d2e03186 100644 --- a/pytest-embedded/pytest_embedded/dut_factory.py +++ b/pytest-embedded/pytest_embedded/dut_factory.py @@ -28,10 +28,7 @@ def _drop_none_kwargs(kwargs: dict[t.Any, t.Any]): return {k: v for k, v in kwargs.items() if v is not None} -if sys.platform == 'darwin': - _ctx = multiprocessing.get_context('fork') -else: - _ctx = multiprocessing.get_context() +_ctx = multiprocessing.get_context('spawn') _stdout = sys.__stdout__ @@ -45,10 +42,6 @@ def _drop_none_kwargs(kwargs: dict[t.Any, t.Any]): PARAMETRIZED_FIXTURES_CACHE = {} -def msg_queue_gn() -> MessageQueue: - return MessageQueue() - - def _listen(q: MessageQueue, filepath: str, with_timestamp: bool = True, count: int = 1, total: int = 1) -> None: shall_add_prefix = True while True: @@ -741,10 +734,15 @@ def create( """ layout = [] try: - global PARAMETRIZED_FIXTURES_CACHE - msg_queue = msg_queue_gn() + from .plugin import _MP_MANAGER # avoid circular import + + if _MP_MANAGER is None: + raise SystemExit('The _MP_MANAGER is not initialized, please use this function under pytest.') + + msg_queue = _MP_MANAGER.MessageQueue() layout.append(msg_queue) + global PARAMETRIZED_FIXTURES_CACHE _pexpect_logfile = os.path.join( PARAMETRIZED_FIXTURES_CACHE['_meta'].logdir, f'custom-dut-{DUT_GLOBAL_INDEX}.txt' ) diff --git a/pytest-embedded/pytest_embedded/log.py b/pytest-embedded/pytest_embedded/log.py index 20d2b4c6..7d7382f6 100644 --- a/pytest-embedded/pytest_embedded/log.py +++ b/pytest-embedded/pytest_embedded/log.py @@ -3,11 +3,11 @@ import multiprocessing import os import subprocess -import sys import tempfile import textwrap import uuid from multiprocessing import queues +from multiprocessing.managers import BaseManager from typing import AnyStr import pexpect.fdpexpect @@ -16,10 +16,11 @@ from .utils import Meta, remove_asci_color_code, to_bytes, to_str, utcnow_str -if sys.platform == 'darwin': - _ctx = multiprocessing.get_context('fork') -else: - _ctx = multiprocessing.get_context() +_ctx = multiprocessing.get_context('spawn') + + +class MessageQueueManager(BaseManager): + pass class MessageQueue(queues.Queue): @@ -40,7 +41,7 @@ def put(self, obj, **kwargs): _b = to_bytes(obj) try: super().put(_b, **kwargs) - except: # noqa # queue might be closed + except Exception: # queue might be closed pass def write(self, s: AnyStr): @@ -53,6 +54,9 @@ def isatty(self): return True +MessageQueueManager.register('MessageQueue', MessageQueue) + + class PexpectProcess(pexpect.fdpexpect.fdspawn): """ Use a temp file to gather multiple inputs into one output, and do `pexpect.expect()` from one place. @@ -146,16 +150,16 @@ def live_print_call(*args, msg_queue: MessageQueue | None = None, expect_returnc class _PopenRedirectProcess(_ctx.Process): def __init__(self, msg_queue: MessageQueue, logfile: str): - self._q = msg_queue - - self.logfile = logfile - - super().__init__(target=self._forward_io, daemon=True) # killed by the main process + super().__init__(target=self._forward_io, args=(msg_queue, logfile), daemon=True) - def _forward_io(self) -> None: - with open(self.logfile) as fr: + @staticmethod + def _forward_io(msg_queue, logfile) -> None: + with open(logfile) as fr: while True: - self._q.put(fr.read()) + try: + msg_queue.put(fr.read()) # msg_queue may be closed + except Exception: + break class DuplicateStdoutPopen(subprocess.Popen): diff --git a/pytest-embedded/pytest_embedded/plugin.py b/pytest-embedded/pytest_embedded/plugin.py index baa6b8c8..aa98d86c 100644 --- a/pytest-embedded/pytest_embedded/plugin.py +++ b/pytest-embedded/pytest_embedded/plugin.py @@ -36,7 +36,6 @@ app_fn, dut_gn, gdb_gn, - msg_queue_gn, openocd_gn, pexpect_proc_fn, qemu_gn, @@ -44,7 +43,7 @@ set_parametrized_fixtures_cache, wokwi_gn, ) -from .log import MessageQueue, PexpectProcess +from .log import MessageQueue, MessageQueueManager, PexpectProcess from .unity import JunitMerger, UnityTestReportMode, escape_illegal_xml_chars from .utils import ( SERVICE_LIB_NAMES, @@ -300,6 +299,7 @@ def pytest_addoption(parser): # helpers # ########### _COUNT = 1 +_MP_MANAGER: MessageQueueManager | None = None def _gte_one_int(v) -> int: @@ -630,6 +630,19 @@ def port_app_cache() -> dict[str, str]: return {} +@pytest.fixture(scope='session', autouse=True) +def _mp_manager(): + manager = MessageQueueManager() + manager.start() + + global _MP_MANAGER + _MP_MANAGER = manager + + yield manager + + manager.shutdown() + + @pytest.fixture def test_case_tempdir(test_case_name: str, session_tempdir: str) -> str: """Function scoped temp dir for pytest-embedded""" @@ -668,8 +681,8 @@ def _pexpect_logfile(test_case_tempdir, logfile_extension, dut_index, dut_total) @pytest.fixture @multi_dut_generator_fixture -def msg_queue() -> MessageQueue: # kwargs passed by `multi_dut_generator_fixture()` - return msg_queue_gn() +def msg_queue(_mp_manager) -> MessageQueue: # kwargs passed by `multi_dut_generator_fixture()` + return _mp_manager.MessageQueue() @pytest.fixture diff --git a/pytest-embedded/tests/test_base.py b/pytest-embedded/tests/test_base.py index aa7f24f7..d18d17c7 100644 --- a/pytest-embedded/tests/test_base.py +++ b/pytest-embedded/tests/test_base.py @@ -290,7 +290,6 @@ def test_expect_all_failed(dut): result.assert_outcomes(passed=10) -@pytest.mark.xfail(reason='unstable') def test_expect_from_timeout(testdir): testdir.makepyfile(r""" import threading