diff --git a/docs/apis/pytest-embedded.rst b/docs/apis/pytest-embedded.rst index a427a198..2149ee13 100644 --- a/docs/apis/pytest-embedded.rst +++ b/docs/apis/pytest-embedded.rst @@ -12,6 +12,11 @@ :undoc-members: :show-inheritance: +.. automodule:: pytest_embedded.group + :members: + :undoc-members: + :show-inheritance: + .. automodule:: pytest_embedded.dut_factory :members: :undoc-members: diff --git a/docs/usages/expecting.rst b/docs/usages/expecting.rst index 36a15825..96fc9e6e 100644 --- a/docs/usages/expecting.rst +++ b/docs/usages/expecting.rst @@ -2,7 +2,7 @@ Expecting Functions ##################### -In testing, most of the work involves expecting a certain string or pattern and then making assertions. This is supported by the functions :func:`~pytest_embedded.dut.Dut.expect`, :func:`~pytest_embedded.dut.Dut.expect_exact`, and :func:`~pytest_embedded.dut.Dut.expect_unity_test_output`. +In testing, most of the work involves expecting a certain string or pattern and then making assertions. This is supported by the functions :func:`~pytest_embedded.dut.Dut.expect`, :func:`~pytest_embedded.dut.Dut.expect_exact`, :class:`~pytest_embedded.group.DutGroup` (multi-DUT synchronization), and :func:`~pytest_embedded.dut.Dut.expect_unity_test_output`. All of these functions accept the following keyword arguments: @@ -186,6 +186,143 @@ As with the :func:`~pytest_embedded.dut.Dut.expect` function, the ``pattern`` ar for _ in range(2): dut.expect_exact(pattern_list) +*************************** + Multi-DUT Synchronization +*************************** + +When you use ``--count N`` (or equivalent), each board has its own serial stream and its own :class:`~pytest_embedded.dut.Dut` instance. Waiting for readiness on each device with separate ``expect`` calls works, but: + +- Sequential calls use **per-call** timeouts, so two ``expect_exact(..., timeout=120)`` lines can behave like a much larger wall-clock budget than a single 120s deadline. +- The **slowest** device should not delay matching on others more than your chosen global timeout. + +:class:`~pytest_embedded.group.DutGroup` +======================================== + +``DutGroup`` is a transparent proxy: **every method** available on a single :class:`~pytest_embedded.dut.Dut` can be called on the group. The call runs on all members **in parallel** and returns a list of per-DUT results. + +.. code:: python + + from pytest_embedded import DutGroup + + def test_two_boards(dut): + group = DutGroup(dut[0], dut[1]) + # or from a list: + group = DutGroup(*dut) + +It is also available as ``Dut.DutGroup`` for discoverability. + +expect / expect_exact +--------------------- + +``expect`` and ``expect_exact`` support both **broadcast** (one pattern for all DUTs) and **per-DUT** patterns (N patterns for N DUTs), all running in parallel: + +.. code:: python + + # Broadcast -- same pattern to every DUT + group.expect_exact("[READY]", timeout=120) + + # Per-DUT patterns -- one per DUT, in constructor order + group.expect_exact("[AP] ready", "[CLIENT] ready", timeout=120) + + # Regex -- also supports broadcast and per-DUT forms + results = group.expect(r"IP=(\S+)", timeout=10) + ip0 = results[0].group(1).decode() + ip1 = results[1].group(1).decode() + + # Same as :class:`~pytest_embedded.dut.Dut`: a single pattern may use the keyword form + group.expect_exact(pattern="[READY]", timeout=120) + +Other methods +------------- + +Any other :class:`~pytest_embedded.dut.Dut` method called on the group is forwarded with the **same arguments** to every DUT in parallel: + +.. code:: python + + group.write(ssid) + +For per-DUT arguments on non-expect methods, index into the group: + +.. code:: python + + group[0].write(ap_config) + group[1].write(client_config) + +Container protocol +------------------ + +``DutGroup`` supports indexing, iteration, and length: + +.. code:: python + + group[0] # first DUT + group[-1] # last DUT + len(group) # number of DUTs + list(group) # iterate over DUTs + group.duts # underlying tuple (read-only) + +Non-callable attributes are returned as a list: + +.. code:: python + + procs = group.pexpect_proc # [proc_0, proc_1, ...] + +Names and clearer errors +------------------------ + +Pass optional **member** labels and an optional **group** label so logs and failures are easy to read: + +.. code:: python + + group = DutGroup(*dut, names=("ap", "client"), group_name="wifi_ap") + # group.names -> ("ap", "client"); group.group_name -> "wifi_ap" + +If you omit ``names``, members default to ``dut-0``, ``dut-1``, … (aligned with per-DUT log file names when using ``--count``). + +When any parallel call fails on one DUT, pytest-embedded raises :exc:`pytest_embedded.group.DutGroupMemberError`. Its message and attributes identify the member (``member_name``, ``member_index``) and group (``group_name``), and the original error (for example :exc:`pexpect.TIMEOUT`) is chained as :attr:`__cause__`. A structured line is also written to the Python logger at ERROR (including the underlying exception context). + +Full example +------------ + +.. code:: python + + from pytest_embedded import DutGroup + + def test_wifi_ap(dut): + group = DutGroup(*dut) + + # Phase 1: wait for both devices to be ready + group.expect_exact("[READY]", timeout=120) + + # Phase 2: exchange SSID + group.expect_exact("Send SSID:", timeout=10) + group.write(ap_ssid) + + # Phase 3: exchange password + group.expect_exact("Send Password:", timeout=10) + group.write(ap_password) + + # Phase 4: verify connection + results = group.expect(r"IP=(\S+)", timeout=30) + for r in results: + assert r.group(1) != b"" + +Phase synchronization +===================== + +``DutGroup`` methods can be called **multiple times** in one test to synchronize phases. Each call blocks until every DUT has matched before continuing. After a successful match, those substrings are consumed from each DUT's buffer; emit new output for the next phase. + +.. code:: python + + group = DutGroup(*dut) + group.expect_exact("Init OK", timeout=30) + group.expect_exact("Server started", timeout=10) + group.expect_exact("Connected", timeout=60) + +.. note:: + + If one DUT fails, pending work is cancelled where possible; expects that have already started may still run until they match or time out, because pexpect cannot always be interrupted from another thread. The failure is reported as :exc:`~pytest_embedded.group.DutGroupMemberError` with the underlying error as :attr:`~BaseException.__cause__`. + *********************************************************** :func:`~pytest_embedded.dut.Dut.expect_unity_test_output` *********************************************************** diff --git a/pytest-embedded/pytest_embedded/__init__.py b/pytest-embedded/pytest_embedded/__init__.py index e400978e..661ad7e3 100644 --- a/pytest-embedded/pytest_embedded/__init__.py +++ b/pytest-embedded/pytest_embedded/__init__.py @@ -3,7 +3,8 @@ from .app import App from .dut import Dut from .dut_factory import DutFactory +from .group import DutGroup, DutGroupMemberError -__all__ = ['App', 'Dut', 'DutFactory'] +__all__ = ['App', 'Dut', 'DutFactory', 'DutGroup', 'DutGroupMemberError'] __version__ = '2.7.0' diff --git a/pytest-embedded/pytest_embedded/dut.py b/pytest-embedded/pytest_embedded/dut.py index 6104b200..d0ad227d 100644 --- a/pytest-embedded/pytest_embedded/dut.py +++ b/pytest-embedded/pytest_embedded/dut.py @@ -9,6 +9,7 @@ import pexpect from .app import App +from .group import DutGroup from .log import MessageQueue, PexpectProcess from .unity import UNITY_SUMMARY_LINE_REGEX, TestSuite from .utils import Meta, _InjectMixinCls, remove_asci_color_code, to_bytes, to_list @@ -232,3 +233,7 @@ def run_all_single_board_cases( requires enable service ``idf`` """ pass + + +#: Alias for :class:`~pytest_embedded.group.DutGroup` for discoverability. +Dut.DutGroup = DutGroup diff --git a/pytest-embedded/pytest_embedded/group.py b/pytest-embedded/pytest_embedded/group.py new file mode 100644 index 00000000..16d1f186 --- /dev/null +++ b/pytest-embedded/pytest_embedded/group.py @@ -0,0 +1,260 @@ +"""Multi-DUT synchronization helpers.""" + +from __future__ import annotations + +import logging +from collections.abc import Iterator, Sequence +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any + +logger = logging.getLogger(__name__) + + +def _normalize_expect_patterns(*patterns: Any, **kwargs: Any) -> tuple[tuple[Any, ...], dict[str, Any]]: + """Match :meth:`~pytest_embedded.dut.Dut.expect` / ``expect_exact``: allow ``pattern=`` keyword.""" + out_kw = dict(kwargs) + if 'pattern' in out_kw: + if patterns: + raise TypeError('multiple values for argument pattern') + patterns = (out_kw.pop('pattern'),) + return patterns, out_kw + + +class DutGroupMemberError(Exception): + """Raised when a parallel :class:`DutGroup` operation fails on one member. + + The original exception (e.g. :exc:`pexpect.TIMEOUT`) is chained as :attr:`__cause__`. + Use :attr:`member_name`, :attr:`member_index`, and :attr:`group_name` for filtering + or reporting. + """ + + def __init__( + self, + message: str, + *, + member_index: int, + member_name: str, + group_name: str | None = None, + ) -> None: + super().__init__(message) + self.member_index = member_index + self.member_name = member_name + self.group_name = group_name + + +# --------------------------------------------------------------------------- +# DutGroup class +# --------------------------------------------------------------------------- + + +class DutGroup: + """Transparent proxy that forwards method calls to every wrapped + :class:`~pytest_embedded.dut.Dut` **in parallel**. + + Create one from any number of DUTs:: + + group = DutGroup(dut[0], dut[1]) + # or + group = DutGroup(*dut) + + Any :class:`~pytest_embedded.dut.Dut` method can be called on the group. + It runs on **every** member concurrently and returns a list of per-DUT + results:: + + group.expect_exact('[READY]', timeout=120) + group.write(ssid) + + :meth:`expect` and :meth:`expect_exact` additionally support **per-DUT + patterns** -- pass N patterns for N DUTs:: + + group.expect_exact('[AP] ready', '[CLIENT] ready', timeout=120) + + Keyword Args: + names: Optional label per DUT (same length as *duts*), used in logs and in + :exc:`DutGroupMemberError`. If omitted, defaults to ``dut-0``, ``dut-1``, ... + group_name: Optional label for this group in logs and exception messages. + """ + + def __init__( + self, + *duts: Any, + names: Sequence[str] | None = None, + group_name: str | None = None, + ) -> None: + if not duts: + raise ValueError('DutGroup requires at least one DUT') + self._duts: tuple[Any, ...] = duts + n = len(duts) + if names is not None: + if len(names) != n: + raise ValueError(f'names must have length {n} (same as number of DUTs), got {len(names)}') + self._names: tuple[str, ...] = tuple(str(x) for x in names) + else: + self._names = tuple(f'dut-{i}' for i in range(n)) + self._group_name = group_name + + # -- container protocol -------------------------------------------------- + + @property + def duts(self) -> tuple[Any, ...]: + """The underlying DUT objects (read-only).""" + return self._duts + + @property + def names(self) -> tuple[str, ...]: + """Human-readable member labels (same order as :attr:`duts`).""" + return self._names + + @property + def group_name(self) -> str | None: + """Optional label for this group, if set in the constructor.""" + return self._group_name + + def __len__(self) -> int: + return len(self._duts) + + def __getitem__(self, index: int | slice) -> Any: + return self._duts[index] + + def __iter__(self) -> Iterator[Any]: + return iter(self._duts) + + def __repr__(self) -> str: + parts = [repr(d) for d in self._duts] + if self._group_name is not None: + parts.insert(0, f'group_name={self._group_name!r}') + parts.insert(0, f'names={self._names!r}') + return f'DutGroup({", ".join(parts)})' + + def _format_member_head(self, member_index: int) -> str: + gn = self._group_name + name = self._names[member_index] + if gn: + return f'DutGroup {gn!r} member {name!r} (index {member_index})' + return f'DutGroup member {name!r} (index {member_index})' + + def _wrap_member_failure( + self, + member_index: int, + cause: BaseException, + *, + operation: str | None, + ) -> DutGroupMemberError: + head = self._format_member_head(member_index) + op = f' during {operation}' if operation else '' + logger.error('%s failed%s: %s', head, op, cause, exc_info=cause) + msg_body = f'{head} failed{op}.\n\n{cause}' + return DutGroupMemberError( + msg_body, + member_index=member_index, + member_name=self._names[member_index], + group_name=self._group_name, + ) + + def _run_parallel( + self, + callables: list[Any], + args_per_call: list[tuple], + kwargs_per_call: list[dict], + *, + operation: str | None = None, + ) -> list[Any]: + """Run *callables* concurrently, one per DUT, and return ordered results.""" + n = len(callables) + if n == 1: + try: + return [callables[0](*args_per_call[0], **kwargs_per_call[0])] + except BaseException as e: + raise self._wrap_member_failure(0, e, operation=operation) from e + + results: list[Any] = [None] * n + executor = ThreadPoolExecutor(max_workers=n) + future_to_idx = {executor.submit(callables[i], *args_per_call[i], **kwargs_per_call[i]): i for i in range(n)} + failed_early = False + try: + for fut in as_completed(future_to_idx): + try: + results[future_to_idx[fut]] = fut.result() + except BaseException as e: + failed_early = True + idx = future_to_idx[fut] + executor.shutdown(wait=False, cancel_futures=True) + raise self._wrap_member_failure(idx, e, operation=operation) from e + finally: + if not failed_early: + executor.shutdown(wait=True, cancel_futures=False) + return results + + # -- expect / expect_exact with per-DUT pattern support ------------------ + + def _expect_impl(self, method_name: str, *patterns: Any, **kwargs: Any) -> list[Any]: + """Shared implementation for :meth:`expect` and :meth:`expect_exact`. + + * 1 pattern -> broadcast to every DUT. + * N patterns -> one per DUT (positional, same order as constructor). + + Callers must pass patterns already normalized via :func:`_normalize_expect_patterns`. + """ + n = len(self._duts) + methods = [getattr(dut, method_name) for dut in self._duts] + + if len(patterns) == 1: + args_list = [(patterns[0],)] * n + elif len(patterns) == n: + args_list = [(p,) for p in patterns] + else: + raise ValueError(f'Expected 1 (broadcast) or {n} (per-DUT) patterns, got {len(patterns)}') + + kwargs_list = [dict(kwargs) for _ in range(n)] + return self._run_parallel(methods, args_list, kwargs_list, operation=method_name) + + def expect(self, *patterns: Any, **kwargs: Any) -> list[Any]: + """Parallel :meth:`~pytest_embedded.dut.Dut.expect` (regex) across all DUTs. + + Args: + *patterns: One pattern broadcast to all DUTs, **or** one per DUT + (positional, same order as constructor). Same as :class:`~pytest_embedded.dut.Dut`, + you may pass a single pattern as ``pattern=...`` instead of positionally. + **kwargs: Forwarded to each DUT's ``expect`` call (e.g. ``timeout``). + + Returns: + Per-DUT match results in the same order as DUTs. + """ + patterns, kwargs = _normalize_expect_patterns(*patterns, **kwargs) + return self._expect_impl('expect', *patterns, **kwargs) + + def expect_exact(self, *patterns: Any, **kwargs: Any) -> list[Any]: + """Parallel :meth:`~pytest_embedded.dut.Dut.expect_exact` (literal) across all DUTs. + + Args: + *patterns: One pattern broadcast to all DUTs, **or** one per DUT + (positional, same order as constructor). Same as :class:`~pytest_embedded.dut.Dut`, + you may pass a single pattern as ``pattern=...`` instead of positionally. + **kwargs: Forwarded to each DUT's ``expect_exact`` call (e.g. ``timeout``). + + Returns: + Per-DUT match results in the same order as DUTs. + """ + patterns, kwargs = _normalize_expect_patterns(*patterns, **kwargs) + return self._expect_impl('expect_exact', *patterns, **kwargs) + + # -- transparent proxy for everything else ------------------------------- + + def __getattr__(self, name: str) -> Any: + attrs = [] + for dut in self._duts: + try: + attrs.append(getattr(dut, name)) + except AttributeError: + raise AttributeError(f"'{type(dut).__name__}' object has no attribute '{name}'") from None + + if not all(callable(a) for a in attrs): + return attrs + + def _proxy(*args: Any, **kwargs: Any) -> list[Any]: + n = len(attrs) + args_list = [args] * n + kwargs_list = [dict(kwargs) for _ in range(n)] + return self._run_parallel(attrs, args_list, kwargs_list, operation=name) + + return _proxy diff --git a/pytest-embedded/tests/test_base.py b/pytest-embedded/tests/test_base.py index 9df1c51f..84b1c4f7 100644 --- a/pytest-embedded/tests/test_base.py +++ b/pytest-embedded/tests/test_base.py @@ -97,6 +97,191 @@ def test_fixture_redirect(pexpect_proc, dut, redirect): result.assert_outcomes(passed=6) +def test_multi_dut(testdir): + testdir.makepyfile(""" + import re + import threading + import time + + import pexpect + import pytest + + from pytest_embedded import Dut, DutGroup, DutGroupMemberError + + # -- container protocol -- + + @pytest.mark.parametrize('count', [3], indirect=True) + def test_multi_dut_container(dut): + group = DutGroup(*dut) + assert len(group) == 3 + assert group[0] is dut[0] + assert group[2] is dut[2] + assert group.duts == tuple(dut) + assert list(group) == list(dut) + assert group[-1] is dut[2] + # also reachable via Dut class attribute + group2 = Dut.DutGroup(dut[0], dut[1]) + assert len(group2) == 2 + + # -- expect_exact: broadcast same pattern -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_expect_exact_broadcast(dut): + group = DutGroup(*dut) + dut[0].write('[READY]') + dut[1].write('[READY]') + r = group.expect_exact('[READY]', timeout=5) + assert len(r) == 2 + + # -- expect: broadcast same regex -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_expect_regex(dut): + group = DutGroup(*dut) + dut[0].write('id=42') + dut[1].write('id=99') + r = group.expect(re.compile(br'id=(\\d+)'), timeout=5) + assert r[0].group(1) == b'42' + assert r[1].group(1) == b'99' + + # -- keyword pattern= (aligned with Dut.expect / Dut.expect_exact) -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_expect_exact_keyword_pattern(dut): + group = DutGroup(*dut) + dut[0].write('[READY]') + dut[1].write('[READY]') + r = group.expect_exact(pattern='[READY]', timeout=5) + assert len(r) == 2 + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_expect_keyword_pattern(dut): + group = DutGroup(*dut) + dut[0].write('id=42') + dut[1].write('id=99') + r = group.expect(pattern=re.compile(br'id=(\\d+)'), timeout=5) + assert r[0].group(1) == b'42' + assert r[1].group(1) == b'99' + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_expect_duplicate_pattern_raises(dut): + group = DutGroup(*dut) + with pytest.raises(TypeError, match='multiple values'): + group.expect('a', pattern=re.compile(br'x')) + + # -- write: broadcast and verify -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_write_broadcast(dut): + group = DutGroup(*dut) + group.write('hello all') + r = group.expect_exact('hello all', timeout=5) + assert len(r) == 2 + + # -- proxy forwards arbitrary Dut methods -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_proxy_method(dut): + group = DutGroup(*dut) + group.write('ping') + r = group.expect_exact('ping', timeout=5) + assert len(r) == 2 + assert all(v == b'ping' for v in r) + + # -- proxy returns list of non-callable attributes -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_proxy_attribute(dut): + group = DutGroup(*dut) + procs = group.pexpect_proc + assert isinstance(procs, list) + assert len(procs) == 2 + + # -- per-DUT operations via indexing -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_per_dut_indexing(dut): + group = DutGroup(*dut) + group[0].write('[AP] ready') + group[1].write('[CLIENT] ready') + group[0].expect_exact('[AP] ready', timeout=5) + group[1].expect_exact('[CLIENT] ready', timeout=5) + + # -- timeout -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_timeout(dut): + group = DutGroup(*dut) + dut[0].write('only one') + with pytest.raises(DutGroupMemberError) as ei: + group.expect_exact('never appears', timeout=1) + assert isinstance(ei.value.__cause__, pexpect.TIMEOUT) + assert ei.value.member_name in ('dut-0', 'dut-1') + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_timeout_named(dut): + group = DutGroup(*dut, names=('ap', 'client'), group_name='wifi') + dut[0].write('only one') + with pytest.raises(DutGroupMemberError) as ei: + group.expect_exact('never appears', timeout=1) + assert ei.value.group_name == 'wifi' + assert ei.value.member_name in ('ap', 'client') + assert isinstance(ei.value.__cause__, pexpect.TIMEOUT) + assert 'wifi' in str(ei.value) + + # -- parallel execution: wall time bounded -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_parallel(dut): + group = DutGroup(*dut) + def late(): + time.sleep(0.4) + dut[0].write('marker') + dut[1].write('marker') + threading.Thread(target=late, daemon=True).start() + t0 = time.monotonic() + group.expect_exact('marker', timeout=3) + assert time.monotonic() - t0 < 2.0 + + # -- expect_exact: per-DUT patterns -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_expect_exact_per_dut(dut): + group = DutGroup(*dut) + dut[0].write('[AP] ready') + dut[1].write('[CLIENT] ready') + r = group.expect_exact('[AP] ready', '[CLIENT] ready', timeout=5) + assert r[0] == b'[AP] ready' + assert r[1] == b'[CLIENT] ready' + + # -- expect: per-DUT regex patterns -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_expect_per_dut_regex(dut): + group = DutGroup(*dut) + dut[0].write('alpha=hello') + dut[1].write('beta=world') + r = group.expect(r'alpha=(\\w+)', r'beta=(\\w+)', timeout=5) + assert r[0].group(1) == b'hello' + assert r[1].group(1) == b'world' + + # -- multi-phase synchronization -- + + @pytest.mark.parametrize('count', [2], indirect=True) + def test_multi_dut_multi_phase(dut): + group = DutGroup(*dut) + group.write('phase1') + group.expect_exact('phase1', timeout=5) + group.write('phase2') + group.expect_exact('phase2', timeout=5) + group.write('done') + group.expect_exact('done', timeout=5) + """) + + result = testdir.runpytest('-s') + result.assert_outcomes(passed=16) + + def test_multi_count_fixtures(testdir): testdir.makepyfile(""" import pytest