diff --git a/examples/throughput_server/README.md b/examples/throughput_server/README.md new file mode 100644 index 0000000000..35a8e06b69 --- /dev/null +++ b/examples/throughput_server/README.md @@ -0,0 +1,179 @@ +# COSMOS Throughput Testing Server + +A standalone Python TCP/IP server for measuring COSMOS command and telemetry throughput. + +## Overview + +This server provides a lightweight test environment for measuring COSMOS throughput performance. It implements: + +- Dual-port TCP server (7778 for INST, 7780 for INST2) +- CCSDS command packet parsing +- CCSDS telemetry packet generation +- LengthProtocol framing (compatible with COSMOS interfaces) +- Configurable telemetry streaming rates +- Real-time throughput metrics + +## Architecture + +``` +COSMOS Container Throughput Server (Python) ++-------------------+ +-------------------+ +| INST (Ruby) |----TCP/7778---->| Port 7778 | +| TcpipClient | | CCSDS Commands | +| LengthProtocol |<---TCP/7778----.| CCSDS Telemetry | ++-------------------+ +-------------------+ +| INST2 (Python) |----TCP/7780---->| Port 7780 | +| TcpipClient | | CCSDS Commands | +| LengthProtocol |<---TCP/7780----.| CCSDS Telemetry | ++-------------------+ +-------------------+ +``` + +## Requirements + +- Python 3.10 or higher +- No external dependencies (uses only standard library) + +## Usage + +### Starting the Server + +```bash +# Default ports (7778 for INST, 7780 for INST2) +python throughput_server.py + +# Custom ports +python throughput_server.py --inst-port 8778 --inst2-port 8780 + +# Debug logging +python throughput_server.py --debug +``` + +### Command Line Options + +| Option | Default | Description | +|--------|---------|-------------| +| `--inst-port` | 7778 | Port for INST (Ruby) target | +| `--inst2-port` | 7780 | Port for INST2 (Python) target | +| `--debug` | False | Enable debug logging | + +## Supported Commands + +The server responds to the following CCSDS commands: + +| Command | PKTID | Description | +|---------|-------|-------------| +| START_STREAM | 200 | Start telemetry streaming at specified rate | +| STOP_STREAM | 201 | Stop telemetry streaming | +| GET_STATS | 202 | Request THROUGHPUT_STATUS telemetry packet | +| RESET_STATS | 203 | Reset all throughput statistics | + +### START_STREAM Payload + +| Field | Type | Description | +|-------|------|-------------| +| RATE | 32-bit UINT | Packets per second (1-100000) | +| PACKET_TYPES | 32-bit UINT | Bitmask of packet types (default: 0x01) | + +## Telemetry Packets + +### THROUGHPUT_STATUS (APID 100) + +| Field | Type | Description | +|-------|------|-------------| +| CMD_RECV_COUNT | 32-bit UINT | Total commands received | +| CMD_RECV_RATE | 32-bit FLOAT | Commands per second | +| TLM_SENT_COUNT | 32-bit UINT | Total telemetry packets sent | +| TLM_SENT_RATE | 32-bit FLOAT | Telemetry packets per second | +| TLM_TARGET_RATE | 32-bit UINT | Configured streaming rate | +| BYTES_RECV | 64-bit UINT | Total bytes received | +| BYTES_SENT | 64-bit UINT | Total bytes sent | +| UPTIME_SEC | 32-bit UINT | Server uptime in seconds | + +## CCSDS Packet Format + +### Command Header (8 bytes) + +``` +Bits 0-2: CCSDSVER (3 bits) = 0 +Bit 3: CCSDSTYPE (1 bit) = 1 (command) +Bit 4: CCSDSSHF (1 bit) = 0 +Bits 5-15: CCSDSAPID (11 bits) +Bits 16-17: CCSDSSEQFLAGS (2 bits) = 3 +Bits 18-31: CCSDSSEQCNT (14 bits) +Bits 32-47: CCSDSLENGTH (16 bits) = packet_length - 7 +Bits 48-63: PKTID (16 bits) +``` + +### Telemetry Header (14 bytes) + +``` +Bits 0-2: CCSDSVER (3 bits) = 0 +Bit 3: CCSDSTYPE (1 bit) = 0 (telemetry) +Bit 4: CCSDSSHF (1 bit) = 1 +Bits 5-15: CCSDSAPID (11 bits) +Bits 16-17: CCSDSSEQFLAGS (2 bits) = 3 +Bits 18-31: CCSDSSEQCNT (14 bits) +Bits 32-47: CCSDSLENGTH (16 bits) +Bits 48-79: TIMESEC (32 bits) +Bits 80-111: TIMEUS (32 bits) +Bits 112-127: PKTID (16 bits) +``` + +## COSMOS Integration + +### Installing the Modified DEMO Plugin + +To use the throughput server with COSMOS, install the modified DEMO plugin with throughput server enabled: + +```bash +./openc3.sh cli load openc3-cosmos-demo-*.gem \ + use_throughput_server=true \ + throughput_server_host=host.docker.internal +``` + +### Plugin Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `use_throughput_server` | false | Enable throughput server mode | +| `throughput_server_host` | host.docker.internal | Hostname of throughput server | +| `inst_throughput_port` | 7778 | Port for INST connection | +| `inst2_throughput_port` | 7780 | Port for INST2 connection | + +### Running Throughput Tests + +1. Start the throughput server: + ```bash + python throughput_server.py + ``` + +2. Install the DEMO plugin with throughput mode enabled + +3. Run the Ruby throughput test: + - Open Script Runner in COSMOS + - Load `INST/procedures/throughput_test.rb` + - Execute and observe results + +4. Run the Python throughput test: + - Open Script Runner in COSMOS + - Load `INST2/procedures/throughput_test.py` + - Execute and observe results + +5. Monitor via the THROUGHPUT screen in Telemetry Viewer + +## File Structure + +``` +examples/throughput_server/ +├── throughput_server.py # Main entry point +├── ccsds.py # CCSDS packet encoding/decoding +├── metrics.py # Throughput statistics +├── config.py # Configuration constants +├── requirements.txt # Dependencies (none required) +└── README.md # This file +``` + +## License + +Copyright 2026 OpenC3, Inc. +Licensed under the GNU Affero General Public License v3. diff --git a/examples/throughput_server/ccsds.py b/examples/throughput_server/ccsds.py new file mode 100644 index 0000000000..41b5586231 --- /dev/null +++ b/examples/throughput_server/ccsds.py @@ -0,0 +1,197 @@ +# Copyright 2026 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +"""CCSDS packet encoding and decoding utilities.""" + +import struct +import time +from dataclasses import dataclass +from typing import Tuple + +from config import ( + CCSDS_CMD_HEADER_SIZE, + CCSDS_TLM_HEADER_SIZE, + LENGTH_ADJUSTMENT, +) + + +@dataclass +class CcsdsCommand: + """Parsed CCSDS command packet.""" + + version: int + packet_type: int + secondary_header_flag: int + apid: int + sequence_flags: int + sequence_count: int + length: int + pktid: int + payload: bytes + + +@dataclass +class CcsdsTelemetry: + """CCSDS telemetry packet structure.""" + + apid: int + sequence_count: int + pktid: int + payload: bytes + time_sec: int = 0 + time_us: int = 0 + + +def parse_ccsds_command(data: bytes) -> CcsdsCommand: + """Parse a CCSDS command packet. + + CCSDS Command Header (8 bytes): + Bits 0-2: CCSDSVER (3 bits) = 0 + Bit 3: CCSDSTYPE (1 bit) = 1 (command) + Bit 4: CCSDSSHF (1 bit) = 0 + Bits 5-15: CCSDSAPID (11 bits) + Bits 16-17: CCSDSSEQFLAGS (2 bits) = 3 + Bits 18-31: CCSDSSEQCNT (14 bits) + Bits 32-47: CCSDSLENGTH (16 bits) = packet_length - 7 + Bits 48-63: PKTID (16 bits) + + Args: + data: Raw packet bytes (must be at least CCSDS_CMD_HEADER_SIZE bytes) + + Returns: + Parsed CcsdsCommand object + """ + if len(data) < CCSDS_CMD_HEADER_SIZE: + raise ValueError( + f"Command packet too short: {len(data)} < {CCSDS_CMD_HEADER_SIZE}" + ) + + # Parse first two bytes (big-endian): version(3) + type(1) + shf(1) + apid(11) + word0 = struct.unpack(">H", data[0:2])[0] + version = (word0 >> 13) & 0x07 + packet_type = (word0 >> 12) & 0x01 + secondary_header_flag = (word0 >> 11) & 0x01 + apid = word0 & 0x07FF + + # Parse next two bytes: seqflags(2) + seqcnt(14) + word1 = struct.unpack(">H", data[2:4])[0] + sequence_flags = (word1 >> 14) & 0x03 + sequence_count = word1 & 0x3FFF + + # Parse length field (2 bytes) + length = struct.unpack(">H", data[4:6])[0] + + # Parse PKTID (2 bytes) + pktid = struct.unpack(">H", data[6:8])[0] + + # Extract payload (after 8-byte header) + payload = data[CCSDS_CMD_HEADER_SIZE:] + + return CcsdsCommand( + version=version, + packet_type=packet_type, + secondary_header_flag=secondary_header_flag, + apid=apid, + sequence_flags=sequence_flags, + sequence_count=sequence_count, + length=length, + pktid=pktid, + payload=payload, + ) + + +def build_ccsds_telemetry( + tlm: CcsdsTelemetry, use_current_time: bool = True +) -> bytes: + """Build a CCSDS telemetry packet. + + CCSDS Telemetry Header (16 bytes): + Bits 0-2: CCSDSVER (3 bits) = 0 + Bit 3: CCSDSTYPE (1 bit) = 0 (telemetry) + Bit 4: CCSDSSHF (1 bit) = 1 (secondary header present) + Bits 5-15: CCSDSAPID (11 bits) + Bits 16-17: CCSDSSEQFLAGS (2 bits) = 3 + Bits 18-31: CCSDSSEQCNT (14 bits) + Bits 32-47: CCSDSLENGTH (16 bits) + Bits 48-79: TIMESEC (32 bits) + Bits 80-111: TIMEUS (32 bits) + Bits 112-127: PKTID (16 bits) + + Args: + tlm: Telemetry packet data + use_current_time: If True, use current time; otherwise use tlm.time_sec/time_us + + Returns: + Raw packet bytes + """ + if use_current_time: + now = time.time() + time_sec = int(now) + time_us = int((now - time_sec) * 1_000_000) + else: + time_sec = tlm.time_sec + time_us = tlm.time_us + + # Calculate total packet length + # CCSDSLENGTH = (packet_length - 7), where packet_length includes primary header + # Packet = primary header (6) + secondary header (8: TIMESEC+TIMEUS) + PKTID (2) + payload + total_length = 6 + 8 + 2 + len(tlm.payload) + ccsds_length = total_length - LENGTH_ADJUSTMENT + + # Build first word: version(3) + type(1) + shf(1) + apid(11) + # version=0, type=0 (telemetry), shf=1 (secondary header present) + word0 = (0 << 13) | (0 << 12) | (1 << 11) | (tlm.apid & 0x07FF) + + # Build second word: seqflags(2) + seqcnt(14) + # seqflags=3 (standalone packet) + word1 = (3 << 14) | (tlm.sequence_count & 0x3FFF) + + # Pack header + header = struct.pack( + ">HHHIIH", + word0, + word1, + ccsds_length, + time_sec, + time_us, + tlm.pktid, + ) + + return header + tlm.payload + + +def read_packet_from_stream(data: bytes) -> Tuple[bytes, bytes]: + """Extract a complete packet from a data stream using LengthProtocol. + + Uses the CCSDSLENGTH field to determine packet boundaries. + + Args: + data: Raw data buffer + + Returns: + Tuple of (complete_packet, remaining_data) + If no complete packet is available, returns (b"", data) + """ + if len(data) < 6: + # Not enough data for primary header + return b"", data + + # Read length field at bytes 4-5 + ccsds_length = struct.unpack(">H", data[4:6])[0] + total_length = ccsds_length + LENGTH_ADJUSTMENT + + if len(data) < total_length: + # Not enough data for complete packet + return b"", data + + return data[:total_length], data[total_length:] diff --git a/examples/throughput_server/config.py b/examples/throughput_server/config.py new file mode 100644 index 0000000000..a52399d892 --- /dev/null +++ b/examples/throughput_server/config.py @@ -0,0 +1,44 @@ +# Copyright 2026 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +"""Configuration constants for the throughput server.""" + +# Network configuration +INST_PORT = 7778 # Port for INST (Ruby) target +INST2_PORT = 7780 # Port for INST2 (Python) target + +# CCSDS Command Packet IDs (from _ccsds_cmd.txt template) +CMD_START_STREAM = 200 +CMD_STOP_STREAM = 201 +CMD_GET_STATS = 202 +CMD_GET_STATS_NO_MSG = 203 +CMD_RESET_STATS = 204 + +# CCSDS Telemetry APIDs +APID_THROUGHPUT_STATUS = 100 + +# Telemetry Packet IDs +PKTID_THROUGHPUT_STATUS = 1 + +# Default streaming configuration +DEFAULT_STREAM_RATE = 100 # Packets per second +MAX_STREAM_RATE = 100000 # Maximum packets per second + +# CCSDS Header sizes +CCSDS_CMD_HEADER_SIZE = 8 # Primary header (6) + PKTID (2) +CCSDS_TLM_HEADER_SIZE = 14 # Primary header (6) + Secondary header (6) + PKTID (2) + +# Length protocol configuration (for LengthProtocol framing) +LENGTH_OFFSET = 4 # Byte offset of CCSDSLENGTH field +LENGTH_SIZE = 2 # Size of length field in bytes +LENGTH_ADJUSTMENT = 7 # CCSDS length = total_length - 7 diff --git a/examples/throughput_server/max_cmd_rate_test.py b/examples/throughput_server/max_cmd_rate_test.py new file mode 100755 index 0000000000..5732da51e2 --- /dev/null +++ b/examples/throughput_server/max_cmd_rate_test.py @@ -0,0 +1,174 @@ +# Copyright 2026 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +""" +Maximum Command Rate Test (Python) + +Tests the theoretical maximum command rate by directly connecting to the +throughput server and sending CCSDS commands as fast as possible. +This bypasses COSMOS entirely to measure raw TCP throughput. + +Usage: + python max_cmd_rate_test.py [host] [port] [duration_seconds] + +Example: + python max_cmd_rate_test.py host.docker.internal 7778 10 +""" + +import socket +import struct +import sys +import time + +# Configuration +DEFAULT_HOST = "host.docker.internal" +DEFAULT_PORT = 7778 +DEFAULT_DURATION = 10 + +# CCSDS Command constants +CMD_GET_STATS = 202 +APID = 1 # Use APID 1 for commands + + +def build_ccsds_command( + pktid: int, sequence_count: int = 0, payload: bytes = b"" +) -> bytes: + """Build a CCSDS command packet. + + Format: + word0 (2 bytes): version(3) | type(1)=1 | shf(1)=0 | apid(11) + word1 (2 bytes): seqflags(2)=3 | seqcnt(14) + length (2 bytes): total_length - 7 + pktid (2 bytes): command ID + """ + # word0: version=0, type=1 (command), shf=0, apid + word0 = (0 << 13) | (1 << 12) | (0 << 11) | (APID & 0x07FF) + + # word1: seqflags=3, seqcnt + word1 = (3 << 14) | (sequence_count & 0x3FFF) + + # Total packet length = 6 (primary header) + 2 (pktid) + payload + total_length = 6 + 2 + len(payload) + ccsds_length = total_length - 7 + + # Pack the header + header = struct.pack(">HHHH", word0, word1, ccsds_length, pktid) + + return header + payload + + +def run_test(host: str, port: int, duration: int) -> None: + """Run the maximum command rate test.""" + print("=" * 60) + print("Maximum Command Rate Test (Python)") + print("=" * 60) + print(f"Host: {host}:{port}") + print(f"Duration: {duration} seconds") + print() + + # Pre-build the command packet (GET_STATS with no payload) + cmd_packet = build_ccsds_command(CMD_GET_STATS) + print(f"Command packet size: {len(cmd_packet)} bytes") + print(f"Command packet (hex): {cmd_packet.hex()}") + print() + + # Connect to server + print(f"Connecting to {host}:{port}...") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.connect((host, port)) + sock.setblocking(False) + print("Connected!") + print() + + # Discard any initial telemetry from the server + try: + sock.recv(65536) + except BlockingIOError: + pass + + # Run the test + print(f"Sending commands for {duration} seconds...") + print() + + cmd_count = 0 + start_time = time.time() + end_time = start_time + duration + last_report_time = start_time + last_report_count = 0 + + # Set socket back to blocking for writes + sock.setblocking(True) + + try: + while time.time() < end_time: + # Send the command + sock.sendall(cmd_packet) + cmd_count += 1 + + # Periodic progress report (every second) + now = time.time() + if now - last_report_time >= 1.0: + interval_count = cmd_count - last_report_count + interval_rate = interval_count / (now - last_report_time) + elapsed = now - start_time + print( + f" {elapsed:.1f}s: {cmd_count} commands sent ({interval_rate:.0f} cmd/s current)" + ) + last_report_time = now + last_report_count = cmd_count + + # Occasionally drain responses to prevent buffer buildup + if cmd_count % 1000 == 0: + sock.setblocking(False) + try: + sock.recv(65536) + except BlockingIOError: + pass + sock.setblocking(True) + + except (BrokenPipeError, ConnectionResetError) as e: + print(f"Connection error: {e}") + finally: + sock.close() + + # Calculate results + actual_duration = time.time() - start_time + overall_rate = cmd_count / actual_duration + bytes_sent = cmd_count * len(cmd_packet) + throughput_mbps = (bytes_sent * 8) / (actual_duration * 1_000_000) + + # Print results + print() + print("=" * 60) + print("RESULTS") + print("=" * 60) + print(f"Commands sent: {cmd_count}") + print(f"Actual duration: {actual_duration:.3f} seconds") + print(f"Command rate: {overall_rate:.0f} commands/second") + print(f"Bytes sent: {bytes_sent} ({bytes_sent / 1024 / 1024:.2f} MB)") + print(f"Throughput: {throughput_mbps:.2f} Mbps") + print("=" * 60) + + +def main(): + """Main entry point.""" + host = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_HOST + port = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_PORT + duration = int(sys.argv[3]) if len(sys.argv) > 3 else DEFAULT_DURATION + + run_test(host, port, duration) + + +if __name__ == "__main__": + main() diff --git a/examples/throughput_server/max_cmd_rate_test.rb b/examples/throughput_server/max_cmd_rate_test.rb new file mode 100755 index 0000000000..607f5907b8 --- /dev/null +++ b/examples/throughput_server/max_cmd_rate_test.rb @@ -0,0 +1,173 @@ +# Copyright 2026 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# Maximum Command Rate Test (Ruby) +# +# Tests the theoretical maximum command rate by directly connecting to the +# throughput server and sending CCSDS commands as fast as possible. +# This bypasses COSMOS entirely to measure raw TCP throughput. +# +# Usage: +# ruby max_cmd_rate_test.rb [host] [port] [duration_seconds] +# +# Example: +# ruby max_cmd_rate_test.rb host.docker.internal 7778 10 + +require 'socket' + +# Configuration +DEFAULT_HOST = 'host.docker.internal' +DEFAULT_PORT = 7778 +DEFAULT_DURATION = 10 + +# CCSDS Command constants +CMD_GET_STATS = 202 +APID = 1 # Use APID 1 for commands + +# Build a CCSDS command packet +# Format: +# word0 (2 bytes): version(3) | type(1)=1 | shf(1)=0 | apid(11) +# word1 (2 bytes): seqflags(2)=3 | seqcnt(14) +# length (2 bytes): total_length - 7 +# pktid (2 bytes): command ID +def build_ccsds_command(pktid, sequence_count = 0, payload = '') + # word0: version=0, type=1 (command), shf=0, apid + word0 = (0 << 13) | (1 << 12) | (0 << 11) | (APID & 0x07FF) + + # word1: seqflags=3, seqcnt + word1 = (3 << 14) | (sequence_count & 0x3FFF) + + # Total packet length = 6 (primary header) + 2 (pktid) + payload + total_length = 6 + 2 + payload.length + ccsds_length = total_length - 7 + + # Pack the header + header = [word0, word1, ccsds_length, pktid].pack('n4') + + header + payload +end + +def run_test(host, port, duration) + $stdout.sync = true # Ensure output is flushed immediately + + puts "=" * 60 + puts "Maximum Command Rate Test (Ruby)" + puts "=" * 60 + puts "Host: #{host}:#{port}" + puts "Duration: #{duration} seconds" + puts "" + + # Pre-build the command packet (GET_STATS with no payload) + cmd_packet = build_ccsds_command(CMD_GET_STATS) + puts "Command packet size: #{cmd_packet.length} bytes" + puts "Command packet (hex): #{cmd_packet.unpack1('H*')}" + puts "" + + # Connect to server + puts "Connecting to #{host}:#{port}..." + socket = TCPSocket.new(host, port) + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, 4 * 1024 * 1024) # 4MB send buffer + socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 4 * 1024 * 1024) # 4MB receive buffer + puts "Connected!" + puts "" + + # Discard any initial telemetry from the server + puts "Draining initial telemetry..." + socket.read_nonblock(65536) rescue nil + puts "Ready to send." + puts "" + + # Run the test + puts "Sending commands for #{duration} seconds..." + puts "" + + cmd_count = 0 + start_time = Time.now + end_time = start_time + duration + last_report_time = start_time + last_report_count = 0 + + # Use non-blocking I/O with select() to avoid TCP deadlock + # Batch multiple commands per select() cycle for efficiency + batch_size = 100 + batch_packet = cmd_packet * batch_size + + begin + while Time.now < end_time + # Use select with short timeout + readable, writable, = IO.select([socket], [socket], nil, 0.0001) + + # Drain any pending responses to prevent receive buffer from filling + if readable && readable.include?(socket) + begin + socket.read_nonblock(65536) + rescue IO::WaitReadable, Errno::EAGAIN + # No data available + end + end + + # Send batch of commands if socket is writable + if writable && writable.include?(socket) + begin + written = socket.write_nonblock(batch_packet) + cmd_count += written / cmd_packet.length + rescue IO::WaitWritable, Errno::EAGAIN + # Can't write right now, will retry + end + end + + # Periodic progress report (every second) + now = Time.now + if now - last_report_time >= 1.0 + interval_count = cmd_count - last_report_count + interval_rate = interval_count / (now - last_report_time) + elapsed = now - start_time + puts " #{elapsed.round(1)}s: #{cmd_count} commands sent (#{interval_rate.round(0)} cmd/s current)" + last_report_time = now + last_report_count = cmd_count + end + end + rescue Errno::EPIPE, Errno::ECONNRESET => e + puts "Connection error: #{e.message}" + ensure + socket.close rescue nil + end + + # Calculate results + actual_duration = Time.now - start_time + overall_rate = cmd_count / actual_duration + bytes_sent = cmd_count * cmd_packet.length + throughput_mbps = (bytes_sent * 8) / (actual_duration * 1_000_000) + + # Print results + puts "" + puts "=" * 60 + puts "RESULTS" + puts "=" * 60 + puts "Commands sent: #{cmd_count}" + puts "Actual duration: #{actual_duration.round(3)} seconds" + puts "Command rate: #{overall_rate.round(0)} commands/second" + puts "Bytes sent: #{bytes_sent} (#{(bytes_sent / 1024.0 / 1024.0).round(2)} MB)" + puts "Throughput: #{throughput_mbps.round(2)} Mbps" + puts "=" * 60 +end + +# Main +if __FILE__ == $0 + host = ARGV[0] || DEFAULT_HOST + port = (ARGV[1] || DEFAULT_PORT).to_i + duration = (ARGV[2] || DEFAULT_DURATION).to_i + + run_test(host, port, duration) +end diff --git a/examples/throughput_server/metrics.py b/examples/throughput_server/metrics.py new file mode 100644 index 0000000000..eb20efdd0e --- /dev/null +++ b/examples/throughput_server/metrics.py @@ -0,0 +1,229 @@ +# Copyright 2026 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +"""Throughput metrics tracking and reporting.""" + +import struct +import time +from dataclasses import dataclass, field +from threading import Lock + +from config import APID_THROUGHPUT_STATUS, PKTID_THROUGHPUT_STATUS + + +@dataclass +class ThroughputMetrics: + """Track throughput statistics for a connection.""" + + cmd_recv_count: int = 0 + tlm_sent_count: int = 0 + bytes_recv: int = 0 + bytes_sent: int = 0 + start_time: float = field(default_factory=time.time) + target_rate: int = 0 + _lock: Lock = field(default_factory=Lock, repr=False) + + # For rate calculation + _last_rate_time: float = field(default_factory=time.time) + _last_cmd_count: int = 0 + _last_tlm_count: int = 0 + _cmd_rate: float = 0.0 + _tlm_rate: float = 0.0 + + def record_command(self, bytes_received: int) -> None: + """Record a received command.""" + with self._lock: + self.cmd_recv_count += 1 + self.bytes_recv += bytes_received + + def record_telemetry(self, bytes_sent: int) -> None: + """Record a sent telemetry packet.""" + with self._lock: + self.tlm_sent_count += 1 + self.bytes_sent += bytes_sent + + def update_rates(self) -> None: + """Update rate calculations (call periodically).""" + with self._lock: + now = time.time() + elapsed = now - self._last_rate_time + + if elapsed >= 1.0: # Update every second + self._cmd_rate = ( + self.cmd_recv_count - self._last_cmd_count + ) / elapsed + self._tlm_rate = ( + self.tlm_sent_count - self._last_tlm_count + ) / elapsed + + self._last_rate_time = now + self._last_cmd_count = self.cmd_recv_count + self._last_tlm_count = self.tlm_sent_count + + def get_uptime(self) -> int: + """Get server uptime in seconds.""" + return int(time.time() - self.start_time) + + def get_cmd_rate(self) -> float: + """Get current command rate.""" + with self._lock: + return self._cmd_rate + + def get_tlm_rate(self) -> float: + """Get current telemetry rate.""" + with self._lock: + return self._tlm_rate + + def reset(self) -> None: + """Reset all metrics.""" + with self._lock: + self.cmd_recv_count = 0 + self.tlm_sent_count = 0 + self.bytes_recv = 0 + self.bytes_sent = 0 + self.start_time = time.time() + self._last_rate_time = time.time() + self._last_cmd_count = 0 + self._last_tlm_count = 0 + self._cmd_rate = 0.0 + self._tlm_rate = 0.0 + + +class ThroughputStatusPacket: + """Build THROUGHPUT_STATUS telemetry packets with pre-allocated buffer. + + Optimized for high-throughput streaming with minimal allocations. + Uses struct.pack_into() to update fields in-place. + + Packet layout (56 bytes total): + Header (16 bytes): + 0-1: word0 (version/type/shf/apid) - STATIC + 2-3: word1 (seqflags/seqcnt) - seqcnt updates + 4-5: ccsdslength - STATIC + 6-9: timesec - updates + 10-13: timeus - updates + 14-15: pktid - STATIC + Payload (40 bytes): + 16-19: CMD_RECV_COUNT + 20-23: CMD_RECV_RATE + 24-27: TLM_SENT_COUNT + 28-31: TLM_SENT_RATE + 32-35: TLM_TARGET_RATE + 36-43: BYTES_RECV + 44-51: BYTES_SENT + 52-55: UPTIME_SEC + """ + + PACKET_SIZE = 56 + PAYLOAD_OFFSET = 16 + + def __init__(self): + self._sequence_count = 0 + # Pre-allocate buffer + self._buffer = bytearray(self.PACKET_SIZE) + + # Build static header fields once + # word0: version=0, type=0 (telemetry), shf=1, apid + word0 = (0 << 13) | (0 << 12) | (1 << 11) | (APID_THROUGHPUT_STATUS & 0x07FF) + struct.pack_into(">H", self._buffer, 0, word0) + + # ccsdslength: total_length - 7 = 56 - 7 = 49 + struct.pack_into(">H", self._buffer, 4, 49) + + # pktid + struct.pack_into(">H", self._buffer, 14, PKTID_THROUGHPUT_STATUS) + + def update(self, metrics: ThroughputMetrics) -> None: + """Update the packet buffer with current values. + + Call this before sending. Does not allocate memory. + """ + # Update sequence count (preserving seqflags=3 in upper 2 bits) + word1 = (3 << 14) | (self._sequence_count & 0x3FFF) + struct.pack_into(">H", self._buffer, 2, word1) + self._sequence_count = (self._sequence_count + 1) & 0x3FFF + + # Update timestamp + now = time.time() + struct.pack_into(">II", self._buffer, 6, int(now), int((now % 1) * 1_000_000)) + + # Update payload - grab all values under lock, then pack + with metrics._lock: + struct.pack_into( + ">IfIfIQQI", + self._buffer, + self.PAYLOAD_OFFSET, + metrics.cmd_recv_count, + metrics._cmd_rate, + metrics.tlm_sent_count, + metrics._tlm_rate, + metrics.target_rate, + metrics.bytes_recv, + metrics.bytes_sent, + int(now - metrics.start_time), # uptime inline + ) + + def get_buffer(self) -> bytearray: + """Get the packet buffer for sending. + + Returns the internal buffer directly - do not modify. + """ + return self._buffer + + def reset_sequence(self) -> None: + """Reset the sequence counter to 0.""" + self._sequence_count = 0 + + def build(self, metrics: ThroughputMetrics) -> bytes: + """Build and return packet as bytes (for compatibility). + + Less efficient than update() + get_buffer() but simpler for one-off sends. + """ + self.update(metrics) + return bytes(self._buffer) + + +@dataclass +class StreamState: + """Track streaming state for a connection.""" + + streaming: bool = False + rate: int = 0 # Packets per second + packet_types: int = 0x01 # Bitmask of packet types to stream + last_send_time: float = 0.0 + send_interval: float = 0.0 # Seconds between packets + + def start(self, rate: int, packet_types: int = 0x01) -> None: + """Start streaming at the specified rate.""" + self.streaming = True + self.rate = rate + self.packet_types = packet_types + self.send_interval = 1.0 / rate if rate > 0 else 0.0 + self.last_send_time = time.time() + + def stop(self) -> None: + """Stop streaming.""" + self.streaming = False + self.rate = 0 + self.send_interval = 0.0 + + def should_send(self) -> bool: + """Check if it's time to send the next packet.""" + if not self.streaming: + return False + + now = time.time() + if now - self.last_send_time >= self.send_interval: + self.last_send_time = now + return True + return False diff --git a/examples/throughput_server/requirements.txt b/examples/throughput_server/requirements.txt new file mode 100644 index 0000000000..4581570749 --- /dev/null +++ b/examples/throughput_server/requirements.txt @@ -0,0 +1,2 @@ +# COSMOS Throughput Server Requirements +# No external dependencies required - uses only Python standard library diff --git a/examples/throughput_server/throughput_server.py b/examples/throughput_server/throughput_server.py new file mode 100644 index 0000000000..6609a9775e --- /dev/null +++ b/examples/throughput_server/throughput_server.py @@ -0,0 +1,458 @@ +#!/usr/bin/env python3 +# Copyright 2026 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +""" +COSMOS Throughput Testing Server + +A standalone TCP/IP server for measuring COSMOS command and telemetry throughput. +Supports dual-port operation for INST (Ruby) and INST2 (Python) targets. + +Usage: + python throughput_server.py [--inst-port PORT] [--inst2-port PORT] +""" + +import argparse +import asyncio +import logging +import signal +import struct +import sys +import time +from typing import Dict + +from ccsds import ( + parse_ccsds_command, + read_packet_from_stream, +) +from config import ( + CMD_GET_STATS, + CMD_GET_STATS_NO_MSG, + CMD_RESET_STATS, + CMD_START_STREAM, + CMD_STOP_STREAM, + DEFAULT_STREAM_RATE, + INST2_PORT, + INST_PORT, + MAX_STREAM_RATE, +) +from metrics import StreamState, ThroughputMetrics, ThroughputStatusPacket + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +class ClientHandler: + """Handle a single client connection.""" + + def __init__( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + port: int, + port_name: str, + ): + self.reader = reader + self.writer = writer + self.port = port + self.port_name = port_name + self.metrics = ThroughputMetrics() + self.stream_state = StreamState() + self.status_builder = ThroughputStatusPacket() + self.running = True + self._buffer = b"" + self._tlm_sequence = 0 + + # Get client info + peername = writer.get_extra_info("peername") + self.client_addr = f"{peername[0]}:{peername[1]}" if peername else "unknown" + + async def handle(self) -> None: + """Main handler loop for this client.""" + logger.info(f"[{self.port_name}] Client connected: {self.client_addr}") + + # Send initial status packet so COSMOS has data immediately + await self._send_status_packet() + + # Start background tasks + rate_update_task = asyncio.create_task(self._rate_update_loop()) + stream_task = asyncio.create_task(self._stream_loop()) + + try: + while self.running: + try: + data = await asyncio.wait_for(self.reader.read(65536), timeout=0.1) + if not data: + logger.info( + f"[{self.port_name}] Client disconnected: {self.client_addr}" + ) + break + + logger.debug( + f"[{self.port_name}] Received {len(data)} bytes: {data[:32].hex()}..." + ) + self._buffer += data + await self._process_buffer() + + except asyncio.TimeoutError: + # No data, continue loop (allows checking self.running) + continue + except ConnectionResetError: + logger.info( + f"[{self.port_name}] Connection reset: {self.client_addr}" + ) + break + + finally: + self.running = False + rate_update_task.cancel() + stream_task.cancel() + + try: + await rate_update_task + except asyncio.CancelledError: + pass + + try: + await stream_task + except asyncio.CancelledError: + pass + + self.writer.close() + try: + await self.writer.wait_closed() + except Exception: + pass + + logger.info( + f"[{self.port_name}] Session stats for {self.client_addr}: " + f"cmds={self.metrics.cmd_recv_count}, " + f"tlm={self.metrics.tlm_sent_count}, " + f"bytes_in={self.metrics.bytes_recv}, " + f"bytes_out={self.metrics.bytes_sent}" + ) + + async def _process_buffer(self) -> None: + """Process complete packets from the buffer.""" + while True: + if len(self._buffer) < 6: + break + + # Parse CCSDS primary header fields for validation + word0 = struct.unpack(">H", self._buffer[0:2])[0] + version = (word0 >> 13) & 0x07 + packet_type = (word0 >> 12) & 0x01 + + ccsds_len = struct.unpack(">H", self._buffer[4:6])[0] + expected_total = ccsds_len + 7 + + # Validate CCSDS command header + # version should be 0, packet_type should be 1 (command) + # CCSDSLENGTH should be reasonable (< 1000 bytes) + valid_header = version == 0 and packet_type == 1 and ccsds_len < 1000 + + if not valid_header: + # Invalid header - try to resync by discarding 1 byte + logger.debug( + f"[{self.port_name}] Invalid header: ver={version} type={packet_type} " + f"len={ccsds_len}, buffer[0:8]={self._buffer[0:8].hex()}" + ) + self._buffer = self._buffer[1:] + continue + + packet, remaining = read_packet_from_stream(self._buffer) + if not packet: + break + + self._buffer = remaining + await self._handle_packet(packet) + + async def _handle_packet(self, packet: bytes) -> None: + """Handle a complete CCSDS command packet.""" + try: + cmd = parse_ccsds_command(packet) + self.metrics.record_command(len(packet)) + + # Dispatch based on PKTID + if cmd.pktid == CMD_START_STREAM: + await self._handle_start_stream(cmd.payload) + elif cmd.pktid == CMD_STOP_STREAM: + await self._handle_stop_stream() + elif cmd.pktid == CMD_GET_STATS: + await self._handle_get_stats() + elif cmd.pktid == CMD_GET_STATS_NO_MSG: + await self._handle_get_stats() + elif cmd.pktid == CMD_RESET_STATS: + await self._handle_reset_stats() + else: + logger.warning(f"[{self.port_name}] Unknown command PKTID: {cmd.pktid}") + + except Exception as e: + logger.error(f"[{self.port_name}] Error handling packet: {e}") + + async def _handle_start_stream(self, payload: bytes) -> None: + """Handle START_STREAM command. + + Payload: RATE (32 UINT), PACKET_TYPES (32 UINT) + """ + if len(payload) >= 8: + rate, packet_types = struct.unpack(">II", payload[:8]) + elif len(payload) >= 4: + rate = struct.unpack(">I", payload[:4])[0] + packet_types = 0x01 + else: + rate = DEFAULT_STREAM_RATE + packet_types = 0x01 + + # Clamp rate to valid range + rate = max(1, min(rate, MAX_STREAM_RATE)) + + self.stream_state.start(rate, packet_types) + self.metrics.target_rate = rate + + logger.info( + f"[{self.port_name}] Started streaming at {rate} Hz " + f"(packet_types=0x{packet_types:08X}, interval={self.stream_state.send_interval:.6f}s)" + ) + + async def _handle_stop_stream(self) -> None: + """Handle STOP_STREAM command.""" + self.stream_state.stop() + self.metrics.target_rate = 0 + logger.info(f"[{self.port_name}] Stopped streaming") + + async def _handle_get_stats(self) -> None: + """Handle GET_STATS command - send THROUGHPUT_STATUS packet.""" + await self._send_status_packet() + + async def _handle_reset_stats(self) -> None: + """Handle RESET_STATS command.""" + self.metrics.reset() + self.status_builder.reset_sequence() + logger.info(f"[{self.port_name}] Reset statistics") + + async def _send_status_packet(self) -> None: + """Send a THROUGHPUT_STATUS telemetry packet (non-streaming path).""" + packet = self.status_builder.build(self.metrics) + await self._send_packet(packet) + + async def _send_packet(self, packet: bytes) -> None: + """Send a packet to the client (non-streaming path).""" + try: + self.writer.write(packet) + await self.writer.drain() + self.metrics.record_telemetry(len(packet)) + except Exception as e: + logger.error(f"[{self.port_name}] Error sending packet: {e}") + self.running = False + + async def _rate_update_loop(self) -> None: + """Background task to update rate calculations.""" + while self.running: + await asyncio.sleep(1.0) + self.metrics.update_rates() + + async def _stream_loop(self) -> None: + """Background task to send telemetry at configured rate. + + Uses time-compensated streaming to maintain accurate rates despite + asyncio.sleep() imprecision. Tracks elapsed time and sends packets + to catch up to where we should be. + """ + BATCH_INTERVAL = 0.05 # Check every 50ms + + while self.running: + if self.stream_state.streaming and self.stream_state.rate > 0: + try: + rate = self.stream_state.rate + stream_start = time.time() + packets_sent = 0 + + while self.stream_state.streaming and self.running: + elapsed = time.time() - stream_start + + # How many packets should have been sent by now? + expected_packets = int(elapsed * rate) + packets_to_send = expected_packets - packets_sent + + # Only send if we're behind schedule + if packets_to_send > 0: + for _ in range(packets_to_send): + self.status_builder.update(self.metrics) + # Must copy buffer - write() may not copy immediately + self.writer.write( + bytes(self.status_builder.get_buffer()) + ) + self.metrics.record_telemetry( + self.status_builder.PACKET_SIZE + ) + packets_sent += packets_to_send + await self.writer.drain() + + await asyncio.sleep(BATCH_INTERVAL) + + except Exception as e: + logger.error(f"[{self.port_name}] Stream error: {e}") + self.running = False + break + else: + # Not streaming - ensure buffer is flushed, then wait + try: + await self.writer.drain() + except Exception: + pass + await asyncio.sleep(0.1) + + +class ThroughputServer: + """Multi-port throughput testing server.""" + + def __init__(self, inst_port: int = INST_PORT, inst2_port: int = INST2_PORT): + self.inst_port = inst_port + self.inst2_port = inst2_port + self.servers: Dict[int, asyncio.Server] = {} + self.clients: Dict[str, ClientHandler] = {} + self.running = False + + async def start(self) -> None: + """Start the server on all configured ports.""" + self.running = True + + # Start INST port server + inst_server = await asyncio.start_server( + lambda r, w: self._handle_client(r, w, self.inst_port, "INST"), + "0.0.0.0", + self.inst_port, + ) + self.servers[self.inst_port] = inst_server + logger.info(f"INST server listening on port {self.inst_port}") + + # Start INST2 port server + inst2_server = await asyncio.start_server( + lambda r, w: self._handle_client(r, w, self.inst2_port, "INST2"), + "0.0.0.0", + self.inst2_port, + ) + self.servers[self.inst2_port] = inst2_server + logger.info(f"INST2 server listening on port {self.inst2_port}") + + logger.info("Throughput server started. Press Ctrl+C to stop.") + + async def _handle_client( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + port: int, + port_name: str, + ) -> None: + """Handle a new client connection.""" + handler = ClientHandler(reader, writer, port, port_name) + client_key = f"{port_name}:{handler.client_addr}" + self.clients[client_key] = handler + + try: + await handler.handle() + finally: + del self.clients[client_key] + + async def stop(self) -> None: + """Stop all servers and disconnect clients.""" + self.running = False + + # Stop all client handlers + for handler in list(self.clients.values()): + handler.running = False + + # Close servers + for port, server in self.servers.items(): + server.close() + await server.wait_closed() + logger.info(f"Server on port {port} stopped") + + logger.info("Throughput server stopped") + + async def run_forever(self) -> None: + """Run the server until interrupted.""" + await self.start() + + # Wait for shutdown signal + stop_event = asyncio.Event() + + def signal_handler(): + logger.info("Shutdown signal received") + stop_event.set() + + loop = asyncio.get_event_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(sig, signal_handler) + except NotImplementedError: + # Windows doesn't support add_signal_handler + pass + + try: + await stop_event.wait() + except asyncio.CancelledError: + pass + + await self.stop() + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="COSMOS Throughput Testing Server", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--inst-port", + type=int, + default=INST_PORT, + help="Port for INST (Ruby) target", + ) + parser.add_argument( + "--inst2-port", + type=int, + default=INST2_PORT, + help="Port for INST2 (Python) target", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) + + server = ThroughputServer( + inst_port=args.inst_port, + inst2_port=args.inst2_port, + ) + + try: + asyncio.run(server.run_forever()) + except KeyboardInterrupt: + logger.info("Interrupted by user") + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/plugin.txt b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/plugin.txt index 402e750b7e..b377eff919 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/plugin.txt +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/plugin.txt @@ -47,6 +47,18 @@ VARIABLE reduced_log_retain_time 2592000 VARIABLE_STATE "90 days" 7776000 VARIABLE_STATE "1 year" 31536000 +# Throughput Server Configuration +VARIABLE use_throughput_server false + VARIABLE_DESCRIPTION "Use external throughput server instead of built-in simulator" + VARIABLE_STATE "No (use simulator)" false + VARIABLE_STATE "Yes (use throughput server)" true +VARIABLE throughput_server_host host.docker.internal + VARIABLE_DESCRIPTION "Hostname for the throughput server" +VARIABLE inst_throughput_port 7778 + VARIABLE_DESCRIPTION "Port for INST throughput server connection" +VARIABLE inst2_throughput_port 7780 + VARIABLE_DESCRIPTION "Port for INST2 throughput server connection" + <% include_inst = (inst_target_name.to_s.strip.length > 0) %> <% include_inst2 = (inst2_target_name.to_s.strip.length > 0) %> <% include_example = (example_target_name.to_s.strip.length > 0) %> @@ -104,13 +116,28 @@ VARIABLE reduced_log_retain_time 2592000 <% end %> <% if include_inst and include_inst_int %> - INTERFACE <%= inst_int_name %> simulated_target_interface.rb sim_inst.rb - MAP_TARGET <%= inst_target_name %> + <% if use_throughput_server.to_s == "true" %> + # Connect to external throughput server using TCP/IP with LengthProtocol + # LengthProtocol params: bit_offset=32, bit_size=16, len_offset=7, bytes_per_count=1 + INTERFACE <%= inst_int_name %> tcpip_client_interface.rb <%= throughput_server_host %> <%= inst_throughput_port %> <%= inst_throughput_port %> 10.0 nil LENGTH 32 16 7 1 BIG_ENDIAN 0 nil nil true + MAP_TARGET <%= inst_target_name %> + <% else %> + INTERFACE <%= inst_int_name %> simulated_target_interface.rb sim_inst.rb + MAP_TARGET <%= inst_target_name %> + <% end %> <% end %> <% if include_inst2 and include_inst2_int %> - INTERFACE <%= inst2_int_name %> openc3/interfaces/simulated_target_interface.py sim_inst.py - MAP_TARGET <%= inst2_target_name %> + <% if use_throughput_server.to_s == "true" %> + # Connect to external throughput server using Python TCP/IP with LengthProtocol + # Must use Python interface for INST2 because target uses Python conversions + # LengthProtocol params: bit_offset=32, bit_size=16, len_offset=7, bytes_per_count=1 + INTERFACE <%= inst2_int_name %> openc3/interfaces/tcpip_client_interface.py <%= throughput_server_host %> <%= inst2_throughput_port %> <%= inst2_throughput_port %> 10.0 None LENGTH 32 16 7 1 BIG_ENDIAN 0 None None True + MAP_TARGET <%= inst2_target_name %> + <% else %> + INTERFACE <%= inst2_int_name %> openc3/interfaces/simulated_target_interface.py sim_inst.py + MAP_TARGET <%= inst2_target_name %> + <% end %> <% end %> <% if include_example and include_example_int %> diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/cmd_tlm/inst_cmds.txt b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/cmd_tlm/inst_cmds.txt index de7310a5c7..281de58551 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/cmd_tlm/inst_cmds.txt +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/cmd_tlm/inst_cmds.txt @@ -256,4 +256,24 @@ COMMAND <%= target_name %> HYBRIDCMD BIG_ENDIAN "Hybrid Accessor Command" APPEND_PARAMETER CBOR_LENGTH 32 UINT MIN MAX 0 HIDDEN APPEND_STRUCTURE CBOR 0 CMD <%= target_name %> CBORSTRUCT - VARIABLE_BIT_SIZE CBOR_LENGTH \ No newline at end of file + VARIABLE_BIT_SIZE CBOR_LENGTH + +# Throughput Testing Commands (for use with external throughput server) +COMMAND <%= target_name %> START_STREAM BIG_ENDIAN "Start telemetry streaming from throughput server" + <%= render "_ccsds_cmd.txt", locals: {id: 200} %> + APPEND_PARAMETER RATE 32 UINT 1 100000 100 "Packets per second" + APPEND_PARAMETER PACKET_TYPES 32 UINT 0 0xFFFFFFFF 0x01 "Bitmask of packet types to stream" + RELATED_ITEM <%= target_name %> THROUGHPUT_STATUS TLM_TARGET_RATE + +COMMAND <%= target_name %> STOP_STREAM BIG_ENDIAN "Stop telemetry streaming from throughput server" + <%= render "_ccsds_cmd.txt", locals: {id: 201} %> + +COMMAND <%= target_name %> GET_STATS BIG_ENDIAN "Request throughput statistics from server" + <%= render "_ccsds_cmd.txt", locals: {id: 202} %> + +COMMAND <%= target_name %> GET_STATS_NO_MSG BIG_ENDIAN "Request throughput statistics from server" + <%= render "_ccsds_cmd.txt", locals: {id: 203} %> + DISABLE_MESSAGES + +COMMAND <%= target_name %> RESET_STATS BIG_ENDIAN "Reset throughput statistics on server" + <%= render "_ccsds_cmd.txt", locals: {id: 204} %> diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/cmd_tlm/inst_tlm.txt b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/cmd_tlm/inst_tlm.txt index 0bf87a5364..a974b1f03d 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/cmd_tlm/inst_tlm.txt +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/cmd_tlm/inst_tlm.txt @@ -296,4 +296,26 @@ TELEMETRY <%= target_name %> HYBRIDTLM BIG_ENDIAN "Hybrid Accessor Telemetry" VARIABLE_BIT_SIZE JSON_LENGTH APPEND_ITEM CBOR_LENGTH 32 UINT APPEND_STRUCTURE CBOR 0 CMD <%= target_name %> CBORSTRUCT - VARIABLE_BIT_SIZE CBOR_LENGTH \ No newline at end of file + VARIABLE_BIT_SIZE CBOR_LENGTH + +# Throughput Status Telemetry (from external throughput server) +TELEMETRY <%= target_name %> THROUGHPUT_STATUS BIG_ENDIAN "Throughput statistics from server" + <%= render "_ccsds_tlm.txt", locals: {apid: 100} %> + APPEND_ITEM CMD_RECV_COUNT 32 UINT "Commands received by server" + APPEND_ITEM CMD_RECV_RATE 32 FLOAT "Commands per second" + FORMAT_STRING "%.1f" + UNITS Hz Hz + APPEND_ITEM TLM_SENT_COUNT 32 UINT "Telemetry packets sent by server" + APPEND_ITEM TLM_SENT_RATE 32 FLOAT "Telemetry packets per second" + FORMAT_STRING "%.1f" + UNITS Hz Hz + APPEND_ITEM TLM_TARGET_RATE 32 UINT "Configured streaming rate" + UNITS Hz Hz + APPEND_ITEM BYTES_RECV 64 UINT "Total bytes received by server" + UNITS Bytes B + APPEND_ITEM BYTES_SENT 64 UINT "Total bytes sent by server" + UNITS Bytes B + APPEND_ITEM UPTIME_SEC 32 UINT "Server uptime" + UNITS Seconds s + ITEM PACKET_TIME 0 0 DERIVED "Ruby time based on TIMESEC and TIMEUS" + READ_CONVERSION unix_time_conversion.rb TIMESEC TIMEUS \ No newline at end of file diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/procedures/throughput_test.rb b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/procedures/throughput_test.rb new file mode 100644 index 0000000000..0c1e5b9ff4 --- /dev/null +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/procedures/throughput_test.rb @@ -0,0 +1,203 @@ +# Throughput Test Script for <%= target_name %> (Ruby) +# +# This script tests command and telemetry throughput performance +# when connected to the external throughput server. +# +# Prerequisites: +# 1. Start the throughput server: python examples/throughput_server/throughput_server.py +# 2. Install the DEMO plugin with: use_throughput_server=true + +set_line_delay(0) +TARGET = "<%= target_name %>" + +def test_command_throughput(num_commands, description) + puts "=" * 60 + puts "Test: #{description}" + puts "Sending #{num_commands} commands..." + + # Reset stats before test + cmd("#{TARGET} RESET_STATS") + wait(1) + + start_time = Time.now + num_commands.times { cmd("#{TARGET} GET_STATS") } + elapsed = Time.now - start_time + rate = num_commands / elapsed + + puts "Completed: #{num_commands} commands in #{elapsed.round(3)} seconds" + puts "Command rate (message): #{rate.round(1)} commands/second" + puts "" + + start_time = Time.now + num_commands.times { cmd("#{TARGET} GET_STATS_NO_MSG") } + elapsed = Time.now - start_time + rate = num_commands / elapsed + + puts "Completed: #{num_commands} commands in #{elapsed.round(3)} seconds" + puts "Command rate (no msg): #{rate.round(1)} commands/second" + puts "" + + start_time = Time.now + num_commands.times { cmd("#{TARGET} GET_STATS_NO_MSG", timeout: 0) } + elapsed = Time.now - start_time + rate = num_commands / elapsed + + puts "Completed: #{num_commands} commands in #{elapsed.round(3)} seconds" + puts "Command rate (no ack): #{rate.round(1)} commands/second" + puts "" + + disable_instrumentation do + start_time = Time.now + num_commands.times { cmd("#{TARGET} GET_STATS_NO_MSG", timeout: 0) } + elapsed = Time.now - start_time + rate = num_commands / elapsed + end + + puts "Completed: #{num_commands} commands in #{elapsed.round(3)} seconds" + puts "Command rate (no instrumentation): #{rate.round(1)} commands/second" + puts "" + + rate +end + +def test_telemetry_throughput(stream_rate, duration, description) + puts "=" * 60 + puts "Test: #{description}" + puts "Streaming at #{stream_rate} Hz for #{duration} seconds..." + + # Reset stats on the server and request fresh telemetry + cmd("#{TARGET} RESET_STATS") + wait(2) + cmd("#{TARGET} GET_STATS") + # Wait for fresh packet to arrive with reset values (count is 0 in the packet) + wait_check("#{TARGET} THROUGHPUT_STATUS TLM_SENT_COUNT == 0", 5) + + # Get initial counts - capture packet count FIRST to minimize race + initial_cosmos_count = get_tlm_cnt("#{TARGET} THROUGHPUT_STATUS") + initial_server_count = tlm("#{TARGET} THROUGHPUT_STATUS TLM_SENT_COUNT") + initial_seq = tlm("#{TARGET} THROUGHPUT_STATUS CCSDSSEQCNT") + + # Start streaming + cmd("#{TARGET} START_STREAM with RATE #{stream_rate}") + + # Wait for specified duration + wait(duration) + + # Stop streaming and wait for in-flight packets to arrive + cmd("#{TARGET} STOP_STREAM") + wait(2.0) + + # Request a status packet to trigger COSMOS telemetry count sync to Redis + # (counts are batched every 1 second and only sync when packets arrive) + cmd("#{TARGET} GET_STATS") + wait(2.0) + + # Get final counts + final_cosmos_count = get_tlm_cnt("#{TARGET} THROUGHPUT_STATUS") + final_server_count = tlm("#{TARGET} THROUGHPUT_STATUS TLM_SENT_COUNT") + final_seq = tlm("#{TARGET} THROUGHPUT_STATUS CCSDSSEQCNT") + + # Calculate packets sent by server during streaming + # Note: final_server_count is from the GET_STATS packet, which has the count BEFORE that packet was sent + packets_sent = final_server_count - initial_server_count + + # Calculate packets received by COSMOS during streaming + packets_received = final_cosmos_count - initial_cosmos_count + + # Calculate actual rate from test data (more accurate than server's TLM_SENT_RATE which is stale) + actual_rate = packets_sent.to_f / duration + + # Calculate sequence span (handles 14-bit wrap-around) + if final_seq >= initial_seq + seq_span = final_seq - initial_seq + else + seq_span = (0x3FFF - initial_seq) + final_seq + 1 + end + # Subtract 1 from seq_span to exclude the GET_STATS packet after STOP_STREAM + streaming_seq_span = seq_span - 1 + + # Calculate loss based on what server actually sent vs what COSMOS received + if packets_sent > 0 + loss_percent = [(packets_sent - packets_received).to_f / packets_sent * 100, 0].max.round(2) + else + loss_percent = 0 + end + + # Check for sequence gaps (comparing streaming packets only) + seq_gaps = streaming_seq_span - packets_received + seq_gaps = 0 if seq_gaps < 0 + + puts "Server sent: #{packets_sent} packets" + puts "COSMOS received: #{packets_received} packets" + puts "Packet loss: #{loss_percent}%" + puts "Sequence span: #{initial_seq} -> #{final_seq - 1} (#{streaming_seq_span} streaming packets)" + if seq_gaps > 0 + puts "Sequence gaps detected: #{seq_gaps} missing packets in sequence" + end + puts "Actual rate: #{actual_rate.round(1)} Hz (target: #{stream_rate} Hz)" + puts "" + + { + rate: actual_rate, + packets: packets_received, + sent: packets_sent, + loss_percent: loss_percent + } +end + +def run_throughput_tests + puts "" + puts "#" * 60 + puts "# #{TARGET} Throughput Test Suite" + puts "#" * 60 + puts "" + + results = {} + + # Command throughput tests + puts "\n### COMMAND THROUGHPUT TESTS ###\n" + + results[:cmd_burst_100] = test_command_throughput(100, "Burst 100 commands") + results[:cmd_burst_500] = test_command_throughput(500, "Burst 500 commands") + results[:cmd_burst_1000] = test_command_throughput(1000, "Burst 1000 commands") + + # Wait for all command test responses to be fully processed by COSMOS + # (command tests generate many THROUGHPUT_STATUS packets that may still be in flight) + puts "\nWaiting for command test packets to settle..." + wait(3.0) + + # Telemetry throughput tests + puts "\n### TELEMETRY THROUGHPUT TESTS ###\n" + + results[:tlm_100hz] = test_telemetry_throughput(100, 5, "100 Hz for 5 seconds") + results[:tlm_1000hz] = test_telemetry_throughput(1000, 5, "1000 Hz for 5 seconds") + results[:tlm_2000hz] = test_telemetry_throughput(2000, 5, "2000 Hz for 5 seconds") + results[:tlm_3000hz] = test_telemetry_throughput(3000, 5, "3000 Hz for 5 seconds") + results[:tlm_4000hz] = test_telemetry_throughput(4000, 5, "4000 Hz for 5 seconds") + + # Summary + puts "\n" + "=" * 60 + puts "SUMMARY" + puts "=" * 60 + + puts "\nCommand Throughput:" + puts " 100 cmd burst: #{results[:cmd_burst_100].round(1)} cmd/s" + puts " 500 cmd burst: #{results[:cmd_burst_500].round(1)} cmd/s" + puts " 1000 cmd burst: #{results[:cmd_burst_1000].round(1)} cmd/s" + + puts "\nTelemetry Throughput:" + puts " 100 Hz target: #{results[:tlm_100hz][:rate].round(1)} Hz (#{results[:tlm_100hz][:loss_percent]}% loss)" + puts " 1000 Hz target: #{results[:tlm_1000hz][:rate].round(1)} Hz (#{results[:tlm_1000hz][:loss_percent]}% loss)" + puts " 2000 Hz target: #{results[:tlm_2000hz][:rate].round(1)} Hz (#{results[:tlm_2000hz][:loss_percent]}% loss)" + puts " 3000 Hz target: #{results[:tlm_3000hz][:rate].round(1)} Hz (#{results[:tlm_3000hz][:loss_percent]}% loss)" + puts " 4000 Hz target: #{results[:tlm_4000hz][:rate].round(1)} Hz (#{results[:tlm_4000hz][:loss_percent]}% loss)" + + puts "\n" + "#" * 60 + puts "# Test Complete" + puts "#" * 60 + + results +end + +# Run the tests +run_throughput_tests diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/screens/throughput.txt b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/screens/throughput.txt new file mode 100644 index 0000000000..a8fe22ef86 --- /dev/null +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST/screens/throughput.txt @@ -0,0 +1,38 @@ +SCREEN AUTO AUTO 1.0 +GLOBAL_SETTING LABELVALUE TEXTCOLOR "#bababa" + +VERTICAL + TITLE "<%= target_name %> Throughput Statistics" + + VERTICALBOX "Server Status" + LABELVALUE <%= target_name %> THROUGHPUT_STATUS UPTIME_SEC + LABELVALUE <%= target_name %> THROUGHPUT_STATUS TLM_TARGET_RATE + END + + VERTICALBOX "Command Metrics" + LABELVALUE <%= target_name %> THROUGHPUT_STATUS CMD_RECV_COUNT + LABELVALUE <%= target_name %> THROUGHPUT_STATUS CMD_RECV_RATE + LABELVALUE <%= target_name %> THROUGHPUT_STATUS BYTES_RECV + END + + VERTICALBOX "Telemetry Metrics" + LABELVALUE <%= target_name %> THROUGHPUT_STATUS TLM_SENT_COUNT + LABELVALUE <%= target_name %> THROUGHPUT_STATUS TLM_SENT_RATE + LABELVALUE <%= target_name %> THROUGHPUT_STATUS BYTES_SENT + END + + VERTICALBOX "Controls" + HORIZONTAL + BUTTON 'Start Stream 10 Hz' 'api.cmd("<%= target_name %> START_STREAM with RATE 10")' + BUTTON 'Start Stream 100 Hz' 'api.cmd("<%= target_name %> START_STREAM with RATE 100")' + BUTTON 'Start Stream 1000 Hz' 'api.cmd("<%= target_name %> START_STREAM with RATE 1000")' + END + HORIZONTAL + BUTTON 'Stop Stream' 'api.cmd("<%= target_name %> STOP_STREAM")' + BUTTON 'Get Stats' 'api.cmd("<%= target_name %> GET_STATS")' + BUTTON 'Reset Stats' 'api.cmd("<%= target_name %> RESET_STATS")' + END + END + + <%= render "_footer.txt" %> +END diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/cmd_tlm/inst_cmds.txt b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/cmd_tlm/inst_cmds.txt index 876c45ba5c..8b577fd07c 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/cmd_tlm/inst_cmds.txt +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/cmd_tlm/inst_cmds.txt @@ -256,4 +256,24 @@ COMMAND <%= target_name %> HYBRIDCMD BIG_ENDIAN "Hybrid Accessor Command" APPEND_PARAMETER CBOR_LENGTH 32 UINT MIN MAX 0 HIDDEN APPEND_STRUCTURE CBOR 0 CMD <%= target_name %> CBORSTRUCT - VARIABLE_BIT_SIZE CBOR_LENGTH \ No newline at end of file + VARIABLE_BIT_SIZE CBOR_LENGTH + +# Throughput Testing Commands (for use with external throughput server) +COMMAND <%= target_name %> START_STREAM BIG_ENDIAN "Start telemetry streaming from throughput server" + <%= render "_ccsds_cmd.txt", locals: {id: 200} %> + APPEND_PARAMETER RATE 32 UINT 1 100000 100 "Packets per second" + APPEND_PARAMETER PACKET_TYPES 32 UINT 0 0xFFFFFFFF 0x01 "Bitmask of packet types to stream" + RELATED_ITEM <%= target_name %> THROUGHPUT_STATUS TLM_TARGET_RATE + +COMMAND <%= target_name %> STOP_STREAM BIG_ENDIAN "Stop telemetry streaming from throughput server" + <%= render "_ccsds_cmd.txt", locals: {id: 201} %> + +COMMAND <%= target_name %> GET_STATS BIG_ENDIAN "Request throughput statistics from server" + <%= render "_ccsds_cmd.txt", locals: {id: 202} %> + +COMMAND <%= target_name %> GET_STATS_NO_MSG BIG_ENDIAN "Request throughput statistics from server" + <%= render "_ccsds_cmd.txt", locals: {id: 203} %> + DISABLE_MESSAGES + +COMMAND <%= target_name %> RESET_STATS BIG_ENDIAN "Reset throughput statistics on server" + <%= render "_ccsds_cmd.txt", locals: {id: 204} %> diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/cmd_tlm/inst_tlm.txt b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/cmd_tlm/inst_tlm.txt index 7f02d74d92..9d16976e05 100644 --- a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/cmd_tlm/inst_tlm.txt +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/cmd_tlm/inst_tlm.txt @@ -290,4 +290,26 @@ TELEMETRY <%= target_name %> HYBRIDTLM BIG_ENDIAN "Hybrid Accessor Telemetry" VARIABLE_BIT_SIZE JSON_LENGTH APPEND_ITEM CBOR_LENGTH 32 UINT APPEND_STRUCTURE CBOR 0 CMD <%= target_name %> CBORSTRUCT - VARIABLE_BIT_SIZE CBOR_LENGTH \ No newline at end of file + VARIABLE_BIT_SIZE CBOR_LENGTH + +# Throughput Status Telemetry (from external throughput server) +TELEMETRY <%= target_name %> THROUGHPUT_STATUS BIG_ENDIAN "Throughput statistics from server" + <%= render "_ccsds_tlm.txt", locals: {apid: 100} %> + APPEND_ITEM CMD_RECV_COUNT 32 UINT "Commands received by server" + APPEND_ITEM CMD_RECV_RATE 32 FLOAT "Commands per second" + FORMAT_STRING "%.1f" + UNITS Hz Hz + APPEND_ITEM TLM_SENT_COUNT 32 UINT "Telemetry packets sent by server" + APPEND_ITEM TLM_SENT_RATE 32 FLOAT "Telemetry packets per second" + FORMAT_STRING "%.1f" + UNITS Hz Hz + APPEND_ITEM TLM_TARGET_RATE 32 UINT "Configured streaming rate" + UNITS Hz Hz + APPEND_ITEM BYTES_RECV 64 UINT "Total bytes received by server" + UNITS Bytes B + APPEND_ITEM BYTES_SENT 64 UINT "Total bytes sent by server" + UNITS Bytes B + APPEND_ITEM UPTIME_SEC 32 UINT "Server uptime" + UNITS Seconds s + ITEM PACKET_TIME 0 0 DERIVED "Python time based on TIMESEC and TIMEUS" + READ_CONVERSION openc3/conversions/unix_time_conversion.py TIMESEC TIMEUS \ No newline at end of file diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/procedures/throughput_test.py b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/procedures/throughput_test.py new file mode 100644 index 0000000000..ced0e4b683 --- /dev/null +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/procedures/throughput_test.py @@ -0,0 +1,221 @@ +# Throughput Test Script for <%= target_name %> (Python) +# +# This script tests command and telemetry throughput performance +# when connected to the external throughput server. +# +# Prerequisites: +# 1. Start the throughput server: python examples/throughput_server/throughput_server.py +# 2. Install the DEMO plugin with: use_throughput_server=true + +import time + +set_line_delay(0) +TARGET = "<%= target_name %>" + + +def test_command_throughput(num_commands, description): + print("=" * 60) + print(f"Test: {description}") + print(f"Sending {num_commands} commands...") + + # Reset stats before test + cmd(f"{TARGET} RESET_STATS") + wait(2) + + start_time = time.time() + for i in range(num_commands): + cmd(f"{TARGET} GET_STATS") + elapsed = time.time() - start_time + rate = num_commands / elapsed + + print(f"Completed: {num_commands} commands in {elapsed:.3f} seconds") + print(f"Command rate (msg): {rate:.1f} commands/second") + print("") + + start_time = time.time() + for i in range(num_commands): + cmd(f"{TARGET} GET_STATS_NO_MSG") + elapsed = time.time() - start_time + rate = num_commands / elapsed + + print(f"Completed: {num_commands} commands in {elapsed:.3f} seconds") + print(f"Command rate (no msg): {rate:.1f} commands/second") + print("") + + start_time = time.time() + for i in range(num_commands): + cmd(f"{TARGET} GET_STATS_NO_MSG", timeout=0) + elapsed = time.time() - start_time + rate = num_commands / elapsed + + print(f"Completed: {num_commands} commands in {elapsed:.3f} seconds") + print(f"Command rate (no ack): {rate:.1f} commands/second") + print("") + + with disable_instrumentation(): + start_time = time.time() + for i in range(num_commands): + cmd(f"{TARGET} GET_STATS_NO_MSG", timeout=0) + elapsed = time.time() - start_time + rate = num_commands / elapsed + + print(f"Completed: {num_commands} commands in {elapsed:.3f} seconds") + print(f"Command rate (no instrumentation): {rate:.1f} commands/second") + print("") + + return rate + + +def test_telemetry_throughput(stream_rate, duration, description): + print("=" * 60) + print(f"Test: {description}") + print(f"Streaming at {stream_rate} Hz for {duration} seconds...") + + # Reset stats on the server and request fresh telemetry + cmd(f"{TARGET} RESET_STATS") + wait(2) + cmd(f"{TARGET} GET_STATS") + # Wait for fresh packet to arrive with reset values (count is 0 in the packet) + wait_check(f"{TARGET} THROUGHPUT_STATUS TLM_SENT_COUNT == 0", 5) + + # Get initial counts - capture packet count FIRST to minimize race + initial_cosmos_count = get_tlm_cnt(f"{TARGET} THROUGHPUT_STATUS") + initial_server_count = tlm(f"{TARGET} THROUGHPUT_STATUS TLM_SENT_COUNT") + initial_seq = tlm(f"{TARGET} THROUGHPUT_STATUS CCSDSSEQCNT") + + # Start streaming + cmd(f"{TARGET} START_STREAM with RATE {stream_rate}") + + # Wait for specified duration + wait(duration) + + # Stop streaming and wait for in-flight packets to arrive + cmd(f"{TARGET} STOP_STREAM") + wait(2.0) + + # Request a status packet to trigger COSMOS telemetry count sync to Redis + # (counts are batched every 1 second and only sync when packets arrive) + cmd(f"{TARGET} GET_STATS") + wait(2.0) + + # Get final counts + final_cosmos_count = get_tlm_cnt(f"{TARGET} THROUGHPUT_STATUS") + final_server_count = tlm(f"{TARGET} THROUGHPUT_STATUS TLM_SENT_COUNT") + final_seq = tlm(f"{TARGET} THROUGHPUT_STATUS CCSDSSEQCNT") + + # Calculate packets sent by server during streaming + # Note: final_server_count is from the GET_STATS packet, which has the count BEFORE that packet was sent + packets_sent = final_server_count - initial_server_count + + # Calculate packets received by COSMOS during streaming + packets_received = final_cosmos_count - initial_cosmos_count + + # Calculate actual rate from test data (more accurate than server's TLM_SENT_RATE which is stale) + actual_rate = packets_sent / duration + + # Calculate sequence span (handles 14-bit wrap-around) + if final_seq >= initial_seq: + seq_span = final_seq - initial_seq + else: + seq_span = (0x3FFF - initial_seq) + final_seq + 1 + # Subtract 1 from seq_span to exclude the GET_STATS packet after STOP_STREAM + streaming_seq_span = seq_span - 1 + + # Calculate loss based on what server actually sent vs what COSMOS received + if packets_sent > 0: + loss_percent = max( + 0, round((packets_sent - packets_received) / packets_sent * 100, 2) + ) + else: + loss_percent = 0 + + # Check for sequence gaps (comparing streaming packets only) + seq_gaps = streaming_seq_span - packets_received + if seq_gaps < 0: + seq_gaps = 0 + + print(f"Server sent: {packets_sent} packets") + print(f"COSMOS received: {packets_received} packets") + print(f"Packet loss: {loss_percent}%") + print( + f"Sequence span: {initial_seq} -> {final_seq - 1} ({streaming_seq_span} streaming packets)" + ) + if seq_gaps > 0: + print(f"Sequence gaps detected: {seq_gaps} missing packets in sequence") + print(f"Actual rate: {actual_rate:.1f} Hz (target: {stream_rate} Hz)") + print("") + + return { + "rate": actual_rate, + "packets": packets_received, + "sent": packets_sent, + "loss_percent": loss_percent, + } + + +def run_throughput_tests(): + print("") + print("#" * 60) + print(f"# {TARGET} Throughput Test Suite (Python)") + print("#" * 60) + print("") + + results = {} + + # Command throughput tests + print("\n### COMMAND THROUGHPUT TESTS ###\n") + + results["cmd_burst_100"] = test_command_throughput(100, "Burst 100 commands") + results["cmd_burst_500"] = test_command_throughput(500, "Burst 500 commands") + results["cmd_burst_1000"] = test_command_throughput(1000, "Burst 1000 commands") + + # Wait for all command test responses to be fully processed by COSMOS + # (command tests generate many THROUGHPUT_STATUS packets that may still be in flight) + print("\nWaiting for command test packets to settle...") + wait(3.0) + + # Telemetry throughput tests + print("\n### TELEMETRY THROUGHPUT TESTS ###\n") + + results["tlm_100hz"] = test_telemetry_throughput(100, 5, "100 Hz for 5 seconds") + results["tlm_1000hz"] = test_telemetry_throughput(1000, 5, "1000 Hz for 5 seconds") + results["tlm_2000hz"] = test_telemetry_throughput(2000, 5, "2000 Hz for 5 seconds") + results["tlm_3000hz"] = test_telemetry_throughput(3000, 5, "3000 Hz for 5 seconds") + results["tlm_4000hz"] = test_telemetry_throughput(4000, 5, "4000 Hz for 5 seconds") + + # Summary + print("\n" + "=" * 60) + print("SUMMARY") + print("=" * 60) + + print("\nCommand Throughput:") + print(f" 100 cmd burst: {results['cmd_burst_100']:.1f} cmd/s") + print(f" 500 cmd burst: {results['cmd_burst_500']:.1f} cmd/s") + print(f" 1000 cmd burst: {results['cmd_burst_1000']:.1f} cmd/s") + + print("\nTelemetry Throughput:") + print( + f" 100 Hz target: {results['tlm_100hz']['rate']:.1f} Hz ({results['tlm_100hz']['loss_percent']}% loss)" + ) + print( + f" 1000 Hz target: {results['tlm_1000hz']['rate']:.1f} Hz ({results['tlm_1000hz']['loss_percent']}% loss)" + ) + print( + f" 2000 Hz target: {results['tlm_2000hz']['rate']:.1f} Hz ({results['tlm_2000hz']['loss_percent']}% loss)" + ) + print( + f" 3000 Hz target: {results['tlm_3000hz']['rate']:.1f} Hz ({results['tlm_3000hz']['loss_percent']}% loss)" + ) + print( + f" 4000 Hz target: {results['tlm_4000hz']['rate']:.1f} Hz ({results['tlm_4000hz']['loss_percent']}% loss)" + ) + + print("\n" + "#" * 60) + print("# Test Complete") + print("#" * 60) + + return results + + +# Run the tests +run_throughput_tests() diff --git a/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/screens/throughput.txt b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/screens/throughput.txt new file mode 100644 index 0000000000..a8fe22ef86 --- /dev/null +++ b/openc3-cosmos-init/plugins/packages/openc3-cosmos-demo/targets/INST2/screens/throughput.txt @@ -0,0 +1,38 @@ +SCREEN AUTO AUTO 1.0 +GLOBAL_SETTING LABELVALUE TEXTCOLOR "#bababa" + +VERTICAL + TITLE "<%= target_name %> Throughput Statistics" + + VERTICALBOX "Server Status" + LABELVALUE <%= target_name %> THROUGHPUT_STATUS UPTIME_SEC + LABELVALUE <%= target_name %> THROUGHPUT_STATUS TLM_TARGET_RATE + END + + VERTICALBOX "Command Metrics" + LABELVALUE <%= target_name %> THROUGHPUT_STATUS CMD_RECV_COUNT + LABELVALUE <%= target_name %> THROUGHPUT_STATUS CMD_RECV_RATE + LABELVALUE <%= target_name %> THROUGHPUT_STATUS BYTES_RECV + END + + VERTICALBOX "Telemetry Metrics" + LABELVALUE <%= target_name %> THROUGHPUT_STATUS TLM_SENT_COUNT + LABELVALUE <%= target_name %> THROUGHPUT_STATUS TLM_SENT_RATE + LABELVALUE <%= target_name %> THROUGHPUT_STATUS BYTES_SENT + END + + VERTICALBOX "Controls" + HORIZONTAL + BUTTON 'Start Stream 10 Hz' 'api.cmd("<%= target_name %> START_STREAM with RATE 10")' + BUTTON 'Start Stream 100 Hz' 'api.cmd("<%= target_name %> START_STREAM with RATE 100")' + BUTTON 'Start Stream 1000 Hz' 'api.cmd("<%= target_name %> START_STREAM with RATE 1000")' + END + HORIZONTAL + BUTTON 'Stop Stream' 'api.cmd("<%= target_name %> STOP_STREAM")' + BUTTON 'Get Stats' 'api.cmd("<%= target_name %> GET_STATS")' + BUTTON 'Reset Stats' 'api.cmd("<%= target_name %> RESET_STATS")' + END + END + + <%= render "_footer.txt" %> +END diff --git a/openc3/lib/openc3/microservices/interface_microservice.rb b/openc3/lib/openc3/microservices/interface_microservice.rb index 46e50743b6..a324f4d596 100644 --- a/openc3/lib/openc3/microservices/interface_microservice.rb +++ b/openc3/lib/openc3/microservices/interface_microservice.rb @@ -14,7 +14,7 @@ # GNU Affero General Public License for more details. # Modified by OpenC3, Inc. -# All changes Copyright 2025, OpenC3, Inc. +# All changes Copyright 2026, OpenC3, Inc. # All Rights Reserved # # This file may also be used under the terms of a commercial license @@ -573,7 +573,8 @@ def initialize(name) @queued = false @interface.options.each do |option_name, option_values| - if option_name.upcase == 'OPTIMIZE_THROUGHPUT' + # OPTIMIZE_THROUGHPUT was changed to UPDATE_INTERVAL to better represent the setting + if option_name.upcase == 'UPDATE_INTERVAL' or option_name.upcase == 'OPTIMIZE_THROUGHPUT' @queued = true update_interval = option_values[0].to_f EphemeralStoreQueued.instance.set_update_interval(update_interval) diff --git a/openc3/lib/openc3/models/target_model.rb b/openc3/lib/openc3/models/target_model.rb index 7ab37ac78a..9364c86131 100644 --- a/openc3/lib/openc3/models/target_model.rb +++ b/openc3/lib/openc3/models/target_model.rb @@ -14,7 +14,7 @@ # GNU Affero General Public License for more details. # Modified by OpenC3, Inc. -# All changes Copyright 2025, OpenC3, Inc. +# All changes Copyright 2026, OpenC3, Inc. # All Rights Reserved # # This file may also be used under the terms of a commercial license @@ -53,7 +53,10 @@ class TargetModel < Model VALID_TYPES = %i(CMD TLM) ERB_EXTENSIONS = %w(.txt .rb .py .json .yaml .yml) ITEM_MAP_CACHE_TIMEOUT = 10.0 + PACKET_CACHE_TIMEOUT = 10.0 @@item_map_cache = {} + @@packet_cache = {} + @@packet_cache_mutex = Mutex.new @@sync_packet_count_data = {} @@sync_packet_count_time = nil @@sync_packet_count_delay_seconds = 1.0 # Sync packet counts every second @@ -208,11 +211,41 @@ def self.download(target_name, scope:) def self.packet(target_name, packet_name, type: :TLM, scope:) raise "Unknown type #{type} for #{target_name} #{packet_name}" unless VALID_TYPES.include?(type) + # Check cache first + cache_key = "#{scope}__#{type}__#{target_name}__#{packet_name}" + @@packet_cache_mutex.synchronize do + cached = @@packet_cache[cache_key] + if cached && (Time.now - cached[:time]) < PACKET_CACHE_TIMEOUT + @@packet_cache_hits ||= 0 + @@packet_cache_hits += 1 + return cached[:packet] + end + end + # Assume it exists and just try to get it to avoid an extra call to Store.exist? json = Store.hget("#{scope}__openc3#{type.to_s.downcase}__#{target_name}", packet_name) raise "Packet '#{target_name} #{packet_name}' does not exist" if json.nil? - JSON.parse(json, allow_nan: true, create_additions: true) + packet = JSON.parse(json, allow_nan: true, create_additions: true) + + # Store in cache + @@packet_cache_mutex.synchronize do + @@packet_cache[cache_key] = { packet: packet, time: Time.now } + @@packet_cache_misses ||= 0 + @@packet_cache_misses += 1 + end + + packet + end + + def self.packet_cache_stats + @@packet_cache_mutex.synchronize do + { + hits: @@packet_cache_hits || 0, + misses: @@packet_cache_misses || 0, + size: @@packet_cache.size + } + end end # @return [Array] All packet hashes under the target_name @@ -236,6 +269,12 @@ def self.all_packet_name_descriptions(target_name, type: :TLM, scope:) def self.set_packet(target_name, packet_name, packet, type: :TLM, scope:) raise "Unknown type #{type} for #{target_name} #{packet_name}" unless VALID_TYPES.include?(type) + # Invalidate cache entry + cache_key = "#{scope}__#{type}__#{target_name}__#{packet_name}" + @@packet_cache_mutex.synchronize do + @@packet_cache.delete(cache_key) + end + begin Store.hset("#{scope}__openc3#{type.to_s.downcase}__#{target_name}", packet_name, JSON.generate(packet.as_json, allow_nan: true)) rescue JSON::GeneratorError => e diff --git a/openc3/lib/openc3/topics/command_topic.rb b/openc3/lib/openc3/topics/command_topic.rb index 29eee4b572..4b6aa64fca 100644 --- a/openc3/lib/openc3/topics/command_topic.rb +++ b/openc3/lib/openc3/topics/command_topic.rb @@ -14,7 +14,7 @@ # GNU Affero General Public License for more details. # Modified by OpenC3, Inc. -# All changes Copyright 2025, OpenC3, Inc. +# All changes Copyright 2026, OpenC3, Inc. # All Rights Reserved # # This file may also be used under the terms of a commercial license @@ -41,14 +41,23 @@ def self.write_packet(packet, scope:) end # @param command [Hash] Command hash structure read to be written to a topic + # @param timeout [Float] Timeout in seconds. Set to 0 or negative for fire-and-forget mode (no ACK waiting). def self.send_command(command, timeout: COMMAND_ACK_TIMEOUT_S, scope:, obfuscated_items: []) timeout = COMMAND_ACK_TIMEOUT_S unless timeout - ack_topic = "{#{scope}__ACKCMD}TARGET__#{command['target_name']}" - Topic.update_topic_offsets([ack_topic]) # Save the existing cmd_params Hash and JSON generate before writing to the topic cmd_params = command['cmd_params'] command['cmd_params'] = JSON.generate(command['cmd_params'].as_json, allow_nan: true) OpenC3.inject_context(command) + + # Fire-and-forget mode: skip ACK waiting when timeout <= 0 + if timeout <= 0 + Topic.write_topic("{#{scope}__CMD}TARGET__#{command['target_name']}", command, '*', 100) + command["cmd_params"] = cmd_params # Restore the original cmd_params Hash + return command + end + + ack_topic = "{#{scope}__ACKCMD}TARGET__#{command['target_name']}" + Topic.update_topic_offsets([ack_topic]) cmd_id = Topic.write_topic("{#{scope}__CMD}TARGET__#{command['target_name']}", command, '*', 100) command["cmd_params"] = cmd_params # Restore the original cmd_params Hash time = Time.now diff --git a/openc3/python/openc3/accessors/json_accessor.py b/openc3/python/openc3/accessors/json_accessor.py index 2d3f0b89d4..929afbbc20 100644 --- a/openc3/python/openc3/accessors/json_accessor.py +++ b/openc3/python/openc3/accessors/json_accessor.py @@ -1,4 +1,4 @@ -# Copyright 2024 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -14,19 +14,46 @@ # This file may also be used under the terms of a commercial license # if purchased from OpenC3, Inc. -import json +from functools import lru_cache + +try: + import orjson + + def json_loads(data): + return orjson.loads(data) + + def json_dumps(data): + return orjson.dumps(data, option=orjson.OPT_SERIALIZE_NUMPY).decode("utf-8") + +except ImportError: + import json + + def json_loads(data): + return json.loads(data) + + def json_dumps(data): + return json.dumps(data, separators=(",", ":")) + + from jsonpath_ng import parse from .accessor import Accessor +# Cache parsed JSONPath expressions to avoid recompiling on every call +# This provides a ~1000x speedup for repeated accesses (2.7ms -> 2.7µs) +@lru_cache(maxsize=256) +def _parse_jsonpath(path): + return parse(path) + + class JsonAccessor(Accessor): @classmethod def class_read_item(cls, item, buffer): if item.data_type == "DERIVED": return None if isinstance(buffer, bytearray): - buffer = json.loads(buffer.decode()) - result = parse(item.key).find(buffer) + buffer = json_loads(buffer) + result = _parse_jsonpath(item.key).find(buffer) if len(result) == 0: return None return cls.convert_to_type(result[0].value, item) @@ -36,12 +63,12 @@ def class_write_item(cls, item, value, buffer): if item.data_type == "DERIVED": return None if isinstance(buffer, bytearray): - decoded = json.loads(buffer.decode()) + decoded = json_loads(buffer) else: decoded = buffer value = cls.convert_to_type(value, item) - jsonpath = parse(item.key) + jsonpath = _parse_jsonpath(item.key) # update_or_create doesn't always work if the item is there # so first find the item before updating matches = jsonpath.find(decoded) @@ -52,25 +79,25 @@ def class_write_item(cls, item, value, buffer): if isinstance(buffer, bytearray): # buffer[0:] syntax so we copy into the buffer - buffer[0:] = bytearray(json.dumps(decoded, separators=(',', ':')), encoding="utf-8") + buffer[0:] = bytearray(json_dumps(decoded), encoding="utf-8") @classmethod def class_read_items(cls, items, buffer): if isinstance(buffer, bytearray): - buffer = json.loads(buffer.decode()) + buffer = json_loads(buffer) return super().class_read_items(items, buffer) @classmethod def class_write_items(cls, items, values, buffer): if isinstance(buffer, bytearray): - decoded = json.loads(buffer.decode()) + decoded = json_loads(buffer) else: decoded = buffer super().class_write_items(items, values, decoded) if isinstance(buffer, bytearray): # buffer[0:] syntax so we copy into the buffer - buffer[0:] = bytearray(json.dumps(decoded, separators=(',', ':')), encoding="utf-8") + buffer[0:] = bytearray(json_dumps(decoded), encoding="utf-8") def enforce_encoding(self): return None diff --git a/openc3/python/openc3/interfaces/protocols/burst_protocol.py b/openc3/python/openc3/interfaces/protocols/burst_protocol.py index ae21f13561..abe0494f56 100644 --- a/openc3/python/openc3/interfaces/protocols/burst_protocol.py +++ b/openc3/python/openc3/interfaces/protocols/burst_protocol.py @@ -1,4 +1,4 @@ -# Copyright 2025 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -51,7 +51,6 @@ def __init__( def reset(self): super().reset() self.data = b"" - # self.data.force_encoding('ASCII-8BIT') self.sync_state = "SEARCHING" # Reads from the interface. It can look for a sync pattern before @@ -181,7 +180,7 @@ def handle_sync_pattern(self): else: # not found self.log_discard(sync_index, False) # Delete Data Before and including first character of suspected sync Pattern - self.data = self.data[(sync_index + 1) :] + self.data = self.data[(sync_index + 1):] continue except ValueError: # sync_index = None diff --git a/openc3/python/openc3/interfaces/protocols/fixed_protocol.py b/openc3/python/openc3/interfaces/protocols/fixed_protocol.py index c9cc2d3a17..0de3847e62 100644 --- a/openc3/python/openc3/interfaces/protocols/fixed_protocol.py +++ b/openc3/python/openc3/interfaces/protocols/fixed_protocol.py @@ -1,4 +1,4 @@ -# Copyright 2025 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -134,8 +134,8 @@ def identify_and_finish_packet(self): self.packet_name = identified_packet.packet_name # Get the data from this packet - packet_data = self.data[0 : (identified_packet.defined_length + self.discard_leading_bytes)] - self.data = self.data[(identified_packet.defined_length + self.discard_leading_bytes) :] + packet_data = self.data[0:(identified_packet.defined_length + self.discard_leading_bytes)] + self.data = self.data[(identified_packet.defined_length + self.discard_leading_bytes):] break if identified_packet is None: diff --git a/openc3/python/openc3/interfaces/protocols/length_protocol.py b/openc3/python/openc3/interfaces/protocols/length_protocol.py index 2ad79fb253..a80549436d 100644 --- a/openc3/python/openc3/interfaces/protocols/length_protocol.py +++ b/openc3/python/openc3/interfaces/protocols/length_protocol.py @@ -1,4 +1,4 @@ -# Copyright 2025 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it diff --git a/openc3/python/openc3/interfaces/protocols/preidentified_protocol.py b/openc3/python/openc3/interfaces/protocols/preidentified_protocol.py index f935f022d8..68d7c39417 100644 --- a/openc3/python/openc3/interfaces/protocols/preidentified_protocol.py +++ b/openc3/python/openc3/interfaces/protocols/preidentified_protocol.py @@ -1,4 +1,4 @@ -# Copyright 2025 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -136,7 +136,7 @@ def reduce_to_single_packet(self): if len(self.data) < len(self.sync_pattern): return ("STOP", self.extra) - self.data = self.data[(len(self.sync_pattern)) :] + self.data = self.data[(len(self.sync_pattern)):] self.reduction_state = "SYNC_REMOVED" elif self.reduction_state == "START": self.reduction_state = "SYNC_REMOVED" diff --git a/openc3/python/openc3/interfaces/protocols/slip_protocol.py b/openc3/python/openc3/interfaces/protocols/slip_protocol.py index c5da24994a..03643b1380 100644 --- a/openc3/python/openc3/interfaces/protocols/slip_protocol.py +++ b/openc3/python/openc3/interfaces/protocols/slip_protocol.py @@ -1,4 +1,4 @@ -# Copyright 2025 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -132,10 +132,10 @@ def reduce_to_single_packet(self): # Reduce to packet data and setup current_data for next packet if index is not None: if index > 0: - packet_data = self.data[0 : (index + len(self.read_termination_characters))] + packet_data = self.data[0:(index + len(self.read_termination_characters))] else: # self.data begins with the termination characters - packet_data = self.data[0 : (len(self.read_termination_characters))] - self.data = self.data[(index + len(self.read_termination_characters)) :] + packet_data = self.data[0:(len(self.read_termination_characters))] + self.data = self.data[(index + len(self.read_termination_characters)):] return (packet_data, self.extra) else: return ("STOP", self.extra) diff --git a/openc3/python/openc3/interfaces/protocols/terminated_protocol.py b/openc3/python/openc3/interfaces/protocols/terminated_protocol.py index 81119667b2..1fc21033d5 100644 --- a/openc3/python/openc3/interfaces/protocols/terminated_protocol.py +++ b/openc3/python/openc3/interfaces/protocols/terminated_protocol.py @@ -1,4 +1,4 @@ -# Copyright 2025 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -74,13 +74,13 @@ def reduce_to_single_packet(self): if self.strip_read_termination: packet_data = self.data[0:index] else: - packet_data = self.data[0 : (index + len(self.read_termination_characters))] + packet_data = self.data[0:(index + len(self.read_termination_characters))] else: # self.data begins with the termination characters if self.strip_read_termination: packet_data = b"" else: # Keep everything - packet_data = self.data[0 : (len(self.read_termination_characters))] - self.data = self.data[(index + len(self.read_termination_characters)) :] + packet_data = self.data[0:(len(self.read_termination_characters))] + self.data = self.data[(index + len(self.read_termination_characters)):] return (packet_data, self.extra) except ValueError: # sync_index = None return ("STOP", self.extra) diff --git a/openc3/python/openc3/microservices/interface_microservice.py b/openc3/python/openc3/microservices/interface_microservice.py index ab2bb2c8aa..12155e5d06 100644 --- a/openc3/python/openc3/microservices/interface_microservice.py +++ b/openc3/python/openc3/microservices/interface_microservice.py @@ -1,4 +1,4 @@ -# Copyright 2025 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -220,7 +220,9 @@ def process_cmd(self, topic, msg_id, msg_hash, _redis): return "SUCCESS" if msg_hash.get(b"release_critical"): # Note: intentional fall through below this point - critical_model = CriticalCmdModel.get_model(name=msg_hash[b"release_critical"].decode(), scope=self.scope) + critical_model = CriticalCmdModel.get_model( + name=msg_hash[b"release_critical"].decode(), scope=self.scope + ) if critical_model is not None: msg_hash = critical_model.cmd_hash release_critical = True @@ -250,7 +252,9 @@ def process_cmd(self, topic, msg_id, msg_hash, _redis): f"{self.interface.name}: target_enable: {target_name} cmd_only:{cmd_only} tlm_only:{tlm_only}" ) except Exception as e: - self.logger.error(f"{self.interface.name}: target_control: {''.join(traceback.format_exception(e))}") + self.logger.error( + f"{self.interface.name}: target_control: {''.join(traceback.format_exception(e))}" + ) return str(e) return "SUCCESS" if msg_hash.get(b"interface_details"): @@ -544,7 +548,9 @@ def run(self): ) result = "SUCCESS" except Exception as e: - self.logger.error(f"{self.router.name}: target_control: {''.join(traceback.format_exception(e))}") + self.logger.error( + f"{self.router.name}: target_control: {''.join(traceback.format_exception(e))}" + ) result = str(e) elif msg_hash.get(b"router_details"): result = json.dumps(self.router.details(), cls=JsonEncoder) diff --git a/openc3/python/openc3/models/target_model.py b/openc3/python/openc3/models/target_model.py index 85ebf899ef..54eef26578 100644 --- a/openc3/python/openc3/models/target_model.py +++ b/openc3/python/openc3/models/target_model.py @@ -1,4 +1,4 @@ -# Copyright 2025 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -47,7 +47,10 @@ class TargetModel(Model): PRIMARY_KEY = "openc3_targets" VALID_TYPES = {"CMD", "TLM"} ITEM_MAP_CACHE_TIMEOUT = 10.0 + PACKET_CACHE_TIMEOUT = 10.0 item_map_cache = {} + packet_cache = {} + packet_cache_lock = threading.Lock() sync_packet_count_data = {} sync_packet_count_time = None sync_packet_count_delay_seconds = 1.0 # Sync packet counts every second @@ -78,11 +81,24 @@ def packet( if type not in cls.VALID_TYPES: raise RuntimeError(f"Unknown type {type} for {target_name} {packet_name}") + # Check cache first + cache_key = f"{scope}__{type}__{target_name}__{packet_name}" + with cls.packet_cache_lock: + cached = cls.packet_cache.get(cache_key) + if cached and (time.time() - cached["time"]) < cls.PACKET_CACHE_TIMEOUT: + return cached["packet"] + # Assume it exists and just try to get it to avoid an extra call to Store.exist? json_data = Store.hget(f"{scope}__openc3{type.lower()}__{target_name}", packet_name) if not json_data: raise RuntimeError(f"Packet '{target_name} {packet_name}' does not exist") - return json.loads(json_data) + packet = json.loads(json_data) + + # Store in cache + with cls.packet_cache_lock: + cls.packet_cache[cache_key] = {"packet": packet, "time": time.time()} + + return packet @classmethod def packets(cls, target_name: str, type: str = "TLM", scope: str = OPENC3_SCOPE): @@ -110,6 +126,11 @@ def set_packet( if type not in cls.VALID_TYPES: raise RuntimeError(f"Unknown type {type} for {target_name} {packet_name}") + # Invalidate cache entry + cache_key = f"{scope}__{type}__{target_name}__{packet_name}" + with cls.packet_cache_lock: + cls.packet_cache.pop(cache_key, None) + try: Store.hset( f"{scope}__openc3{type.lower()}__{target_name}", diff --git a/openc3/python/openc3/streams/tcpip_socket_stream.py b/openc3/python/openc3/streams/tcpip_socket_stream.py index 373a0e32a5..9a599fe739 100644 --- a/openc3/python/openc3/streams/tcpip_socket_stream.py +++ b/openc3/python/openc3/streams/tcpip_socket_stream.py @@ -1,4 +1,4 @@ -# Copyright 2024 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -59,7 +59,7 @@ def read(self): # No read mutex is needed because reads happen serially while True: # Loop until we get some data try: - data = self.read_socket.recv(4096, socket.MSG_DONTWAIT) + data = self.read_socket.recv(65535, socket.MSG_DONTWAIT) # Non-blocking sockets return an errno EAGAIN or EWOULDBLOCK # if there is no data available except socket.error as error: diff --git a/openc3/python/openc3/topics/command_topic.py b/openc3/python/openc3/topics/command_topic.py index cd04269563..60f5d766d9 100644 --- a/openc3/python/openc3/topics/command_topic.py +++ b/openc3/python/openc3/topics/command_topic.py @@ -1,4 +1,4 @@ -# Copyright 2024 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -42,13 +42,33 @@ def write_packet(cls, packet, scope): @classmethod def send_command(cls, command, timeout, scope, obfuscated_items=[]): + """Send a command to a target. + + Args: + command: Command hash structure to be written to a topic + timeout: Timeout in seconds. Set to 0 or negative for fire-and-forget mode (no ACK waiting). + scope: COSMOS scope + obfuscated_items: List of obfuscated items + """ if timeout is None: timeout = cls.COMMAND_ACK_TIMEOUT_S - ack_topic = f"{{{scope}__ACKCMD}}TARGET__{command['target_name']}" - Topic.update_topic_offsets([ack_topic]) # Save the existing cmd_params Hash and JSON generate before writing to the topic cmd_params = command["cmd_params"] command["cmd_params"] = json.dumps(command["cmd_params"], cls=JsonEncoder) + + # Fire-and-forget mode: skip ACK waiting when timeout <= 0 + if timeout <= 0: + Topic.write_topic( + f"{{{scope}__CMD}}TARGET__{command['target_name']}", + command, + "*", + 100, + ) + command["cmd_params"] = cmd_params # Restore the original cmd_params dict + return command + + ack_topic = f"{{{scope}__ACKCMD}}TARGET__{command['target_name']}" + Topic.update_topic_offsets([ack_topic]) cmd_id = Topic.write_topic( f"{{{scope}__CMD}}TARGET__{command['target_name']}", command, diff --git a/openc3/python/poetry.lock b/openc3/python/poetry.lock index e9ea60685f..ecc04eb8d9 100644 --- a/openc3/python/poetry.lock +++ b/openc3/python/poetry.lock @@ -83,18 +83,18 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "boto3" -version = "1.42.26" +version = "1.42.30" description = "The AWS SDK for Python" optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "boto3-1.42.26-py3-none-any.whl", hash = "sha256:f116cfbe7408e0a9153da363f134d2f1b5008f17ee86af104f0ce59a62be1833"}, - {file = "boto3-1.42.26.tar.gz", hash = "sha256:0fbcf1922e62d180f3644bc1139425821b38d93c1e6ec27409325d2ae86131aa"}, + {file = "boto3-1.42.30-py3-none-any.whl", hash = "sha256:d7e548bea65e0ae2c465c77de937bc686b591aee6a352d5a19a16bc751e591c1"}, + {file = "boto3-1.42.30.tar.gz", hash = "sha256:ba9cd2f7819637d15bfbeb63af4c567fcc8a7dcd7b93dd12734ec58601169538"}, ] [package.dependencies] -botocore = ">=1.42.26,<1.43.0" +botocore = ">=1.42.30,<1.43.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.16.0,<0.17.0" @@ -103,14 +103,14 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.42.26" +version = "1.42.30" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "botocore-1.42.26-py3-none-any.whl", hash = "sha256:71171c2d09ac07739f4efce398b15a4a8bc8769c17fb3bc99625e43ed11ad8b7"}, - {file = "botocore-1.42.26.tar.gz", hash = "sha256:1c8855e3e811f015d930ccfe8751d4be295aae0562133d14b6f0b247cd6fd8d3"}, + {file = "botocore-1.42.30-py3-none-any.whl", hash = "sha256:97070a438cac92430bb7b65f8ebd7075224f4a289719da4ee293d22d1e98db02"}, + {file = "botocore-1.42.30.tar.gz", hash = "sha256:9bf1662b8273d5cc3828a49f71ca85abf4e021011c1f0a71f41a2ea5769a5116"}, ] [package.dependencies] @@ -1034,6 +1034,103 @@ files = [ {file = "numpy-2.4.1.tar.gz", hash = "sha256:a1ceafc5042451a858231588a104093474c6a5c57dcc724841f5c888d237d690"}, ] +[[package]] +name = "orjson" +version = "3.11.5" +description = "Fast, correct Python JSON library supporting dataclasses, datetimes, and numpy" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "orjson-3.11.5-cp310-cp310-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:df9eadb2a6386d5ea2bfd81309c505e125cfc9ba2b1b99a97e60985b0b3665d1"}, + {file = "orjson-3.11.5-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ccc70da619744467d8f1f49a8cadae5ec7bbe054e5232d95f92ed8737f8c5870"}, + {file = "orjson-3.11.5-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:073aab025294c2f6fc0807201c76fdaed86f8fc4be52c440fb78fbb759a1ac09"}, + {file = "orjson-3.11.5-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:835f26fa24ba0bb8c53ae2a9328d1706135b74ec653ed933869b74b6909e63fd"}, + {file = "orjson-3.11.5-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:667c132f1f3651c14522a119e4dd631fad98761fa960c55e8e7430bb2a1ba4ac"}, + {file = "orjson-3.11.5-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:42e8961196af655bb5e63ce6c60d25e8798cd4dfbc04f4203457fa3869322c2e"}, + {file = "orjson-3.11.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:75412ca06e20904c19170f8a24486c4e6c7887dea591ba18a1ab572f1300ee9f"}, + {file = "orjson-3.11.5-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:6af8680328c69e15324b5af3ae38abbfcf9cbec37b5346ebfd52339c3d7e8a18"}, + {file = "orjson-3.11.5-cp310-cp310-musllinux_1_2_armv7l.whl", hash = "sha256:a86fe4ff4ea523eac8f4b57fdac319faf037d3c1be12405e6a7e86b3fbc4756a"}, + {file = "orjson-3.11.5-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:e607b49b1a106ee2086633167033afbd63f76f2999e9236f638b06b112b24ea7"}, + {file = "orjson-3.11.5-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:7339f41c244d0eea251637727f016b3d20050636695bc78345cce9029b189401"}, + {file = "orjson-3.11.5-cp310-cp310-win32.whl", hash = "sha256:8be318da8413cdbbce77b8c5fac8d13f6eb0f0db41b30bb598631412619572e8"}, + {file = "orjson-3.11.5-cp310-cp310-win_amd64.whl", hash = "sha256:b9f86d69ae822cabc2a0f6c099b43e8733dda788405cba2665595b7e8dd8d167"}, + {file = "orjson-3.11.5-cp311-cp311-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:9c8494625ad60a923af6b2b0bd74107146efe9b55099e20d7740d995f338fcd8"}, + {file = "orjson-3.11.5-cp311-cp311-macosx_15_0_arm64.whl", hash = "sha256:7bb2ce0b82bc9fd1168a513ddae7a857994b780b2945a8c51db4ab1c4b751ebc"}, + {file = "orjson-3.11.5-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:67394d3becd50b954c4ecd24ac90b5051ee7c903d167459f93e77fc6f5b4c968"}, + {file = "orjson-3.11.5-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:298d2451f375e5f17b897794bcc3e7b821c0f32b4788b9bcae47ada24d7f3cf7"}, + {file = "orjson-3.11.5-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:aa5e4244063db8e1d87e0f54c3f7522f14b2dc937e65d5241ef0076a096409fd"}, + {file = "orjson-3.11.5-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1db2088b490761976c1b2e956d5d4e6409f3732e9d79cfa69f876c5248d1baf9"}, + {file = "orjson-3.11.5-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c2ed66358f32c24e10ceea518e16eb3549e34f33a9d51f99ce23b0251776a1ef"}, + {file = "orjson-3.11.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c2021afda46c1ed64d74b555065dbd4c2558d510d8cec5ea6a53001b3e5e82a9"}, + {file = "orjson-3.11.5-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:b42ffbed9128e547a1647a3e50bc88ab28ae9daa61713962e0d3dd35e820c125"}, + {file = "orjson-3.11.5-cp311-cp311-musllinux_1_2_armv7l.whl", hash = "sha256:8d5f16195bb671a5dd3d1dbea758918bada8f6cc27de72bd64adfbd748770814"}, + {file = "orjson-3.11.5-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:c0e5d9f7a0227df2927d343a6e3859bebf9208b427c79bd31949abcc2fa32fa5"}, + {file = "orjson-3.11.5-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:23d04c4543e78f724c4dfe656b3791b5f98e4c9253e13b2636f1af5d90e4a880"}, + {file = "orjson-3.11.5-cp311-cp311-win32.whl", hash = "sha256:c404603df4865f8e0afe981aa3c4b62b406e6d06049564d58934860b62b7f91d"}, + {file = "orjson-3.11.5-cp311-cp311-win_amd64.whl", hash = "sha256:9645ef655735a74da4990c24ffbd6894828fbfa117bc97c1edd98c282ecb52e1"}, + {file = "orjson-3.11.5-cp311-cp311-win_arm64.whl", hash = "sha256:1cbf2735722623fcdee8e712cbaaab9e372bbcb0c7924ad711b261c2eccf4a5c"}, + {file = "orjson-3.11.5-cp312-cp312-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:334e5b4bff9ad101237c2d799d9fd45737752929753bf4faf4b207335a416b7d"}, + {file = "orjson-3.11.5-cp312-cp312-macosx_15_0_arm64.whl", hash = "sha256:ff770589960a86eae279f5d8aa536196ebda8273a2a07db2a54e82b93bc86626"}, + {file = "orjson-3.11.5-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ed24250e55efbcb0b35bed7caaec8cedf858ab2f9f2201f17b8938c618c8ca6f"}, + {file = "orjson-3.11.5-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a66d7769e98a08a12a139049aac2f0ca3adae989817f8c43337455fbc7669b85"}, + {file = "orjson-3.11.5-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:86cfc555bfd5794d24c6a1903e558b50644e5e68e6471d66502ce5cb5fdef3f9"}, + {file = "orjson-3.11.5-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a230065027bc2a025e944f9d4714976a81e7ecfa940923283bca7bbc1f10f626"}, + {file = "orjson-3.11.5-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b29d36b60e606df01959c4b982729c8845c69d1963f88686608be9ced96dbfaa"}, + {file = "orjson-3.11.5-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c74099c6b230d4261fdc3169d50efc09abf38ace1a42ea2f9994b1d79153d477"}, + {file = "orjson-3.11.5-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:e697d06ad57dd0c7a737771d470eedc18e68dfdefcdd3b7de7f33dfda5b6212e"}, + {file = "orjson-3.11.5-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:e08ca8a6c851e95aaecc32bc44a5aa75d0ad26af8cdac7c77e4ed93acf3d5b69"}, + {file = "orjson-3.11.5-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:e8b5f96c05fce7d0218df3fdfeb962d6b8cfff7e3e20264306b46dd8b217c0f3"}, + {file = "orjson-3.11.5-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:ddbfdb5099b3e6ba6d6ea818f61997bb66de14b411357d24c4612cf1ebad08ca"}, + {file = "orjson-3.11.5-cp312-cp312-win32.whl", hash = "sha256:9172578c4eb09dbfcf1657d43198de59b6cef4054de385365060ed50c458ac98"}, + {file = "orjson-3.11.5-cp312-cp312-win_amd64.whl", hash = "sha256:2b91126e7b470ff2e75746f6f6ee32b9ab67b7a93c8ba1d15d3a0caaf16ec875"}, + {file = "orjson-3.11.5-cp312-cp312-win_arm64.whl", hash = "sha256:acbc5fac7e06777555b0722b8ad5f574739e99ffe99467ed63da98f97f9ca0fe"}, + {file = "orjson-3.11.5-cp313-cp313-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:3b01799262081a4c47c035dd77c1301d40f568f77cc7ec1bb7db5d63b0a01629"}, + {file = "orjson-3.11.5-cp313-cp313-macosx_15_0_arm64.whl", hash = "sha256:61de247948108484779f57a9f406e4c84d636fa5a59e411e6352484985e8a7c3"}, + {file = "orjson-3.11.5-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:894aea2e63d4f24a7f04a1908307c738d0dce992e9249e744b8f4e8dd9197f39"}, + {file = "orjson-3.11.5-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ddc21521598dbe369d83d4d40338e23d4101dad21dae0e79fa20465dbace019f"}, + {file = "orjson-3.11.5-cp313-cp313-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7cce16ae2f5fb2c53c3eafdd1706cb7b6530a67cc1c17abe8ec747f5cd7c0c51"}, + {file = "orjson-3.11.5-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e46c762d9f0e1cfb4ccc8515de7f349abbc95b59cb5a2bd68df5973fdef913f8"}, + {file = "orjson-3.11.5-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d7345c759276b798ccd6d77a87136029e71e66a8bbf2d2755cbdde1d82e78706"}, + {file = "orjson-3.11.5-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:75bc2e59e6a2ac1dd28901d07115abdebc4563b5b07dd612bf64260a201b1c7f"}, + {file = "orjson-3.11.5-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:54aae9b654554c3b4edd61896b978568c6daa16af96fa4681c9b5babd469f863"}, + {file = "orjson-3.11.5-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:4bdd8d164a871c4ec773f9de0f6fe8769c2d6727879c37a9666ba4183b7f8228"}, + {file = "orjson-3.11.5-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:a261fef929bcf98a60713bf5e95ad067cea16ae345d9a35034e73c3990e927d2"}, + {file = "orjson-3.11.5-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:c028a394c766693c5c9909dec76b24f37e6a1b91999e8d0c0d5feecbe93c3e05"}, + {file = "orjson-3.11.5-cp313-cp313-win32.whl", hash = "sha256:2cc79aaad1dfabe1bd2d50ee09814a1253164b3da4c00a78c458d82d04b3bdef"}, + {file = "orjson-3.11.5-cp313-cp313-win_amd64.whl", hash = "sha256:ff7877d376add4e16b274e35a3f58b7f37b362abf4aa31863dadacdd20e3a583"}, + {file = "orjson-3.11.5-cp313-cp313-win_arm64.whl", hash = "sha256:59ac72ea775c88b163ba8d21b0177628bd015c5dd060647bbab6e22da3aad287"}, + {file = "orjson-3.11.5-cp314-cp314-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:e446a8ea0a4c366ceafc7d97067bfd55292969143b57e3c846d87fc701e797a0"}, + {file = "orjson-3.11.5-cp314-cp314-macosx_15_0_arm64.whl", hash = "sha256:53deb5addae9c22bbe3739298f5f2196afa881ea75944e7720681c7080909a81"}, + {file = "orjson-3.11.5-cp314-cp314-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:82cd00d49d6063d2b8791da5d4f9d20539c5951f965e45ccf4e96d33505ce68f"}, + {file = "orjson-3.11.5-cp314-cp314-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:3fd15f9fc8c203aeceff4fda211157fad114dde66e92e24097b3647a08f4ee9e"}, + {file = "orjson-3.11.5-cp314-cp314-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9df95000fbe6777bf9820ae82ab7578e8662051bb5f83d71a28992f539d2cda7"}, + {file = "orjson-3.11.5-cp314-cp314-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:92a8d676748fca47ade5bc3da7430ed7767afe51b2f8100e3cd65e151c0eaceb"}, + {file = "orjson-3.11.5-cp314-cp314-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:aa0f513be38b40234c77975e68805506cad5d57b3dfd8fe3baa7f4f4051e15b4"}, + {file = "orjson-3.11.5-cp314-cp314-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fa1863e75b92891f553b7922ce4ee10ed06db061e104f2b7815de80cdcb135ad"}, + {file = "orjson-3.11.5-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:d4be86b58e9ea262617b8ca6251a2f0d63cc132a6da4b5fcc8e0a4128782c829"}, + {file = "orjson-3.11.5-cp314-cp314-musllinux_1_2_armv7l.whl", hash = "sha256:b923c1c13fa02084eb38c9c065afd860a5cff58026813319a06949c3af5732ac"}, + {file = "orjson-3.11.5-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:1b6bd351202b2cd987f35a13b5e16471cf4d952b42a73c391cc537974c43ef6d"}, + {file = "orjson-3.11.5-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:bb150d529637d541e6af06bbe3d02f5498d628b7f98267ff87647584293ab439"}, + {file = "orjson-3.11.5-cp314-cp314-win32.whl", hash = "sha256:9cc1e55c884921434a84a0c3dd2699eb9f92e7b441d7f53f3941079ec6ce7499"}, + {file = "orjson-3.11.5-cp314-cp314-win_amd64.whl", hash = "sha256:a4f3cb2d874e03bc7767c8f88adaa1a9a05cecea3712649c3b58589ec7317310"}, + {file = "orjson-3.11.5-cp314-cp314-win_arm64.whl", hash = "sha256:38b22f476c351f9a1c43e5b07d8b5a02eb24a6ab8e75f700f7d479d4568346a5"}, + {file = "orjson-3.11.5-cp39-cp39-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:1b280e2d2d284a6713b0cfec7b08918ebe57df23e3f76b27586197afca3cb1e9"}, + {file = "orjson-3.11.5-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3c8d8a112b274fae8c5f0f01954cb0480137072c271f3f4958127b010dfefaec"}, + {file = "orjson-3.11.5-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:5f0a2ae6f09ac7bd47d2d5a5305c1d9ed08ac057cda55bb0a49fa506f0d2da00"}, + {file = "orjson-3.11.5-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c0d87bd1896faac0d10b4f849016db81a63e4ec5df38757ffae84d45ab38aa71"}, + {file = "orjson-3.11.5-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:801a821e8e6099b8c459ac7540b3c32dba6013437c57fdcaec205b169754f38c"}, + {file = "orjson-3.11.5-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:69a0f6ac618c98c74b7fbc8c0172ba86f9e01dbf9f62aa0b1776c2231a7bffe5"}, + {file = "orjson-3.11.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fea7339bdd22e6f1060c55ac31b6a755d86a5b2ad3657f2669ec243f8e3b2bdb"}, + {file = "orjson-3.11.5-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:4dad582bc93cef8f26513e12771e76385a7e6187fd713157e971c784112aad56"}, + {file = "orjson-3.11.5-cp39-cp39-musllinux_1_2_armv7l.whl", hash = "sha256:0522003e9f7fba91982e83a97fec0708f5a714c96c4209db7104e6b9d132f111"}, + {file = "orjson-3.11.5-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:7403851e430a478440ecc1258bcbacbfbd8175f9ac1e39031a7121dd0de05ff8"}, + {file = "orjson-3.11.5-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:5f691263425d3177977c8d1dd896cde7b98d93cbf390b2544a090675e83a6a0a"}, + {file = "orjson-3.11.5-cp39-cp39-win32.whl", hash = "sha256:61026196a1c4b968e1b1e540563e277843082e9e97d78afa03eb89315af531f1"}, + {file = "orjson-3.11.5-cp39-cp39-win_amd64.whl", hash = "sha256:09b94b947ac08586af635ef922d69dc9bc63321527a3a04647f4986a73f4bd30"}, + {file = "orjson-3.11.5.tar.gz", hash = "sha256:82393ab47b4fe44ffd0a7659fa9cfaacc717eb617c93cde83795f14af5c2e9d5"}, +] + [[package]] name = "packaging" version = "25.0" @@ -1475,31 +1572,31 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] [[package]] name = "ruff" -version = "0.14.11" +version = "0.14.13" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" groups = ["dev"] files = [ - {file = "ruff-0.14.11-py3-none-linux_armv6l.whl", hash = "sha256:f6ff2d95cbd335841a7217bdfd9c1d2e44eac2c584197ab1385579d55ff8830e"}, - {file = "ruff-0.14.11-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6f6eb5c1c8033680f4172ea9c8d3706c156223010b8b97b05e82c59bdc774ee6"}, - {file = "ruff-0.14.11-py3-none-macosx_11_0_arm64.whl", hash = "sha256:f2fc34cc896f90080fca01259f96c566f74069a04b25b6205d55379d12a6855e"}, - {file = "ruff-0.14.11-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:53386375001773ae812b43205d6064dae49ff0968774e6befe16a994fc233caa"}, - {file = "ruff-0.14.11-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a697737dce1ca97a0a55b5ff0434ee7205943d4874d638fe3ae66166ff46edbe"}, - {file = "ruff-0.14.11-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6845ca1da8ab81ab1dce755a32ad13f1db72e7fba27c486d5d90d65e04d17b8f"}, - {file = "ruff-0.14.11-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:e36ce2fd31b54065ec6f76cb08d60159e1b32bdf08507862e32f47e6dde8bcbf"}, - {file = "ruff-0.14.11-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:590bcc0e2097ecf74e62a5c10a6b71f008ad82eb97b0a0079e85defe19fe74d9"}, - {file = "ruff-0.14.11-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:53fe71125fc158210d57fe4da26e622c9c294022988d08d9347ec1cf782adafe"}, - {file = "ruff-0.14.11-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a35c9da08562f1598ded8470fcfef2afb5cf881996e6c0a502ceb61f4bc9c8a3"}, - {file = "ruff-0.14.11-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:0f3727189a52179393ecf92ec7057c2210203e6af2676f08d92140d3e1ee72c1"}, - {file = "ruff-0.14.11-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:eb09f849bd37147a789b85995ff734a6c4a095bed5fd1608c4f56afc3634cde2"}, - {file = "ruff-0.14.11-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:c61782543c1231bf71041461c1f28c64b961d457d0f238ac388e2ab173d7ecb7"}, - {file = "ruff-0.14.11-py3-none-musllinux_1_2_i686.whl", hash = "sha256:82ff352ea68fb6766140381748e1f67f83c39860b6446966cff48a315c3e2491"}, - {file = "ruff-0.14.11-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:728e56879df4ca5b62a9dde2dd0eb0edda2a55160c0ea28c4025f18c03f86984"}, - {file = "ruff-0.14.11-py3-none-win32.whl", hash = "sha256:337c5dd11f16ee52ae217757d9b82a26400be7efac883e9e852646f1557ed841"}, - {file = "ruff-0.14.11-py3-none-win_amd64.whl", hash = "sha256:f981cea63d08456b2c070e64b79cb62f951aa1305282974d4d5216e6e0178ae6"}, - {file = "ruff-0.14.11-py3-none-win_arm64.whl", hash = "sha256:649fb6c9edd7f751db276ef42df1f3df41c38d67d199570ae2a7bd6cbc3590f0"}, - {file = "ruff-0.14.11.tar.gz", hash = "sha256:f6dc463bfa5c07a59b1ff2c3b9767373e541346ea105503b4c0369c520a66958"}, + {file = "ruff-0.14.13-py3-none-linux_armv6l.whl", hash = "sha256:76f62c62cd37c276cb03a275b198c7c15bd1d60c989f944db08a8c1c2dbec18b"}, + {file = "ruff-0.14.13-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:914a8023ece0528d5cc33f5a684f5f38199bbb566a04815c2c211d8f40b5d0ed"}, + {file = "ruff-0.14.13-py3-none-macosx_11_0_arm64.whl", hash = "sha256:d24899478c35ebfa730597a4a775d430ad0d5631b8647a3ab368c29b7e7bd063"}, + {file = "ruff-0.14.13-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9aaf3870f14d925bbaf18b8a2347ee0ae7d95a2e490e4d4aea6813ed15ebc80e"}, + {file = "ruff-0.14.13-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac5b7f63dd3b27cc811850f5ffd8fff845b00ad70e60b043aabf8d6ecc304e09"}, + {file = "ruff-0.14.13-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:78d2b1097750d90ba82ce4ba676e85230a0ed694178ca5e61aa9b459970b3eb9"}, + {file = "ruff-0.14.13-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:7d0bf87705acbbcb8d4c24b2d77fbb73d40210a95c3903b443cd9e30824a5032"}, + {file = "ruff-0.14.13-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a3eb5da8e2c9e9f13431032fdcbe7681de9ceda5835efee3269417c13f1fed5c"}, + {file = "ruff-0.14.13-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:642442b42957093811cd8d2140dfadd19c7417030a7a68cf8d51fcdd5f217427"}, + {file = "ruff-0.14.13-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4acdf009f32b46f6e8864af19cbf6841eaaed8638e65c8dac845aea0d703c841"}, + {file = "ruff-0.14.13-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:591a7f68860ea4e003917d19b5c4f5ac39ff558f162dc753a2c5de897fd5502c"}, + {file = "ruff-0.14.13-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:774c77e841cc6e046fc3e91623ce0903d1cd07e3a36b1a9fe79b81dab3de506b"}, + {file = "ruff-0.14.13-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:61f4e40077a1248436772bb6512db5fc4457fe4c49e7a94ea7c5088655dd21ae"}, + {file = "ruff-0.14.13-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6d02f1428357fae9e98ac7aa94b7e966fd24151088510d32cf6f902d6c09235e"}, + {file = "ruff-0.14.13-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:e399341472ce15237be0c0ae5fbceca4b04cd9bebab1a2b2c979e015455d8f0c"}, + {file = "ruff-0.14.13-py3-none-win32.whl", hash = "sha256:ef720f529aec113968b45dfdb838ac8934e519711da53a0456038a0efecbd680"}, + {file = "ruff-0.14.13-py3-none-win_amd64.whl", hash = "sha256:6070bd026e409734b9257e03e3ef18c6e1a216f0435c6751d7a8ec69cb59abef"}, + {file = "ruff-0.14.13-py3-none-win_arm64.whl", hash = "sha256:7ab819e14f1ad9fe39f246cfcc435880ef7a9390d81a2b6ac7e01039083dd247"}, + {file = "ruff-0.14.13.tar.gz", hash = "sha256:83cd6c0763190784b99650a20fec7633c59f6ebe41c5cc9d45ee42749563ad47"}, ] [[package]] @@ -1818,4 +1915,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = ">=3.10,<4.0" -content-hash = "b6904c546db5c39ac9ddfb7761a43dbed78c85a0a6b97e7c1861e95077aec2ae" +content-hash = "d86bf0f9368b132d24a312150a7774d07376e41895ecd62a3813134fc6e4b823" diff --git a/openc3/python/pyproject.toml b/openc3/python/pyproject.toml index 351d37f2df..49e1a3c83c 100644 --- a/openc3/python/pyproject.toml +++ b/openc3/python/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "boto3 (>=1.36.13,<2.0.0)", "cbor2 (>=5.6.5,<6.0.0)", "jsonpath-ng (>=1.7.0,<2.0.0)", + "orjson (>=3.10.0,<4.0.0)", "lxml (>=6.0.0,<7.0.0)", "numpy (>= 2.0, < 3.0)", "paho-mqtt (>=2.1.0,<3.0.0)", diff --git a/openc3/python/test/microservices/test_interface_microservice.py b/openc3/python/test/microservices/test_interface_microservice.py index 66b0ca9619..33cb91a690 100644 --- a/openc3/python/test/microservices/test_interface_microservice.py +++ b/openc3/python/test/microservices/test_interface_microservice.py @@ -1,4 +1,4 @@ -# Copyright 2024 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -33,6 +33,7 @@ from openc3.topics.interface_topic import InterfaceTopic from openc3.microservices.interface_microservice import InterfaceMicroservice from openc3.utilities.time import from_nsec_from_epoch +from openc3.utilities.store_queued import StoreQueued, EphemeralStoreQueued # This must be here in order to work when running more than this individual file @@ -422,4 +423,54 @@ def test_supports_protocol_cmd(self): self.assertEqual(("PARAM2", 3), im.interface.protocol_cmd_args) self.assertEqual("READ", im.interface.protocol_read_write) self.assertEqual(3, im.interface.protocol_index) - im.shutdown() \ No newline at end of file + im.shutdown() + + def test_supports_update_interval_option_to_enable_queued_writes(self): + # Update the model to use UPDATE_INTERVAL option + model = InterfaceModel( + name="INST_INT", + scope="DEFAULT", + target_names=["INST"], + cmd_target_names=["INST"], + tlm_target_names=["INST"], + config_params=["test_interface.py"], + options=[["UPDATE_INTERVAL", "0.2"]], + ) + model.update() + + # Reset the store update intervals to 0 to verify they get set + StoreQueued.instance().set_update_interval(0) + EphemeralStoreQueued.instance().set_update_interval(0) + + im = InterfaceMicroservice("DEFAULT__INTERFACE__INST_INT") + self.assertEqual(im.queued, True) + self.assertEqual(StoreQueued.instance().update_interval, 0.2) + self.assertEqual(EphemeralStoreQueued.instance().update_interval, 0.2) + + im.shutdown() + time.sleep(0.1) # Allow threads to exit + + def test_supports_optimize_throughput_option_for_backward_compatibility(self): + # Update the model to use OPTIMIZE_THROUGHPUT option (legacy name) + model = InterfaceModel( + name="INST_INT", + scope="DEFAULT", + target_names=["INST"], + cmd_target_names=["INST"], + tlm_target_names=["INST"], + config_params=["test_interface.py"], + options=[["OPTIMIZE_THROUGHPUT", "0.3"]], + ) + model.update() + + # Reset the store update intervals to 0 to verify they get set + StoreQueued.instance().set_update_interval(0) + EphemeralStoreQueued.instance().set_update_interval(0) + + im = InterfaceMicroservice("DEFAULT__INTERFACE__INST_INT") + self.assertEqual(im.queued, True) + self.assertEqual(StoreQueued.instance().update_interval, 0.3) + self.assertEqual(EphemeralStoreQueued.instance().update_interval, 0.3) + + im.shutdown() + time.sleep(0.1) # Allow threads to exit \ No newline at end of file diff --git a/openc3/python/test/models/test_target_model.py b/openc3/python/test/models/test_target_model.py index 9e8658ed2d..410fe8f467 100644 --- a/openc3/python/test/models/test_target_model.py +++ b/openc3/python/test/models/test_target_model.py @@ -1,4 +1,4 @@ -# Copyright 2025 OpenC3, Inc. +# Copyright 2026 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -163,6 +163,8 @@ def setUp(self): setup_system() model = TargetModel(folder_name="INST", name="INST", scope="DEFAULT") model.create() + # Clear packet cache before each test + TargetModel.packet_cache = {} def test_raises_for_an_unknown_type(self): with self.assertRaisesRegex(RuntimeError, "Unknown type OTHER"): @@ -186,6 +188,70 @@ def test_returns_packet_hash_if_the_command_exists(self): self.assertEqual(pkt["target_name"], "INST") self.assertEqual(pkt["packet_name"], "ABORT") + def test_caches_packet_lookups(self): + # First call should populate cache + pkt1 = TargetModel.packet("INST", "HEALTH_STATUS", type="TLM", scope="DEFAULT") + self.assertEqual(len(TargetModel.packet_cache), 1) + + # Second call should hit cache and return same result + pkt2 = TargetModel.packet("INST", "HEALTH_STATUS", type="TLM", scope="DEFAULT") + self.assertEqual(len(TargetModel.packet_cache), 1) + + # Both should be equal + self.assertEqual(pkt1, pkt2) + + def test_expires_cache_after_timeout(self): + import time as time_module + + # First call populates cache + pkt1 = TargetModel.packet("INST", "HEALTH_STATUS", type="TLM", scope="DEFAULT") + self.assertEqual(len(TargetModel.packet_cache), 1) + + # Save the original timeout and set it to 0 to force expiration + original_timeout = TargetModel.PACKET_CACHE_TIMEOUT + TargetModel.PACKET_CACHE_TIMEOUT = 0 + + try: + # Wait a tiny bit to ensure cache is expired + time_module.sleep(0.001) + + # Next call should miss cache due to expiration + pkt2 = TargetModel.packet("INST", "HEALTH_STATUS", type="TLM", scope="DEFAULT") + + # Still only one entry but cache time should have been updated + self.assertEqual(len(TargetModel.packet_cache), 1) + self.assertEqual(pkt1, pkt2) + finally: + # Restore timeout + TargetModel.PACKET_CACHE_TIMEOUT = original_timeout + + def test_invalidates_cache_on_set_packet(self): + # First call populates cache + pkt1 = TargetModel.packet("INST", "HEALTH_STATUS", type="TLM", scope="DEFAULT") + self.assertEqual(len(TargetModel.packet_cache), 1) + + # set_packet should invalidate the cache entry + TargetModel.set_packet("INST", "HEALTH_STATUS", pkt1, type="TLM", scope="DEFAULT") + self.assertEqual(len(TargetModel.packet_cache), 0) + + # Next packet() call should re-populate cache + pkt2 = TargetModel.packet("INST", "HEALTH_STATUS", type="TLM", scope="DEFAULT") + self.assertEqual(len(TargetModel.packet_cache), 1) + self.assertEqual(pkt1, pkt2) + + def test_caches_different_packet_types_separately(self): + # Get TLM packet + tlm_pkt = TargetModel.packet("INST", "HEALTH_STATUS", type="TLM", scope="DEFAULT") + self.assertEqual(len(TargetModel.packet_cache), 1) + + # Get CMD packet + cmd_pkt = TargetModel.packet("INST", "ABORT", type="CMD", scope="DEFAULT") + self.assertEqual(len(TargetModel.packet_cache), 2) + + # Verify different packets + self.assertEqual(tlm_pkt["packet_name"], "HEALTH_STATUS") + self.assertEqual(cmd_pkt["packet_name"], "ABORT") + class TestTargetModelPacketItem(unittest.TestCase): def setUp(self): diff --git a/openc3/spec/microservices/interface_microservice_spec.rb b/openc3/spec/microservices/interface_microservice_spec.rb index 92aea32e28..4ce2a317ad 100644 --- a/openc3/spec/microservices/interface_microservice_spec.rb +++ b/openc3/spec/microservices/interface_microservice_spec.rb @@ -163,6 +163,21 @@ class ApiTest expect(Thread.list.count).to eql init_threads end + it "supports UPDATE_INTERVAL option to enable queued writes" do + # First update the model to use UPDATE_INTERVAL instead of OPTIMIZE_THROUGHPUT + InterfaceModel.new(name: "INST_INT", scope: "DEFAULT", target_names: ["INST"], cmd_target_names: ["INST"], tlm_target_names: ["INST"], config_params: ["TestInterface"], options: [["UPDATE_INTERVAL", "0.2"]]).update + StoreQueued.instance.set_update_interval(0) + EphemeralStoreQueued.instance.set_update_interval(0) + + im = InterfaceMicroservice.new("DEFAULT__INTERFACE__INST_INT") + expect(im.instance_variable_get(:@queued)).to eql true + expect(StoreQueued.instance.update_interval).to eql 0.2 + expect(EphemeralStoreQueued.instance.update_interval).to eql 0.2 + + im.shutdown + sleep 0.1 # Allow threads to exit + end + it "preserves existing packet counts" do # Initialize the telemetry topic with a non-zero RECEIVED_COUNT System.telemetry.packets("INST").each do |_packet_name, packet| diff --git a/openc3/spec/models/target_model_spec.rb b/openc3/spec/models/target_model_spec.rb index 2718f0e696..1b794b6cfa 100644 --- a/openc3/spec/models/target_model_spec.rb +++ b/openc3/spec/models/target_model_spec.rb @@ -333,6 +333,92 @@ module OpenC3 expect(pkt['target_name']).to eql "INST" expect(pkt['packet_name']).to eql "ABORT" end + + it "caches packet lookups" do + # Clear cache and stats before test + TargetModel.class_variable_set(:@@packet_cache, {}) + TargetModel.class_variable_set(:@@packet_cache_hits, 0) + TargetModel.class_variable_set(:@@packet_cache_misses, 0) + + # First call should miss cache + pkt1 = TargetModel.packet("INST", "HEALTH_STATUS", type: :TLM, scope: "DEFAULT") + stats = TargetModel.packet_cache_stats + expect(stats[:hits]).to eql 0 + expect(stats[:misses]).to eql 1 + expect(stats[:size]).to eql 1 + + # Second call should hit cache + pkt2 = TargetModel.packet("INST", "HEALTH_STATUS", type: :TLM, scope: "DEFAULT") + stats = TargetModel.packet_cache_stats + expect(stats[:hits]).to eql 1 + expect(stats[:misses]).to eql 1 + + # Both packets should be equivalent + expect(pkt1).to eql pkt2 + end + + it "expires cache after timeout" do + # Clear cache before test + TargetModel.class_variable_set(:@@packet_cache, {}) + TargetModel.class_variable_set(:@@packet_cache_hits, 0) + TargetModel.class_variable_set(:@@packet_cache_misses, 0) + + # First call populates cache + TargetModel.packet("INST", "HEALTH_STATUS", type: :TLM, scope: "DEFAULT") + expect(TargetModel.packet_cache_stats[:misses]).to eql 1 + + # Set timeout to 0 to force expiration + timeout = TargetModel::PACKET_CACHE_TIMEOUT + OpenC3.disable_warnings do + TargetModel::PACKET_CACHE_TIMEOUT = 0 + end + + # Next call should miss cache due to expiration + TargetModel.packet("INST", "HEALTH_STATUS", type: :TLM, scope: "DEFAULT") + expect(TargetModel.packet_cache_stats[:misses]).to eql 2 + expect(TargetModel.packet_cache_stats[:hits]).to eql 0 + + # Restore timeout + OpenC3.disable_warnings do + TargetModel::PACKET_CACHE_TIMEOUT = timeout + end + end + + it "invalidates cache on set_packet" do + # Clear cache before test + TargetModel.class_variable_set(:@@packet_cache, {}) + TargetModel.class_variable_set(:@@packet_cache_hits, 0) + TargetModel.class_variable_set(:@@packet_cache_misses, 0) + + # Populate cache + pkt = TargetModel.packet("INST", "HEALTH_STATUS", type: :TLM, scope: "DEFAULT") + expect(TargetModel.packet_cache_stats[:size]).to eql 1 + + # set_packet should invalidate the cache entry + TargetModel.set_packet("INST", "HEALTH_STATUS", pkt, type: :TLM, scope: "DEFAULT") + expect(TargetModel.packet_cache_stats[:size]).to eql 0 + + # Next get should miss cache + TargetModel.packet("INST", "HEALTH_STATUS", type: :TLM, scope: "DEFAULT") + expect(TargetModel.packet_cache_stats[:misses]).to eql 2 + end + + it "caches different packet types separately" do + # Clear cache before test + TargetModel.class_variable_set(:@@packet_cache, {}) + + # Get telemetry packet + tlm_pkt = TargetModel.packet("INST", "HEALTH_STATUS", type: :TLM, scope: "DEFAULT") + expect(TargetModel.packet_cache_stats[:size]).to eql 1 + + # Get command packet + cmd_pkt = TargetModel.packet("INST", "ABORT", type: :CMD, scope: "DEFAULT") + expect(TargetModel.packet_cache_stats[:size]).to eql 2 + + # Verify they are different packets + expect(tlm_pkt['packet_name']).to eql "HEALTH_STATUS" + expect(cmd_pkt['packet_name']).to eql "ABORT" + end end describe "self.packet_item" do