-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
494 lines (395 loc) · 14.1 KB
/
main.py
File metadata and controls
494 lines (395 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
"""
serial_debug_mcp - 给AI使用的串口调试MCP服务器
"""
import threading
from collections import deque
import serial
import serial.tools.list_ports
from fastmcp import FastMCP
# 全局常量
HEADERS = ["send_tick", "left_encoder", "right_encoder"]
# MCP服务器实例
mcp = FastMCP("serial_debug_mcp")
class SerialStateManager:
"""全局状态管理器 - 单例模式"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 串口连接实例
self.serial_conn: serial.Serial | None = None
# 采集状态标志
self.is_capturing: bool = False
# 采集线程句柄
self.capture_thread: threading.Thread | None = None
# 数据缓冲区 - 使用deque实现环形缓冲区
self.data_buffer: deque = deque(maxlen=10000)
# 缓冲区锁 - 确保线程安全
self.buffer_lock: threading.Lock = threading.Lock()
self._initialized = True
# 全局状态实例
state = SerialStateManager()
@mcp.tool()
def list_ports() -> str:
"""列出系统当前所有可用的串口设备"""
try:
ports = serial.tools.list_ports.comports()
result = []
for port in ports:
result.append({"name": port.device, "description": port.description})
import json
return json.dumps(result, ensure_ascii=False)
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def open_port(
port: str,
baudrate: int = 115200,
bytesize: int = 8,
parity: str = "N",
stopbits: float = 1,
timeout: float = 1,
) -> str:
"""打开指定串口"""
try:
# 检查是否已有连接,有则先关闭
if state.serial_conn is not None and state.serial_conn.is_open:
state.serial_conn.close()
# 校验位映射
parity_map = {
"N": serial.PARITY_NONE,
"E": serial.PARITY_EVEN,
"O": serial.PARITY_ODD,
}
parity_val = parity_map.get(parity.upper(), serial.PARITY_NONE)
# 数据位映射
bytesize_map = {
5: serial.FIVEBITS,
6: serial.SIXBITS,
7: serial.SEVENBITS,
8: serial.EIGHTBITS,
}
bytesize_val = bytesize_map.get(bytesize, serial.EIGHTBITS)
# 停止位映射
stopbits_map = {
1: serial.STOPBITS_ONE,
1.5: serial.STOPBITS_ONE_POINT_FIVE,
2: serial.STOPBITS_TWO,
}
stopbits_val = stopbits_map.get(stopbits, serial.STOPBITS_ONE)
# 打开串口
state.serial_conn = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=bytesize_val,
parity=parity_val,
stopbits=stopbits_val,
timeout=timeout,
)
return f"Opened {port} at {baudrate}"
except serial.SerialException as e:
return f"Error: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def close_port() -> str:
"""关闭当前打开的串口"""
try:
if state.serial_conn is None or not state.serial_conn.is_open:
return "Error: No open port"
# 如果正在采集,先停止
if state.is_capturing:
stop_capture()
# 关闭端口
state.serial_conn.close()
state.serial_conn = None
return "Closed"
except Exception as e:
return f"Error: {str(e)}"
def _capture_worker():
"""后台采集线程函数"""
first_line = True
while (
state.is_capturing
and state.serial_conn is not None
and state.serial_conn.is_open
):
try:
# 读取一行数据
line = state.serial_conn.readline()
if not line:
continue
# 解码数据
try:
data_str = line.decode("utf-8").strip()
except UnicodeDecodeError:
data_str = line.decode("latin-1").strip()
# 首行处理(表头)- 仅记录日志,不存储
if first_line:
first_line = False
print(f"Header received: {data_str}")
continue
# 获取当前时间戳
import time
timestamp = time.time()
time_str = time.strftime("%H:%M:%S", time.localtime(timestamp))
ms = int((timestamp % 1) * 1000)
formatted_time = f"{time_str}.{ms:03d}"
# 存储数据(带锁)
with state.buffer_lock:
state.data_buffer.append((timestamp, formatted_time, data_str))
except serial.SerialException:
# 串口错误,停止采集
state.is_capturing = False
break
except Exception as e:
print(f"Capture error: {e}")
continue
@mcp.tool()
def start_capture() -> str:
"""开始从已打开的串口读取数据"""
try:
# 检查串口是否打开
if state.serial_conn is None or not state.serial_conn.is_open:
return "Error: No open port"
# 检查是否已在采集
if state.is_capturing:
return "Error: Capture already running"
# 清空旧缓冲区
with state.buffer_lock:
state.data_buffer.clear()
# 启动采集
state.is_capturing = True
state.capture_thread = threading.Thread(target=_capture_worker, daemon=True)
state.capture_thread.start()
return "Capture started"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def stop_capture() -> str:
"""停止数据采集"""
try:
# 检查是否在采集
if not state.is_capturing:
return "Error: Capture not running"
# 停止采集
state.is_capturing = False
# 等待线程结束
if state.capture_thread is not None and state.capture_thread.is_alive():
state.capture_thread.join(timeout=2)
state.capture_thread = None
return "Capture stopped"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def clear_data() -> str:
"""清空所有已经采集的数据"""
try:
with state.buffer_lock:
state.data_buffer.clear()
return "Clear data success"
except Exception as e:
return f"Error: {str(e)}"
def _query_data_impl(
start_time: float = 0, end_time: float = 0, max_points: int = 1000
) -> str:
"""query_data的实际实现"""
# 加锁读取缓冲区副本
with state.buffer_lock:
buffer_copy = list(state.data_buffer)
# 数据筛选
filtered_data = []
for item in buffer_copy:
timestamp, formatted_time, data = item
# 时间过滤(0表示无界)
if start_time != 0 and timestamp < start_time:
continue
if end_time != 0 and timestamp > end_time:
continue
filtered_data.append(item)
# 降采样
n = len(filtered_data)
if n > max_points and max_points > 0:
step = n // max_points
filtered_data = filtered_data[::step][:max_points]
n = len(filtered_data)
# 构建YAML元数据
if n == 0:
yaml_meta = "---\ndata_rows: 0\n---"
else:
start_str = filtered_data[0][1]
end_str = filtered_data[-1][1]
yaml_meta = f'---\ndata_rows: {n}\nstart_time: "{start_str}"\nend_time: "{end_str}"\n---'
# 构建CSV数据体
csv_header = "recv_time, " + ", ".join(HEADERS)
csv_rows = []
for item in filtered_data:
_, formatted_time, data = item
csv_rows.append(f"{formatted_time}, {data}")
# 拼接结果
result = yaml_meta + "\n" + csv_header
if csv_rows:
result += "\n" + "\n".join(csv_rows)
return result
@mcp.tool()
def query_data(
start_time: float = 0, end_time: float = 0, max_points: int = 1000
) -> str:
"""按时间窗口查询缓冲数据"""
try:
return _query_data_impl(start_time, end_time, max_points)
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def send_command(
command: str, response_timeout: float = 1.0, expect_response: bool = True
) -> str:
"""向已打开的串口发送一条命令"""
try:
# 检查串口是否打开
if state.serial_conn is None or not state.serial_conn.is_open:
return "Error: No open port"
# 发送命令(自动追加换行符)
cmd_bytes = (command + "\n").encode("utf-8")
state.serial_conn.write(cmd_bytes)
# 如果不期望响应,直接返回
if not expect_response:
return "Command sent"
# 处理响应
if response_timeout > 0:
# 阻塞读取指定时间
import time
start_time = time.time()
response_lines = []
while time.time() - start_time < response_timeout:
if state.serial_conn.in_waiting > 0:
try:
line = state.serial_conn.readline()
if line:
try:
line_str = line.decode("utf-8").strip()
except UnicodeDecodeError:
line_str = line.decode("latin-1").strip()
response_lines.append(line_str)
except:
pass
else:
time.sleep(0.01) # 短暂休眠避免CPU占用过高
if response_lines:
return "\n".join(response_lines)
else:
return "No response within timeout"
else:
# 非阻塞尝试读取一次
response_lines = []
if state.serial_conn.in_waiting > 0:
try:
line = state.serial_conn.readline()
if line:
try:
line_str = line.decode("utf-8").strip()
except UnicodeDecodeError:
line_str = line.decode("latin-1").strip()
response_lines.append(line_str)
except:
pass
return "\n".join(response_lines) if response_lines else ""
except serial.SerialException as e:
return f"Error: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def send_raw_string(data: str, add_newline: bool = True) -> str:
"""向串口发送原始字符串数据,用于模拟测试"""
try:
# 检查串口是否打开
if state.serial_conn is None or not state.serial_conn.is_open:
return "Error: No open port"
# 发送数据
if add_newline:
data_bytes = (data + "\n").encode("utf-8")
else:
data_bytes = data.encode("utf-8")
bytes_sent = state.serial_conn.write(data_bytes)
return f"Sent {bytes_sent} bytes"
except serial.SerialException as e:
return f"Error: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def debug_get_raw_data(max_entries: int = 100) -> str:
"""获取缓冲区原始数据(用于调试)"""
try:
with state.buffer_lock:
buffer_copy = list(state.data_buffer)
if not buffer_copy:
return "Buffer is empty"
lines = []
lines.append(
f"Buffer size: {len(buffer_copy)} (maxlen: {state.data_buffer.maxlen})"
)
lines.append(f"Showing first {min(max_entries, len(buffer_copy))} entries:")
lines.append("")
for i, item in enumerate(buffer_copy[:max_entries]):
timestamp, formatted_time, data = item
lines.append(f"[{i}] {formatted_time} | ts={timestamp:.6f} | data={data}")
return "\n".join(lines)
except Exception as e:
return f"Error: {str(e)}"
@mcp.tool()
def debug_inject_data(data_lines: str, interval_ms: float = 100) -> str:
"""向缓冲区注入测试数据(用于调试)
Args:
data_lines: 数据行,每行一个,格式: "send_tick, left_encoder, right_encoder"
interval_ms: 每条数据的时间间隔(毫秒)
"""
try:
import time
lines = data_lines.strip().split("\n")
base_time = time.time()
count = 0
with state.buffer_lock:
for i, line in enumerate(lines):
ts = base_time + (i * interval_ms / 1000)
time_str = time.strftime("%H:%M:%S", time.localtime(ts))
ms = int((ts % 1) * 1000)
formatted_time = f"{time_str}.{ms:03d}"
state.data_buffer.append((ts, formatted_time, line.strip()))
count += 1
return f"Injected {count} data entries"
except Exception as e:
return f"Error: {str(e)}"
def main():
"""MCP服务器入口函数"""
import sys
# 检查参数
if len(sys.argv) > 1 and sys.argv[1] == "--sse":
# SSE 模式 - 用于 VSCode Chat 等通过网络连接
host = "127.0.0.1"
port = 8000
if len(sys.argv) > 2:
port = int(sys.argv[2])
print(f"Starting MCP server in SSE mode...")
print(f"Server: http://{host}:{port}/sse")
print(f"Health: http://{host}:{port}/health")
print(f"Docs: http://{host}:{port}/docs")
print(f"\nUse this URL in VSCode Chat or other MCP clients:")
print(f" http://{host}:{port}/sse")
print(f"\nPress Ctrl+C to stop\n")
mcp.run(transport="sse", host=host, port=port)
else:
# 默认使用stdio传输(用于OpenCode等本地工具)
print("Starting MCP server in stdio mode...")
print("This mode is used by OpenCode and other local MCP clients.")
print("For network access, use: python main.py --sse [port]\n")
mcp.run()
if __name__ == "__main__":
main()