Skip to content
Open
328 changes: 328 additions & 0 deletions .cursor/rules/device-drivers.mdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
---
description: 设备驱动开发规范
globs: ["unilabos/devices/**/*.py"]
---

# 设备驱动开发规范

## 目录结构

```
unilabos/devices/
├── virtual/ # 虚拟设备(用于测试)
│ ├── virtual_stirrer.py
│ └── virtual_centrifuge.py
├── liquid_handling/ # 液体处理设备
├── balance/ # 天平设备
├── hplc/ # HPLC设备
├── pump_and_valve/ # 泵和阀门
├── temperature/ # 温度控制设备
├── workstation/ # 工作站(组合设备)
└── ...
```

## 设备类完整模板

```python
import asyncio
import logging
import time as time_module
from typing import Dict, Any, Optional

from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode


class MyDevice:
"""
设备类描述

Attributes:
device_id: 设备唯一标识
config: 设备配置字典
data: 设备状态数据
"""

_ros_node: BaseROS2DeviceNode

def __init__(
self,
device_id: str = None,
config: Dict[str, Any] = None,
**kwargs
):
"""
初始化设备

Args:
device_id: 设备ID
config: 配置字典
**kwargs: 其他参数
"""
# 兼容不同调用方式
if device_id is None and 'id' in kwargs:
device_id = kwargs.pop('id')
if config is None and 'config' in kwargs:
config = kwargs.pop('config')

self.device_id = device_id or "unknown_device"
self.config = config or {}
self.data = {}

# 从config读取参数
self.port = self.config.get('port') or kwargs.get('port', 'COM1')
self._max_value = self.config.get('max_value', 1000.0)

# 初始化日志
self.logger = logging.getLogger(f"MyDevice.{self.device_id}")

self.logger.info(f"设备 {self.device_id} 已创建")

def post_init(self, ros_node: BaseROS2DeviceNode):
"""
ROS节点注入 - 在ROS节点创建后调用

Args:
ros_node: ROS2设备节点实例
"""
self._ros_node = ros_node

async def initialize(self) -> bool:
"""
初始化设备 - 连接硬件、设置初始状态

Returns:
bool: 初始化是否成功
"""
self.logger.info(f"初始化设备 {self.device_id}")

try:
# 执行硬件初始化
# await self._connect_hardware()

# 设置初始状态
self.data.update({
"status": "待机",
"is_running": False,
"current_value": 0.0,
})

self.logger.info(f"设备 {self.device_id} 初始化完成")
return True

except Exception as e:
self.logger.error(f"初始化失败: {e}")
self.data["status"] = f"错误: {e}"
return False

async def cleanup(self) -> bool:
"""
清理设备 - 断开连接、释放资源

Returns:
bool: 清理是否成功
"""
self.logger.info(f"清理设备 {self.device_id}")

self.data.update({
"status": "离线",
"is_running": False,
})

return True

# ==================== 设备动作 ====================

async def execute_action(
self,
param1: float,
param2: str = "",
**kwargs
) -> bool:
"""
执行设备动作

Args:
param1: 参数1
param2: 参数2(可选)

Returns:
bool: 动作是否成功
"""
# 类型转换和验证
try:
param1 = float(param1)
except (ValueError, TypeError) as e:
self.logger.error(f"参数类型错误: {e}")
return False

# 参数验证
if param1 > self._max_value:
self.logger.error(f"参数超出范围: {param1} > {self._max_value}")
return False

self.logger.info(f"执行动作: param1={param1}, param2={param2}")

# 更新状态
self.data.update({
"status": "运行中",
"is_running": True,
})

# 执行动作(带进度反馈)
duration = 10.0 # 秒
start_time = time_module.time()

while True:
elapsed = time_module.time() - start_time
remaining = max(0, duration - elapsed)
progress = min(100, (elapsed / duration) * 100)

self.data.update({
"status": f"运行中: {progress:.0f}%",
"remaining_time": remaining,
})

if remaining <= 0:
break

await self._ros_node.sleep(1.0)

# 完成
self.data.update({
"status": "完成",
"is_running": False,
})

self.logger.info("动作执行完成")
return True

# ==================== 状态属性 ====================

@property
def status(self) -> str:
"""设备状态 - 自动发布为ROS Topic"""
return self.data.get("status", "未知")

@property
def is_running(self) -> bool:
"""是否正在运行"""
return self.data.get("is_running", False)

@property
def current_value(self) -> float:
"""当前值"""
return self.data.get("current_value", 0.0)

# ==================== 辅助方法 ====================

def get_device_info(self) -> Dict[str, Any]:
"""获取设备信息"""
return {
"device_id": self.device_id,
"status": self.status,
"is_running": self.is_running,
"current_value": self.current_value,
}

def __str__(self) -> str:
return f"MyDevice({self.device_id}: {self.status})"
```

## 关键规则

### 1. 参数处理

所有动作方法的参数都可能以字符串形式传入,必须进行类型转换:

```python
async def my_action(self, value: float, **kwargs) -> bool:
# 始终进行类型转换
try:
value = float(value)
except (ValueError, TypeError) as e:
self.logger.error(f"参数类型错误: {e}")
return False
```

### 2. vessel 参数处理

vessel 参数可能是字符串ID或字典:

```python
def extract_vessel_id(vessel: Union[str, dict]) -> str:
if isinstance(vessel, dict):
return vessel.get("id", "")
return str(vessel) if vessel else ""
```

### 3. 状态更新

使用 `self.data` 字典存储状态,属性读取状态:

```python
# 更新状态
self.data["status"] = "运行中"
self.data["current_speed"] = 300.0

# 读取状态(通过属性)
@property
def status(self) -> str:
return self.data.get("status", "待机")
```

### 4. 异步等待

使用 ROS 节点的 sleep 方法:

```python
# 正确
await self._ros_node.sleep(1.0)

# 避免(除非在纯 Python 测试环境)
await asyncio.sleep(1.0)
```

### 5. 进度反馈

长时间运行的操作需要提供进度反馈:

```python
while remaining > 0:
progress = (elapsed / total_time) * 100
self.data["status"] = f"运行中: {progress:.0f}%"
self.data["remaining_time"] = remaining

await self._ros_node.sleep(1.0)
```

## 虚拟设备

虚拟设备用于测试和演示,放在 `unilabos/devices/virtual/` 目录:

- 类名以 `Virtual` 开头
- 文件名以 `virtual_` 开头
- 模拟真实设备的行为和时序
- 使用表情符号增强日志可读性(可选)

## 工作站设备

工作站是组合多个设备的复杂设备:

```python
from unilabos.devices.workstation.workstation_base import WorkstationBase

class MyWorkstation(WorkstationBase):
"""组合工作站"""

async def execute_workflow(self, workflow: Dict[str, Any]) -> bool:
"""执行工作流"""
pass
```

## 设备注册

设备类开发完成后,需要在注册表中注册:

1. 创建/编辑 `unilabos/registry/devices/my_category.yaml`
2. 添加设备配置(参考 `virtual_device.yaml`)
3. 运行 `--complete_registry` 自动生成 schema
Loading
Loading