diff --git a/.cursor/rules/device-drivers.mdc b/.cursor/rules/device-drivers.mdc new file mode 100644 index 00000000..8adfb33c --- /dev/null +++ b/.cursor/rules/device-drivers.mdc @@ -0,0 +1,328 @@ +--- +description: 设备驱动开发规范 +globs: ["unilabos/devices/**/*.py"] +--- + +# 设备驱动开发规范 + +## 目录结构 + +``` +unilabos/devices/ +├── virtual/ # 虚拟设备(用于测试) +│ ├── virtual_stirrer.py +│ └── virtual_centrifuge.py +├── liquid_handling/ # 液体处理设备 +├── balance/ # 天平设备 +├── hplc/ # HPLC设备 +├── pump_and_valve/ # 泵和阀门 +├── temperature/ # 温度控制设备 +├── workstation/ # 工作站(组合设备) +└── ... +``` + +## 设备类完整模板 + +```python +import asyncio +import logging +import time as time_module +from typing import Dict, Any, Optional + +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode + + +class MyDevice: + """ + 设备类描述 + + Attributes: + device_id: 设备唯一标识 + config: 设备配置字典 + data: 设备状态数据 + """ + + _ros_node: BaseROS2DeviceNode + + def __init__( + self, + device_id: str = None, + config: Dict[str, Any] = None, + **kwargs + ): + """ + 初始化设备 + + Args: + device_id: 设备ID + config: 配置字典 + **kwargs: 其他参数 + """ + # 兼容不同调用方式 + if device_id is None and 'id' in kwargs: + device_id = kwargs.pop('id') + if config is None and 'config' in kwargs: + config = kwargs.pop('config') + + self.device_id = device_id or "unknown_device" + self.config = config or {} + self.data = {} + + # 从config读取参数 + self.port = self.config.get('port') or kwargs.get('port', 'COM1') + self._max_value = self.config.get('max_value', 1000.0) + + # 初始化日志 + self.logger = logging.getLogger(f"MyDevice.{self.device_id}") + + self.logger.info(f"设备 {self.device_id} 已创建") + + def post_init(self, ros_node: BaseROS2DeviceNode): + """ + ROS节点注入 - 在ROS节点创建后调用 + + Args: + ros_node: ROS2设备节点实例 + """ + self._ros_node = ros_node + + async def initialize(self) -> bool: + """ + 初始化设备 - 连接硬件、设置初始状态 + + Returns: + bool: 初始化是否成功 + """ + self.logger.info(f"初始化设备 {self.device_id}") + + try: + # 执行硬件初始化 + # await self._connect_hardware() + + # 设置初始状态 + self.data.update({ + "status": "待机", + "is_running": False, + "current_value": 0.0, + }) + + self.logger.info(f"设备 {self.device_id} 初始化完成") + return True + + except Exception as e: + self.logger.error(f"初始化失败: {e}") + self.data["status"] = f"错误: {e}" + return False + + async def cleanup(self) -> bool: + """ + 清理设备 - 断开连接、释放资源 + + Returns: + bool: 清理是否成功 + """ + self.logger.info(f"清理设备 {self.device_id}") + + self.data.update({ + "status": "离线", + "is_running": False, + }) + + return True + + # ==================== 设备动作 ==================== + + async def execute_action( + self, + param1: float, + param2: str = "", + **kwargs + ) -> bool: + """ + 执行设备动作 + + Args: + param1: 参数1 + param2: 参数2(可选) + + Returns: + bool: 动作是否成功 + """ + # 类型转换和验证 + try: + param1 = float(param1) + except (ValueError, TypeError) as e: + self.logger.error(f"参数类型错误: {e}") + return False + + # 参数验证 + if param1 > self._max_value: + self.logger.error(f"参数超出范围: {param1} > {self._max_value}") + return False + + self.logger.info(f"执行动作: param1={param1}, param2={param2}") + + # 更新状态 + self.data.update({ + "status": "运行中", + "is_running": True, + }) + + # 执行动作(带进度反馈) + duration = 10.0 # 秒 + start_time = time_module.time() + + while True: + elapsed = time_module.time() - start_time + remaining = max(0, duration - elapsed) + progress = min(100, (elapsed / duration) * 100) + + self.data.update({ + "status": f"运行中: {progress:.0f}%", + "remaining_time": remaining, + }) + + if remaining <= 0: + break + + await self._ros_node.sleep(1.0) + + # 完成 + self.data.update({ + "status": "完成", + "is_running": False, + }) + + self.logger.info("动作执行完成") + return True + + # ==================== 状态属性 ==================== + + @property + def status(self) -> str: + """设备状态 - 自动发布为ROS Topic""" + return self.data.get("status", "未知") + + @property + def is_running(self) -> bool: + """是否正在运行""" + return self.data.get("is_running", False) + + @property + def current_value(self) -> float: + """当前值""" + return self.data.get("current_value", 0.0) + + # ==================== 辅助方法 ==================== + + def get_device_info(self) -> Dict[str, Any]: + """获取设备信息""" + return { + "device_id": self.device_id, + "status": self.status, + "is_running": self.is_running, + "current_value": self.current_value, + } + + def __str__(self) -> str: + return f"MyDevice({self.device_id}: {self.status})" +``` + +## 关键规则 + +### 1. 参数处理 + +所有动作方法的参数都可能以字符串形式传入,必须进行类型转换: + +```python +async def my_action(self, value: float, **kwargs) -> bool: + # 始终进行类型转换 + try: + value = float(value) + except (ValueError, TypeError) as e: + self.logger.error(f"参数类型错误: {e}") + return False +``` + +### 2. vessel 参数处理 + +vessel 参数可能是字符串ID或字典: + +```python +def extract_vessel_id(vessel: Union[str, dict]) -> str: + if isinstance(vessel, dict): + return vessel.get("id", "") + return str(vessel) if vessel else "" +``` + +### 3. 状态更新 + +使用 `self.data` 字典存储状态,属性读取状态: + +```python +# 更新状态 +self.data["status"] = "运行中" +self.data["current_speed"] = 300.0 + +# 读取状态(通过属性) +@property +def status(self) -> str: + return self.data.get("status", "待机") +``` + +### 4. 异步等待 + +使用 ROS 节点的 sleep 方法: + +```python +# 正确 +await self._ros_node.sleep(1.0) + +# 避免(除非在纯 Python 测试环境) +await asyncio.sleep(1.0) +``` + +### 5. 进度反馈 + +长时间运行的操作需要提供进度反馈: + +```python +while remaining > 0: + progress = (elapsed / total_time) * 100 + self.data["status"] = f"运行中: {progress:.0f}%" + self.data["remaining_time"] = remaining + + await self._ros_node.sleep(1.0) +``` + +## 虚拟设备 + +虚拟设备用于测试和演示,放在 `unilabos/devices/virtual/` 目录: + +- 类名以 `Virtual` 开头 +- 文件名以 `virtual_` 开头 +- 模拟真实设备的行为和时序 +- 使用表情符号增强日志可读性(可选) + +## 工作站设备 + +工作站是组合多个设备的复杂设备: + +```python +from unilabos.devices.workstation.workstation_base import WorkstationBase + +class MyWorkstation(WorkstationBase): + """组合工作站""" + + async def execute_workflow(self, workflow: Dict[str, Any]) -> bool: + """执行工作流""" + pass +``` + +## 设备注册 + +设备类开发完成后,需要在注册表中注册: + +1. 创建/编辑 `unilabos/registry/devices/my_category.yaml` +2. 添加设备配置(参考 `virtual_device.yaml`) +3. 运行 `--complete_registry` 自动生成 schema diff --git a/.cursor/rules/protocol-development.mdc b/.cursor/rules/protocol-development.mdc new file mode 100644 index 00000000..a94f947d --- /dev/null +++ b/.cursor/rules/protocol-development.mdc @@ -0,0 +1,240 @@ +--- +description: 协议编译器开发规范 +globs: ["unilabos/compile/**/*.py"] +--- + +# 协议编译器开发规范 + +## 概述 + +协议编译器负责将高级实验操作(如 Stir、Add、Filter)编译为设备可执行的动作序列。 + +## 文件命名 + +- 位置: `unilabos/compile/` +- 命名: `{operation}_protocol.py` +- 示例: `stir_protocol.py`, `add_protocol.py`, `filter_protocol.py` + +## 协议函数模板 + +```python +from typing import List, Dict, Any, Union +import networkx as nx +import logging + +from .utils.unit_parser import parse_time_input +from .utils.vessel_parser import extract_vessel_id + +logger = logging.getLogger(__name__) + + +def generate_{operation}_protocol( + G: nx.DiGraph, + vessel: Union[str, dict], + param1: Union[str, float] = "0", + param2: float = 0.0, + **kwargs +) -> List[Dict[str, Any]]: + """ + 生成{操作}协议序列 + + Args: + G: 物理拓扑图 (NetworkX DiGraph) + vessel: 容器ID或Resource字典 + param1: 参数1(支持字符串单位,如 "5 min") + param2: 参数2 + **kwargs: 其他参数 + + Returns: + List[Dict]: 动作序列 + + Raises: + ValueError: 参数无效时 + """ + # 1. 提取 vessel_id + vessel_id = extract_vessel_id(vessel) + + # 2. 验证参数 + if not vessel_id: + raise ValueError("vessel 参数不能为空") + + if vessel_id not in G.nodes(): + raise ValueError(f"容器 '{vessel_id}' 不存在于系统中") + + # 3. 解析参数(支持单位) + parsed_param1 = parse_time_input(param1) # "5 min" -> 300.0 + + # 4. 查找设备 + device_id = find_connected_device(G, vessel_id, device_type="my_device") + + # 5. 生成动作序列 + action_sequence = [] + + action = { + "device_id": device_id, + "action_name": "my_action", + "action_kwargs": { + "vessel": {"id": vessel_id}, # 始终使用字典格式 + "param1": float(parsed_param1), + "param2": float(param2), + } + } + action_sequence.append(action) + + logger.info(f"生成协议: {len(action_sequence)} 个动作") + return action_sequence + + +def find_connected_device( + G: nx.DiGraph, + vessel_id: str, + device_type: str = "" +) -> str: + """ + 查找与容器相连的设备 + + Args: + G: 拓扑图 + vessel_id: 容器ID + device_type: 设备类型关键字 + + Returns: + str: 设备ID + """ + # 查找所有匹配类型的设备 + device_nodes = [] + for node in G.nodes(): + node_class = G.nodes[node].get('class', '') or '' + if device_type.lower() in node_class.lower(): + device_nodes.append(node) + + # 检查连接 + if vessel_id and device_nodes: + for device in device_nodes: + if G.has_edge(device, vessel_id) or G.has_edge(vessel_id, device): + return device + + # 返回第一个可用设备 + if device_nodes: + return device_nodes[0] + + # 默认设备 + return f"{device_type}_1" +``` + +## 关键规则 + +### 1. vessel 参数处理 + +vessel 参数可能是字符串或字典,需要统一处理: + +```python +def extract_vessel_id(vessel: Union[str, dict]) -> str: + """提取vessel_id""" + if isinstance(vessel, dict): + # 可能是 {"id": "xxx"} 或完整 Resource 对象 + return vessel.get("id", list(vessel.values())[0].get("id", "")) + return str(vessel) if vessel else "" +``` + +### 2. action_kwargs 中的 vessel + +始终使用 `{"id": vessel_id}` 格式传递 vessel: + +```python +# 正确 +"action_kwargs": { + "vessel": {"id": vessel_id}, # 字符串ID包装为字典 +} + +# 避免 +"action_kwargs": { + "vessel": vessel_resource, # 不要传递完整 Resource 对象 +} +``` + +### 3. 单位解析 + +使用 `parse_time_input` 解析时间参数: + +```python +from .utils.unit_parser import parse_time_input + +# 支持格式: "5 min", "1 h", "300", "1.5 hours" +time_seconds = parse_time_input("5 min") # -> 300.0 +time_seconds = parse_time_input(120) # -> 120.0 +time_seconds = parse_time_input("1 h") # -> 3600.0 +``` + +### 4. 参数验证 + +所有参数必须进行验证和类型转换: + +```python +# 验证范围 +if speed < 10.0 or speed > 1500.0: + logger.warning(f"速度 {speed} 超出范围,修正为 300") + speed = 300.0 + +# 类型转换 +param = float(param) if not isinstance(param, (int, float)) else param +``` + +### 5. 日志记录 + +使用项目日志记录器: + +```python +logger = logging.getLogger(__name__) + +def generate_protocol(...): + logger.info(f"开始生成协议...") + logger.debug(f"参数: vessel={vessel_id}, time={time}") + logger.warning(f"参数修正: {old_value} -> {new_value}") +``` + +## 便捷函数 + +为常用操作提供便捷函数: + +```python +def stir_briefly(G: nx.DiGraph, vessel: Union[str, dict], + speed: float = 300.0) -> List[Dict[str, Any]]: + """短时间搅拌(30秒)""" + return generate_stir_protocol(G, vessel, time="30", stir_speed=speed) + +def stir_vigorously(G: nx.DiGraph, vessel: Union[str, dict], + time: str = "5 min") -> List[Dict[str, Any]]: + """剧烈搅拌""" + return generate_stir_protocol(G, vessel, time=time, stir_speed=800.0) +``` + +## 测试函数 + +每个协议文件应包含测试函数: + +```python +def test_{operation}_protocol(): + """测试协议生成""" + # 测试参数处理 + vessel_dict = {"id": "flask_1", "name": "反应瓶1"} + vessel_id = extract_vessel_id(vessel_dict) + assert vessel_id == "flask_1" + + # 测试单位解析 + time_s = parse_time_input("5 min") + assert time_s == 300.0 + + +if __name__ == "__main__": + test_{operation}_protocol() +``` + +## 现有协议参考 + +- `stir_protocol.py` - 搅拌操作 +- `add_protocol.py` - 添加物料 +- `filter_protocol.py` - 过滤操作 +- `heatchill_protocol.py` - 加热/冷却 +- `separate_protocol.py` - 分离操作 +- `evaporate_protocol.py` - 蒸发操作 diff --git a/.cursor/rules/registry-config.mdc b/.cursor/rules/registry-config.mdc new file mode 100644 index 00000000..bba2f221 --- /dev/null +++ b/.cursor/rules/registry-config.mdc @@ -0,0 +1,319 @@ +--- +description: 注册表配置规范 (YAML) +globs: ["unilabos/registry/**/*.yaml"] +--- + +# 注册表配置规范 + +## 概述 + +注册表使用 YAML 格式定义设备和资源类型,是 Uni-Lab-OS 的核心配置系统。 + +## 目录结构 + +``` +unilabos/registry/ +├── devices/ # 设备类型注册 +│ ├── virtual_device.yaml +│ ├── liquid_handler.yaml +│ └── ... +├── device_comms/ # 通信设备配置 +│ ├── communication_devices.yaml +│ └── modbus_ioboard.yaml +└── resources/ # 资源类型注册 + ├── bioyond/ + ├── organic/ + ├── opentrons/ + └── ... +``` + +## 设备注册表格式 + +### 基本结构 + +```yaml +device_type_id: + # 基本信息 + description: "设备描述" + version: "1.0.0" + category: + - category_name + icon: "icon_device.webp" + + # 类配置 + class: + module: "unilabos.devices.my_module:MyClass" + type: python + + # 状态类型(属性 -> ROS消息类型) + status_types: + status: String + temperature: Float64 + is_running: Bool + + # 动作映射 + action_value_mappings: + action_name: + type: UniLabJsonCommand # 或 UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: {...} + handles: {} +``` + +### action_value_mappings 详细格式 + +```yaml +action_value_mappings: + # 同步动作 + my_sync_action: + type: UniLabJsonCommand + goal: + param1: param1 + param2: param2 + feedback: {} + result: + success: success + message: message + goal_default: + param1: 0.0 + param2: "" + handles: {} + placeholder_keys: + device_param: unilabos_devices # 设备选择器 + resource_param: unilabos_resources # 资源选择器 + schema: + title: "动作名称参数" + description: "动作描述" + type: object + properties: + goal: + type: object + properties: + param1: + type: number + param2: + type: string + required: + - param1 + feedback: {} + result: + type: object + properties: + success: + type: boolean + message: + type: string + required: + - goal + + # 异步动作 + my_async_action: + type: UniLabJsonCommandAsync + goal: {} + feedback: + progress: progress + current_status: status + result: + success: success + schema: {...} +``` + +### 自动生成的动作 + +以 `auto-` 开头的动作由系统自动生成: + +```yaml +action_value_mappings: + auto-initialize: + type: UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: {...} + + auto-cleanup: + type: UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: {...} +``` + +### handles 配置 + +用于工作流编辑器中的数据流连接: + +```yaml +handles: + input: + - handler_key: "input_resource" + data_type: "resource" + label: "输入资源" + data_source: "handle" + data_key: "resources" + output: + - handler_key: "output_labware" + data_type: "resource" + label: "输出器皿" + data_source: "executor" + data_key: "created_resource.@flatten" +``` + +## 资源注册表格式 + +```yaml +resource_type_id: + description: "资源描述" + version: "1.0.0" + category: + - category_name + icon: "" + handles: [] + init_param_schema: {} + + class: + module: "unilabos.resources.my_module:MyResource" + type: pylabrobot # 或 python +``` + +### PyLabRobot 资源示例 + +```yaml +BIOYOND_Electrolyte_6VialCarrier: + category: + - bottle_carriers + - bioyond + class: + module: "unilabos.resources.bioyond.bottle_carriers:BIOYOND_Electrolyte_6VialCarrier" + type: pylabrobot + version: "1.0.0" +``` + +## 状态类型映射 + +Python 类型到 ROS 消息类型的映射: + +| Python 类型 | ROS 消息类型 | +|------------|-------------| +| `str` | `String` | +| `bool` | `Bool` | +| `int` | `Int64` | +| `float` | `Float64` | +| `list` | `String` (序列化) | +| `dict` | `String` (序列化) | + +## 自动完善注册表 + +使用 `--complete_registry` 参数自动生成 schema: + +```bash +python -m unilabos.app.main --complete_registry +``` + +这会: +1. 扫描设备类的方法签名 +2. 自动生成 `auto-` 前缀的动作 +3. 生成 JSON Schema +4. 更新 YAML 文件 + +## 验证规则 + +1. **device_type_id** 必须唯一 +2. **module** 路径必须正确可导入 +3. **status_types** 的类型必须是有效的 ROS 消息类型 +4. **schema** 必须是有效的 JSON Schema + +## 示例:完整设备配置 + +```yaml +virtual_stirrer: + category: + - virtual_device + description: "虚拟搅拌器设备" + version: "1.0.0" + icon: "icon_stirrer.webp" + handles: [] + init_param_schema: {} + + class: + module: "unilabos.devices.virtual.virtual_stirrer:VirtualStirrer" + type: python + + status_types: + status: String + operation_mode: String + current_speed: Float64 + is_stirring: Bool + remaining_time: Float64 + + action_value_mappings: + auto-initialize: + type: UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: + title: "initialize参数" + type: object + properties: + goal: + type: object + properties: {} + feedback: {} + result: {} + required: + - goal + + stir: + type: UniLabJsonCommandAsync + goal: + stir_time: stir_time + stir_speed: stir_speed + settling_time: settling_time + feedback: + current_speed: current_speed + remaining_time: remaining_time + result: + success: success + goal_default: + stir_time: 60.0 + stir_speed: 300.0 + settling_time: 30.0 + handles: {} + schema: + title: "stir参数" + description: "搅拌操作" + type: object + properties: + goal: + type: object + properties: + stir_time: + type: number + description: "搅拌时间(秒)" + stir_speed: + type: number + description: "搅拌速度(RPM)" + settling_time: + type: number + description: "沉降时间(秒)" + required: + - stir_time + - stir_speed + feedback: + type: object + properties: + current_speed: + type: number + remaining_time: + type: number + result: + type: object + properties: + success: + type: boolean + required: + - goal +``` diff --git a/.cursor/rules/ros-integration.mdc b/.cursor/rules/ros-integration.mdc new file mode 100644 index 00000000..4057b48e --- /dev/null +++ b/.cursor/rules/ros-integration.mdc @@ -0,0 +1,233 @@ +--- +description: ROS 2 集成开发规范 +globs: ["unilabos/ros/**/*.py", "**/*_node.py"] +--- + +# ROS 2 集成开发规范 + +## 概述 + +Uni-Lab-OS 使用 ROS 2 作为设备通信中间件,基于 rclpy 实现。 + +## 核心组件 + +### BaseROS2DeviceNode + +设备节点基类,提供: +- ROS Topic 自动发布(状态属性) +- Action Server 自动创建(设备动作) +- 资源管理服务 +- 异步任务调度 + +```python +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode +``` + +### 消息转换器 + +```python +from unilabos.ros.msgs.message_converter import ( + convert_to_ros_msg, + convert_from_ros_msg_with_mapping, + msg_converter_manager, + ros_action_to_json_schema, + ros_message_to_json_schema, +) +``` + +## 设备与 ROS 集成 + +### post_init 方法 + +设备类必须实现 `post_init` 方法接收 ROS 节点: + +```python +class MyDevice: + _ros_node: BaseROS2DeviceNode + + def post_init(self, ros_node: BaseROS2DeviceNode): + """ROS节点注入""" + self._ros_node = ros_node +``` + +### 状态属性发布 + +设备的 `@property` 属性会自动发布为 ROS Topic: + +```python +class MyDevice: + @property + def temperature(self) -> float: + return self._temperature + + # 自动发布到 /{namespace}/temperature Topic +``` + +### Topic 配置装饰器 + +```python +from unilabos.utils.decorator import topic_config + +class MyDevice: + @property + @topic_config(period=1.0, print_publish=False, qos=10) + def fast_data(self) -> float: + """高频数据 - 每秒发布一次""" + return self._fast_data + + @property + @topic_config(period=5.0) + def slow_data(self) -> str: + """低频数据 - 每5秒发布一次""" + return self._slow_data +``` + +### 订阅装饰器 + +```python +from unilabos.utils.decorator import subscribe + +class MyDevice: + @subscribe(topic="/external/sensor_data", qos=10) + def on_sensor_data(self, msg): + """订阅外部Topic""" + self._sensor_value = msg.data +``` + +## 异步操作 + +### 使用 ROS 节点睡眠 + +```python +# 推荐:使用ROS节点的睡眠方法 +await self._ros_node.sleep(1.0) + +# 不推荐:直接使用asyncio(可能导致回调阻塞) +await asyncio.sleep(1.0) +``` + +### 获取事件循环 + +```python +from unilabos.ros.x.rclpyx import get_event_loop + +loop = get_event_loop() +``` + +## 消息类型 + +### unilabos_msgs 包 + +```python +from unilabos_msgs.msg import Resource +from unilabos_msgs.srv import ( + ResourceAdd, + ResourceDelete, + ResourceUpdate, + ResourceList, + SerialCommand, +) +from unilabos_msgs.action import SendCmd +``` + +### Resource 消息结构 + +```python +Resource: + id: str + name: str + category: str + type: str + parent: str + children: List[str] + config: str # JSON字符串 + data: str # JSON字符串 + sample_id: str + pose: Pose +``` + +## 日志适配器 + +```python +from unilabos.utils.log import info, debug, warning, error, trace + +class MyDevice: + def __init__(self): + # 创建设备专属日志器 + self.logger = logging.getLogger(f"MyDevice.{self.device_id}") +``` + +ROSLoggerAdapter 同时向自定义日志和 ROS 日志发送消息。 + +## Action Server + +设备动作自动创建为 ROS Action Server: + +```yaml +# 在注册表中配置 +action_value_mappings: + my_action: + type: UniLabJsonCommandAsync # 异步Action + goal: {...} + feedback: {...} + result: {...} +``` + +### Action 类型 + +- **UniLabJsonCommand**: 同步动作 +- **UniLabJsonCommandAsync**: 异步动作(支持feedback) + +## 服务客户端 + +```python +from rclpy.client import Client + +# 调用其他节点的服务 +response = await self._ros_node.call_service( + service_name="/other_node/service", + request=MyServiceRequest(...) +) +``` + +## 命名空间 + +设备节点使用命名空间隔离: + +``` +/{device_id}/ # 设备命名空间 +/{device_id}/status # 状态Topic +/{device_id}/temperature # 温度Topic +/{device_id}/my_action # 动作Server +``` + +## 调试 + +### 查看 Topic + +```bash +ros2 topic list +ros2 topic echo /{device_id}/status +``` + +### 查看 Action + +```bash +ros2 action list +ros2 action info /{device_id}/my_action +``` + +### 查看 Service + +```bash +ros2 service list +ros2 service call /{device_id}/resource_list unilabos_msgs/srv/ResourceList +``` + +## 最佳实践 + +1. **状态属性命名**: 使用蛇形命名法(snake_case) +2. **Topic 频率**: 根据数据变化频率调整,避免过高频率 +3. **Action 反馈**: 长时间操作提供进度反馈 +4. **错误处理**: 使用 try-except 捕获并记录错误 +5. **资源清理**: 在 cleanup 方法中正确清理资源 diff --git a/.cursor/rules/testing-patterns.mdc b/.cursor/rules/testing-patterns.mdc new file mode 100644 index 00000000..73df7b0c --- /dev/null +++ b/.cursor/rules/testing-patterns.mdc @@ -0,0 +1,357 @@ +--- +description: 测试开发规范 +globs: ["tests/**/*.py", "**/test_*.py"] +--- + +# 测试开发规范 + +## 目录结构 + +``` +tests/ +├── __init__.py +├── devices/ # 设备测试 +│ └── liquid_handling/ +│ └── test_transfer_liquid.py +├── resources/ # 资源测试 +│ ├── test_bottle_carrier.py +│ └── test_resourcetreeset.py +├── ros/ # ROS消息测试 +│ └── msgs/ +│ ├── test_basic.py +│ ├── test_conversion.py +│ └── test_mapping.py +└── workflow/ # 工作流测试 + └── merge_workflow.py +``` + +## 测试框架 + +使用 pytest 作为测试框架: + +```bash +# 运行所有测试 +pytest tests/ + +# 运行特定测试文件 +pytest tests/resources/test_bottle_carrier.py + +# 运行特定测试函数 +pytest tests/resources/test_bottle_carrier.py::test_bottle_carrier + +# 显示详细输出 +pytest -v tests/ + +# 显示打印输出 +pytest -s tests/ +``` + +## 测试文件模板 + +```python +import pytest +from typing import List, Dict, Any + +# 导入被测试的模块 +from unilabos.resources.bioyond.bottle_carriers import ( + BIOYOND_Electrolyte_6VialCarrier, +) +from unilabos.resources.bioyond.bottles import ( + BIOYOND_PolymerStation_Solid_Vial, +) + + +class TestBottleCarrier: + """BottleCarrier 测试类""" + + def setup_method(self): + """每个测试方法前执行""" + self.carrier = BIOYOND_Electrolyte_6VialCarrier("test_carrier") + + def teardown_method(self): + """每个测试方法后执行""" + pass + + def test_carrier_creation(self): + """测试载架创建""" + assert self.carrier.name == "test_carrier" + assert len(self.carrier.sites) == 6 + + def test_bottle_placement(self): + """测试瓶子放置""" + bottle = BIOYOND_PolymerStation_Solid_Vial("test_bottle") + # 测试逻辑... + assert bottle.name == "test_bottle" + + +def test_standalone_function(): + """独立测试函数""" + result = some_function() + assert result is True + + +# 参数化测试 +@pytest.mark.parametrize("input,expected", [ + ("5 min", 300.0), + ("1 h", 3600.0), + ("120", 120.0), + (60, 60.0), +]) +def test_time_parsing(input, expected): + """测试时间解析""" + from unilabos.compile.utils.unit_parser import parse_time_input + assert parse_time_input(input) == expected + + +# 异常测试 +def test_invalid_input_raises_error(): + """测试无效输入抛出异常""" + with pytest.raises(ValueError) as exc_info: + invalid_function("bad_input") + assert "invalid" in str(exc_info.value).lower() + + +# 跳过条件测试 +@pytest.mark.skipif( + not os.environ.get("ROS_DISTRO"), + reason="需要ROS环境" +) +def test_ros_feature(): + """需要ROS环境的测试""" + pass +``` + +## 设备测试 + +### 虚拟设备测试 + +```python +import pytest +import asyncio +from unittest.mock import MagicMock, AsyncMock + +from unilabos.devices.virtual.virtual_stirrer import VirtualStirrer + + +class TestVirtualStirrer: + """VirtualStirrer 测试""" + + @pytest.fixture + def stirrer(self): + """创建测试用搅拌器""" + device = VirtualStirrer( + device_id="test_stirrer", + config={"max_speed": 1500.0, "min_speed": 50.0} + ) + + # Mock ROS节点 + mock_node = MagicMock() + mock_node.sleep = AsyncMock(return_value=None) + device.post_init(mock_node) + + return device + + @pytest.mark.asyncio + async def test_initialize(self, stirrer): + """测试初始化""" + result = await stirrer.initialize() + assert result is True + assert stirrer.status == "待机中" + + @pytest.mark.asyncio + async def test_stir_action(self, stirrer): + """测试搅拌动作""" + await stirrer.initialize() + + result = await stirrer.stir( + stir_time=5.0, + stir_speed=300.0, + settling_time=2.0 + ) + + assert result is True + assert stirrer.operation_mode == "Completed" + + @pytest.mark.asyncio + async def test_stir_invalid_speed(self, stirrer): + """测试无效速度""" + await stirrer.initialize() + + # 速度超出范围 + result = await stirrer.stir( + stir_time=5.0, + stir_speed=2000.0, # 超过max_speed + settling_time=0.0 + ) + + assert result is False + assert "错误" in stirrer.status +``` + +### 异步测试配置 + +```python +# conftest.py +import pytest +import asyncio + + +@pytest.fixture(scope="session") +def event_loop(): + """创建事件循环""" + loop = asyncio.get_event_loop_policy().new_event_loop() + yield loop + loop.close() +``` + +## 资源测试 + +```python +import pytest +from unilabos.resources.resource_tracker import ( + ResourceTreeSet, + ResourceTreeInstance, +) + + +def test_resource_tree_creation(): + """测试资源树创建""" + tree_set = ResourceTreeSet() + + # 添加资源 + resource = {"id": "res_1", "name": "Resource 1"} + tree_set.add_resource(resource) + + # 验证 + assert len(tree_set.all_nodes) == 1 + assert tree_set.get_resource("res_1") is not None + + +def test_resource_tree_merge(): + """测试资源树合并""" + local_set = ResourceTreeSet() + remote_set = ResourceTreeSet() + + # 设置数据... + + local_set.merge_remote_resources(remote_set) + + # 验证合并结果... +``` + +## ROS 消息测试 + +```python +import pytest +from unilabos.ros.msgs.message_converter import ( + convert_to_ros_msg, + convert_from_ros_msg_with_mapping, + msg_converter_manager, +) + + +def test_message_conversion(): + """测试消息转换""" + # Python -> ROS + python_data = {"id": "test", "value": 42} + ros_msg = convert_to_ros_msg(python_data, MyMsgType) + + assert ros_msg.id == "test" + assert ros_msg.value == 42 + + # ROS -> Python + result = convert_from_ros_msg_with_mapping(ros_msg, mapping) + assert result["id"] == "test" +``` + +## 协议测试 + +```python +import pytest +import networkx as nx +from unilabos.compile.stir_protocol import ( + generate_stir_protocol, + extract_vessel_id, +) + + +@pytest.fixture +def topology_graph(): + """创建测试拓扑图""" + G = nx.DiGraph() + G.add_node("flask_1", **{"class": "flask"}) + G.add_node("stirrer_1", **{"class": "virtual_stirrer"}) + G.add_edge("stirrer_1", "flask_1") + return G + + +def test_generate_stir_protocol(topology_graph): + """测试搅拌协议生成""" + actions = generate_stir_protocol( + G=topology_graph, + vessel="flask_1", + time="5 min", + stir_speed=300.0 + ) + + assert len(actions) == 1 + assert actions[0]["device_id"] == "stirrer_1" + assert actions[0]["action_name"] == "stir" + + +def test_extract_vessel_id(): + """测试vessel_id提取""" + # 字典格式 + assert extract_vessel_id({"id": "flask_1"}) == "flask_1" + + # 字符串格式 + assert extract_vessel_id("flask_2") == "flask_2" + + # 空值 + assert extract_vessel_id("") == "" +``` + +## 测试标记 + +```python +# 慢速测试 +@pytest.mark.slow +def test_long_running(): + pass + +# 需要网络 +@pytest.mark.network +def test_network_call(): + pass + +# 需要ROS +@pytest.mark.ros +def test_ros_feature(): + pass +``` + +运行特定标记的测试: + +```bash +pytest -m "not slow" # 排除慢速测试 +pytest -m ros # 仅ROS测试 +``` + +## 覆盖率 + +```bash +# 生成覆盖率报告 +pytest --cov=unilabos tests/ + +# HTML报告 +pytest --cov=unilabos --cov-report=html tests/ +``` + +## 最佳实践 + +1. **测试命名**: `test_{功能}_{场景}_{预期结果}` +2. **独立性**: 每个测试独立运行,不依赖其他测试 +3. **Mock外部依赖**: 使用 unittest.mock 模拟外部服务 +4. **参数化**: 使用 `@pytest.mark.parametrize` 减少重复代码 +5. **fixtures**: 使用 fixtures 共享测试设置 +6. **断言清晰**: 每个断言只验证一件事 diff --git a/.cursor/rules/unilabos-project.mdc b/.cursor/rules/unilabos-project.mdc new file mode 100644 index 00000000..1b6a24ee --- /dev/null +++ b/.cursor/rules/unilabos-project.mdc @@ -0,0 +1,353 @@ +--- +description: Uni-Lab-OS 实验室自动化平台开发规范 - 核心规则 +globs: ["**/*.py", "**/*.yaml", "**/*.json"] +--- + +# Uni-Lab-OS 项目开发规范 + +## 项目概述 + +Uni-Lab-OS 是一个实验室自动化操作系统,用于连接和控制各种实验设备,实现实验工作流的自动化和标准化。 + +## 技术栈 + +- **Python 3.11** - 核心开发语言 +- **ROS 2** - 设备通信中间件 (rclpy) +- **Conda/Mamba** - 包管理 (robostack-staging, conda-forge) +- **FastAPI** - Web API 服务 +- **WebSocket** - 实时通信 +- **NetworkX** - 拓扑图管理 +- **YAML** - 配置和注册表定义 +- **PyLabRobot** - 实验室自动化库集成 +- **pytest** - 测试框架 +- **asyncio** - 异步编程 + +## 项目结构 + +``` +unilabos/ +├── app/ # 应用入口、Web服务、后端 +├── compile/ # 协议编译器 (stir, add, filter 等) +├── config/ # 配置管理 +├── devices/ # 设备驱动 (真实/虚拟) +├── device_comms/ # 设备通信协议 +├── device_mesh/ # 3D网格和可视化 +├── registry/ # 设备和资源类型注册表 (YAML) +├── resources/ # 资源定义 +├── ros/ # ROS 2 集成 +├── utils/ # 工具函数 +└── workflow/ # 工作流管理 +``` + +## 代码规范 + +### Python 风格 + +1. **类型注解**:所有函数必须使用类型注解 + ```python + def transfer_liquid( + source: str, + destination: str, + volume: float, + **kwargs + ) -> List[Dict[str, Any]]: + ``` + +2. **Docstring**:使用 Google 风格的文档字符串 + ```python + def initialize(self) -> bool: + """ + 初始化设备 + + Returns: + bool: 初始化是否成功 + """ + ``` + +3. **导入顺序**: + - 标准库 + - 第三方库 + - ROS 相关 (rclpy, unilabos_msgs) + - 项目内部模块 + +### 异步编程 + +1. 设备操作方法使用 `async def` +2. 使用 `await self._ros_node.sleep()` 而非 `asyncio.sleep()` +3. 长时间运行操作需提供进度反馈 + +```python +async def stir(self, stir_time: float, stir_speed: float, **kwargs) -> bool: + """执行搅拌操作""" + start_time = time_module.time() + while True: + elapsed = time_module.time() - start_time + remaining = max(0, stir_time - elapsed) + + self.data.update({ + "remaining_time": remaining, + "status": f"搅拌中: {stir_speed} RPM" + }) + + if remaining <= 0: + break + await self._ros_node.sleep(1.0) + return True +``` + +### 日志规范 + +使用项目自定义日志系统: + +```python +from unilabos.utils.log import logger, info, debug, warning, error, trace + +# 在设备类中使用 +self.logger = logging.getLogger(f"DeviceName.{self.device_id}") +self.logger.info("设备初始化完成") +``` + +## 设备驱动开发 + +### 设备类结构 + +```python +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode + +class MyDevice: + """设备驱动类""" + + _ros_node: BaseROS2DeviceNode + + def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs): + self.device_id = device_id or "unknown_device" + self.config = config or {} + self.data = {} # 设备状态数据 + + def post_init(self, ros_node: BaseROS2DeviceNode): + """ROS节点注入""" + self._ros_node = ros_node + + async def initialize(self) -> bool: + """初始化设备""" + pass + + async def cleanup(self) -> bool: + """清理设备""" + pass + + # 状态属性 - 自动发布为 ROS Topic + @property + def status(self) -> str: + return self.data.get("status", "待机") +``` + +### 状态属性装饰器 + +```python +from unilabos.utils.decorator import topic_config + +class MyDevice: + @property + @topic_config(period=1.0, qos=10) # 每秒发布一次 + def temperature(self) -> float: + return self._temperature +``` + +### 虚拟设备 + +虚拟设备放置在 `unilabos/devices/virtual/` 目录下,命名为 `virtual_*.py` + +## 注册表配置 + +### 设备注册表 (YAML) + +位置: `unilabos/registry/devices/*.yaml` + +```yaml +my_device_type: + category: + - my_category + description: "设备描述" + version: "1.0.0" + class: + module: "unilabos.devices.my_device:MyDevice" + type: python + status_types: + status: String + temperature: Float64 + action_value_mappings: + auto-initialize: + type: UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: {...} +``` + +### 资源注册表 (YAML) + +位置: `unilabos/registry/resources/**/*.yaml` + +```yaml +my_container: + category: + - container + class: + module: "unilabos.resources.my_resource:MyContainer" + type: pylabrobot + version: "1.0.0" +``` + +## 协议编译器 + +位置: `unilabos/compile/*_protocol.py` + +### 协议生成函数模板 + +```python +from typing import List, Dict, Any, Union +import networkx as nx + +def generate_my_protocol( + G: nx.DiGraph, + vessel: Union[str, dict], + param1: float = 0.0, + **kwargs +) -> List[Dict[str, Any]]: + """ + 生成操作协议序列 + + Args: + G: 物理拓扑图 + vessel: 容器ID或字典 + param1: 参数1 + + Returns: + List[Dict]: 动作序列 + """ + # 提取vessel_id + vessel_id = vessel if isinstance(vessel, str) else vessel.get("id", "") + + # 查找设备 + device_id = find_connected_device(G, vessel_id) + + # 生成动作 + action_sequence = [{ + "device_id": device_id, + "action_name": "my_action", + "action_kwargs": { + "vessel": {"id": vessel_id}, + "param1": float(param1) + } + }] + + return action_sequence +``` + +## 测试规范 + +### 测试文件位置 + +- 单元测试: `tests/` 目录 +- 设备测试: `tests/devices/` +- 资源测试: `tests/resources/` +- ROS消息测试: `tests/ros/msgs/` + +### 测试命名 + +```python +# tests/devices/my_device/test_my_device.py + +import pytest + +def test_device_initialization(): + """测试设备初始化""" + pass + +def test_device_action(): + """测试设备动作""" + pass +``` + +## 错误处理 + +```python +from unilabos.utils.exception import UniLabException + +try: + result = await device.execute_action() +except ValueError as e: + self.logger.error(f"参数错误: {e}") + self.data["status"] = "错误: 参数无效" + return False +except Exception as e: + self.logger.error(f"执行失败: {e}") + raise +``` + +## 配置管理 + +```python +from unilabos.config.config import BasicConfig, HTTPConfig + +# 读取配置 +port = BasicConfig.port +is_host = BasicConfig.is_host_mode + +# 配置文件: local_config.py +``` + +## 常用工具 + +### 单例模式 + +```python +from unilabos.utils.decorator import singleton + +@singleton +class MyManager: + pass +``` + +### 类型检查 + +```python +from unilabos.utils.type_check import NoAliasDumper + +yaml.dump(data, f, Dumper=NoAliasDumper) +``` + +### 导入管理 + +```python +from unilabos.utils.import_manager import get_class + +device_class = get_class("unilabos.devices.my_device:MyDevice") +``` + +## Git 提交规范 + +提交信息格式: +``` +(): + + +``` + +类型: +- `feat`: 新功能 +- `fix`: 修复bug +- `docs`: 文档更新 +- `refactor`: 重构 +- `test`: 测试相关 +- `chore`: 构建/工具相关 + +示例: +``` +feat(devices): 添加虚拟搅拌器设备 + +- 实现VirtualStirrer类 +- 支持定时搅拌和持续搅拌模式 +- 添加速度验证逻辑 +``` diff --git a/.cursorignore b/.cursorignore new file mode 100644 index 00000000..0bd258b5 --- /dev/null +++ b/.cursorignore @@ -0,0 +1,188 @@ +# ============================================================ +# Uni-Lab-OS Cursor Ignore 配置,控制 Cursor AI 的文件索引范围 +# ============================================================ + +# ==================== 敏感配置文件 ==================== +# 本地配置(可能包含密钥) +**/local_config.py +test_config.py +local_test*.py + +# 环境变量和密钥 +.env +.env.* +**/.certs/ +*.pem +*.key +credentials.json +secrets.yaml + +# ==================== 二进制和 3D 模型文件 ==================== +# 3D 模型文件(无需索引) +*.stl +*.dae +*.glb +*.gltf +*.obj +*.fbx +*.blend + +# URDF/Xacro 机器人描述文件(大型XML) +*.xacro + +# 图片文件 +*.png +*.jpg +*.jpeg +*.gif +*.webp +*.ico +*.svg +*.bmp + +# 压缩包 +*.zip +*.tar +*.tar.gz +*.tgz +*.bz2 +*.rar +*.7z + +# ==================== Python 生成文件 ==================== +__pycache__/ +*.py[cod] +*$py.class +*.so +*.pyd +*.egg +*.egg-info/ +.eggs/ +dist/ +build/ +*.manifest +*.spec + +# ==================== IDE 和编辑器 ==================== +.idea/ +.vscode/ +*.swp +*.swo +*~ +.#* + +# ==================== 测试和覆盖率 ==================== +.pytest_cache/ +.coverage +.coverage.* +htmlcov/ +.tox/ +.nox/ +coverage.xml +*.cover + +# ==================== 虚拟环境 ==================== +.venv/ +venv/ +env/ +ENV/ + +# ==================== ROS 2 生成文件 ==================== +# ROS 构建目录 +build/ +install/ +log/ +logs/ +devel/ + +# ROS 消息生成 +msg_gen/ +srv_gen/ +msg/*Action.msg +msg/*ActionFeedback.msg +msg/*ActionGoal.msg +msg/*ActionResult.msg +msg/*Feedback.msg +msg/*Goal.msg +msg/*Result.msg +msg/_*.py +srv/_*.py +build_isolated/ +devel_isolated/ + +# ROS 动态配置 +*.cfgc +/cfg/cpp/ +/cfg/*.py + +# ==================== 项目特定目录 ==================== +# 工作数据目录 +unilabos_data/ + +# 临时和输出目录 +temp/ +output/ +cursor_docs/ +configs/ + +# 文档构建 +docs/_build/ +/site + +# ==================== 大型数据文件 ==================== +# 点云数据 +*.pcd + +# GraphML 图形文件 +*.graphml + +# 日志文件 +*.log + +# 数据库 +*.sqlite3 +*.db + +# Jupyter 检查点 +.ipynb_checkpoints/ + +# ==================== 设备网格资源 ==================== +# 3D 网格文件目录(包含大量 STL/DAE 文件) +unilabos/device_mesh/devices/**/*.stl +unilabos/device_mesh/devices/**/*.dae +unilabos/device_mesh/resources/**/*.stl +unilabos/device_mesh/resources/**/*.glb +unilabos/device_mesh/resources/**/*.xacro + +# RViz 配置 +*.rviz + +# ==================== 系统文件 ==================== +.DS_Store +Thumbs.db +desktop.ini + +# ==================== 锁文件 ==================== +poetry.lock +Pipfile.lock +pdm.lock +package-lock.json +yarn.lock + +# ==================== 类型检查缓存 ==================== +.mypy_cache/ +.dmypy.json +.pytype/ +.pyre/ +pyrightconfig.json + +# ==================== 其他 ==================== +# Catkin +CATKIN_IGNORE + +# Eclipse/Qt +.project +.cproject +CMakeLists.txt.user +*.user +qtcreator-* diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index 35aba210..24d85187 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -23,14 +23,8 @@ Trash, Tip, ) -from typing_extensions import TypedDict - -from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend -from unilabos.registry.placeholder_type import ResourceSlot -from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict -from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode - +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode class SimpleReturn(TypedDict): samples: List[List[ResourceDict]] volumes: List[float] @@ -52,6 +46,18 @@ class TransferLiquidReturn(TypedDict): targets: List[List[ResourceDict]] + +class SetLiquidReturn(TypedDict): + wells: list + volumes: list + + +class SetLiquidFromPlateReturn(TypedDict): + plate: list + wells: list + volumes: list + + class LiquidHandlerMiddleware(LiquidHandler): def __init__( self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8, **kwargs @@ -681,51 +687,7 @@ def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[fl well.set_liquids([(liquid_name, volume)]) # type: ignore res_volumes.append(volume) - return SetLiquidReturn( - wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore - ) - - def set_liquid_from_plate( - self, plate: List[ResourceSlot], well_names: list[str], liquid_names: list[str], volumes: list[float] - ) -> SetLiquidFromPlateReturn: - """Set the liquid in wells of a plate by well names (e.g., A1, A2, B3). - - 如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。 - """ - if isinstance(plate, list): # 未来移除 - plate = plate[0] - assert issubclass(plate.__class__, Plate), "plate must be a Plate" - plate: Plate = cast(Plate, plate) - # 根据 well_names 获取对应的 Well 对象 - wells = [plate.get_well(name) for name in well_names] - res_volumes = [] - - # 如果 liquid_names 和 volumes 都为空,直接返回 - if not liquid_names and not volumes: - return SetLiquidFromPlateReturn( - plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore - wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore - volumes=res_volumes, - ) - - for well, liquid_name, volume in zip(wells, liquid_names, volumes): - well.set_liquids([(liquid_name, volume)]) # type: ignore - res_volumes.append(volume) - - task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": wells}) - submit_time = time.time() - while not task.done(): - if time.time() - submit_time > 10: - self._ros_node.lab_logger().info(f"set_liquid_from_plate {plate} 超时") - break - time.sleep(0.01) - - return SetLiquidFromPlateReturn( - plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore - wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore - volumes=res_volumes, - ) - + return SimpleReturn(samples=res_samples, volumes=res_volumes) # --------------------------------------------------------------- # REMOVE LIQUID -------------------------------------------------- # --------------------------------------------------------------- diff --git a/unilabos/devices/motor/ZDT_X42.py b/unilabos/devices/motor/ZDT_X42.py new file mode 100644 index 00000000..0d1566c3 --- /dev/null +++ b/unilabos/devices/motor/ZDT_X42.py @@ -0,0 +1,376 @@ +# -*- coding: utf-8 -*- +""" +ZDT X42 Closed-Loop Stepper Motor Driver +RS485 Serial Communication via USB-Serial Converter + +- Baudrate: 115200 +""" + +import serial +import time +import threading +import struct +import logging +from typing import Optional, Any + +try: + from unilabos.device_comms.universal_driver import UniversalDriver +except ImportError: + class UniversalDriver: + def __init__(self, *args, **kwargs): + self.logger = logging.getLogger(self.__class__.__name__) + def execute_command_from_outer(self, command: Any): pass + +from serial.rs485 import RS485Settings + + +class ZDTX42Driver(UniversalDriver): + """ + ZDT X42 闭环步进电机驱动器 + + 支持功能: + - 速度模式运行 + - 位置模式运行 (相对/绝对) + - 位置读取和清零 + - 使能/禁用控制 + + 通信协议: + - 帧格式: [设备ID] [功能码] [数据...] [校验位=0x6B] + - 响应长度根据功能码决定 + """ + + def __init__( + self, + port: str, + baudrate: int = 115200, + device_id: int = 1, + timeout: float = 0.5, + debug: bool = False + ): + """ + 初始化 ZDT X42 电机驱动 + + Args: + port: 串口设备路径 + baudrate: 波特率 (默认 115200) + device_id: 设备地址 (1-255) + timeout: 通信超时时间(秒) + debug: 是否启用调试输出 + """ + super().__init__() + self.id = device_id + self.debug = debug + self.lock = threading.RLock() + self.status = "idle" # 对应注册表中的 status (str) + self.position = 0 # 对应注册表中的 position (int) + + try: + self.ser = serial.Serial( + port=port, + baudrate=baudrate, + timeout=timeout, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE + ) + + # 启用 RS485 模式 + try: + self.ser.rs485_mode = RS485Settings( + rts_level_for_tx=True, + rts_level_for_rx=False + ) + except Exception: + pass # RS485 模式是可选的 + + self.logger.info( + f"ZDT X42 Motor connected: {port} " + f"(Baud: {baudrate}, ID: {device_id})" + ) + # 自动使能电机,确保初始状态可运动 + self.enable(True) + + # 启动背景轮询线程,确保 position 实时刷新 + self._stop_event = threading.Event() + self._polling_thread = threading.Thread( + target=self._update_loop, + name=f"ZDTPolling_{port}", + daemon=True + ) + self._polling_thread.start() + except Exception as e: + self.logger.error(f"Failed to open serial port {port}: {e}") + self.ser = None + + def _update_loop(self): + """背景循环读取电机位置""" + while not self._stop_event.is_set(): + try: + self.get_position() + except Exception as e: + if self.debug: + self.logger.error(f"Polling error: {e}") + time.sleep(1.0) # 每1秒刷新一次位置数据 + + def _send(self, func_code: int, payload: list) -> bytes: + """ + 发送指令并接收响应 + + Args: + func_code: 功能码 + payload: 数据负载 (list of bytes) + + Returns: + 响应数据 (bytes) + """ + if not self.ser: + self.logger.error("Serial port not available") + return b"" + + with self.lock: + # 清空输入缓冲区 + self.ser.reset_input_buffer() + + # 构建消息: [ID] [功能码] [数据...] [校验位=0x6B] + message = bytes([self.id, func_code] + payload + [0x6B]) + + # 发送 + self.ser.write(message) + + # 根据功能码决定响应长度 + # 查询类指令返回 10 字节,控制类指令返回 4 字节 + read_len = 10 if func_code in [0x31, 0x32, 0x35, 0x24, 0x27] else 4 + response = self.ser.read(read_len) + + # 调试输出 + if self.debug: + sent_hex = message.hex().upper() + recv_hex = response.hex().upper() if response else 'TIMEOUT' + print(f"[ID {self.id}] TX: {sent_hex} → RX: {recv_hex}") + + return response + + def enable(self, on: bool = True) -> bool: + """ + 使能/禁用电机 + + Args: + on: True=使能(锁轴), False=禁用(松轴) + + Returns: + 是否成功 + """ + state = 1 if on else 0 + resp = self._send(0xF3, [0xAB, state, 0]) + return len(resp) >= 4 + + def move_speed( + self, + speed_rpm: int, + direction: str = "CW", + acceleration: int = 10 + ) -> bool: + """ + 速度模式运行 + + Args: + speed_rpm: 转速 (RPM) + direction: 方向 ("CW"=顺时针, "CCW"=逆时针) + acceleration: 加速度 (0-255) + + Returns: + 是否成功 + """ + dir_val = 0 if direction.upper() in ["CW", "顺时针"] else 1 + speed_bytes = struct.pack('>H', int(speed_rpm)) + self.status = f"moving@{speed_rpm}rpm" + resp = self._send(0xF6, [dir_val, speed_bytes[0], speed_bytes[1], acceleration, 0]) + return len(resp) >= 4 + + def move_position( + self, + pulses: int, + speed_rpm: int, + direction: str = "CW", + acceleration: int = 10, + absolute: bool = False + ) -> bool: + """ + 位置模式运行 + + Args: + pulses: 脉冲数 + speed_rpm: 转速 (RPM) + direction: 方向 ("CW"=顺时针, "CCW"=逆时针) + acceleration: 加速度 (0-255) + absolute: True=绝对位置, False=相对位置 + + Returns: + 是否成功 + """ + dir_val = 0 if direction.upper() in ["CW", "顺时针"] else 1 + speed_bytes = struct.pack('>H', int(speed_rpm)) + self.status = f"moving_to_{pulses}" + pulse_bytes = struct.pack('>I', int(pulses)) + abs_flag = 1 if absolute else 0 + + payload = [ + dir_val, + speed_bytes[0], speed_bytes[1], + acceleration, + pulse_bytes[0], pulse_bytes[1], pulse_bytes[2], pulse_bytes[3], + abs_flag, + 0 + ] + + resp = self._send(0xFD, payload) + return len(resp) >= 4 + + def stop(self) -> bool: + """ + 停止电机 + + Returns: + 是否成功 + """ + self.status = "idle" + resp = self._send(0xFE, [0x98, 0]) + return len(resp) >= 4 + + def rotate_quarter(self, speed_rpm: int = 60, direction: str = "CW") -> bool: + """ + 电机旋转 1/4 圈 (阻塞式) + 假设电机细分为 3200 脉冲/圈,1/4 圈 = 800 脉冲 + """ + pulses = 800 + success = self.move_position(pulses=pulses, speed_rpm=speed_rpm, direction=direction, absolute=False) + + if success: + # 计算预估旋转时间并进行阻塞等待 (Time = revolutions / (RPM/60)) + # 1/4 rev / (RPM/60) = 15.0 / RPM + estimated_time = 15.0 / max(1, speed_rpm) + time.sleep(estimated_time + 0.5) # 额外给 0.5 秒缓冲 + self.status = "idle" + + return success + + def wait_time(self, duration_s: float) -> bool: + """ + 等待指定时间 (秒) + """ + self.logger.info(f"Waiting for {duration_s} seconds...") + time.sleep(duration_s) + return True + + def set_zero(self) -> bool: + """ + 清零当前位置 + + Returns: + 是否成功 + """ + resp = self._send(0x0A, []) + return len(resp) >= 4 + + def get_position(self) -> Optional[int]: + """ + 读取当前位置 (脉冲数) + + Returns: + 当前位置脉冲数,失败返回 None + """ + resp = self._send(0x32, []) + + if len(resp) >= 8: + # 响应格式: [ID] [Func] [符号位] [数值4字节] [校验] + sign = resp[2] # 0=正, 1=负 + value = struct.unpack('>I', resp[3:7])[0] + self.position = -value if sign == 1 else value + + if self.debug: + print(f"[Position] Raw: {resp.hex().upper()}, Parsed: {self.position}") + + return self.position + + self.logger.warning("Failed to read position") + return None + + def close(self): + """关闭串口连接并停止线程""" + if hasattr(self, '_stop_event'): + self._stop_event.set() + + if self.ser and self.ser.is_open: + self.ser.close() + self.logger.info("Serial port closed") + + +# ============================================================ +# 测试和调试代码 +# ============================================================ + +def test_motor(): + """基础功能测试""" + logging.basicConfig(level=logging.INFO) + + print("="*60) + print("ZDT X42 电机驱动测试") + print("="*60) + + driver = ZDTX42Driver( + port="/dev/tty.usbserial-3110", + baudrate=115200, + device_id=2, + debug=True + ) + + if not driver.ser: + print("❌ 串口打开失败") + return + + try: + # 测试 1: 读取位置 + print("\n[1] 读取当前位置") + pos = driver.get_position() + print(f"✓ 当前位置: {pos} 脉冲") + + # 测试 2: 使能 + print("\n[2] 使能电机") + driver.enable(True) + time.sleep(0.3) + print("✓ 电机已锁定") + + # 测试 3: 相对位置运动 + print("\n[3] 相对位置运动 (1000脉冲)") + driver.move_position(pulses=1000, speed_rpm=60, direction="CW") + time.sleep(2) + pos = driver.get_position() + print(f"✓ 新位置: {pos}") + + # 测试 4: 速度运动 + print("\n[4] 速度模式 (30RPM, 3秒)") + driver.move_speed(speed_rpm=30, direction="CW") + time.sleep(3) + driver.stop() + pos = driver.get_position() + print(f"✓ 停止后位置: {pos}") + + # 测试 5: 禁用 + print("\n[5] 禁用电机") + driver.enable(False) + print("✓ 电机已松开") + + print("\n" + "="*60) + print("✅ 测试完成") + print("="*60) + + except Exception as e: + print(f"\n❌ 测试失败: {e}") + import traceback + traceback.print_exc() + finally: + driver.close() + + +if __name__ == "__main__": + test_motor() diff --git a/unilabos/devices/separator/chinwe.py b/unilabos/devices/separator/chinwe.py index 8beac447..b8c36a72 100644 --- a/unilabos/devices/separator/chinwe.py +++ b/unilabos/devices/separator/chinwe.py @@ -623,6 +623,119 @@ def wait_time(self, duration: int) -> bool: time.sleep(duration) return True + def separation_step(self, motor_id: int = 5, speed: int = 60, pulses: int = 700, + max_cycles: int = 0, timeout: int = 300) -> bool: + """ + 分液步骤 - 液位传感器与电机联动 + 当液位传感器检测到"有液"时,电机顺时针旋转指定脉冲数 + 当液位传感器检测到"无液"时,电机逆时针旋转指定脉冲数 + + :param motor_id: 电机ID (必须在初始化时配置的motor_ids中) + :param speed: 电机转速 (RPM) + :param pulses: 每次旋转的脉冲数 (默认700约为1/4圈,假设3200脉冲/圈) + :param max_cycles: 最大执行循环次数 (0=无限制,默认0) + :param timeout: 整体超时时间 (秒) + :return: 成功返回True,超时或失败返回False + """ + motor_id = int(motor_id) + speed = int(speed) + pulses = int(pulses) + max_cycles = int(max_cycles) + timeout = int(timeout) + + # 检查电机是否存在 + if motor_id not in self.motors: + self.logger.error(f"Motor {motor_id} not found in configured motors: {list(self.motors.keys())}") + return False + + # 检查传感器是否可用 + if not self.sensor: + self.logger.error("Sensor not initialized") + return False + + motor = self.motors[motor_id] + + # 停止轮询线程,避免与 separation_step 同时读取传感器造成串口冲突 + self.logger.info("Stopping polling thread for separation_step...") + self._stop_event.set() + if self._poll_thread and self._poll_thread.is_alive(): + self._poll_thread.join(timeout=2.0) + + # 使能电机 + self.logger.info(f"Enabling motor {motor_id}...") + motor.enable(True) + time.sleep(0.2) + + self.logger.info(f"Starting separation step: motor_id={motor_id}, speed={speed} RPM, " + f"pulses={pulses}, max_cycles={max_cycles}, timeout={timeout}s") + + # 记录上一次的液位状态 + last_level = None + cycle_count = 0 + start_time = time.time() + error_count = 0 + + try: + while True: + # 检查超时 + if time.time() - start_time > timeout: + self.logger.warning(f"Separation step timeout after {timeout} seconds") + return False + + # 检查循环次数限制 + if max_cycles > 0 and cycle_count >= max_cycles: + self.logger.info(f"Separation step completed: reached max_cycles={max_cycles}") + return True + + # 读取传感器数据 + data = self.sensor.read_level() + + if data is None: + error_count += 1 + if error_count > 5: + self.logger.warning("Sensor read failed multiple times, retrying...") + error_count = 0 + time.sleep(0.5) + continue + + error_count = 0 + current_level = data['level'] + rssi = data['rssi'] + + # 检测状态变化 (包括首次检测) + if current_level != last_level: + cycle_count += 1 + + if current_level: + # 有液 -> 电机顺时针旋转 + self.logger.info(f"[Cycle {cycle_count}] Liquid detected (RSSI={rssi}), " + f"rotating motor {motor_id} clockwise {pulses} pulses") + motor.run_position(pulses=pulses, speed_rpm=speed, direction=0, absolute=False) + + # 等待电机完成 (预估时间) + estimated_time = 15.0 / max(1, speed) + time.sleep(estimated_time + 0.5) + + else: + # 无液 -> 电机逆时针旋转 + self.logger.info(f"[Cycle {cycle_count}] No liquid detected (RSSI={rssi}), " + f"rotating motor {motor_id} counter-clockwise {pulses} pulses") + motor.run_position(pulses=pulses, speed_rpm=speed, direction=1, absolute=False) + + # 等待电机完成 (预估时间) + estimated_time = 15.0 / max(1, speed) + time.sleep(estimated_time + 0.5) + + # 更新状态 + last_level = current_level + + # 轮询间隔 + time.sleep(0.1) + finally: + # 恢复轮询线程 + self.logger.info("Restarting polling thread...") + self._start_polling() + def execute_command_from_outer(self, command_dict: Dict[str, Any]) -> bool: """支持标准 JSON 指令调用""" return super().execute_command_from_outer(command_dict) diff --git a/unilabos/devices/separator/xkc_sensor.py b/unilabos/devices/separator/xkc_sensor.py new file mode 100644 index 00000000..c954a2e0 --- /dev/null +++ b/unilabos/devices/separator/xkc_sensor.py @@ -0,0 +1,379 @@ +# -*- coding: utf-8 -*- +""" +XKC RS485 液位传感器 (Modbus RTU) + +说明: + 1. 遵循 Modbus-RTU 协议。 + 2. 数据寄存器: 0x0001 (液位状态, 1=有液, 0=无液), 0x0002 (RSSI 信号强度)。 + 3. 地址寄存器: 0x0004 (可读写, 范围 1-254)。 + 4. 波特率寄存器: 0x0005 (可写, 代码表见 change_baudrate 方法)。 +""" + +import struct +import threading +import time +import logging +import serial +from typing import Optional, Dict, Any, List + +from unilabos.device_comms.universal_driver import UniversalDriver + +class TransportManager: + """ + 统一通信管理类。 + 仅支持 串口 (Serial/有线) 连接。 + """ + def __init__(self, port: str, baudrate: int = 9600, timeout: float = 3.0, logger=None): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.logger = logger + self.lock = threading.RLock() # 线程锁,确保多设备共用一个连接时不冲突 + + self.serial = None + self._connect_serial() + + def _connect_serial(self): + try: + self.serial = serial.Serial( + port=self.port, + baudrate=self.baudrate, + timeout=self.timeout + ) + except Exception as e: + raise ConnectionError(f"Serial open failed: {e}") + + def close(self): + """关闭连接""" + if self.serial and self.serial.is_open: + self.serial.close() + + def clear_buffer(self): + """清空缓冲区 (Thread-safe)""" + with self.lock: + if self.serial: + self.serial.reset_input_buffer() + + def write(self, data: bytes): + """发送原始字节""" + with self.lock: + if self.serial: + self.serial.write(data) + + def read(self, size: int) -> bytes: + """读取指定长度字节""" + if self.serial: + return self.serial.read(size) + return b'' + +class XKCSensorDriver(UniversalDriver): + """XKC RS485 液位传感器 (Modbus RTU)""" + + def __init__(self, port: str, baudrate: int = 9600, device_id: int = 6, + threshold: int = 300, timeout: float = 3.0, debug: bool = False): + super().__init__() + self.port = port + self.baudrate = baudrate + self.device_id = device_id + self.threshold = threshold + self.timeout = timeout + self.debug = debug + self.level = False + self.rssi = 0 + self.status = {"level": self.level, "rssi": self.rssi} + + try: + self.transport = TransportManager(port, baudrate, timeout, logger=self.logger) + self.logger.info(f"XKCSensorDriver connected to {port} (ID: {device_id})") + except Exception as e: + self.logger.error(f"Failed to connect XKCSensorDriver: {e}") + self.transport = None + + # 启动背景轮询线程,确保 status 实时刷新 + self._stop_event = threading.Event() + self._polling_thread = threading.Thread( + target=self._update_loop, + name=f"XKCPolling_{port}", + daemon=True + ) + if self.transport: + self._polling_thread.start() + + def _update_loop(self): + """背景循环读取传感器数据""" + while not self._stop_event.is_set(): + try: + self.read_level() + except Exception as e: + if self.debug: + self.logger.error(f"Polling error: {e}") + time.sleep(2.0) # 每2秒刷新一次数据 + + def _crc(self, data: bytes) -> bytes: + crc = 0xFFFF + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 + else: crc >>= 1 + return struct.pack(' Optional[Dict[str, Any]]: + """ + 读取液位。 + 返回: {'level': bool, 'rssi': int} + """ + if not self.transport: + return None + + with self.transport.lock: + self.transport.clear_buffer() + # Modbus Read Registers: 01 03 00 01 00 02 CRC + payload = struct.pack('>HH', 0x0001, 0x0002) + msg = struct.pack('BB', self.device_id, 0x03) + payload + msg += self._crc(msg) + + if self.debug: + self.logger.info(f"TX (ID {self.device_id}): {msg.hex().upper()}") + + self.transport.write(msg) + + # Read header + h = self.transport.read(3) # Addr, Func, Len + if self.debug: + self.logger.info(f"RX Header: {h.hex().upper()}") + + if len(h) < 3: return None + length = h[2] + + # Read body + CRC + body = self.transport.read(length + 2) + if self.debug: + self.logger.info(f"RX Body+CRC: {body.hex().upper()}") + if len(body) < length + 2: + # Firmware bug fix specific to some modules + if len(body) == 4 and length == 4: + pass + else: + return None + + data = body[:-2] + # 根据手册说明: + # 寄存器 0x0001 (data[0:2]): 液位状态 (00 01 为有液, 00 00 为无液) + # 寄存器 0x0002 (data[2:4]): 信号强度 RSSI + + hw_level = False + rssi = 0 + + if len(data) >= 4: + hw_level = ((data[0] << 8) | data[1]) == 1 + rssi = (data[2] << 8) | data[3] + elif len(data) == 2: + # 兼容模式: 某些老固件可能只返回 1 个寄存器 + rssi = (data[0] << 8) | data[1] + hw_level = rssi > self.threshold + else: + return None + + # 最终判定: 优先使用硬件层级的 level 判定,但 RSSI 阈值逻辑作为补充/校验 + # 注意: 如果用户显式设置了 THRESHOLD,我们可以在逻辑中做权衡 + self.level = hw_level or (rssi > self.threshold) + self.rssi = rssi + result = { + 'level': self.level, + 'rssi': self.rssi + } + self.status = result + return result + + def wait_level(self, target_state: bool, timeout: float = 60.0) -> bool: + """ + 等待液位达到目标状态 (阻塞式) + """ + self.logger.info(f"Waiting for level: {target_state}") + start_time = time.time() + while (time.time() - start_time) < timeout: + res = self.read_level() + if res and res.get('level') == target_state: + return True + time.sleep(0.5) + self.logger.warning(f"Wait level timeout ({timeout}s)") + return False + + def wait_for_liquid(self, target_state: bool, timeout: float = 120.0) -> bool: + """ + 实时检测电导率(RSSI)并等待用户指定的“有液”或“无液”状态。 + 一旦检测到符合目标状态,立即返回。 + + Args: + target_state: True 为“有液”, False 为“无液” + timeout: 最大等待时间(秒) + """ + state_str = "有液" if target_state else "无液" + self.logger.info(f"开始实时检测电导率,等待状态: {state_str} (超时: {timeout}s)") + + start_time = time.time() + while (time.time() - start_time) < timeout: + res = self.read_level() # 内部已更新 self.level 和 self.rssi + if res: + current_level = res.get('level') + current_rssi = res.get('rssi') + if current_level == target_state: + self.logger.info(f"✅ 检测到目标状态: {state_str} (当前电导率/RSSI: {current_rssi})") + return True + + if self.debug: + self.logger.debug(f"当前状态: {'有液' if current_level else '无液'}, RSSI: {current_rssi}") + + time.sleep(0.2) # 高频采样 + + self.logger.warning(f"❌ 等待 {state_str} 状态超时 ({timeout}s)") + return False + + def set_threshold(self, threshold: int): + """设置液位判定阈值""" + self.threshold = int(threshold) + self.logger.info(f"Threshold updated to: {self.threshold}") + + def change_device_id(self, new_id: int) -> bool: + """ + 修改设备的 Modbus 从站地址。 + 寄存器: 0x0004, 功能码: 0x06 + """ + if not (1 <= new_id <= 254): + self.logger.error(f"Invalid device ID: {new_id}. Must be 1-254.") + return False + + self.logger.info(f"Changing device ID from {self.device_id} to {new_id}") + success = self._write_single_register(0x0004, new_id) + if success: + self.device_id = new_id # 更新内存中的地址 + self.logger.info(f"Device ID update command sent successfully (target {new_id}).") + return success + + def change_baudrate(self, baud_code: int) -> bool: + """ + 更改通讯波特率 (寄存器: 0x0005)。 + 设置成功后传感器 LED 会闪烁,通常无数据返回。 + + 波特率代码对照表 (16进制): + 05: 2400 + 06: 4800 + 07: 9600 (默认) + 08: 14400 + 09: 19200 + 0A: 28800 + 0C: 57600 + 0D: 115200 + 0E: 128000 + 0F: 256000 + """ + self.logger.info(f"Sending baudrate change command (Code: {baud_code:02X})") + # 写入寄存器 0x0005 + self._write_single_register(0x0005, baud_code) + self.logger.info("Baudrate change command executed. Device LED should flash. Please update connection settings.") + return True + + def factory_reset(self) -> bool: + """ + 恢复出厂设置 (通过广播地址 FF)。 + 设置地址为 01,逻辑为向 0x0004 写入 0x0002 + """ + self.logger.info("Sending factory reset command via broadcast address FF...") + # 广播指令通常无回显 + self._write_single_register(0x0004, 0x0002, slave_id=0xFF) + self.logger.info("Factory reset command sent. Device address should be 01 now.") + return True + + def _write_single_register(self, reg_addr: int, value: int, slave_id: Optional[int] = None) -> bool: + """内部辅助函数: Modbus 功能码 06 写单个寄存器""" + if not self.transport: return False + + target_id = slave_id if slave_id is not None else self.device_id + msg = struct.pack('BBHH', target_id, 0x06, reg_addr, value) + msg += self._crc(msg) + + with self.transport.lock: + self.transport.clear_buffer() + if self.debug: + self.logger.info(f"TX Write (Reg {reg_addr:#06x}): {msg.hex().upper()}") + + self.transport.write(msg) + + # 广播地址、波特率修改或厂家特定指令可能无回显 + if target_id == 0xFF or reg_addr == 0x0005: + time.sleep(0.5) + return True + + # 等待返回 (正常应返回相同报文) + resp = self.transport.read(len(msg)) + if self.debug: + self.logger.info(f"RX Write Response: {resp.hex().upper()}") + + return resp == msg + + def close(self): + if self.transport: + self.transport.close() + +if __name__ == "__main__": + # 快速实例化测试 + import logging + # 减少冗余日志,仅显示重要信息 + logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') + + # 硬件配置 (根据实际情况修改) + TEST_PORT = "/dev/tty.usbserial-3110" + SLAVE_ID = 1 + THRESHOLD = 300 + + print("\n" + "="*50) + print(f" XKC RS485 传感器独立测试程序") + print(f" 端口: {TEST_PORT} | 地址: {SLAVE_ID} | 阈值: {THRESHOLD}") + print("="*50) + + sensor = XKCSensorDriver(port=TEST_PORT, device_id=SLAVE_ID, threshold=THRESHOLD, debug=False) + + try: + if sensor.transport: + print(f"\n开始实时连续采样测试 (持续 15 秒)...") + print(f"按 Ctrl+C 可提前停止\n") + + start_time = time.time() + duration = 15 + count = 0 + + while time.time() - start_time < duration: + count += 1 + res = sensor.read_level() + if res: + rssi = res['rssi'] + level = res['level'] + status_str = "【有液】" if level else "【无液】" + # 使用 \r 实现单行刷新显示 (或者不刷,直接打印历史) + # 为了方便查看变化,我们直接打印 + elapsed = time.time() - start_time + print(f" [{elapsed:4.1f}s] 采样 {count:<3}: 电导率/RSSI = {rssi:<5} | 判定结果: {status_str}") + else: + print(f" [{time.time()-start_time:4.1f}s] 采样 {count:<3}: 通信失败 (无响应)") + + time.sleep(0.5) # 每秒采样 2 次 + + print(f"\n--- 15 秒采样测试完成 (总计 {count} 次) ---") + + # [3] 测试动态修改阈值 + print(f"\n[3] 动态修改阈值演示...") + new_threshold = 400 + sensor.set_threshold(new_threshold) + res = sensor.read_level() + if res: + print(f" 采样 (当前阈值={new_threshold}): 电导率/RSSI = {res['rssi']:<5} | 判定结果: {'【有液】' if res['level'] else '【无液】'}") + sensor.set_threshold(THRESHOLD) # 还原 + + except KeyboardInterrupt: + print("\n[!] 用户中断测试") + except Exception as e: + print(f"\n[!] 测试运行出错: {e}") + finally: + sensor.close() + print("\n--- 测试程序已退出 ---\n") diff --git a/unilabos/devices/workstation/bioyond_studio/station.py b/unilabos/devices/workstation/bioyond_studio/station.py index 327d8195..60c18e1e 100644 --- a/unilabos/devices/workstation/bioyond_studio/station.py +++ b/unilabos/devices/workstation/bioyond_studio/station.py @@ -258,7 +258,7 @@ def sync_to_external(self, resource: Any) -> bool: logger.info(f"[同步→Bioyond] ➕ 物料不存在于 Bioyond,将创建新物料并入库") # 第1步:从配置中获取仓库配置 - warehouse_mapping = self.bioyond_config.get("warehouse_mapping", {}) + warehouse_mapping = self.workstation.bioyond_config.get("warehouse_mapping", {}) # 确定目标仓库名称 parent_name = None diff --git a/unilabos/registry/devices/chinwe.yaml b/unilabos/registry/devices/chinwe.yaml index 2078d0f0..468fc27c 100644 --- a/unilabos/registry/devices/chinwe.yaml +++ b/unilabos/registry/devices/chinwe.yaml @@ -317,6 +317,47 @@ separator.chinwe: - port type: object type: UniLabJsonCommand + separation_step: + goal: + max_cycles: 0 + motor_id: 5 + pulses: 700 + speed: 60 + timeout: 300 + handles: {} + schema: + description: 分液步骤 - 液位传感器与电机联动 (有液→顺时针, 无液→逆时针) + properties: + goal: + properties: + max_cycles: + default: 0 + description: 最大循环次数 (0=无限制) + type: integer + motor_id: + default: '5' + description: 选择电机 + enum: + - '4' + - '5' + title: '注: 4=搅拌, 5=旋钮' + type: string + pulses: + default: 700 + description: 每次旋转脉冲数 (约1/4圈) + type: integer + speed: + default: 60 + description: 电机转速 (RPM) + type: integer + timeout: + default: 300 + description: 超时时间 (秒) + type: integer + required: + - motor_id + type: object + type: UniLabJsonCommand wait_sensor_level: goal: target_state: 有液 diff --git a/unilabos/registry/devices/motor.yaml b/unilabos/registry/devices/motor.yaml new file mode 100644 index 00000000..7b603ae5 --- /dev/null +++ b/unilabos/registry/devices/motor.yaml @@ -0,0 +1,286 @@ +motor.zdt_x42: + category: + - motor + class: + action_value_mappings: + auto-enable: + feedback: {} + goal: {} + goal_default: + 'on': true + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 使能或禁用电机。使能后电机进入锁轴状态,可接收运动指令;禁用后电机进入松轴状态。 + properties: + feedback: {} + goal: + properties: + 'on': + default: true + type: boolean + required: [] + type: object + result: {} + required: + - goal + title: enable参数 + type: object + type: UniLabJsonCommand + auto-get_position: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 获取当前电机脉冲位置。 + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: + properties: + position: + type: integer + type: object + required: + - goal + title: get_position参数 + type: object + type: UniLabJsonCommand + auto-move_position: + feedback: {} + goal: {} + goal_default: + absolute: false + acceleration: 10 + direction: CW + pulses: 1000 + speed_rpm: 60 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 位置模式运行。控制电机移动到指定脉冲位置或相对于当前位置移动指定脉冲数。 + properties: + feedback: {} + goal: + properties: + absolute: + default: false + type: boolean + acceleration: + default: 10 + maximum: 255 + minimum: 0 + type: integer + direction: + default: CW + enum: + - CW + - CCW + type: string + pulses: + default: 1000 + type: integer + speed_rpm: + default: 60 + minimum: 0 + type: integer + required: + - pulses + - speed_rpm + type: object + result: {} + required: + - goal + title: move_position参数 + type: object + type: UniLabJsonCommand + auto-move_speed: + feedback: {} + goal: {} + goal_default: + acceleration: 10 + direction: CW + speed_rpm: 60 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 速度模式运行。控制电机以指定转速和方向持续转动。 + properties: + feedback: {} + goal: + properties: + acceleration: + default: 10 + maximum: 255 + minimum: 0 + type: integer + direction: + default: CW + enum: + - CW + - CCW + type: string + speed_rpm: + default: 60 + minimum: 0 + type: integer + required: + - speed_rpm + type: object + result: {} + required: + - goal + title: move_speed参数 + type: object + type: UniLabJsonCommand + auto-rotate_quarter: + feedback: {} + goal: {} + goal_default: + direction: CW + speed_rpm: 60 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 电机旋转 1/4 圈 (阻塞式)。 + properties: + feedback: {} + goal: + properties: + direction: + default: CW + enum: + - CW + - CCW + type: string + speed_rpm: + default: 60 + minimum: 1 + type: integer + required: [] + type: object + result: {} + required: + - goal + title: rotate_quarter参数 + type: object + type: UniLabJsonCommand + auto-set_zero: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 将当前电机位置设为零点。 + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: set_zero参数 + type: object + type: UniLabJsonCommand + auto-stop: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 立即停止电机运动。 + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: stop参数 + type: object + type: UniLabJsonCommand + auto-wait_time: + feedback: {} + goal: {} + goal_default: + duration_s: 1.0 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 等待指定时间 (秒)。 + properties: + feedback: {} + goal: + properties: + duration_s: + default: 1.0 + minimum: 0 + type: number + required: + - duration_s + type: object + result: {} + required: + - goal + title: wait_time参数 + type: object + type: UniLabJsonCommand + module: unilabos.devices.motor.ZDT_X42:ZDTX42Driver + status_types: + position: int + status: str + type: python + config_info: [] + description: ZDT X42 闭环步进电机驱动。支持速度运行、精确位置控制、位置查询和清零功能。适用于各种需要精确运动控制的实验室自动化场景。 + handles: [] + icon: '' + init_param_schema: + config: + properties: + baudrate: + default: 115200 + type: integer + debug: + default: false + type: boolean + device_id: + default: 1 + type: integer + port: + type: string + timeout: + default: 0.5 + type: number + required: + - port + type: object + data: + properties: + position: + type: integer + status: + type: string + required: + - status + - position + type: object + version: 1.0.0 diff --git a/unilabos/registry/devices/sensor.yaml b/unilabos/registry/devices/sensor.yaml new file mode 100644 index 00000000..81d05b0f --- /dev/null +++ b/unilabos/registry/devices/sensor.yaml @@ -0,0 +1,148 @@ +sensor.xkc_rs485: + category: + - sensor + - separator + class: + action_value_mappings: + auto-change_baudrate: + goal: + baud_code: 7 + handles: {} + schema: + description: '更改通讯波特率 (设置成功后无返回,且需手动切换波特率重连)。代码表 (16进制): 05=2400, 06=4800, + 07=9600, 08=14400, 09=19200, 0A=28800, 0C=57600, 0D=115200, 0E=128000, + 0F=256000' + properties: + goal: + properties: + baud_code: + description: '波特率代码 (例如: 7 为 9600, 13 即 0x0D 为 115200)' + type: integer + required: + - baud_code + type: object + type: UniLabJsonCommand + auto-change_device_id: + goal: + new_id: 1 + handles: {} + schema: + description: 修改传感器的 Modbus 从站地址 + properties: + goal: + properties: + new_id: + description: 新的从站地址 (1-254) + maximum: 254 + minimum: 1 + type: integer + required: + - new_id + type: object + type: UniLabJsonCommand + auto-factory_reset: + goal: {} + handles: {} + schema: + description: 恢复出厂设置 (地址重置为 01) + properties: + goal: + type: object + type: UniLabJsonCommand + auto-read_level: + goal: {} + handles: {} + schema: + description: 直接读取当前液位及信号强度 + properties: + goal: + type: object + type: object + type: UniLabJsonCommand + auto-set_threshold: + goal: + threshold: 300 + handles: {} + schema: + description: 设置液位判定阈值 + properties: + goal: + properties: + threshold: + type: integer + required: + - threshold + type: object + type: UniLabJsonCommand + auto-wait_for_liquid: + goal: + target_state: true + timeout: 120 + handles: {} + schema: + description: 实时检测电导率(RSSI)并等待用户指定的状态 + properties: + goal: + properties: + target_state: + default: true + description: 目标状态 (True=有液, False=无液) + type: boolean + timeout: + default: 120 + description: 超时时间 (秒) + required: + - target_state + type: object + type: UniLabJsonCommand + auto-wait_level: + goal: + level: true + timeout: 10 + handles: {} + schema: + description: 等待液位达到目标状态 + properties: + goal: + properties: + level: + type: boolean + timeout: + type: number + required: + - level + type: object + type: UniLabJsonCommand + module: unilabos.devices.separator.xkc_sensor:XKCSensorDriver + status_types: + level: bool + rssi: int + type: python + config_info: [] + description: XKC RS485 非接触式液位传感器 (Modbus RTU) + handles: [] + icon: '' + init_param_schema: + config: + properties: + baudrate: + default: 9600 + type: integer + debug: + default: false + type: boolean + device_id: + default: 1 + type: integer + port: + type: string + threshold: + default: 300 + type: integer + timeout: + default: 3.0 + type: number + required: + - port + type: object + version: 1.0.0 diff --git a/unilabos/registry/resources/bioyond/bottle_carriers.yaml b/unilabos/registry/resources/bioyond/bottle_carriers.yaml index 764a8aa5..89f2bdd1 100644 --- a/unilabos/registry/resources/bioyond/bottle_carriers.yaml +++ b/unilabos/registry/resources/bioyond/bottle_carriers.yaml @@ -46,3 +46,16 @@ BIOYOND_PolymerStation_8StockCarrier: init_param_schema: {} registry_type: resource version: 1.0.0 +BIOYOND_PolymerStation_TipBox: + category: + - bottle_carriers + - tip_racks + class: + module: unilabos.resources.bioyond.bottle_carriers:BIOYOND_PolymerStation_TipBox + type: pylabrobot + description: BIOYOND_PolymerStation_TipBox (4x6布局,24个枪头孔位) + handles: [] + icon: '' + init_param_schema: {} + registry_type: resource + version: 1.0.0 diff --git a/unilabos/registry/resources/bioyond/bottles.yaml b/unilabos/registry/resources/bioyond/bottles.yaml index ecc5525d..e493e7b1 100644 --- a/unilabos/registry/resources/bioyond/bottles.yaml +++ b/unilabos/registry/resources/bioyond/bottles.yaml @@ -82,14 +82,3 @@ BIOYOND_PolymerStation_Solution_Beaker: icon: '' init_param_schema: {} version: 1.0.0 -BIOYOND_PolymerStation_TipBox: - category: - - bottles - - tip_boxes - class: - module: unilabos.resources.bioyond.bottles:BIOYOND_PolymerStation_TipBox - type: pylabrobot - handles: [] - icon: '' - init_param_schema: {} - version: 1.0.0 diff --git a/unilabos/resources/bioyond/bottle_carriers.py b/unilabos/resources/bioyond/bottle_carriers.py index d79b8495..e1932b20 100644 --- a/unilabos/resources/bioyond/bottle_carriers.py +++ b/unilabos/resources/bioyond/bottle_carriers.py @@ -1,4 +1,4 @@ -from pylabrobot.resources import create_homogeneous_resources, Coordinate, ResourceHolder, create_ordered_items_2d +from pylabrobot.resources import create_homogeneous_resources, Coordinate, ResourceHolder, create_ordered_items_2d, Container from unilabos.resources.itemized_carrier import BottleCarrier from unilabos.resources.bioyond.bottles import ( @@ -9,6 +9,28 @@ BIOYOND_PolymerStation_Reagent_Bottle, BIOYOND_PolymerStation_Flask, ) + + +def BIOYOND_PolymerStation_Tip(name: str, size_x: float = 8.0, size_y: float = 8.0, size_z: float = 50.0) -> Container: + """创建单个枪头资源 + + Args: + name: 枪头名称 + size_x: 枪头宽度 (mm) + size_y: 枪头长度 (mm) + size_z: 枪头高度 (mm) + + Returns: + Container: 枪头容器 + """ + return Container( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + category="tip", + model="BIOYOND_PolymerStation_Tip", + ) # 命名约定:试剂瓶-Bottle,烧杯-Beaker,烧瓶-Flask,小瓶-Vial @@ -322,3 +344,88 @@ def BIOYOND_Electrolyte_1BottleCarrier(name: str) -> BottleCarrier: carrier.num_items_z = 1 carrier[0] = BIOYOND_PolymerStation_Solution_Beaker(f"{name}_beaker_1") return carrier + + +def BIOYOND_PolymerStation_TipBox( + name: str, + size_x: float = 127.76, # 枪头盒宽度 + size_y: float = 85.48, # 枪头盒长度 + size_z: float = 100.0, # 枪头盒高度 + barcode: str = None, +) -> BottleCarrier: + """创建4×6枪头盒 (24个枪头) - 使用 BottleCarrier 结构 + + Args: + name: 枪头盒名称 + size_x: 枪头盒宽度 (mm) + size_y: 枪头盒长度 (mm) + size_z: 枪头盒高度 (mm) + barcode: 条形码 + + Returns: + BottleCarrier: 包含24个枪头孔位的枪头盒载架 + + 布局说明: + - 4行×6列 (A-D, 1-6) + - 枪头孔位间距: 18mm (x方向) × 18mm (y方向) + - 起始位置居中对齐 + - 索引顺序: 列优先 (0=A1, 1=B1, 2=C1, 3=D1, 4=A2, ...) + """ + # 枪头孔位参数 + num_cols = 6 # 1-6 (x方向) + num_rows = 4 # A-D (y方向) + tip_diameter = 8.0 # 枪头孔位直径 + tip_spacing_x = 18.0 # 列间距 (增加到18mm,更宽松) + tip_spacing_y = 18.0 # 行间距 (增加到18mm,更宽松) + + # 计算起始位置 (居中对齐) + total_width = (num_cols - 1) * tip_spacing_x + tip_diameter + total_height = (num_rows - 1) * tip_spacing_y + tip_diameter + start_x = (size_x - total_width) / 2 + start_y = (size_y - total_height) / 2 + + # 使用 create_ordered_items_2d 创建孔位 + # create_ordered_items_2d 返回的 key 是数字索引: 0, 1, 2, ... + # 顺序是列优先: 先y后x (即 0=A1, 1=B1, 2=C1, 3=D1, 4=A2, 5=B2, ...) + sites = create_ordered_items_2d( + klass=ResourceHolder, + num_items_x=num_cols, + num_items_y=num_rows, + dx=start_x, + dy=start_y, + dz=5.0, + item_dx=tip_spacing_x, + item_dy=tip_spacing_y, + size_x=tip_diameter, + size_y=tip_diameter, + size_z=50.0, # 枪头深度 + ) + + # 更新 sites 中每个 ResourceHolder 的名称 + for k, v in sites.items(): + v.name = f"{name}_{v.name}" + + # 创建枪头盒载架 + # 注意:不设置 category,使用默认的 "bottle_carrier",这样前端会显示为完整的矩形载架 + tip_box = BottleCarrier( + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + sites=sites, # 直接使用数字索引的 sites + model="BIOYOND_PolymerStation_TipBox", + ) + + # 设置自定义属性 + tip_box.barcode = barcode + tip_box.tip_count = 24 # 4行×6列 + tip_box.num_items_x = num_cols + tip_box.num_items_y = num_rows + tip_box.num_items_z = 1 + + # ⭐ 枪头盒不需要放入子资源 + # 与其他 carrier 不同,枪头盒在 Bioyond 中是一个整体 + # 不需要追踪每个枪头的状态,保持为空的 ResourceHolder 即可 + # 这样前端会显示24个空槽位,可以用于放置枪头 + + return tip_box diff --git a/unilabos/resources/bioyond/bottles.py b/unilabos/resources/bioyond/bottles.py index 7045d8b7..73343bc6 100644 --- a/unilabos/resources/bioyond/bottles.py +++ b/unilabos/resources/bioyond/bottles.py @@ -116,7 +116,9 @@ def BIOYOND_PolymerStation_TipBox( size_z: float = 100.0, # 枪头盒高度 barcode: str = None, ): - """创建4×6枪头盒 (24个枪头) + """创建4×6枪头盒 (24个枪头) - 使用 BottleCarrier 结构 + + 注意:此函数已弃用,请使用 bottle_carriers.py 中的版本 Args: name: 枪头盒名称 @@ -126,55 +128,11 @@ def BIOYOND_PolymerStation_TipBox( barcode: 条形码 Returns: - TipBoxCarrier: 包含24个枪头孔位的枪头盒 + BottleCarrier: 包含24个枪头孔位的枪头盒载架 """ - from pylabrobot.resources import Container, Coordinate - - # 创建枪头盒容器 - tip_box = Container( - name=name, - size_x=size_x, - size_y=size_y, - size_z=size_z, - category="tip_rack", - model="BIOYOND_PolymerStation_TipBox_4x6", - ) - - # 设置自定义属性 - tip_box.barcode = barcode - tip_box.tip_count = 24 # 4行×6列 - tip_box.num_items_x = 6 # 6列 - tip_box.num_items_y = 4 # 4行 - - # 创建24个枪头孔位 (4行×6列) - # 假设孔位间距为 9mm - tip_spacing_x = 9.0 # 列间距 - tip_spacing_y = 9.0 # 行间距 - start_x = 14.38 # 第一个孔位的x偏移 - start_y = 11.24 # 第一个孔位的y偏移 - - for row in range(4): # A, B, C, D - for col in range(6): # 1-6 - spot_name = f"{chr(65 + row)}{col + 1}" # A1, A2, ..., D6 - x = start_x + col * tip_spacing_x - y = start_y + row * tip_spacing_y - - # 创建枪头孔位容器 - tip_spot = Container( - name=spot_name, - size_x=8.0, # 单个枪头孔位大小 - size_y=8.0, - size_z=size_z - 10.0, # 略低于盒子高度 - category="tip_spot", - ) - - # 添加到枪头盒 - tip_box.assign_child_resource( - tip_spot, - location=Coordinate(x=x, y=y, z=0) - ) - - return tip_box + # 重定向到 bottle_carriers.py 中的实现 + from unilabos.resources.bioyond.bottle_carriers import BIOYOND_PolymerStation_TipBox as TipBox_Carrier + return TipBox_Carrier(name=name, size_x=size_x, size_y=size_y, size_z=size_z, barcode=barcode) def BIOYOND_PolymerStation_Flask( diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 1b8f97f1..cae23a72 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -779,9 +779,12 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st bottle = plr_material[number] = initialize_resource( {"name": f'{detail["name"]}_{number}', "class": reverse_type_mapping[typeName][0]}, resource_type=ResourcePLR ) - bottle.tracker.liquids = [ - (detail["name"], float(detail.get("quantity", 0)) if detail.get("quantity") else 0) - ] + # 只有具有 tracker 属性的容器才设置液体信息(如 Bottle, Well) + # ResourceHolder 等不支持液体追踪的容器跳过 + if hasattr(bottle, "tracker"): + bottle.tracker.liquids = [ + (detail["name"], float(detail.get("quantity", 0)) if detail.get("quantity") else 0) + ] bottle.code = detail.get("code", "") logger.debug(f" └─ [子物料] {detail['name']} → {plr_material.name}[{number}] (类型:{typeName})") else: @@ -790,9 +793,11 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st # 只对有 capacity 属性的容器(液体容器)处理液体追踪 if hasattr(plr_material, 'capacity'): bottle = plr_material[0] if plr_material.capacity > 0 else plr_material - bottle.tracker.liquids = [ - (material["name"], float(material.get("quantity", 0)) if material.get("quantity") else 0) - ] + # 确保 bottle 有 tracker 属性才设置液体信息 + if hasattr(bottle, "tracker"): + bottle.tracker.liquids = [ + (material["name"], float(material.get("quantity", 0)) if material.get("quantity") else 0) + ] plr_materials.append(plr_material) @@ -821,24 +826,29 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st wh_name = loc.get("whName") logger.debug(f"[物料位置] {unique_name} 尝试放置到 warehouse: {wh_name} (Bioyond坐标: x={loc.get('x')}, y={loc.get('y')}, z={loc.get('z')})") + # Bioyond坐标映射 (重要!): x→行(1=A,2=B...), y→列(1=01,2=02...), z→层(通常=1) + # 必须在warehouse映射之前先获取坐标,以便后续调整 + x = loc.get("x", 1) # 行号 (1-based: 1=A, 2=B, 3=C, 4=D) + y = loc.get("y", 1) # 列号 (1-based: 1=01, 2=02, 3=03...) + z = loc.get("z", 1) # 层号 (1-based, 通常为1) + # 特殊处理: Bioyond的"堆栈1"需要映射到"堆栈1左"或"堆栈1右" - # 根据列号(x)判断: 1-4映射到左侧, 5-8映射到右侧 + # 根据列号(y)判断: 1-4映射到左侧, 5-8映射到右侧 if wh_name == "堆栈1": - x_val = loc.get("x", 1) - if 1 <= x_val <= 4: + if 1 <= y <= 4: wh_name = "堆栈1左" - elif 5 <= x_val <= 8: + elif 5 <= y <= 8: wh_name = "堆栈1右" + y = y - 4 # 调整列号: 5-8映射到1-4 else: - logger.warning(f"物料 {material['name']} 的列号 x={x_val} 超出范围,无法映射到堆栈1左或堆栈1右") + logger.warning(f"物料 {material['name']} 的列号 y={y} 超出范围,无法映射到堆栈1左或堆栈1右") continue # 特殊处理: Bioyond的"站内Tip盒堆栈"也需要进行拆分映射 if wh_name == "站内Tip盒堆栈": - y_val = loc.get("y", 1) - if y_val == 1: + if y == 1: wh_name = "站内Tip盒堆栈(右)" - elif y_val in [2, 3]: + elif y in [2, 3]: wh_name = "站内Tip盒堆栈(左)" y = y - 1 # 调整列号,因为左侧仓库对应的 Bioyond y=2 实际上是它的第1列 @@ -846,15 +856,6 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st warehouse = deck.warehouses[wh_name] logger.debug(f"[Warehouse匹配] 找到warehouse: {wh_name} (容量: {warehouse.capacity}, 行×列: {warehouse.num_items_x}×{warehouse.num_items_y})") - # Bioyond坐标映射 (重要!): x→行(1=A,2=B...), y→列(1=01,2=02...), z→层(通常=1) - x = loc.get("x", 1) # 行号 (1-based: 1=A, 2=B, 3=C, 4=D) - y = loc.get("y", 1) # 列号 (1-based: 1=01, 2=02, 3=03...) - z = loc.get("z", 1) # 层号 (1-based, 通常为1) - - # 如果是右侧堆栈,需要调整列号 (5→1, 6→2, 7→3, 8→4) - if wh_name == "堆栈1右": - y = y - 4 # 将5-8映射到1-4 - # 特殊处理竖向warehouse(站内试剂存放堆栈、测量小瓶仓库) # 这些warehouse使用 vertical-col-major 布局 if wh_name in ["站内试剂存放堆栈", "测量小瓶仓库(测密度)"]: diff --git a/unilabos/resources/plr_additional_res_reg.py b/unilabos/resources/plr_additional_res_reg.py index 1c019ded..4dc1c4b5 100644 --- a/unilabos/resources/plr_additional_res_reg.py +++ b/unilabos/resources/plr_additional_res_reg.py @@ -18,3 +18,9 @@ def register(): from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend from unilabos.devices.liquid_handling.laiyu.backend.laiyu_v_backend import UniLiquidHandlerLaiyuBackend + # noinspection PyUnresolvedReferences + from unilabos.resources.bioyond.decks import ( + BIOYOND_PolymerReactionStation_Deck, + BIOYOND_PolymerPreparationStation_Deck, + BIOYOND_YB_Deck, + ) diff --git a/unilabos/resources/resource_tracker.py b/unilabos/resources/resource_tracker.py index 8a0fef39..6d2d8d01 100644 --- a/unilabos/resources/resource_tracker.py +++ b/unilabos/resources/resource_tracker.py @@ -341,6 +341,7 @@ def replace_plr_type(source: str): "deck": "deck", "tip_rack": "tip_rack", "tip_spot": "tip_spot", + "tip": "tip", # 添加 tip 类型支持 "tube": "tube", "bottle_carrier": "bottle_carrier", } diff --git a/unilabos/test/experiments/xkc_sensor_test.json b/unilabos/test/experiments/xkc_sensor_test.json new file mode 100644 index 00000000..ef50ddef --- /dev/null +++ b/unilabos/test/experiments/xkc_sensor_test.json @@ -0,0 +1,29 @@ +{ + "nodes": [ + { + "id": "Liquid_Sensor_1", + "name": "XKC Sensor", + "children": [], + "parent": null, + "type": "device", + "class": "sensor.xkc_rs485", + "position": { + "x": 0, + "y": 0, + "z": 0 + }, + "config": { + "port": "/dev/tty.usbserial-3110", + "baudrate": 9600, + "device_id": 1, + "threshold": 300, + "timeout": 3.0 + }, + "data": { + "level": false, + "rssi": 0 + } + } + ], + "links": [] +} \ No newline at end of file diff --git a/unilabos/test/experiments/zdt_motor_test.json b/unilabos/test/experiments/zdt_motor_test.json new file mode 100644 index 00000000..692e40ef --- /dev/null +++ b/unilabos/test/experiments/zdt_motor_test.json @@ -0,0 +1,28 @@ +{ + "nodes": [ + { + "id": "ZDT_Motor", + "name": "ZDT Motor", + "children": [], + "parent": null, + "type": "device", + "class": "motor.zdt_x42", + "position": { + "x": 0, + "y": 0, + "z": 0 + }, + "config": { + "port": "/dev/tty.usbserial-3110", + "baudrate": 115200, + "device_id": 1, + "debug": true + }, + "data": { + "position": 0, + "status": "idle" + } + } + ], + "links": [] +} \ No newline at end of file