CleanSight 是一个用于长海医院内镜清洗过程 AI 检测的后端系统。它确保每个清洗步骤都正确执行,从而提高患者安全性和合规性。
- 实时视频流处理: 捕获视频,使用 AI 模型处理,并通过 WebSocket 推送结果。
- 三线程架构: 解耦帧捕获、AI 推理和 WebSocket 推送,优化性能。
- 多任务并行推理: 支持多种 AI 模型并行执行(关键点检测、动作分析、内镜弯折检测等)。
- 可扩展架构: 基于任务注册表的设计,便于添加新的检测任务。
- RTMP 流处理: 从 RTMP 流以固定帧率提取视频帧,支持实时监控。
- AI 推理: 关键点检测 + 动作分析,实时评估清洗过程。
- 实时推送: 通过 WebSocket 推送处理后的视频帧和推理结果。
- 视频追溯: 自动生成 HLS 视频段和关键点 JSON,支持任务回放。
- 多客户端支持: 同时处理多个 RTMP 流,每个客户端独立队列管理。
app/: 主应用代码,包括 API 路由和 WebSocket 处理程序。
models/: 包含用于请求和响应验证的 Pydantic 数据结构。routers/: API 路由定义。ai.py: AI 推理服务路由inspection.py: 检查流程路由task.py: 任务管理路由
services/: 业务逻辑和 AI 模型集成。ai.py: 推理管理器和任务架构ai_models/: AI 模型实现detection.py: 关键点检测motion.py: 动作分析yolo_detection.py: 内镜弯折检测器yolo_task.py: 内镜弯折检测任务
client.py: 与摄像头/客户端通信的工具和示例客户端实现infer_task.py: 推理任务基类与调度辅助逻辑(多个任务类型的共有行为)example_custom_task.py: 自定义任务示例task.py: 任务管理和视频追溯
test/: 测试客户端代码,用于上传视频帧和显示推理结果。integration_tests/: 集成测试与端到端/远程测试脚本(用于验证完整管道)。docs/: 项目文档AI_INFERENCE_ARCHITECTURE.md: 推理架构说明QUICK_START_CUSTOM_TASK.md: 自定义任务快速开始REFACTORING_SUMMARY.md: 架构重构总结
RTMP 流 → 帧捕获线程 → CA-ReadyQueue → CA-RawQueue & AI 推理 → CA-ProcessedQueue + RT-ProcessedQueue
↓ ↓
HLS 段 + JSON WebSocket 推送
- CA-ReadyQueue: 从 RTMP 流提取的原始帧,等待 AI 推理,AI服务启动后才会开始捕获
- CA-RawQueue: 等待落盘的原始视频
- CA-ProcessedQueue: 目标检测后的处理帧(含关键点),用于生成 HLS 段以及JSON数据
- RT-ProcessedQueue: 实时推理结果(约 1 秒缓存),用于 WebSocket 推送给前端展示
独立运行,使用 mediamtx 提供视频流中转功能。配置文件位于 mediamtx_v1.15.4 (for Windows Local Test), mediamtx_v1.15.5_linux_amd64(for Linux Remote Test)。
系统采用可扩展的任务注册架构,支持多种 AI 模型并行或串行执行:
- 关键点检测: 检测内窥镜清洗过程中的关键点
- 动作分析: 分析弯曲、浸泡等清洗动作
- 内镜弯折检测: 使用 YOLOv8 模型检测内镜是否弯折
系统支持快速扩展新的检测任务,只需 3 步:
- 创建继承
InferenceTask的任务类 - 实现
infer()和visualize()方法 - 在
ai.py中注册任务
详细说明请参考文档:
# 创建虚拟环境并激活
py -3.13 -m venv .venv
.\.venv\Scripts\activate
# 安装依赖(包含 ultralytics 用于内镜弯折检测)
pip install -r requirements.txt参考 .env.example 创建 .env.dev (开发) 和 .env (生产) 配置文件。
环境配置:
- 开发环境:默认加载
.env.dev - 生产环境:使用 start_prod.ps1 (Windows) 或 start_prod.sh (Linux) 启动,自动加载
.env
Docker 化的双服务开发栈(Postgres + MediaMTX)已经编排在 docker-compose.yml。
-
启动:第一次运行会拉取镜像并构建上述服务镜像。
# docker构建服务并启动 docker compose up --build # 本地启动后端 uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
启动完成后:
- API: http://localhost:8000/docs
- Postgres:
postgresql://cleansight:cleansight@localhost:5432/cleansight - MediaMTX:
rtmp://localhost:1935/live/<stream>、rtsp://localhost:8004/<path>
-
组件说明
db:postgres:15-alpine,持久化卷postgres_data保存数据文件。mediamtx:使用官方bluenviron/mediamtx:1.15.4镜像,并挂载 mediamtx_v1.15.4/mediamtx.yml 作为配置,可直接在宿主机修改后docker compose restart mediamtx生效。- (开发中),改用python代码访问数据库,而非使用脚本。
-
常用命令
- 停止并移除资源:
docker compose down -v - 查看应用日志:
docker compose logs -f app - 进入数据库:
docker compose exec db psql -U cleansight -d cleansight
- 停止并移除资源:
提示:如果需要变更数据库凭据或端口,可直接编辑 docker-compose.yml,同时更新
app服务的CLEANSIGHT_*变量即可。
# Windows
.\.venv\Scripts\activate
uvicorn app.main:app --reload
# Linux/Mac
source .venv/bin/activate
uvicorn app.main:app --reload默认加载 .env.dev 配置,本地访问:http://localhost:8000
Windows:
.\start_prod.ps1Linux/Mac:
chmod +x start_prod.sh
./start_prod.sh这些脚本会:
- 自动激活虚拟环境
- 设置
CLEANSIGHT_PROD=1加载.env配置 - 启动服务允许外部访问 (
0.0.0.0:8000)
# Windows
.\.venv\Scripts\activate
$env:CLEANSIGHT_PROD = '1'
uvicorn app.main:app --host 0.0.0.0 --port 8000
# Linux/Mac
source .venv/bin/activate
export CLEANSIGHT_PROD=1
uvicorn app.main:app --host 0.0.0.0 --port 8000系统按以下优先级加载配置文件(优先级从高到低):
-
CLEANSIGHT_ENV_FILE(最高优先级)
若设置此环境变量,则加载指定路径的配置文件(可为绝对或相对路径)$env:CLEANSIGHT_ENV_FILE = 'C:\secrets\custom.env'
-
CLEANSIGHT_PROD=1(生产模式)
设置此环境变量后,系统将加载项目根目录下的.env文件作为生产环境配置$env:CLEANSIGHT_PROD = '1'
-
.env.dev(开发模式,默认)
未设置以上环境变量时,系统默认加载项目根目录下的.env.dev文件作为开发环境配置
直接设置的环境变量 (CLEANSIGHT_*)
> CLEANSIGHT_ENV_FILE 指定的文件
> CLEANSIGHT_PROD=1 时的 .env 文件
> .env.dev 文件(默认)
> 代码中的默认值
CLEANSIGHT_ENV_FILE:指定配置文件路径(覆盖所有默认行为)CLEANSIGHT_PROD:设置为1、true或yes启用生产模式CLEANSIGHT_STRICT:设置为1时,在缺失关键配置时抛出异常(生产环境推荐);否则仅打印警告
# 开发模式(默认,自动加载 .env.dev)
uvicorn app.main:app --reload
# 生产模式(加载 .env)
$env:CLEANSIGHT_PROD = '1'
uvicorn app.main:app --host 0.0.0.0 --port 8000
# 使用自定义配置文件
$env:CLEANSIGHT_ENV_FILE = 'C:\secrets\cleansight.env'
uvicorn app.main:app --host 0.0.0.0 --port 8000
# 生产模式 + 严格校验
$env:CLEANSIGHT_PROD = '1'
$env:CLEANSIGHT_STRICT = '1'
uvicorn app.main:app --host 0.0.0.0 --port 8000建议:将 .env 或敏感文件路径通过 CI/服务管理器安全注入,而不是直接提交到仓库。
当允许外部访问时,请注意:
- 防火墙配置: 只开放必要端口
- HTTPS: 生产环境建议使用HTTPS
- 认证: 考虑添加API认证机制
- 反向代理: 建议使用nginx等反向代理
运行后,访问 http://localhost:8000/docs 查看交互式 HTTP API 文档。
-
URL:
GET /ai/status -
描述: 获取所有客户端的队列状态和AI服务运行信息
-
响应:
{ "clients": 2, "queues": { "camera_001": { "ca_raw": 15, "ca_processed": 120, "rt_processed": 30, "rtmp_url": "rtmp://192.168.1.100:1935/live/endoscope" } } }
-
URL:
GET /ai/load_task/{task_id} -
描述: 从数据库加载任务,为指定task_id的任务在AI服务中创建任务对象
-
路径参数:
task_id(int): 任务唯一标识符
-
响应:
{ "task_id": 0, "status": "running", "cleaning_stage": "1", "bending": false, "bubble_detected": false, "fully_submerged": false, "updated_at": "2024-01-01T12:00:00" }
-
URL:
POST /ai/terminate_task/{client_id} -
描述: 终止指定的清洗任务,清理所有AI服务资源
-
路径参数:
client_id(str): 客户端唯一标识符
-
响应:
{ "status": "success", "message": "Task terminated for client camera_001" }
-
URL:
POST /inspection/start_rtmp_stream -
描述: 启动RTMP流捕获,以固定帧率提取视频帧
-
请求体:
{ "client_id": "camera_001", "rtmp_url": "rtmp://localhost:1935/live/endoscope", "fps": 30 } -
响应:
{ "status": "success", "message": "RTMP 流捕获已启动 for camera_001" }
-
URL:
POST /inspection/stop_rtmp_stream?client_id={client_id} -
描述: 停止指定客户端的RTMP流捕获
-
查询参数:
client_id(str): 客户端唯一标识符
-
响应:
{ "status": "success", "message": "RTMP 流捕获已停止 for camera_001" }
-
URL:
POST /inspection/start_rtsp_stream -
描述: 启动 RTSP 流捕获;请求体包含
client_id,rtsp_url,fps。 -
请求体示例:
{ "client_id": "camera_001", "rtsp_url": "rtsp://localhost:8004/live/stream", "fps": 30 } -
响应示例:
{ "status": "success", "message": "RTSP 流捕获已启动 for camera_001" }
-
URL:
POST /inspection/stop_rtsp_stream?client_id={client_id} -
描述: 停止指定客户端的 RTSP 流捕获。
-
查询参数:
client_id(str): 客户端唯一标识符
-
响应示例:
{ "status": "success", "message": "RTSP 流捕获已停止 for camera_001" }
-
URL:
GET /task/traceback/{task_id}/segments -
描述: 获取任务的所有HLS视频段路径和关键点JSON路径
-
路径参数:
task_id(int): 任务ID
-
查询参数:
video_type(str, 可选): 视频类型 ("raw" 或 "processed", 默认 "processed")
-
响应:
{ "task_id": 0, "video_type": "processed", "total_segments": 5, "playlist_path": "/data/task_0/processed_playlist.m3u8", "segments": [ { "segment_id": 1, "segment_path": "/data/task_0/processed_segment_1735689600000.mp4", "start_time": "2024-01-01T12:00:00", "end_time": "2024-01-01T12:00:10", "client_id": "camera_001", "keypoints_path": "/data/task_0/keypoints_1735689600000.json" } ] }
- URL:
GET /task/traceback/{task_id}/playlist - 描述: 获取任务的HLS播放列表文件(.m3u8)
- 路径参数:
task_id(int): 任务ID
- 查询参数:
video_type(str, 可选): 视频类型 ("raw" 或 "processed", 默认 "processed")
- 响应: M3U8播放列表文件
- URL:
GET /task/traceback/{task_id}/video/{segment_id} - 描述: 流式传输指定的视频段
- 路径参数:
task_id(int): 任务IDsegment_id(int): 段ID
- 响应: MP4视频文件流
- URL:
GET /task/traceback/{task_id}/keypoints/{segment_id} - 描述: 获取指定视频段的关键点JSON数据
- 路径参数:
task_id(int): 任务IDsegment_id(int): 段ID
- 响应: 关键点JSON数据
-
URL:
GET /task/traceback/{task_id}/all_keypoints -
描述: 获取任务的所有关键点数据(合并所有段)
-
路径参数:
task_id(int): 任务ID
-
响应:
{ "task_id": 0, "total_frames": 900, "keypoints": [ { "frame_id": 1, "timestamp": 1735689600000, "keypoints": [...], "confidence": 0.95 } ] }
- URL:
GET /task/{task_id}/alarms - 描述: 查询本地数据库中
alarm_record表为指定task_id保存的所有告警记录,按created_at降序返回。适用于回溯某任务的所有异常事件与上报历史。 - 路径参数:
task_id(int): 任务ID
- 响应示例:
{
"task_id": 1,
"total": 2,
"alarms": [
{
"id": 123,
"task_id": 1,
"step_id": "0",
"alarm_type": "流程违规",
"alarm_level": "high",
"alarm_message": "检测到未按规范操作:操作员未佩戴手套",
"alarm_time": "2025-12-08T20:30:15",
"detection_result": {"detected_objects": ["person","glove"], "confidence": 0.95},
"camera_ip": "192.168.1.64",
"reader_ip": "172.16.77.221",
"created_at": "2025-12-08T20:30:20"
}
]
}cURL 示例:
curl -X GET "http://localhost:8000/task/1/alarms"注意:当前实现会在运行时尝试创建并写入 alarm_record 表(针对 PostgreSQL)。若使用其他数据库,请确保表结构兼容或采用 ORM/migration 管理表结构。
-
URL:
ws://localhost:8000/ai/video?client_id={client_id} -
描述: 实时接收 AI 处理后的视频帧(含关键点标注)
-
连接参数:
client_id(必需): 客户端唯一标识符
-
数据格式: Base64 编码的 JPEG 图像
// 接收示例 data:image/jpeg;base64,/9j/4AAQSkZJRgABAQAAAQ...
-
URL:
ws://localhost:8000/task/status/{client_id} -
描述: 实时接收任务状态更新
-
路径参数:
client_id(必需): 客户端唯一标识符
-
数据格式: JSON
// 有活跃任务时 { "task_id": "task_123", "status": "active", "cleaning_stage": 1, "bending_count": 5, "bubble_detected": false, "fully_submerged": true, "updated_at": "2024-01-01T12:00:00" } // 无活跃任务时 { "status": "no_active_task" }
# 运行完整的本地管道测试(需要本地MediaMTX服务)
# rtmp
python integration_tests/test_full_pipeline.py
# rtsp
python integration_tests/test_full_pipeline_rtsp.py用于测试部署在远程服务器上的CleanSight服务:
# 基本用法
python integration_tests/remote_test_pipeline.py --server 192.168.1.100
# 自定义参数
python integration_tests/remote_test_pipeline.py --server 192.168.1.100 --duration 120 --task_id 0 --client_id remote_test_client远程测试功能:
- 向远程服务器推送RTMP视频流
- 加载远程任务 (task_id=0)
- 实时接收AI推理结果和状态更新
- 本地可视化显示远程处理结果
- 自动化测试报告
详细使用说明见:远程测试框架文档
- 捕获线程: 持续从视频源捕获最新帧。
- 推理线程: 使用 AI 模型处理帧(当前为模拟实现)。
- WebSocket 线程: 将处理结果推送到连接的客户端。
- 帧丢弃: 自动丢弃旧帧以保持实时性能。
- URL:
ws://localhost:8000/ai/video?client_id={client_id} - 请求类型: WebSocket
- 描述: 实时视频流,包含 AI 处理结果。
- 连接参数:
client_id(必需): 客户端唯一标识符
- 数据格式: Base64 编码的 JPEG 图像 (
data:image/jpeg;base64,...)