-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathm16_driver.py
More file actions
383 lines (325 loc) · 13.7 KB
/
m16_driver.py
File metadata and controls
383 lines (325 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
import os
import serial
import struct
import json
import logging
from time import time, sleep
from typing import Optional, Dict, Any
class M16:
"""
Library for controlling the M16 modem.
Provides methods for configuring the modem, sending commands,
reading diagnostic packets, and decoding them.
The modem maintains internal state for:
- channel (1-12)
- power level (1-4)
- mode (diagnostic or transparent)
"""
# Valid channels (1 through 12) and levels (1 through 4)
CHANNELS = [1, 2, 3, 4, 5 ,6, 7, 8, 9, 10, 11, 12]
LEVELS = [1, 2, 3, 4]
PACKET_LENGTH = 18
def __init__(self, port: str, baudrate: int = 9600, channel: int = 1, level: int = 4, diagnostic: bool = False,
timeout: float = 0.5) -> None:
"""
Initialize the modem connection. If channel, level or diagnostic mode is not spesified they are set to default
default = channel = 1, Level = 4, diagnostic mode = False
If an optional parameter is left as None, the modem will retain its current configuration.
Parameters:
port (str): Serial port (e.g. "COM3" on Windows or "/dev/ttyUSB0" on Linux).
baudrate (int): Baud rate (default 9600).
timeout (float): Timeout for serial reads (default 0.5).
channel (int): Channel to set (valid values 1 to 12), (default 1).
level (int): Power level to set (valid values 1 to 4), (default 4).
diagnostic (bool): If True, set the modem to diagnostic mode; if False, set transparent mode, (default 1).
"""
# Logging
self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s:%(lineno)d=%(levelname)s:%(message)s')
# Check if the port exists
if not os.path.exists(port):
raise ValueError(f"Port {port} does not exist")
self.ser = serial.Serial(port, baudrate, timeout=timeout)
# Initialize internal state with defaults.
self.channel = channel
self.level = level
self.diagnostic = diagnostic
self.logger.info(f"Connecting to modem with: channel: {channel}, level: {level}, diagnostic: {diagnostic}")
self.set_channel(channel)
self.logger.info(f"Setting channel: {channel}")
self.set_level(level)
self.logger.info(f"Setting level: {level}")
if diagnostic:
self.set_diagnostic_mode()
else:
self.reset_diagnostic_mode()
self.logger.info(f"Setting diagnostic mode: {diagnostic}")
def send_data(self, data: str) -> int | None:
"""
Send ASCII data to the modem.
Parameters:
data (str): The data to be sent.
Returns:
int: Number of characters written.
"""
return self.ser.write(data.encode('ascii'))
def set_channel(self, channel: int) -> bool:
"""
Set the modem's communication channel.
Parameters:
channel (int): The channel number (1 to 12).
"""
if channel not in self.CHANNELS:
self.logger.warning(f"Channel: {channel} is not a valid channel, needs to be between 1-12 ")
return False
self.send_data('c')
sleep(1)
self.send_data('c')
# For channels 10-12, convert to letters: 10 -> 'a', 11 -> 'b', 12 -> 'c'
if channel in (10, 11, 12):
ch_str = {10: 'a', 11: 'b', 12: 'c'}[channel]
else:
ch_str = str(channel)
self.send_data(ch_str)
self.channel = channel # Update internal state
sleep(1)
return True
def set_level(self, level: int) -> bool:
"""
Set the modem's power level.
Parameters:
level (int): The power level (1 to 4).
"""
if level not in self.LEVELS:
self.logger.warning(f"Level: {level} is not a valid level, needs to be between 1-4 ")
return False
self.send_data('l')
sleep(1)
self.send_data('l')
self.send_data(str(level))
self.level = level # Update internal state
sleep(1)
return True
def set_diagnostic_mode(self) -> None:
"""
Set the modem in diagnostic mode.
"""
self.send_data('d')
sleep(1)
self.send_data('d')
self.diagnostic = True # Update internal state
sleep(1)
def reset_diagnostic_mode(self) -> None:
"""
Reset the modem from diagnostic mode (enter transparent mode).
"""
self.send_data('t')
sleep(1)
self.send_data('t')
self.diagnostic = False # Update internal state
sleep(1)
def toggle_mode(self) -> None:
"""
Toggle between diagnostic and transparent modes.
"""
self.send_data('m')
sleep(1)
self.send_data('m')
# Toggle internal state if already set; if not, we cannot infer reliably.
if self.diagnostic is not None:
self.diagnostic = not self.diagnostic
sleep(1)
def get_report(self) -> None:
"""
Request a diagnostic report from the modem.
"""
self.send_data('r')
sleep(1)
self.send_data('r')
sleep(1)
def request_report(self, filename: Optional[str] = None, overall_timeout: float = 5.0) -> Dict[str, Any] | None:
"""
Request a diagnostic report, decode it, update member varaibles from the report,
and optionally save the report as a JSON file.
This function sends the report request command and then listens for a valid packet
until overall_timeout seconds have elapsed.
Parameters:
filename (str, optional): If provided, the report is saved to this file.
overall_timeout (float): Maximum time (in seconds) to wait for a valid report.
Returns:
Dict[str, Any]: The decoded report if successful; otherwise, None.
"""
# Send the report request.
self.get_report()
packet = self.read_packet()
self.logger.debug(f"Found a packet of length: {len(str(packet))} -> {str(packet)}")
if packet is None:
self.logger.info("No valid packet received.")
return None
report = self.decode_packet(packet)
self.logger.debug(f"Decoded packet: \n{report}")
if report is None:
self.logger.info("Failed to decode the packet.")
return None
# Update internal state from the report.
self.update_state_from_report(report)
# Optionally save the report as JSON.
if filename is not None:
with open(filename, "w") as f:
json.dump(report, f, indent=4, default=self._default_converter)
self.logger.info(f"Report saved to {filename}")
return report
def update_state_from_report(self, report: Dict[str, Any]) -> None:
"""
Update internal state modem configuration
Parameters:
report (Dict[str, Any]): Decoded report from the modem containing configuration info.
"""
self.channel = report.get("CHANNEL", self.channel)
self.level = 4 - report.get("LEVEL", self.level)
self.diagnostic = bool(report.get("DIAGNOSTIC_MODE", 0))
self.logger.debug(f"State updated: channel={self.channel}, level={self.level}, diagnostic={self.diagnostic}")
def send_two_bytes(self, data: str) -> (int | None):
"""
Send two bytes of data to the modem.
Parameters:
data (str): A string (at least two characters) representing the data.
Returns:
int: Number of characters written-
"""
if len(data) != 2:
return 0
else:
bytes = self.send_data(data)
sleep(1)
return bytes
def send_msg(self, msg: str, timeout_per_chunk: float = 5.0) -> (int | None):
"""
Send a longer message (more than 2 bytes) in 2-byte chunks.
If in diagnostic mode, after sending each chunk, wait for a diagnostic report
that indicates the transmission is complete (TX_COMPLETE == 1).
If in transparent mode, simply wait 2 seconds between chunks.
Parameters:
msg (str): The message to be sent.
timeout_per_chunk (float): Maximum time (in seconds) to wait for TX_COMPLETE
after sending each 2-byte chunk (diagnostic mode only).
Returns:
"""
sum_sent_char = 0
# Break the message into 2-byte chunks.
if len(msg) % 2 != 0:
msg = msg + " "
for i in range(0, len(msg), 2):
chunk = msg[i:i+2]
sent_char = self.send_two_bytes(chunk)
self.logger.info(f"Sent chunk: '{chunk}'")
if sent_char is not None:
sum_sent_char += sent_char
if self.diagnostic:
# Wait for a report with TX_COMPLETE set to 1.
start_time = time()
while time() - start_time < timeout_per_chunk:
packet = self.read_packet()
if packet is not None:
report = self.decode_packet(packet)
if report is not None:
if report.get("TX_COMPLETE", 0) == 1:
self.logger.info(f"Transmission complete for chunk: '{chunk}'")
break
sleep(0.1)
else:
# In transparent mode, simply wait the transmission duration.
sleep(2)
return sent_char
def read_packet(self) -> Optional[bytes]:
"""
Read data from the serial port and search for a valid diagnostic packet.
A valid packet starts with '$' (0x24) and ends with '\\n' (0x0A) and is exactly 18 bytes long.
Returns:
Optional[bytes]: The valid packet if found, otherwise the buffer if it is not empty.
"""
buffer = b""
start_time = time()
timeout_duration = 2 # seconds to wait for a valid packet
while time() - start_time < timeout_duration:
if self.ser.in_waiting:
data = self.ser.read(self.ser.in_waiting)
buffer += data
self.logger.debug(f"Buffer length: {len(buffer)}, buffer: {str(buffer)}")
if b'$' in buffer and b'\n' in buffer:
start_index = buffer.rfind(b'$')
end_index = buffer.rfind(b'\n', start_index)
packet = buffer[start_index:end_index + 1]
if len(packet) == self.PACKET_LENGTH:
self.logger.debug(f"Returning packet: {str(packet)}")
return packet
elif len(packet) > self.PACKET_LENGTH:
self.logger.debug(f"Returning packet: {str(packet)}")
return packet[:self.PACKET_LENGTH]
sleep(0.1)
if len(buffer) == 0:
self.logger.debug(f"Returning None")
return None
else:
self.logger.debug(f"Returning buffer: {str(buffer)}")
return buffer
def decode_packet(self, packet: bytes) -> Optional[Dict[str, Any]]:
"""
Decode a diagnostic packet received from the modem.
The packet should be 18 bytes long, starting with '$' (0x24) and ending with '\\n' (0x0A).
The bytes between contain the data in the following format:
- Byte 0: '$'
- Bytes 1-16: Data fields (see modem documentation)
- Byte 17: '\\n'
Returns:
Optional[Dict[str, Any]]: A dictionary of decoded values if the packet is valid,
otherwise None.
"""
try:
packet_str = packet.decode('ISO-8859-1')
except UnicodeDecodeError:
return None
if len(packet_str) != self.PACKET_LENGTH or packet_str[0] != '$' or packet_str[-1] != '\n':
return None
data_bytes = packet_str[1:17].encode('ISO-8859-1')
try:
decoded = struct.unpack("<HBBBHBBBBBHBB", data_bytes)
except struct.error:
return None
decoded_dict = {
"TR_BLOCK": decoded[0].to_bytes(2, "little"),
"BER": decoded[1],
"SIGNAL_POWER": decoded[2],
"NOISE_POWER": decoded[3],
"PACKET_VALID": decoded[4],
"PACKET_INVALID": decoded[5],
"GIT_REV": decoded[6].to_bytes(1, "little"),
"TIME": (decoded[9] << 16) | (decoded[8] << 8) | decoded[7],
"CHIP_ID": decoded[10],
"HW_REV": decoded[11] & 0b00000011,
"CHANNEL": (decoded[11] & 0b00111100) >> 2,
"TB_VALID": (decoded[11] & 0b01000000) >> 6,
"TX_COMPLETE": (decoded[11] & 0b10000000) >> 7,
"DIAGNOSTIC_MODE": decoded[12] & 0b00000001,
"LEVEL": (decoded[12] & 0b00001100) >> 2,
}
return decoded_dict
def _default_converter(self, object: Any) -> Optional[str]:
"""
Convert bytes object to bytestring.
"""
if isinstance(object, bytes):
return object.hex()
raise TypeError(f"Object of type {type(object)} is not JSON serializable")
def close(self) -> None:
"""
Close the serial connection.
"""
self.ser.close()
# Example usage:
if __name__ == "__main__":
# Initialize modem on COM4 with diagnostic mode disabled.
m = M16("COM4", diagnostic=False)
# m.send_msg("Hello, this is a longer message.")
# m.request_report("report.json")
m.set_channel(1)