diff --git a/examples/SimpleDevice/simple_device.cpp b/examples/SimpleDevice/simple_device.cpp index 37bc0ae7..6f39a647 100644 --- a/examples/SimpleDevice/simple_device.cpp +++ b/examples/SimpleDevice/simple_device.cpp @@ -80,8 +80,8 @@ void printPacket(const cbPKT_GENERIC& pkt) { /// Map device type string to DeviceType enum DeviceType parseDeviceType(const std::string& type_str) { - if (type_str == "NSP") return DeviceType::LEGACY_NSP; - if (type_str == "GEMINI_NSP") return DeviceType::NSP; + if (type_str == "LEGACY_NSP") return DeviceType::LEGACY_NSP; + if (type_str == "NSP") return DeviceType::NSP; if (type_str == "HUB1") return DeviceType::HUB1; if (type_str == "HUB2") return DeviceType::HUB2; if (type_str == "HUB3") return DeviceType::HUB3; diff --git a/pycbsdk/src/pycbsdk/cli/clock_check.py b/pycbsdk/src/pycbsdk/cli/clock_check.py new file mode 100644 index 00000000..a9f12021 --- /dev/null +++ b/pycbsdk/src/pycbsdk/cli/clock_check.py @@ -0,0 +1,258 @@ +"""Validate device-to-host clock conversion. + +Receives live packets via callbacks, converts each ``header.time`` to +``time.monotonic()`` via ``session.device_to_monotonic()``, and compares +against the actual ``time.monotonic()`` at arrival. The difference is the +one-way delivery latency — it should be small and stable when clock sync +is working correctly. + +Reports the detected protocol version. Protocol 4.0+ uses native PTP +nanosecond timestamps; protocol 3.11 uses 30 kHz sample counters that the +protocol wrapper upconverts to nanoseconds. Clock sync should work in both +cases since the probe echo uses the same time base as data packets. + +By default listens for event packets (spikes). Use ``--group`` to listen +on a continuous sample group instead (useful when no spike channels are +configured). + +Usage:: + + python -m pycbsdk.cli.clock_check HUB1 + python -m pycbsdk.cli.clock_check HUB1 --interval 2.0 + python -m pycbsdk.cli.clock_check HUB1 --group 6 +""" + +from __future__ import annotations + +import argparse +import sys +import time +import threading +from collections import deque + +from pycbsdk import DeviceType, SampleRate, Session +from pycbsdk.session import ProtocolVersion + + +class ClockChecker: + """Accumulates packet arrival-vs-converted timestamps.""" + + def __init__(self, session: Session, window_sec: float): + self._session = session + self._window = window_sec + self._lock = threading.Lock() + # (arrival_mono, converted_mono, device_ns) + self._samples: deque[tuple[float, float, int]] = deque() + self._total = 0 + + def on_packet(self, header, data) -> None: + arrival = time.monotonic() + try: + converted = self._session.device_to_monotonic(header.time) + except Exception: + return + with self._lock: + self._samples.append((arrival, converted, header.time)) + self._total += 1 + + def snapshot(self) -> dict | None: + cutoff = time.monotonic() - self._window + with self._lock: + while self._samples and self._samples[0][0] < cutoff: + self._samples.popleft() + samples = list(self._samples) + total = self._total + if not samples: + return None + + # latency = arrival - converted (positive means packet arrived + # after the converted timestamp, i.e. normal delivery delay) + latencies = [(a - c) * 1000 for a, c, _ in samples] + return { + "n_window": len(samples), + "n_total": total, + "min_ms": min(latencies), + "max_ms": max(latencies), + "mean_ms": sum(latencies) / len(latencies), + "last_latency_ms": latencies[-1], + "last_device_ns": samples[-1][2], + "last_arrival": samples[-1][0], + } + + +def clock_check( + device_type: DeviceType, + interval: float = 1.0, + group: int | None = None, + timeout: float = 10.0, +) -> None: + """Connect to a device and print clock-conversion diagnostics. + + Args: + device_type: Device to connect to. + interval: Seconds between reports. + group: If set, listen on this sample group instead of events. + timeout: Connection timeout in seconds. + """ + with Session(device_type=device_type) as session: + deadline = time.monotonic() + timeout + while not session.running: + if time.monotonic() > deadline: + raise TimeoutError( + f"Session for {device_type.name} did not start within {timeout}s" + ) + time.sleep(0.1) + + proto = session.protocol_version + ts_kind = ("30 kHz ticks (upconverted to ns)" + if proto == ProtocolVersion.V3_11 + else "PTP nanoseconds") + print(f"Connected to {device_type.name} protocol: {proto.name} " + f"timestamps: {ts_kind}") + + if proto == ProtocolVersion.UNKNOWN: + print("WARNING: Protocol version unknown — results may be unreliable.", + file=sys.stderr) + + # Wait for at least one clock sync + print("Waiting for clock sync ...") + while session.clock_offset_ns is None: + if time.monotonic() > deadline: + raise TimeoutError("Clock sync did not arrive within timeout") + session.send_clock_probe() + time.sleep(0.25) + + checker = ClockChecker(session, window_sec=interval) + + if group is not None: + rate = SampleRate(group) + source = f"group {rate.name}" + + @session.on_group(rate) + def _on_group(header, data): + checker.on_packet(header, data) + else: + source = "events (all channel types)" + + @session.on_event(None) + def _on_event(header, data): + checker.on_packet(header, data) + + print(f"Listening on {source} ...") + + # Wait for at least one packet to arrive so we can sanity-check + # that data-packet timestamps and clock-probe timestamps share + # the same time base. + pkt_deadline = time.monotonic() + timeout + while True: + snap = checker.snapshot() + if snap is not None: + break + if time.monotonic() > pkt_deadline: + raise TimeoutError( + f"No packets received within {timeout}s — try --group" + ) + time.sleep(0.1) + + first_latency_ms = snap["mean_ms"] + offset_ns = session.clock_offset_ns + offset_s = offset_ns / 1e9 if offset_ns is not None else float("nan") + print( + f"\nClock sanity check (first packets):\n" + f" Clock offset: {offset_s:+.6f} s " + f"(device_ns - host_steady_ns)\n" + f" First data pkt: {snap['last_device_ns']} ns " + f"({snap['last_device_ns'] / 1e9:.6f} s)\n" + f" Delivery latency: {first_latency_ms:+.3f} ms " + f"(arrival_mono - device_to_monotonic(pkt.time))" + ) + if abs(first_latency_ms) > 1000: + print( + f"\n WARNING: Latency magnitude > 1 s — data packet " + f"timestamps and clock probe\n responses appear to use " + f"different time bases. device_to_monotonic() results\n" + f" will not be meaningful for this device.", + file=sys.stderr, + ) + + print(f"\nReporting every {interval}s ...\n") + + hdr = (f"{'Pkts':>6s} {'Offset (ms)':>14s} {'Uncert (ms)':>11s} " + f"{'Latency min':>11s} {'mean':>8s} {'max':>8s} {'last':>8s}") + print(hdr) + print("─" * len(hdr)) + + all_means: list[float] = [] + try: + while True: + time.sleep(interval) + snap = checker.snapshot() + + offset_ns = session.clock_offset_ns + uncert_ns = session.clock_uncertainty_ns + offset_ms = offset_ns / 1e6 if offset_ns is not None else float("nan") + uncert_ms = uncert_ns / 1e6 if uncert_ns is not None else float("nan") + + if snap is None: + print(f"{'0':>6s} {offset_ms:>14.3f} {uncert_ms:>11.3f}" + f" (no packets)") + continue + + all_means.append(snap["mean_ms"]) + print( + f"{snap['n_window']:>6d} {offset_ms:>14.3f} {uncert_ms:>11.3f} " + f"{snap['min_ms']:>+11.3f} {snap['mean_ms']:>+8.3f} " + f"{snap['max_ms']:>+8.3f} {snap['last_latency_ms']:>+8.3f}" + ) + + except KeyboardInterrupt: + print() + + if all_means: + print(f"\nReports: {len(all_means)} " + f"Mean latency min: {min(all_means):+.3f} ms " + f"max: {max(all_means):+.3f} ms " + f"overall: {sum(all_means) / len(all_means):+.3f} ms") + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + prog="python -m pycbsdk.cli.clock_check", + description="Validate device-to-host clock conversion.", + ) + parser.add_argument( + "device", nargs="?", default="NPLAY", + help="Device type (default: NPLAY)", + ) + parser.add_argument( + "--interval", "-i", type=float, default=1.0, + help="Seconds between reports (default: 1.0)", + ) + parser.add_argument( + "--group", "-g", type=int, default=None, + help="Listen on sample group N instead of events. " + f"Choices: {', '.join(f'{r.value}={r.name}' for r in SampleRate if r.value > 0)}", + ) + parser.add_argument( + "--timeout", type=float, default=10.0, + help="Connection timeout in seconds (default: 10)", + ) + args = parser.parse_args(argv) + + try: + device_type = DeviceType[args.device.upper()] + except KeyError: + names = ", ".join(dt.name for dt in DeviceType if dt != DeviceType.CUSTOM) + parser.error(f"Unknown device: {args.device}. Choices: {names}") + return 1 + + try: + clock_check(device_type, args.interval, group=args.group, timeout=args.timeout) + return 0 + except Exception as e: + print(f"ERROR: {e}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/pycbsdk/src/pycbsdk/cli/load_ccf.py b/pycbsdk/src/pycbsdk/cli/load_ccf.py new file mode 100644 index 00000000..4ba6d5fd --- /dev/null +++ b/pycbsdk/src/pycbsdk/cli/load_ccf.py @@ -0,0 +1,85 @@ +"""Load a CCF file to configure a Cerebus device. + +Reads a CCF (XML) configuration file and sends its contents to the device. + +Usage:: + + python -m pycbsdk.cli.load_ccf my_config.ccf + python -m pycbsdk.cli.load_ccf my_config.ccf --device NPLAY + python -m pycbsdk.cli.load_ccf my_config.ccf --timeout 15 +""" + +from __future__ import annotations + +import argparse +import sys +import time + +from pycbsdk import DeviceType, Session +from pycbsdk.session import _coerce_enum + + +def load_ccf( + filename: str, + device_type: DeviceType, + timeout: float = 10.0, +) -> None: + """Connect to a device and apply a CCF configuration file. + + Args: + filename: Path to the CCF file to load. + device_type: Device to connect to. + timeout: Connection timeout in seconds. + """ + with Session(device_type=device_type) as session: + deadline = time.monotonic() + timeout + while not session.running: + if time.monotonic() > deadline: + raise TimeoutError( + f"Session for {device_type.name} did not start within {timeout}s" + ) + time.sleep(0.1) + # Let initial config settle + time.sleep(0.5) + + print(f"Loading CCF {filename!r} onto {device_type.name} ...") + session.load_ccf(filename) + print("CCF loaded successfully.") + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + prog="python -m pycbsdk.cli.load_ccf", + description="Load a CCF file to configure a Cerebus device.", + ) + parser.add_argument( + "filename", + help="Path to the CCF file to load.", + ) + parser.add_argument( + "--device", default="NPLAY", + help="Device type (default: NPLAY). " + f"Choices: {', '.join(dt.name for dt in DeviceType)}", + ) + parser.add_argument( + "--timeout", type=float, default=10.0, + help="Connection timeout in seconds (default: 10).", + ) + args = parser.parse_args(argv) + + try: + device_type = _coerce_enum(DeviceType, args.device) + except (ValueError, TypeError) as e: + parser.error(str(e)) + return 1 + + try: + load_ccf(args.filename, device_type, timeout=args.timeout) + return 0 + except Exception as e: + print(f"ERROR: {e}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/pycbsdk/src/pycbsdk/session.py b/pycbsdk/src/pycbsdk/session.py index 56aba0c6..587c196e 100644 --- a/pycbsdk/src/pycbsdk/session.py +++ b/pycbsdk/src/pycbsdk/session.py @@ -1181,30 +1181,30 @@ def set_channel_spike_sorting( def _calibrate_monotonic_offset() -> int: """Compute offset between time.monotonic() and C++ steady_clock. - On Linux, macOS, and Windows with Python 3.12+, both clocks use the - same underlying source (CLOCK_MONOTONIC / mach_absolute_time / - QueryPerformanceCounter) so the offset is exactly 0. + On modern macOS, libc++ steady_clock uses mach_continuous_time() + (includes sleep) while Python time.monotonic() uses + mach_absolute_time() (excludes sleep), so the clocks can diverge + by the total accumulated sleep time. On older Windows Python (<3.12), time.monotonic() may use - GetTickCount64 while steady_clock uses QueryPerformanceCounter, - so we measure the offset empirically. + GetTickCount64 while libc++ steady_clock uses QueryPerformanceCounter. + + Always measure empirically. Returns: steady_clock_ns - monotonic_ns (int). """ - import sys - import platform - - if platform.system() != "Windows" or sys.version_info >= (3, 12): - return 0 - - # Windows < 3.12: clocks may differ, measure empirically _lib = _get_lib() t1 = _time.monotonic() steady_ns = _lib.cbsdk_get_steady_clock_ns() t2 = _time.monotonic() mono_ns = int((t1 + t2) / 2 * 1_000_000_000) - return steady_ns - mono_ns + offset = steady_ns - mono_ns + # If the offset is small enough to be measurement noise, treat + # the clocks as identical (common case on Linux and Windows 3.12+). + if abs(offset) < 1_000_000: # < 1 ms + return 0 + return offset @property def clock_offset_ns(self) -> Optional[int]: diff --git a/src/cbdev/src/clock_sync.cpp b/src/cbdev/src/clock_sync.cpp index b5d9336c..7775fbf8 100644 --- a/src/cbdev/src/clock_sync.cpp +++ b/src/cbdev/src/clock_sync.cpp @@ -59,6 +59,13 @@ void ClockSync::addProbeSample(time_point t1_local, uint64_t t3_device_ns, time_ recomputeEstimate(); } +void ClockSync::reset() { + std::lock_guard lock(m_mutex); + m_probe_samples.clear(); + m_current_offset_ns = std::nullopt; + m_current_uncertainty_ns = std::nullopt; +} + std::optional ClockSync::toLocalTime(uint64_t device_time_ns) const { std::lock_guard lock(m_mutex); if (!m_current_offset_ns) diff --git a/src/cbdev/src/clock_sync.h b/src/cbdev/src/clock_sync.h index 4752b0a1..6f61134e 100644 --- a/src/cbdev/src/clock_sync.h +++ b/src/cbdev/src/clock_sync.h @@ -53,6 +53,9 @@ class ClockSync { /// Convert host steady_clock time_point to device timestamp (nanoseconds). [[nodiscard]] std::optional toDeviceTime(time_point local_time) const; + /// Discard all probe samples and reset the offset estimate. + void reset(); + /// Returns true if at least one probe sample has been ingested. [[nodiscard]] bool hasSyncData() const; diff --git a/src/cbdev/src/device_session.cpp b/src/cbdev/src/device_session.cpp index e25ff1a2..7a3f2062 100644 --- a/src/cbdev/src/device_session.cpp +++ b/src/cbdev/src/device_session.cpp @@ -232,9 +232,11 @@ struct DeviceSession::Impl { cbproto::DeviceConfig device_config{}; // Timestamp conversion for non-Gemini devices - // Gemini devices send timestamps in nanoseconds; non-Gemini (e.g. NPlay) send sample counts. + // Gemini devices, our primary use case, send timestamps in nanoseconds; default to true. + // Non-Gemini (i.e. NPlay and Legacy NSP) send sample counts. // Determined from PROCREP ident field: if ident contains "gemini", timestamps are nanoseconds. - bool timestamps_are_nanoseconds = true; // Default true until PROCREP says otherwise + // PROCREP may be received several seconds after device start. + bool timestamps_are_nanoseconds = true; uint64_t ts_convert_num = 1; // Numerator of reduced (1e9/sysfreq) fraction uint64_t ts_convert_den = 1; // Denominator of reduced (1e9/sysfreq) fraction @@ -345,6 +347,12 @@ Result DeviceSession::create(const ConnectionParams& config) { session.m_impl = std::make_unique(); session.m_impl->config = config; + // LEGACY_NSP is known to use sample-count timestamps (never Gemini). + // For other types, we default to true and let PROCREP confirm. + if (config.type == DeviceType::LEGACY_NSP) { + session.m_impl->timestamps_are_nanoseconds = false; + } + // Auto-detect client address if not specified if (session.m_impl->config.client_address.empty()) { session.m_impl->config.client_address = detectClientAddress(session.m_impl->config.type); @@ -546,7 +554,7 @@ Result DeviceSession::receivePackets(void* buffer, const size_t buffer_size // Convert timestamps from sample counts to nanoseconds for non-Gemini devices. // The flag is set when PROCREP is processed in updateConfigFromBuffer above. - if (!m_impl->timestamps_are_nanoseconds && m_impl->ts_convert_den > 0) { + if (!m_impl->timestamps_are_nanoseconds && m_impl->ts_convert_den > 1) { auto* bytes_ptr = static_cast(buffer); const size_t total_bytes = result.value(); size_t offset = 0; @@ -1361,6 +1369,7 @@ void DeviceSession::updateConfigFromBuffer(const void* buffer, const size_t byte m_impl->timestamps_are_nanoseconds = is_gemini; if (!is_gemini) { + m_impl->clock_sync.reset(); uint32_t sysfreq = m_impl->device_config.sysinfo.sysfreq; if (sysfreq > 0) { uint64_t g = std::gcd(uint64_t(1000000000), uint64_t(sysfreq)); @@ -1479,15 +1488,22 @@ void DeviceSession::updateConfigFromBuffer(const void* buffer, const size_t byte const auto* nplay = reinterpret_cast(buff_bytes + offset); std::lock_guard lock(m_impl->clock_probe_mutex); if (m_impl->pending_clock_probe.active) { - // New firmware (>=7.8 with clock sync mod) writes fresh + // New firmware (>=7.9? with clock sync mod) writes fresh // clock_gettime(ptp_clkid) into .etime — zero staleness. // Old firmware echoes the packet unchanged, so .etime stays 0 // (we zero-initialize it). Fall back to header->time which is // the stale ptptime from the previous main loop iteration. constexpr uint64_t STALENESS_CORRECTION_NS = 165000; - const uint64_t device_time_ns = (nplay->etime != 0) - ? nplay->etime - : header->time + STALENESS_CORRECTION_NS; + uint64_t device_time_ns; + if (nplay->etime != 0) { + device_time_ns = nplay->etime; + } else { + device_time_ns = header->time; + if (!m_impl->timestamps_are_nanoseconds && m_impl->ts_convert_den > 1) { + device_time_ns = device_time_ns * m_impl->ts_convert_num / m_impl->ts_convert_den; + } + device_time_ns += STALENESS_CORRECTION_NS; + } m_impl->clock_sync.addProbeSample( m_impl->pending_clock_probe.t1_local,