Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ $ pip install -I client_test_lib*.whl
- Windows: `set CLOUD_API_KEY=<access_key_here>`
- Default API address is `https://api.us-east-1.mbedcloud.com`. You can change this by defining `CLOUD_API_GW` environment variable in similar way as `CLOUD_API_KEY` is done above.
- Test run will create temporary API key for the WebSocket callback channel by default. If you want to prevent that and use only the exported API key, add `--use_one_apikey` startup argument.
- Tests use [Mbed LS](https://github.com/ARMmbed/mbed-os-tools/tree/master/packages/mbed-ls) to select the board from the serial port.
- Tests use [pyOCD](https://pyocd.io/) for device discovery, with automatic fallback to [Mbed LS](https://github.com/ARMmbed/mbed-os-tools/tree/master/packages/mbed-ls) if pyOCD is not available.
- If you have only one board connected to the serial port, you don't need to select the device for the tests.
- If there are multiple boards connected to the serial port, run `mbedls` to check the target board's ID, and use it in the test run's argument `--target_id=[id]`.
- If there are multiple boards connected to the serial port, you can use either:
- `pyocd list` to check pyOCD-discovered boards and use the board ID with `--target_id=[id]`
- `mbedls` to check mbed-discovered boards and use the target ID with `--target_id=[id]`

```bash
# Using pyOCD (preferred)
$ pyocd list
[INFO] Available debug probes:
0: 0240000032044e4500257009997b00386781000097969900 (ST-Link V2-1)

# Using mbed-ls (fallback)
$ mbedls
+---------------+----------------------+-------------+--------------+--------------------------------------------------+-----------------+
| platform_name | platform_name_unique | mount_point | serial_port | target_id | daplink_version |
Expand Down
14 changes: 11 additions & 3 deletions client_test_lib/fixtures/client_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from client_test_lib.tools.external_conn import ExternalConnection
from client_test_lib.tools.local_conn import LocalConnection
from client_test_lib.tools.serial_conn import SerialConnection
from client_test_lib.tools.utils import get_serial_port_for_mbed
from client_test_lib.tools.utils import (
get_serial_port_for_mbed,
get_serial_port_for_pyocd,
)

log = logging.getLogger(__name__)

Expand All @@ -40,23 +43,28 @@ def client_internal(request):
log.info("Using local binary process")
conn = LocalConnection(request.config.getoption("local_binary"))
else:
address = get_serial_port_for_mbed(
# Try pyocd first, fall back to mbed-ls if needed
address = get_serial_port_for_pyocd(
request.config.getoption("target_id")
)
log.info("Serial connection address: {}".format(address))
if address:
conn = SerialConnection(address, 115200)
log.info("Serial connection opened successfully")
else:
err_msg = "No serial connection to open for test device"
log.error(err_msg)
assert False, err_msg

cli = Client(conn)
cli = Client(conn, trace=True)

# reset the serial connection device
if not request.config.getoption(
"ext_conn"
) and not request.config.getoption("local_binary"):
log.info("Resetting device before test...")
cli.reset()
sleep(2) # Give device time to reset and stabilize

cli.wait_for_output("Client registered", 300)
ep_id = cli.endpoint_id(120)
Expand Down
127 changes: 114 additions & 13 deletions client_test_lib/tools/client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ class Client:
:param dut: Running client object
:param trace: Log the raw client output
:param name: Logging name for the client
:param filter_debug: Filter out debug messages to reduce log noise (default: True)
"""

def __init__(self, dut, trace=False, name="0"):
def __init__(self, dut, trace=False, name="0", filter_debug=True):
self._ep_id = None
self.name = name
self.trace = trace
self.filter_debug = filter_debug
self.run = True
self.iq = queue.Queue()
self.dut = dut
Expand All @@ -64,21 +66,120 @@ def _input_thread(self):
while self.run:
line = self.dut.readline()
if line:
plain_line = utils.strip_escape(line)
if b"\r" in line and line.count(b"\r") > 1:
plain_line = plain_line.split(b"\r")[-2]
plain_line = plain_line.replace(b"\t", b" ").decode(
"utf-8", "replace"
)
flog.info("<--|D{}| {}".format(self.name, plain_line.strip()))
if self.trace:
log.debug("Raw output: {}".format(line))
if b"Error" in line:
log.error("Output: {}".format(line))
self.iq.put(plain_line)
plain_line = self._parse_serial_line(line)
if plain_line: # Only process non-empty lines
flog.info("<--|D{}| {}".format(self.name, plain_line.strip()))
if self.trace:
log.debug("Raw output: {}".format(line))
self._check_for_errors(line, plain_line)
self.iq.put(plain_line)
else:
pass

def _parse_serial_line(self, line):
"""
Parse serial line to extract clean content
:param line: Raw serial line bytes
:return: Cleaned string or None if line should be ignored
"""
if not line or line == b'':
return None

# Strip escape sequences first
plain_line = utils.strip_escape(line)

# Handle multiple carriage returns and newlines
# Split on \r\n or \r\r\n patterns and take the last meaningful part
if b"\r" in plain_line:
# Split on carriage returns and filter out empty parts
parts = plain_line.split(b"\r")
# Find the last non-empty part that contains actual content
for part in reversed(parts):
if part.strip() and not part.startswith(b"\n"):
plain_line = part
break

# Remove leading/trailing newlines and whitespace
plain_line = plain_line.strip(b"\n\r\t ")

# Skip empty lines
if not plain_line:
return None

# Convert tabs to spaces and decode to string
plain_line = plain_line.replace(b"\t", b" ").decode("utf-8", "replace")

# Skip lines that are just whitespace or control characters
if not plain_line.strip():
return None

# Filter debug output if enabled
if self.filter_debug and self._is_debug_line(plain_line):
return None

return plain_line

def _is_debug_line(self, line):
"""
Check if a line is debug output that should be filtered
:param line: Parsed line string
:return: True if line should be filtered out
"""
line_lower = line.lower().strip()

# log.info("Checking if line is debug: {}".format(line_lower))
# Common debug patterns to filter
debug_patterns = [
"[trace][paal]",
"debug: ",
]

for pattern in debug_patterns:
if pattern in line_lower:
return True

return False

def _check_for_errors(self, raw_line, parsed_line):
"""
Check for error conditions in the serial output
:param raw_line: Raw bytes from serial
:param parsed_line: Parsed string line
"""
# Check for various error patterns (case insensitive)
error_patterns = [
b"Error ",
b"ERROR:",
b"error:",
b"Error:",
b"FAIL",
b"fail",
b"Exception",
b"exception",
b"Fatal",
b"fatal",
b"Critical",
b"critical"
]

# Check raw line for error patterns
for pattern in error_patterns:
if pattern in raw_line:
log.error("Output: {}".format(raw_line))
return

# Also check parsed line for error keywords
parsed_lower = parsed_line.lower()
error_keywords = [
"error", "fail", "exception", "fatal", "critical",
"timeout", "abort", "crash", "panic"
]

for keyword in error_keywords:
if keyword in parsed_lower:
log.error("Output: {}".format(raw_line))
return

def _read_line(self, timeout):
"""
Read data from input queue
Expand Down
110 changes: 110 additions & 0 deletions client_test_lib/tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import re
import string
import mbed_lstools
import serial.tools.list_ports

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -157,6 +158,115 @@ def get_serial_port_for_mbed(target_id):
return None


def get_serial_port_for_pyocd(target_id):
"""
Gets serial port address for the device using pyocd for device discovery
Falls back to mbed-ls if pyocd is not available or fails
:param target_id: device target_id (can be pyocd board ID or mbed target_id)
:return: Serial port address
"""
try:
from pyocd.core.helpers import ConnectHelper

log.debug("Attempting to discover devices using pyocd")

# Try to create a session with pyocd
session = ConnectHelper.session_with_chosen_probe()

if session is None:
log.warning("No devices found with pyocd, falling back to mbed-ls")
return get_serial_port_for_mbed(target_id)

# Get the probe from the session
probe = session.probe
if probe:
# Map pyocd probe to serial port
serial_port = _map_pyocd_probe_to_serial_port(probe)
if serial_port:
log.info(
'Using pyocd-discovered device "{}" at "{}" port for tests'.format(
getattr(probe, 'unique_id', 'Unknown'),
serial_port
)
)
session.close()
return serial_port
else:
log.warning("Could not map pyocd probe to serial port, falling back to mbed-ls")
session.close()
return get_serial_port_for_mbed(target_id)
else:
log.warning("No probe found in pyocd session, falling back to mbed-ls")
session.close()
return get_serial_port_for_mbed(target_id)

except ImportError:
log.debug("pyocd not available, falling back to mbed-ls")
return get_serial_port_for_mbed(target_id)
except Exception as e:
log.warning("pyocd device discovery failed: {}, falling back to mbed-ls".format(e))
return get_serial_port_for_mbed(target_id)


def _map_pyocd_probe_to_serial_port(probe):
"""
Maps a pyocd probe to its corresponding serial port
:param probe: pyocd probe object
:return: Serial port path or None if not found
"""
try:
# Get all available serial ports
ports = serial.tools.list_ports.comports()

# Try to match based on USB VID/PID if available
if hasattr(probe, 'vid') and hasattr(probe, 'pid'):
target_vid = probe.vid
target_pid = probe.pid

for port in ports:
if port.vid == target_vid and port.pid == target_pid:
log.debug("Matched pyocd probe to serial port {} by VID/PID".format(port.device))
return port.device

# Prioritize USB serial ports over system serial ports
# Common patterns for ARM development boards (in order of preference)
arm_patterns = [
'ttyACM', # Linux USB CDC-ACM (most common for ARM boards)
'ttyUSB', # Linux USB serial
'cu.usbmodem', # macOS USB
'COM', # Windows
]

# First pass: look for USB serial ports
for port in ports:
port_name = port.device.lower()
for pattern in arm_patterns:
if pattern in port_name:
log.debug("Matched pyocd probe to USB serial port {} by name pattern".format(port.device))
return port.device

# Second pass: exclude system serial ports and use first available USB port
usb_ports = []
for port in ports:
port_name = port.device.lower()
# Skip system serial ports (ttyS*) and virtual ports
if not any(skip in port_name for skip in ['ttys', 'pts', 'ttyprintk']):
usb_ports.append(port)

if usb_ports:
log.debug("Using first available USB serial port {} for pyocd probe".format(usb_ports[0].device))
return usb_ports[0].device

# Last resort: return None to fall back to mbed-ls
log.debug("No suitable USB serial port found for pyocd probe")
return None

except Exception as e:
log.debug("Error mapping pyocd probe to serial port: {}".format(e))

return None


def get_path(path):
if "WORKSPACE" in os.environ:
log.debug("$WORKSPACE: {}".format(os.environ["WORKSPACE"]))
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
manifest-tool==2.6.2
mbed-ls==1.8.*
pyocd>=0.35.0
pytest==7.4.4
pytest-html
pyserial
Expand Down
Loading