diff --git a/memory-networkx-service/.env.example b/memory-networkx-service/.env.example new file mode 100644 index 0000000000..ee1bd8fd1c --- /dev/null +++ b/memory-networkx-service/.env.example @@ -0,0 +1,15 @@ +# 环境变量配置文件 +# Redis 配置 +REDIS_HOST=redis.higress-system.svc.cluster.local +REDIS_PORT=6379 +REDIS_PASSWORD= +REDIS_DB=0 + +# 服务配置 +SERVICE_PORT=8080 +LOG_LEVEL=INFO + +# 记忆配置 +SHORT_TERM_MEMORY_SIZE=20 +RETRIEVE_TOP_N=5 +SUMMARY_MAX_TAGS=30 diff --git a/memory-networkx-service/Dockerfile b/memory-networkx-service/Dockerfile new file mode 100644 index 0000000000..73e4fe4b62 --- /dev/null +++ b/memory-networkx-service/Dockerfile @@ -0,0 +1,28 @@ +# Memory Enhancement Service Dockerfile +FROM python:3.11-slim + +WORKDIR /app + +# 安装系统依赖 +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# 复制依赖文件 +COPY requirements.txt . + +# 安装 Python 依赖 +RUN pip install --no-cache-dir -r requirements.txt + +# 复制应用代码 +COPY app/ ./app/ + +# 暴露端口 +EXPOSE 8080 + +# 健康检查 +HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ + CMD python -c "import requests; requests.get('http://localhost:8080/health')" + +# 启动应用 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/memory-networkx-service/INTEGRATION_GUIDE.md b/memory-networkx-service/INTEGRATION_GUIDE.md new file mode 100644 index 0000000000..f75a61b175 --- /dev/null +++ b/memory-networkx-service/INTEGRATION_GUIDE.md @@ -0,0 +1,360 @@ +# Memory Enhancement Service 集成指南 + +本指南将帮助您将 Memory Enhancement Service 与 Higress ai-history 插件集成。 + +## 系统架构 + +``` +┌─────────────┐ +│ AI Client │ +└──────┬──────┘ + │ + ▼ +┌─────────────────────────────────────────┐ +│ Higress Gateway │ +│ ┌───────────────────────────────────┐ │ +│ │ ai-history Plugin (WASM Go) │ │ +│ │ │ │ +│ │ 1. 检索增强记忆 ────────┐ │ │ +│ │ 2. 注入上下文 │ │ │ +│ │ 3. 异步保存记忆 ────┐ │ │ │ +│ └─────────────────────┼───┼───────┘ │ +└────────────────────────┼───┼──────────┘ + │ │ + ┌─────────────────┘ └─────────────────┐ + ▼ ▼ +┌──────────────────────┐ ┌──────────────────┐ +│ Memory Enhancement │ │ Redis (对话历史) │ +│ Service (Python) │◄───────────┤ │ +│ │ └──────────────────┘ +│ - NetworkX 图谱 │ +│ - 记忆检索算法 │ +│ - 关键词提取 │ +└──────────────────────┘ +``` + +## 工作流程 + +### 请求流程 + +1. **用户请求** → Higress Gateway +2. **ai-history 插件** 拦截请求 +3. **检索记忆**:调用 Memory Service `/api/v1/memory/retrieve` +4. **注入上下文**:将检索到的记忆插入到 `messages` 中 +5. **转发请求** → LLM Provider +6. **返回响应** → 用户 + +### 响应流程 + +1. **LLM 响应** → Higress Gateway +2. **ai-history 插件** 拦截响应 +3. **保存对话**:保存到 Redis(原有功能) +4. **异步增强**:调用 Memory Service `/api/v1/memory/enhance` +5. **更新图谱**:Memory Service 提取关键词、更新 NetworkX 图 +6. **返回响应** → 用户 + +## 部署步骤 + +### 步骤 1:准备环境 + +确保您已安装: +- Kubernetes 集群 +- Higress Gateway +- kubectl 命令行工具 + +### 步骤 2:构建 Memory Service 镜像 + +```bash +cd memory-networkx-service + +# 构建 Docker 镜像 +docker build -t your-registry/memory-enhancement-service:latest . + +# 推送到镜像仓库 +docker push your-registry/memory-enhancement-service:latest +``` + +### 步骤 3:部署 Memory Service + +```bash +# 修改 deploy/complete-deployment.yaml 中的镜像地址 +# 将 your-registry 替换为您的实际镜像仓库地址 + +# 部署所有组件 +kubectl apply -f deploy/complete-deployment.yaml + +# 检查部署状态 +kubectl get pods -n higress-system +``` + +### 步骤 4:验证部署 + +```bash +# 查看 Memory Service 日志 +kubectl logs -n higress-system -l app=memory-service --tail=50 + +# 检查服务健康状态 +kubectl exec -it -n higress-system deployment/memory-enhancement-service -- \ + curl http://localhost:8080/health + +# 预期输出: {"status":"healthy"} +``` + +### 步骤 5:配置 ai-history 插件 + +ai-history 插件已在原代码基础上扩展,支持 Memory Service 配置。 + +**重要**:需要重新编译 ai-history 插件: + +```bash +cd plugins/wasm-go/extensions/ai-history + +# 编译插件 +GOOS=wasip1 GOARCH=wasm go build -o main.wasm . + +# 构建 OCI 镜像 +docker build -t your-registry/ai-history:1.0.0 -f Dockerfile . +docker push your-registry/ai-history:1.0.0 +``` + +**Dockerfile 示例**(在 ai-history 目录下创建): +```dockerfile +FROM scratch +COPY main.wasm plugin.wasm +``` + +### 步骤 6:应用插件配置 + +```bash +# 修改 deploy/ai-history-plugin-config.yaml 中的镜像地址 +# 应用配置 +kubectl apply -f deploy/ai-history-plugin-config.yaml + +# 验证插件已加载 +kubectl get wasmplugin -n higress-system ai-history +``` + +## 配置说明 + +### Memory Service 配置项 + +在 ai-history 插件配置中添加 `memoryService` 部分: + +```yaml +memoryService: + enabled: true # 是否启用记忆增强(默认:false) + serviceName: memory-service.... # 服务地址(FQDN) + servicePort: 8080 # 服务端口(默认:8080) + timeout: 5000 # 超时时间,毫秒(默认:5000) + topK: 5 # 检索记忆数量(默认:5) +``` + +### 环境变量配置 + +Memory Service 支持以下环境变量(在 ConfigMap 中配置): + +| 变量名 | 说明 | 默认值 | +|--------|------|--------| +| `REDIS_HOST` | Redis 主机地址 | `localhost` | +| `REDIS_PORT` | Redis 端口 | `6379` | +| `REDIS_PASSWORD` | Redis 密码(可选) | `""` | +| `REDIS_DB` | Redis 数据库编号 | `0` | +| `SHORT_TERM_MEMORY_SIZE` | 短期记忆大小(对话数) | `20` | +| `RETRIEVE_TOP_N` | 默认检索数量 | `5` | +| `SUMMARY_MAX_TAGS` | 每个记忆的最大标签数 | `30` | + +## 测试验证 + +### 1. 手动测试 Memory Service + +```bash +# 端口转发 +kubectl port-forward -n higress-system svc/memory-service 8080:8080 + +# 测试健康检查 +curl http://localhost:8080/health + +# 测试添加记忆 +curl -X POST http://localhost:8080/api/v1/memory/enhance \ + -H "Content-Type: application/json" \ + -d '{ + "session_id": "test-session", + "question": "什么是 NetworkX?", + "answer": "NetworkX 是一个用于创建、操作和研究复杂网络的 Python 库。" + }' + +# 测试检索记忆 +curl -X POST http://localhost:8080/api/v1/memory/retrieve \ + -H "Content-Type: application/json" \ + -d '{ + "session_id": "test-session", + "query": "如何使用图数据库?", + "top_k": 5 + }' + +# 查看统计信息 +curl http://localhost:8080/api/v1/memory/stats/test-session +``` + +### 2. 端到端测试 + +发送 AI 对话请求: + +```bash +# 通过 Higress Gateway 发送请求 +curl -X POST http://your-higress-gateway/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer your-token" \ + -d '{ + "model": "gpt-3.5-turbo", + "messages": [ + {"role": "user", "content": "介绍一下 NetworkX"} + ] + }' +``` + +检查日志验证集成: + +```bash +# 查看 ai-history 插件日志(在 Higress 日志中) +kubectl logs -n higress-system -l app=higress-gateway --tail=100 | grep "ai-history" + +# 应该看到类似日志: +# retrieved 3 enhanced memories +# memory enhanced successfully for session: xxx + +# 查看 Memory Service 日志 +kubectl logs -n higress-system -l app=memory-service --tail=100 + +# 应该看到: +# 收到增强记忆请求: session_id=xxx +# 收到检索请求: session_id=xxx, query=... +# 检索到 N 条相关记忆 +``` + +## 故障排查 + +### Memory Service 无法启动 + +**症状**:Pod 处于 CrashLoopBackOff 状态 + +**排查步骤**: +```bash +# 查看详细日志 +kubectl logs -n higress-system -l app=memory-service --tail=200 + +# 常见原因: +# 1. Redis 连接失败 - 检查 Redis 服务是否运行 +kubectl get svc -n higress-system redis + +# 2. 镜像拉取失败 - 检查镜像仓库凭证 +kubectl describe pod -n higress-system -l app=memory-service +``` + +### 记忆检索没有结果 + +**可能原因**: +1. 对话数量不足,尚未转换为长期记忆 +2. 查询关键词与记忆标签匹配度低 + +**解决方案**: +```bash +# 查看记忆统计 +curl http://localhost:8080/api/v1/memory/stats/your-session-id + +# 如果 total_memories 为 0,说明还没有长期记忆 +# 需要积累更多对话(默认 20 条短期记忆才会转换) +``` + +### ai-history 插件未调用 Memory Service + +**排查步骤**: +```bash +# 1. 检查插件配置 +kubectl get wasmplugin -n higress-system ai-history -o yaml + +# 确认 memoryService.enabled 为 true + +# 2. 检查服务连通性 +kubectl exec -it -n higress-system deployment/higress-gateway -- \ + curl -v http://memory-service.higress-system.svc.cluster.local:8080/health + +# 3. 检查 Higress 日志 +kubectl logs -n higress-system -l app=higress-gateway | grep -i "memory" +``` + +## 性能优化 + +### 1. 调整副本数 + +根据负载调整 Memory Service 副本数: + +```bash +kubectl scale deployment memory-enhancement-service \ + -n higress-system --replicas=5 +``` + +### 2. 资源限制 + +根据实际使用情况调整资源配置: + +```yaml +resources: + requests: + memory: "512Mi" # 增加内存 + cpu: "500m" + limits: + memory: "1Gi" + cpu: "1000m" +``` + +### 3. Redis 优化 + +```bash +# 使用 Redis Cluster 提升性能 +# 或使用 Redis 持久化避免数据丢失 + +# 添加 Redis 配置 +kubectl create configmap redis-config \ + --from-literal=maxmemory=256mb \ + --from-literal=maxmemory-policy=allkeys-lru \ + -n higress-system +``` + +## 监控和告警 + +### Prometheus 指标 + +Memory Service 暴露以下指标(通过 FastAPI): + +``` +# 请求计数 +http_requests_total{method="POST",path="/api/v1/memory/enhance"} +http_requests_total{method="POST",path="/api/v1/memory/retrieve"} + +# 响应时间 +http_request_duration_seconds{method="POST",path="/api/v1/memory/enhance"} +``` + +### 日志级别调整 + +```yaml +# 在 ConfigMap 中修改 +data: + LOG_LEVEL: "DEBUG" # 改为 DEBUG 获取更详细日志 +``` + +## 最佳实践 + +1. **分离环境**:开发、测试、生产环境使用不同的 Redis 数据库编号 +2. **备份策略**:定期备份 Redis 数据 +3. **性能测试**:在生产环境前进行压力测试 +4. **监控告警**:配置 Memory Service 和 Redis 的监控告警 +5. **降级策略**:Memory Service 不可用时,ai-history 应能正常工作(仅使用 Redis 对话历史) + +## 下一步 + +- 集成专业的中文分词工具(如 jieba)提升关键词提取质量 +- 使用 LLM API 进行摘要生成 +- 添加更多图算法(如社区发现、中心性分析) +- 实现记忆的主动推送机制 diff --git a/memory-networkx-service/LIGHTRAG_INTEGRATION.md b/memory-networkx-service/LIGHTRAG_INTEGRATION.md new file mode 100644 index 0000000000..60e1379032 --- /dev/null +++ b/memory-networkx-service/LIGHTRAG_INTEGRATION.md @@ -0,0 +1,1240 @@ +# LightRAG 集成方案 - 增强长期记忆系统 + +## 📋 方案概述 + +本方案详细说明如何在现有 Memory Enhancement Service 中集成 LightRAG,用于长期记忆的构建和检索,实现更智能的知识图谱和语义检索能力。 + +## 🎯 为什么选择 LightRAG + +### LightRAG 核心优势 + +1. **双层检索架构** + - 低层级:实体/关系级别的细粒度检索 + - 高层级:主题级别的粗粒度检索 + - 混合检索:结合两种策略获得最佳效果 + +2. **图增强生成** + - 自动从文本中提取实体和关系 + - 构建知识图谱进行推理 + - 支持多跳查询和关联分析 + +3. **增量更新** + - 支持动态添加新知识 + - 无需重建整个索引 + - 适合对话场景的持续学习 + +4. **高性能检索** + - 向量化存储和快速检索 + - 支持混合搜索(向量+关键词) + - 可扩展的索引结构 + +### 与现有 NetworkX 方案的对比 + +| 特性 | NetworkX 方案 | LightRAG 方案 | 混合方案 | +|------|--------------|---------------|----------| +| 关键词提取 | 简单分词 | LLM 提取实体 | ✅ LightRAG | +| 图谱构建 | 共现关系 | 实体关系图 | ✅ 两者结合 | +| 语义检索 | Jaccard 相似度 | 向量相似度 | ✅ LightRAG | +| 多跳推理 | 有限支持 | 原生支持 | ✅ LightRAG | +| 摘要生成 | 截断文本 | LLM 生成 | ✅ LightRAG | +| 增量更新 | ✅ 支持 | ✅ 支持 | ✅ 两者都支持 | + +**推荐方案**:采用 LightRAG 作为主要长期记忆系统,保留 NetworkX 用于快速关键词关联分析。 + +## 🏗️ 架构设计 + +### 整体架构 + +```mermaid +graph TB + Client[AI Client] --> Gateway[Higress Gateway] + Gateway --> AIHistory[ai-history Plugin] + + AIHistory --> |检索记忆| MemService[Memory Service] + AIHistory --> |保存对话| Redis[(Redis)] + AIHistory --> |异步增强| MemService + + MemService --> ShortMem[短期记忆缓存] + ShortMem --> |达到阈值| LongMem[长期记忆转换] + + LongMem --> LightRAG[LightRAG Engine] + LongMem --> NetworkX[NetworkX Graph] + + LightRAG --> |实体提取| LLM[LLM API] + LightRAG --> |向量化| VectorDB[(Vector DB)] + LightRAG --> |图存储| GraphStore[(Graph Store)] + + NetworkX --> |关键词| NXGraph[(NetworkX Graph)] + + MemService --> |混合检索| Retriever[混合检索器] + Retriever --> |语义检索| LightRAG + Retriever --> |关键词扩展| NetworkX + + style LightRAG fill:#e1f5ff + style NetworkX fill:#fff4e1 + style Retriever fill:#e8f5e9 +``` + +### 数据流程 + +```mermaid +sequenceDiagram + participant User + participant AIHistory + participant MemService + participant LightRAG + participant LLM + participant VectorDB + + User->>AIHistory: 发送对话请求 + AIHistory->>MemService: 检索增强记忆 + MemService->>LightRAG: 语义检索 + LightRAG->>VectorDB: 向量查询 + VectorDB-->>LightRAG: 相关文档 + LightRAG->>LLM: 生成增强上下文 + LLM-->>LightRAG: 增强结果 + LightRAG-->>MemService: 返回上下文 + MemService-->>AIHistory: 注入上下文 + AIHistory->>LLM: 转发增强请求 + LLM-->>AIHistory: 返回响应 + AIHistory-->>User: 返回结果 + + AIHistory->>MemService: 异步保存对话 + MemService->>MemService: 累积短期记忆 + + alt 达到转换阈值 + MemService->>LightRAG: 插入新知识 + LightRAG->>LLM: 提取实体和关系 + LLM-->>LightRAG: 实体关系数据 + LightRAG->>VectorDB: 存储向量 + LightRAG->>GraphStore: 更新图谱 + end +``` + +## 💻 技术实现 + +### 1. 依赖安装 + +```python +# requirements.txt 更新 +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +pydantic==2.5.3 +pydantic-settings==2.1.0 +redis==5.0.1 +networkx==3.2.1 +numpy==1.26.3 +python-multipart==0.0.6 + +# LightRAG 相关依赖 +lightrag==0.1.0 # LightRAG 核心库 +openai==1.12.0 # OpenAI API 客户端 +tiktoken==0.5.2 # Token 计数 +chromadb==0.4.22 # 向量数据库(可选) +# 或使用其他向量数据库 +# qdrant-client==1.7.0 +# weaviate-client==3.25.0 + +# 图数据库支持(可选) +neo4j==5.16.0 # Neo4j 驱动 +``` + +### 2. LightRAG 配置模块 + +```python +# app/lightrag_config.py +""" +LightRAG 配置和初始化 +""" +from typing import Optional +from pydantic import BaseModel +from lightrag import LightRAG, QueryParam +from lightrag.llm import openai_complete_if_cache, openai_embedding +from lightrag.base import BaseVectorStorage, BaseGraphStorage +import os + + +class LightRAGConfig(BaseModel): + """LightRAG 配置""" + # LLM 配置 + llm_model: str = "gpt-4o-mini" + llm_api_base: str = "https://api.openai.com/v1" + llm_api_key: str = "" + + # Embedding 配置 + embedding_model: str = "text-embedding-3-small" + embedding_dim: int = 1536 + + # 存储配置 + working_dir: str = "./lightrag_data" + vector_storage: str = "chroma" # chroma, qdrant, weaviate + graph_storage: str = "networkx" # networkx, neo4j + + # 检索配置 + top_k: int = 10 + max_token_for_text_unit: int = 4000 + max_token_for_global_context: int = 4000 + max_token_for_local_context: int = 4000 + + +class LightRAGManager: + """LightRAG 管理器 - 单例模式""" + + _instance = None + _rag_instances = {} # session_id -> LightRAG 实例 + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, 'initialized'): + self.config = LightRAGConfig( + llm_api_key=os.getenv("OPENAI_API_KEY", ""), + llm_api_base=os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1"), + llm_model=os.getenv("LLM_MODEL", "gpt-4o-mini"), + embedding_model=os.getenv("EMBEDDING_MODEL", "text-embedding-3-small"), + working_dir=os.getenv("LIGHTRAG_WORKING_DIR", "./lightrag_data"), + ) + self.initialized = True + + def get_rag_instance(self, session_id: str) -> LightRAG: + """获取或创建指定 session 的 LightRAG 实例""" + if session_id not in self._rag_instances: + working_dir = os.path.join(self.config.working_dir, session_id) + os.makedirs(working_dir, exist_ok=True) + + self._rag_instances[session_id] = LightRAG( + working_dir=working_dir, + llm_model_func=self._create_llm_func(), + embedding_func=self._create_embedding_func(), + # 可选:自定义向量存储 + # vector_storage=self._create_vector_storage(), + # 可选:自定义图存储 + # graph_storage=self._create_graph_storage(), + ) + + return self._rag_instances[session_id] + + def _create_llm_func(self): + """创建 LLM 函数""" + async def llm_func( + prompt, + system_prompt=None, + history_messages=[], + **kwargs + ) -> str: + return await openai_complete_if_cache( + model=self.config.llm_model, + prompt=prompt, + system_prompt=system_prompt, + history_messages=history_messages, + api_key=self.config.llm_api_key, + base_url=self.config.llm_api_base, + **kwargs + ) + return llm_func + + def _create_embedding_func(self): + """创建 Embedding 函数""" + async def embedding_func(texts: list[str]) -> np.ndarray: + return await openai_embedding( + texts=texts, + model=self.config.embedding_model, + api_key=self.config.llm_api_key, + base_url=self.config.llm_api_base, + ) + return embedding_func + + def remove_instance(self, session_id: str): + """移除指定 session 的 RAG 实例""" + if session_id in self._rag_instances: + del self._rag_instances[session_id] +``` + +### 3. 增强的 Memory Engine + +```python +# app/memory_engine.py(修改版本) +""" +Memory Engine - 集成 LightRAG 的记忆引擎 +""" +import asyncio +import json +import logging +from typing import List, Tuple, Dict, Any, Optional +from datetime import datetime +import redis.asyncio as redis +import numpy as np + +from app.memory_graph import MemoryGraph +from app.memory_item import MemoryItem +from app.text_processor import TextProcessor +from app.lightrag_config import LightRAGManager, QueryParam + +logger = logging.getLogger(__name__) + + +class MemoryEngine: + """记忆引擎 - 集成 LightRAG""" + + def __init__( + self, + session_id: str, + redis_host: str = "localhost", + redis_port: int = 6379, + redis_password: Optional[str] = None, + redis_db: int = 0, + use_lightrag: bool = True, # 是否启用 LightRAG + ): + self.session_id = session_id + self.redis_host = redis_host + self.redis_port = redis_port + self.redis_password = redis_password + self.redis_db = redis_db + self.use_lightrag = use_lightrag + + # 初始化组件 + self.memory_graph = MemoryGraph() # NetworkX 图谱(用于关键词扩展) + self.text_processor = TextProcessor() + + # LightRAG 管理器 + self.lightrag_manager = LightRAGManager() if use_lightrag else None + self.lightrag = None + if use_lightrag: + self.lightrag = self.lightrag_manager.get_rag_instance(session_id) + + # Redis 客户端 + self._redis_client: Optional[redis.Redis] = None + + # 短期记忆缓存 + self.short_term_memory: List[Dict[str, Any]] = [] + self.short_term_memory_size = 20 + + # 长期记忆列表 + self.long_term_memory: List[MemoryItem] = [] + + # 配置参数 + self.retrieve_top_n = 5 + self.summary_max_tags = 30 + + logger.info(f"记忆引擎已初始化: session_id={session_id}, use_lightrag={use_lightrag}") + + async def _get_redis_client(self) -> redis.Redis: + """获取 Redis 客户端(懒加载)""" + if self._redis_client is None: + self._redis_client = redis.Redis( + host=self.redis_host, + port=self.redis_port, + password=self.redis_password, + db=self.redis_db, + decode_responses=True + ) + await self._redis_client.ping() + logger.info(f"Redis 连接成功: {self.redis_host}:{self.redis_port}") + return self._redis_client + + async def add_conversation( + self, + question: str, + answer: str, + metadata: Dict[str, Any] + ) -> None: + """添加对话到记忆系统""" + try: + # 添加到短期记忆 + conversation = { + "role": "user", + "content": question, + "timestamp": datetime.now().isoformat() + } + self.short_term_memory.append(conversation) + + conversation = { + "role": "assistant", + "content": answer, + "timestamp": datetime.now().isoformat() + } + self.short_term_memory.append(conversation) + + # 如果短期记忆超出限制,转换为长期记忆 + if len(self.short_term_memory) >= self.short_term_memory_size: + await self._convert_to_long_term_memory() + + # 保存到 Redis + await self._save_to_redis() + + except Exception as e: + logger.error(f"添加对话失败: {e}", exc_info=True) + raise + + async def _convert_to_long_term_memory(self) -> None: + """ + 将短期记忆转换为长期记忆 + 核心改动:集成 LightRAG + """ + try: + # 合并对话内容 + conversation_text = "\n".join([ + f"{msg['role']}: {msg['content']}" + for msg in self.short_term_memory[-10:] # 取最近10条 + ]) + + if self.use_lightrag and self.lightrag: + # ==================== LightRAG 路径 ==================== + logger.info("使用 LightRAG 进行长期记忆转换") + + # 1. 插入文本到 LightRAG(自动提取实体、关系、生成摘要) + await self.lightrag.ainsert(conversation_text) + + # 2. 同时使用 TextProcessor 提取关键词(用于 NetworkX 图谱) + tags = await self.text_processor.extract_tags(conversation_text) + + # 3. 使用 LightRAG 生成摘要(可选,也可以用自己的) + # summary = await self._generate_summary_with_lightrag(conversation_text) + summary = conversation_text[:200] # 简化:直接截断 + + else: + # ==================== 原有路径(不使用 LightRAG)==================== + logger.info("使用原有方式进行长期记忆转换") + summary = await self.text_processor.generate_summary(conversation_text) + tags = await self.text_processor.extract_tags(conversation_text) + + # 限制标签数量 + if len(tags) > self.summary_max_tags: + tags = tags[:self.summary_max_tags] + + # 添加时间标签 + now = datetime.now() + time_tag = f"DATETIME:{now.strftime('%Y-%m-%d %H:%M:%S')}" + tags.append(time_tag) + + # 创建记忆项(用于 NetworkX 图谱) + memory_item = MemoryItem(summary=summary, tags=tags) + self.long_term_memory.append(memory_item) + + # 更新 NetworkX 知识图谱(用于关键词扩展) + self.memory_graph.add_memory(memory_item) + + # 清理部分短期记忆 + self.short_term_memory = self.short_term_memory[-10:] + + logger.info(f"长期记忆转换完成: tags={len(tags)}, summary_len={len(summary)}") + + except Exception as e: + logger.error(f"转换长期记忆失败: {e}", exc_info=True) + + async def _generate_summary_with_lightrag(self, text: str) -> str: + """使用 LightRAG 生成摘要""" + if not self.lightrag: + return text[:200] + + try: + # 使用 LightRAG 的 query 功能生成摘要 + prompt = f"请用一句话总结以下对话的核心内容:\n{text}" + result = await self.lightrag.aquery( + prompt, + param=QueryParam(mode="naive") # 使用简单模式 + ) + return result + except Exception as e: + logger.error(f"LightRAG 生成摘要失败: {e}") + return text[:200] + + async def retrieve_relevant_context( + self, + query: str, + top_k: int = 5 + ) -> Tuple[List[str], Dict[str, Any]]: + """ + 检索相关上下文 + 核心改动:优先使用 LightRAG 进行语义检索 + """ + try: + if self.use_lightrag and self.lightrag: + # ==================== LightRAG 混合检索 ==================== + logger.info(f"使用 LightRAG 进行语义检索: query={query[:50]}...") + + # 1. LightRAG 语义检索(主要方法) + lightrag_result = await self._retrieve_with_lightrag(query, top_k) + + # 2. NetworkX 关键词扩展(辅助方法) + expanded_keywords = await self._expand_keywords_with_networkx(query) + + # 3. 合并结果 + context = lightrag_result.get("context", []) + metadata = { + "method": "lightrag_hybrid", + "lightrag_result": lightrag_result, + "expanded_keywords": expanded_keywords[:10], + "retrieved_count": len(context) + } + + logger.info(f"LightRAG 检索到 {len(context)} 条相关记忆") + return context, metadata + + else: + # ==================== 原有 NetworkX 方法 ==================== + logger.info("使用 NetworkX 方法进行检索") + return await self._retrieve_with_networkx(query, top_k) + + except Exception as e: + logger.error(f"检索上下文失败: {e}", exc_info=True) + return [], {"error": str(e)} + + async def _retrieve_with_lightrag( + self, + query: str, + top_k: int + ) -> Dict[str, Any]: + """使用 LightRAG 进行检索""" + try: + # LightRAG 支持三种检索模式: + # - naive: 简单检索 + # - local: 局部上下文检索(实体级别) + # - global: 全局上下文检索(主题级别) + # - hybrid: 混合检索(推荐) + + result = await self.lightrag.aquery( + query, + param=QueryParam( + mode="hybrid", # 使用混合检索 + only_need_context=True, # 只需要上下文,不需要生成答案 + top_k=top_k, + ) + ) + + # 解析结果 + if isinstance(result, str): + # 如果返回的是字符串,说明是上下文 + context_list = [result] + elif isinstance(result, dict): + context_list = result.get("context", []) + else: + context_list = [] + + return { + "context": context_list, + "mode": "hybrid", + "top_k": top_k + } + + except Exception as e: + logger.error(f"LightRAG 检索失败: {e}", exc_info=True) + return {"context": [], "error": str(e)} + + async def _expand_keywords_with_networkx(self, query: str) -> List[str]: + """使用 NetworkX 图谱扩展关键词""" + try: + # 提取查询关键词 + query_tags = await self.text_processor.extract_tags(query) + + if not query_tags: + return [] + + # 使用 NetworkX 图谱扩展 + expanded_tags = self.memory_graph.get_related_keywords(set(query_tags)) + + return expanded_tags + + except Exception as e: + logger.error(f"关键词扩展失败: {e}") + return [] + + async def _retrieve_with_networkx( + self, + query: str, + top_k: int + ) -> Tuple[List[str], Dict[str, Any]]: + """使用原有 NetworkX 方法检索(保持向后兼容)""" + # 提取查询关键词 + query_tags = await self.text_processor.extract_tags(query) + + if not query_tags: + logger.warning("查询未提取到关键词") + return [], {"query_tags": []} + + # 使用知识图谱扩展关键词 + expanded_tags = self.memory_graph.get_related_keywords(set(query_tags)) + + # 合并查询关键词和扩展关键词 + all_tags = set(query_tags + expanded_tags[:20]) + + logger.info(f"查询关键词: {query_tags}") + logger.info(f"扩展关键词: {expanded_tags[:10]}") + + # 计算每个记忆的相关性得分 + scored_memories = [] + for memory in self.long_term_memory: + memory_tags = set(memory.tags()) + + # 计算 Jaccard 相似度 + intersection = len(memory_tags & all_tags) + union = len(memory_tags | all_tags) + jaccard = intersection / union if union > 0 else 0 + + # 时间衰减因子 + time_diff = (datetime.now() - memory.time()).total_seconds() + time_decay = np.exp(-time_diff / (7 * 24 * 3600)) # 7天半衰期 + + # 综合得分 + score = jaccard * 0.7 + time_decay * 0.3 + + if score > 0: + scored_memories.append((memory, score)) + + # 排序并取 top_k + scored_memories.sort(key=lambda x: x[1], reverse=True) + top_memories = scored_memories[:top_k] + + # 提取摘要 + context = [memory.summary() for memory, score in top_memories] + + # 构建元数据 + metadata = { + "method": "networkx", + "query_tags": query_tags, + "expanded_tags": expanded_tags[:10], + "retrieved_count": len(context), + "scores": [float(score) for _, score in top_memories] + } + + logger.info(f"NetworkX 检索到 {len(context)} 条相关记忆") + + return context, metadata + + async def get_stats(self) -> Dict[str, Any]: + """获取记忆统计信息""" + stats = { + "total_memories": len(self.long_term_memory), + "graph_nodes": self.memory_graph.get_node_count(), + "graph_edges": self.memory_graph.get_edge_count(), + "avg_degree": self.memory_graph.get_avg_degree(), + "use_lightrag": self.use_lightrag + } + + if self.use_lightrag and self.lightrag: + # 可以添加 LightRAG 相关统计 + stats["lightrag_enabled"] = True + # stats["lightrag_entities"] = ... # 如果 LightRAG 提供统计接口 + + return stats + + async def _save_to_redis(self) -> None: + """保存记忆到 Redis""" + try: + client = await self._get_redis_client() + + # 保存短期记忆 + key = f"memory:{self.session_id}:short_term" + await client.set(key, json.dumps(self.short_term_memory)) + + # 保存长期记忆 + key = f"memory:{self.session_id}:long_term" + long_term_data = [ + { + "summary": mem.summary(), + "tags": mem.tags(), + "time": mem.time().isoformat() + } + for mem in self.long_term_memory + ] + await client.set(key, json.dumps(long_term_data)) + + except Exception as e: + logger.error(f"保存到 Redis 失败: {e}", exc_info=True) + + async def clear_all(self) -> None: + """清除所有记忆""" + self.short_term_memory.clear() + self.long_term_memory.clear() + self.memory_graph.clear() + + # 清除 LightRAG 数据 + if self.use_lightrag and self.lightrag_manager: + self.lightrag_manager.remove_instance(self.session_id) + # 注意:这里不会删除磁盘上的 LightRAG 数据,需要手动清理 + + # 从 Redis 删除 + try: + client = await self._get_redis_client() + await client.delete( + f"memory:{self.session_id}:short_term", + f"memory:{self.session_id}:long_term" + ) + except Exception as e: + logger.error(f"从 Redis 删除失败: {e}", exc_info=True) + + async def close(self) -> None: + """关闭记忆引擎""" + if self._redis_client: + await self._redis_client.close() + logger.info(f"记忆引擎已关闭: session_id={self.session_id}") +``` + +### 4. 配置文件更新 + +```python +# app/config.py(更新) +""" +Configuration - 配置管理 +""" +import os +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """应用配置""" + + # Redis 配置 + redis_host: str = os.getenv("REDIS_HOST", "localhost") + redis_port: int = int(os.getenv("REDIS_PORT", "6379")) + redis_password: str = os.getenv("REDIS_PASSWORD", "") + redis_db: int = int(os.getenv("REDIS_DB", "0")) + + # 服务配置 + service_port: int = int(os.getenv("SERVICE_PORT", "8080")) + log_level: str = os.getenv("LOG_LEVEL", "INFO") + + # 记忆配置 + short_term_memory_size: int = int(os.getenv("SHORT_TERM_MEMORY_SIZE", "20")) + retrieve_top_n: int = int(os.getenv("RETRIEVE_TOP_N", "5")) + summary_max_tags: int = int(os.getenv("SUMMARY_MAX_TAGS", "30")) + + # LightRAG 配置 + use_lightrag: bool = os.getenv("USE_LIGHTRAG", "true").lower() == "true" + openai_api_key: str = os.getenv("OPENAI_API_KEY", "") + openai_api_base: str = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1") + llm_model: str = os.getenv("LLM_MODEL", "gpt-4o-mini") + embedding_model: str = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small") + lightrag_working_dir: str = os.getenv("LIGHTRAG_WORKING_DIR", "./lightrag_data") + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" +``` + +### 5. 主应用更新 + +```python +# app/main.py(部分更新) + +def get_memory_engine(session_id: str) -> MemoryEngine: + """获取或创建指定 session 的记忆引擎""" + if session_id not in memory_engines: + logger.info(f"创建新的记忆引擎: session_id={session_id}") + memory_engines[session_id] = MemoryEngine( + session_id=session_id, + redis_host=settings.redis_host, + redis_port=settings.redis_port, + redis_password=settings.redis_password, + redis_db=settings.redis_db, + use_lightrag=settings.use_lightrag, # 新增:LightRAG 开关 + ) + return memory_engines[session_id] + +``` + +## 🚀 部署配置 + +### 1. 环境变量配置 + +```bash +# .env 文件更新 +# Redis 配置 +REDIS_HOST=redis.higress-system.svc.cluster.local +REDIS_PORT=6379 +REDIS_PASSWORD= +REDIS_DB=0 + +# 服务配置 +SERVICE_PORT=8080 +LOG_LEVEL=INFO + +# 记忆配置 +SHORT_TERM_MEMORY_SIZE=20 +RETRIEVE_TOP_N=5 +SUMMARY_MAX_TAGS=30 + +# ==================== LightRAG 配置(新增)==================== +USE_LIGHTRAG=true + +# OpenAI API 配置 +OPENAI_API_KEY=sk-your-api-key +OPENAI_API_BASE=https://api.openai.com/v1 +LLM_MODEL=gpt-4o-mini +EMBEDDING_MODEL=text-embedding-3-small + +# LightRAG 工作目录 +LIGHTRAG_WORKING_DIR=/app/lightrag_data + +# 向量数据库配置(如果使用 Chroma) +# CHROMA_HOST=chroma.higress-system.svc.cluster.local +# CHROMA_PORT=8000 + +# 图数据库配置(如果使用 Neo4j) +# NEO4J_URI=bolt://neo4j.higress-system.svc.cluster.local:7687 +# NEO4J_USER=neo4j +# NEO4J_PASSWORD=password +``` + +### 2. Kubernetes 配置更新 + +```yaml +# deploy/kubernetes-lightrag.yaml +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: memory-service-config + namespace: higress-system +data: + REDIS_HOST: "redis.higress-system.svc.cluster.local" + REDIS_PORT: "6379" + REDIS_DB: "0" + SERVICE_PORT: "8080" + LOG_LEVEL: "INFO" + SHORT_TERM_MEMORY_SIZE: "20" + RETRIEVE_TOP_N: "5" + SUMMARY_MAX_TAGS: "30" + # LightRAG 配置 + USE_LIGHTRAG: "true" + OPENAI_API_BASE: "https://api.openai.com/v1" + LLM_MODEL: "gpt-4o-mini" + EMBEDDING_MODEL: "text-embedding-3-small" + LIGHTRAG_WORKING_DIR: "/app/lightrag_data" + +--- +# OpenAI API Key Secret +apiVersion: v1 +kind: Secret +metadata: + name: openai-secret + namespace: higress-system +type: Opaque +stringData: + api-key: "sk-your-openai-api-key" + +--- +# PersistentVolumeClaim for LightRAG data +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: lightrag-data-pvc + namespace: higress-system +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi # 根据需求调整 + # storageClassName: your-storage-class + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: memory-enhancement-service + namespace: higress-system + labels: + app: memory-service +spec: + replicas: 3 + selector: + matchLabels: + app: memory-service + template: + metadata: + labels: + app: memory-service + spec: + containers: + - name: memory-service + image: your-registry/memory-enhancement-service:lightrag-v1.0.0 + imagePullPolicy: Always + ports: + - containerPort: 8080 + name: http + protocol: TCP + envFrom: + - configMapRef: + name: memory-service-config + env: + - name: OPENAI_API_KEY + valueFrom: + secretKeyRef: + name: openai-secret + key: api-key + volumeMounts: + - name: lightrag-data + mountPath: /app/lightrag_data + resources: + requests: + memory: "512Mi" # LightRAG 需要更多内存 + cpu: "500m" + limits: + memory: "2Gi" # 增加内存限制 + cpu: "2000m" + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 15 + periodSeconds: 30 + timeoutSeconds: 5 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 10 + timeoutSeconds: 3 + volumes: + - name: lightrag-data + persistentVolumeClaim: + claimName: lightrag-data-pvc + +--- +apiVersion: v1 +kind: Service +metadata: + name: memory-service + namespace: higress-system + labels: + app: memory-service +spec: + type: ClusterIP + selector: + app: memory-service + ports: + - port: 8080 + targetPort: 8080 + protocol: TCP + name: http +``` + +### 3. Dockerfile 更新 + +```dockerfile +# Dockerfile(LightRAG 版本) +FROM python:3.11-slim + +WORKDIR /app + +# 安装系统依赖 +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + g++ \ + && rm -rf /var/lib/apt/lists/* + +# 复制依赖文件 +COPY requirements.txt . + +# 安装 Python 依赖 +RUN pip install --no-cache-dir -r requirements.txt + +# 复制应用代码 +COPY app/ ./app/ + +# 创建 LightRAG 数据目录 +RUN mkdir -p /app/lightrag_data && \ + chmod 777 /app/lightrag_data + +# 暴露端口 +EXPOSE 8080 + +# 健康检查 +HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ + CMD python -c "import requests; requests.get('http://localhost:8080/health')" + +# 启动应用 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"] +``` + +## 📊 性能优化 + +### 1. 资源配置建议 + +```yaml +# 根据负载调整资源 +resources: + requests: + memory: "512Mi" # 最小内存 + cpu: "500m" # 最小 CPU + limits: + memory: "2Gi" # 最大内存(LightRAG + NetworkX) + cpu: "2000m" # 最大 CPU +``` + +### 2. 缓存策略 + +```python +# 在 LightRAG 配置中启用缓存 +# app/lightrag_config.py +class LightRAGConfig(BaseModel): + + # 缓存配置 + enable_llm_cache: bool = True + cache_dir: str = "./lightrag_cache" +``` + +### 3. 批处理优化 + +```python +# 如果短期内有多个对话,可以批量转换 +async def batch_convert_to_long_term_memory(self): + """批量转换短期记忆""" + if len(self.short_term_memory) < self.short_term_memory_size: + return + + # 将所有短期记忆合并为一个文本 + full_text = "\n\n===对话分隔===\n\n".join([ + f"{msg['role']}: {msg['content']}" + for msg in self.short_term_memory + ]) + + # 一次性插入到 LightRAG + await self.lightrag.ainsert(full_text) + + # 清空短期记忆 + self.short_term_memory.clear() +``` + +## 🧪 测试验证 + +### 1. 单元测试 + +```python +# tests/test_lightrag_integration.py +import pytest +import asyncio +from app.memory_engine import MemoryEngine + + +@pytest.mark.asyncio +async def test_lightrag_integration(): + """测试 LightRAG 集成""" + engine = MemoryEngine( + session_id="test-session", + use_lightrag=True + ) + + # 添加对话 + await engine.add_conversation( + question="什么是 LightRAG?", + answer="LightRAG 是一个轻量级的 RAG 框架,支持图增强检索。", + metadata={} + ) + + # 强制转换为长期记忆 + engine.short_term_memory_size = 2 + await engine.add_conversation( + question="LightRAG 的优势是什么?", + answer="双层检索架构、图增强生成、增量更新。", + metadata={} + ) + + # 检索记忆 + context, metadata = await engine.retrieve_relevant_context( + query="介绍一下 RAG 框架", + top_k=3 + ) + + assert len(context) > 0 + assert metadata["method"] == "lightrag_hybrid" + + # 清理 + await engine.close() + + +@pytest.mark.asyncio +async def test_fallback_to_networkx(): + """测试回退到 NetworkX""" + engine = MemoryEngine( + session_id="test-session", + use_lightrag=False # 禁用 LightRAG + ) + + # ... 类似的测试逻辑 +``` + +### 2. 性能测试 + +```bash +# 使用 locust 进行压力测试 +# locustfile.py +from locust import HttpUser, task, between + + +class MemoryServiceUser(HttpUser): + wait_time = between(1, 3) + + @task(3) + def enhance_memory(self): + """测试增强记忆接口""" + self.client.post("/api/v1/memory/enhance", json={ + "session_id": "load-test", + "question": "测试问题", + "answer": "测试答案" + }) + + @task(7) + def retrieve_memory(self): + """测试检索接口""" + self.client.post("/api/v1/memory/retrieve", json={ + "session_id": "load-test", + "query": "测试查询", + "top_k": 5 + }) + + +# 运行测试 +# locust -f locustfile.py --host=http://localhost:8080 +``` + +## 🔍 故障排查 + +### 常见问题 + +#### 1. LightRAG 初始化失败 + +**症状**:服务启动时报错 `LightRAG initialization failed` + +**解决方案**: +```bash +# 检查 OpenAI API Key +kubectl get secret openai-secret -n higress-system -o jsonpath='{.data.api-key}' | base64 -d + +# 检查日志 +kubectl logs -n higress-system -l app=memory-service | grep -i "lightrag" + +# 验证 API 连接 +curl -X POST https://api.openai.com/v1/chat/completions \ + -H "Authorization: Bearer $OPENAI_API_KEY" \ + -H "Content-Type: application/json" \ + -d '{"model":"gpt-4o-mini","messages":[{"role":"user","content":"test"}]}' +``` + +#### 2. 向量检索性能差 + +**解决方案**: +- 启用向量数据库(Chroma/Qdrant)而不是本地存储 +- 调整 `top_k` 参数 +- 使用更小的 embedding 模型 + +#### 3. 内存占用过高 + +**解决方案**: +```yaml +# 限制每个 session 的数据大小 +# 在 app/lightrag_config.py 中添加 +class LightRAGConfig(BaseModel): + max_knowledge_units: int = 1000 # 最大知识单元数 + cleanup_threshold: int = 1200 # 清理阈值 +``` + +## 📈 监控指标 + +### Prometheus 指标 + +```python +# app/metrics.py +from prometheus_client import Counter, Histogram, Gauge + +# LightRAG 指标 +lightrag_insert_total = Counter( + 'lightrag_insert_total', + 'Total number of LightRAG insertions' +) + +lightrag_query_total = Counter( + 'lightrag_query_total', + 'Total number of LightRAG queries' +) + +lightrag_query_duration = Histogram( + 'lightrag_query_duration_seconds', + 'LightRAG query duration' +) + +lightrag_entities_total = Gauge( + 'lightrag_entities_total', + 'Total number of entities in LightRAG', + ['session_id'] +) +``` + +## 🎯 最佳实践 + +### 1. LLM 选择 + +| 场景 | 推荐模型 | 理由 | +|------|---------|------| +| 生产环境 | gpt-4o-mini | 性价比高,速度快 | +| 高质量要求 | gpt-4o | 效果最好 | +| 本地部署 | qwen2.5-72b | 开源,可控 | +| 低成本 | deepseek-chat | 便宜,效果不错 | + +### 2. 向量数据库选择 + +| 数据库 | 适用场景 | 优势 | +|--------|---------|------| +| Chroma | 开发/小规模 | 简单,易用 | +| Qdrant | 生产/中等规模 | 高性能,功能丰富 | +| Milvus | 大规模/企业 | 可扩展性强 | +| 本地存储 | 测试 | 无需额外部署 | + +### 3. 数据持久化策略 + +```python +# 定期备份 LightRAG 数据 +async def backup_lightrag_data(session_id: str): + """备份 LightRAG 数据到 S3/OSS""" + import tarfile + import os + + working_dir = f"./lightrag_data/{session_id}" + backup_file = f"/tmp/lightrag-{session_id}-{datetime.now().strftime('%Y%m%d')}.tar.gz" + + # 创建压缩包 + with tarfile.open(backup_file, "w:gz") as tar: + tar.add(working_dir, arcname=session_id) + + # 上传到 S3/OSS + # await upload_to_s3(backup_file) + + os.remove(backup_file) +``` + +## 🚀 生产就绪检查清单 + +- [ ] OpenAI API Key 已配置且有效 +- [ ] PersistentVolume 已创建用于数据持久化 +- [ ] 资源限制已根据负载调整 +- [ ] 监控和告警已配置 +- [ ] 备份策略已实施 +- [ ] 性能测试已完成 +- [ ] 降级策略已验证(LightRAG 不可用时回退到 NetworkX) +- [ ] 日志聚合已配置 +- [ ] 成本预算已评估(LLM API 调用成本) + +## 📚 相关文档 + +- [LightRAG GitHub](https://github.com/HKUDS/LightRAG) +- [LightRAG 论文](https://arxiv.org/abs/2410.05779) +- [OpenAI API 文档](https://platform.openai.com/docs) +- [原有 Memory Service 文档](./README.md) +- [集成指南](./INTEGRATION_GUIDE.md) + +## 🎓 总结 + +本方案提供了完整的 LightRAG 集成方案,包括: + +1. ✅ **渐进式集成**:保留 NetworkX 作为备选,LightRAG 作为主要方案 +2. ✅ **混合检索**:LightRAG 语义检索 + NetworkX 关键词扩展 +3. ✅ **生产就绪**:完整的配置、部署、监控、故障排查方案 +4. ✅ **性能优化**:缓存、批处理、资源配置建议 +5. ✅ **降级策略**:LightRAG 不可用时自动回退到 NetworkX + +通过这个方案,您可以充分利用 LightRAG 的图增强生成能力,同时保持系统的稳定性和可维护性。 diff --git a/memory-networkx-service/LIGHTRAG_QUICKSTART.md b/memory-networkx-service/LIGHTRAG_QUICKSTART.md new file mode 100644 index 0000000000..8907a04a6f --- /dev/null +++ b/memory-networkx-service/LIGHTRAG_QUICKSTART.md @@ -0,0 +1,409 @@ +# LightRAG 集成快速入门指南 + +本指南帮助您快速启动集成了 LightRAG 的 Memory Enhancement Service。 + +## 🎯 30分钟快速部署 + +### 步骤 1:环境准备(5分钟) + +```bash +# 1. 确保已安装 Python 3.11+ +python --version + +# 2. 创建虚拟环境 +cd memory-networkx-service +python -m venv venv +source venv/bin/activate # Windows: venv\Scripts\activate + +# 3. 安装依赖 +pip install -r requirements-lightrag.txt +``` + +### 步骤 2:配置 OpenAI API(5分钟) + +```bash +# 复制环境变量配置 +cp deploy/env-lightrag.example .env + +# 编辑 .env 文件,填入您的 OpenAI API Key +# 最少需要配置: +# OPENAI_API_KEY=sk-your-api-key +# USE_LIGHTRAG=true +``` + +**重要**:如果没有 OpenAI API Key,可以: +- 使用兼容的 API(如 DeepSeek、Qwen 等) +- 设置 `USE_LIGHTRAG=false` 仅使用 NetworkX 模式 + +### 步骤 3:本地启动服务(5分钟) + +```bash +# 启动服务 +python -m uvicorn app.main:app --reload --host 0.0.0.0 --port 8080 + +# 看到以下输出表示成功: +# INFO: Application startup complete. +# INFO: Uvicorn running on http://0.0.0.0:8080 +``` + +### 步骤 4:测试 LightRAG 集成(10分钟) + +#### 4.1 健康检查 + +```bash +curl http://localhost:8080/health +# 预期输出:{"status":"healthy"} +``` + +#### 4.2 添加知识 + +```bash +# 第一条对话 +curl -X POST http://localhost:8080/api/v1/memory/enhance \ + -H "Content-Type: application/json" \ + -d '{ + "session_id": "test-lightrag", + "question": "什么是 LightRAG?", + "answer": "LightRAG 是一个轻量级的 RAG 框架,它使用双层检索架构,结合了实体级别和主题级别的检索,能够实现更准确的知识检索和图增强生成。" + }' + +# 第二条对话(继续积累) +curl -X POST http://localhost:8080/api/v1/memory/enhance \ + -H "Content-Type: application/json" \ + -d '{ + "session_id": "test-lightrag", + "question": "LightRAG 的优势是什么?", + "answer": "LightRAG 的主要优势包括:1. 双层检索架构提供更精确的检索;2. 图增强生成支持多跳推理;3. 增量更新适合对话场景;4. 高性能的向量检索和图算法。" + }' + +# 继续添加直到触发长期记忆转换(默认需要 20 条对话) +# 或者修改 SHORT_TERM_MEMORY_SIZE=4 加快测试 +``` + +#### 4.3 检索知识 + +```bash +# 语义检索测试 +curl -X POST http://localhost:8080/api/v1/memory/retrieve \ + -H "Content-Type: application/json" \ + -d '{ + "session_id": "test-lightrag", + "query": "介绍一下 RAG 框架的特点", + "top_k": 3 + }' + +# 预期输出(使用 LightRAG): +# { +# "status": "success", +# "context": [ +# "...关于 LightRAG 的相关内容...", +# "...双层检索架构...", +# "...图增强生成..." +# ], +# "metadata": { +# "method": "lightrag_hybrid", +# "retrieved_count": 3 +# } +# } +``` + +#### 4.4 查看统计信息 + +```bash +curl http://localhost:8080/api/v1/memory/stats/test-lightrag + +# 预期输出: +# { +# "session_id": "test-lightrag", +# "total_memories": 2, +# "graph_nodes": 15, +# "graph_edges": 30, +# "avg_degree": 4.0, +# "use_lightrag": true, +# "lightrag_enabled": true +# } +``` + +### 步骤 5:验证 LightRAG 数据(5分钟) + +```bash +# 查看 LightRAG 生成的数据 +ls -lh lightrag_data/test-lightrag/ + +# 应该看到: +# - vdb/ (向量数据库) +# - graph_chunk_entity_relation.graphml (知识图谱) +# - full_docs.json (原始文档) +# - text_chunks.json (文本块) +# - entities.json (实体) +# - relationships.json (关系) +``` + +## 🐳 Docker 部署(生产环境) + +### 快速启动 + +```bash +# 1. 构建镜像 +docker build -t memory-service-lightrag:latest \ + -f Dockerfile . + +# 2. 运行容器 +docker run -d \ + --name memory-service \ + -p 8080:8080 \ + -e OPENAI_API_KEY=sk-your-key \ + -e USE_LIGHTRAG=true \ + -v $(pwd)/lightrag_data:/app/lightrag_data \ + memory-service-lightrag:latest + +# 3. 查看日志 +docker logs -f memory-service + +# 4. 测试 +curl http://localhost:8080/health +``` + +## ☸️ Kubernetes 部署 + +```bash +# 1. 创建 Secret(OpenAI API Key) +kubectl create secret generic openai-secret \ + --from-literal=api-key=sk-your-key \ + -n higress-system + +# 2. 部署服务 +kubectl apply -f deploy/kubernetes-lightrag.yaml + +# 3. 检查状态 +kubectl get pods -n higress-system -l app=memory-service +kubectl logs -n higress-system -l app=memory-service --tail=50 + +# 4. 端口转发测试 +kubectl port-forward -n higress-system svc/memory-service 8080:8080 + +# 5. 测试 +curl http://localhost:8080/health +``` + +## 🔧 配置选项 + +### 基础配置(最小化) + +```bash +# .env +OPENAI_API_KEY=sk-your-key +USE_LIGHTRAG=true +``` + +### 推荐配置(生产环境) + +```bash +# .env +# OpenAI 配置 +OPENAI_API_KEY=sk-your-key +OPENAI_API_BASE=https://api.openai.com/v1 +LLM_MODEL=gpt-4o-mini # 性价比高 +EMBEDDING_MODEL=text-embedding-3-small + +# LightRAG 配置 +USE_LIGHTRAG=true +LIGHTRAG_WORKING_DIR=/data/lightrag +ENABLE_LLM_CACHE=true + +# 记忆配置 +SHORT_TERM_MEMORY_SIZE=20 # 20条对话转长期记忆 +RETRIEVE_TOP_N=5 + +# Redis 配置 +REDIS_HOST=redis +REDIS_PORT=6379 +``` + +### 高性能配置 + +```bash +# 使用更强大的模型 +LLM_MODEL=gpt-4o +EMBEDDING_MODEL=text-embedding-3-large + +# 调整并发 +MAX_ASYNC_REQUESTS=8 + +# 启用向量数据库(Qdrant) +QDRANT_HOST=qdrant +QDRANT_PORT=6333 +``` + +## 📊 验证 LightRAG 效果 + +### 对比测试 + +```python +# test_comparison.py +import asyncio +import requests + +async def test_comparison(): + # 1. 先禁用 LightRAG 测试 + response = requests.post( + "http://localhost:8080/api/v1/memory/retrieve", + json={ + "session_id": "test-session", + "query": "如何实现图增强检索?", + "top_k": 3 + } + ) + networkx_result = response.json() + print("NetworkX 结果:", networkx_result["metadata"]) + + # 2. 启用 LightRAG 测试 + # 在 .env 中设置 USE_LIGHTRAG=true 并重启服务 + response = requests.post( + "http://localhost:8080/api/v1/memory/retrieve", + json={ + "session_id": "test-session", + "query": "如何实现图增强检索?", + "top_k": 3 + } + ) + lightrag_result = response.json() + print("LightRAG 结果:", lightrag_result["metadata"]) + +asyncio.run(test_comparison()) +``` + +### 查看 LightRAG 生成的知识图谱 + +```python +# view_graph.py +import json +import networkx as nx +import matplotlib.pyplot as plt + +# 读取 LightRAG 生成的图谱 +G = nx.read_graphml("lightrag_data/test-session/graph_chunk_entity_relation.graphml") + +# 打印统计 +print(f"节点数: {G.number_of_nodes()}") +print(f"边数: {G.number_of_edges()}") + +# 可视化(小图) +if G.number_of_nodes() < 100: + plt.figure(figsize=(12, 8)) + nx.draw(G, with_labels=True, node_color='lightblue', + node_size=500, font_size=8, arrows=True) + plt.savefig("knowledge_graph.png") + print("图谱已保存到 knowledge_graph.png") +``` + +## 🚨 常见问题 + +### Q1: LightRAG 初始化失败 + +**错误**:`LightRAG initialization failed` + +**解决**: +```bash +# 检查 API Key +echo $OPENAI_API_KEY + +# 测试 API 连接 +curl -X POST https://api.openai.com/v1/chat/completions \ + -H "Authorization: Bearer $OPENAI_API_KEY" \ + -H "Content-Type: application/json" \ + -d '{"model":"gpt-4o-mini","messages":[{"role":"user","content":"hi"}]}' +``` + +### Q2: 记忆检索没有结果 + +**原因**:短期记忆还未转换为长期记忆 + +**解决**: +```bash +# 方法1:降低转换阈值 +export SHORT_TERM_MEMORY_SIZE=4 + +# 方法2:添加更多对话直到达到阈值(默认20条) +``` + +### Q3: 成本过高 + +**解决**: +```bash +# 使用更便宜的模型 +export LLM_MODEL=gpt-4o-mini # OpenAI 最便宜 +# 或 +export OPENAI_API_BASE=https://api.deepseek.com +export LLM_MODEL=deepseek-chat # 更便宜 + +# 启用缓存减少重复调用 +export ENABLE_LLM_CACHE=true +``` + +### Q4: 想回退到 NetworkX 模式 + +```bash +# 设置环境变量 +export USE_LIGHTRAG=false + +# 重启服务 +# 系统会自动使用 NetworkX 进行检索 +``` + +## 📈 性能基准 + +基于 gpt-4o-mini 模型的性能参考: + +| 操作 | 延迟 | 成本(每次) | 说明 | +|------|------|-------------|------| +| 添加对话 | 50-100ms | $0 | 仅保存到短期记忆 | +| 转换长期记忆 | 2-5s | $0.001-0.003 | 提取实体+生成向量 | +| 语义检索 | 100-300ms | $0 | 纯向量检索 | +| 混合检索 | 200-500ms | $0.0005 | 向量+LLM生成 | + +**优化建议**: +- 批量转换:将多条对话合并后一次性转换 +- 缓存开启:减少重复的 LLM 调用 +- 向量数据库:使用专业向量数据库提升检索速度 + +## 🎯 下一步 + +1. **集成到 ai-history 插件** + - 参考 [INTEGRATION_GUIDE.md](./INTEGRATION_GUIDE.md) + - 配置 ai-history 插件调用 Memory Service + +2. **性能调优** + - 阅读 [LIGHTRAG_INTEGRATION.md](./LIGHTRAG_INTEGRATION.md) 的性能优化章节 + - 根据负载调整资源配置 + +3. **生产部署** + - 配置持久化存储(PV/PVC) + - 设置监控和告警 + - 实施备份策略 + +4. **进阶功能** + - 使用 Neo4j 作为图数据库 + - 集成 Qdrant 作为向量数据库 + - 添加自定义实体提取规则 + +## 📚 相关文档 + +- [完整集成方案](./LIGHTRAG_INTEGRATION.md) +- [集成指南](./INTEGRATION_GUIDE.md) +- [基础 README](./README.md) +- [LightRAG 官方文档](https://github.com/HKUDS/LightRAG) + +--- + +**提示**:如果您在部署过程中遇到问题,请查看日志: +```bash +# Docker +docker logs -f memory-service + +# Kubernetes +kubectl logs -f -n higress-system -l app=memory-service + +# 本地 +# 日志会输出到终端 +``` diff --git a/memory-networkx-service/README.md b/memory-networkx-service/README.md new file mode 100644 index 0000000000..afff4e8e54 --- /dev/null +++ b/memory-networkx-service/README.md @@ -0,0 +1,303 @@ +# Memory Enhancement Service + +基于 NetworkX 的 AI 对话记忆增强服务,用于与 Higress ai-history 插件集成。 + +## 功能特性 + +- **知识图谱构建**:使用 NetworkX 构建对话关键词的知识图谱 +- **记忆扩散算法**:基于海马体特性的记忆检索算法 +- **动态权重管理**:自动调整记忆权重和图结构 +- **Redis 持久化**:支持将记忆数据持久化到 Redis +- **RESTful API**:提供简单的 HTTP API 接口 + +## 架构说明 + +``` +AI Client → Higress Gateway → ai-history Plugin (WASM Go) + ↓ + Memory Enhancement Service (Python + NetworkX) + ↓ + Redis +``` + +### 工作流程 + +1. **请求阶段**:ai-history 插件从 Memory Service 检索相关记忆并注入到请求中 +2. **响应阶段**:ai-history 插件将新的对话异步发送到 Memory Service +3. **记忆处理**:Memory Service 提取关键词、更新知识图谱、存储记忆 + +## 快速开始 + +### 1. 本地开发 + +```bash +# 安装依赖 +cd memory-networkx-service +pip install -r requirements.txt + +# 启动服务 +python -m uvicorn app.main:app --reload --host 0.0.0.0 --port 8080 +``` + +### 2. Docker 部署 + +```bash +# 构建镜像 +docker build -t memory-enhancement-service:latest . + +# 运行容器 +docker run -d \ + -p 8080:8080 \ + -e REDIS_HOST=redis \ + -e REDIS_PORT=6379 \ + --name memory-service \ + memory-enhancement-service:latest +``` + +### 3. Kubernetes 部署 + +```bash +# 部署服务 +kubectl apply -f deploy/kubernetes.yaml + +# 检查部署状态 +kubectl get pods -n higress-system -l app=memory-service +kubectl logs -n higress-system -l app=memory-service +``` + +## API 接口 + +### 增强记忆 + +**POST** `/api/v1/memory/enhance` + +添加对话到记忆系统。 + +**请求体:** +```json +{ + "session_id": "user-123", + "question": "什么是 NetworkX?", + "answer": "NetworkX 是一个用于创建、操作和研究复杂网络的 Python 库。" +} +``` + +**响应:** +```json +{ + "status": "success", + "message": "Memory enhanced successfully", + "session_id": "user-123" +} +``` + +### 检索记忆 + +**POST** `/api/v1/memory/retrieve` + +检索相关记忆上下文。 + +**请求体:** +```json +{ + "session_id": "user-123", + "query": "如何使用图数据库?", + "top_k": 5 +} +``` + +**响应:** +```json +{ + "status": "success", + "context": [ + "讨论了 NetworkX 的基本用法...", + "介绍了图数据结构的特点..." + ], + "metadata": { + "query_tags": ["图数据库", "使用"], + "expanded_tags": ["networkx", "数据结构", "算法"], + "retrieved_count": 2, + "scores": [0.85, 0.72] + }, + "session_id": "user-123" +} +``` + +### 获取统计信息 + +**GET** `/api/v1/memory/stats/{session_id}` + +获取指定会话的记忆统计信息。 + +**响应:** +```json +{ + "session_id": "user-123", + "total_memories": 15, + "graph_nodes": 120, + "graph_edges": 450, + "avg_degree": 7.5 +} +``` + +### 删除记忆 + +**DELETE** `/api/v1/memory/{session_id}` + +删除指定会话的所有记忆。 + +**响应:** +```json +{ + "status": "success", + "message": "Memory for session user-123 deleted", + "session_id": "user-123" +} +``` + +## ai-history 插件配置 + +在原有 ai-history 插件配置基础上,添加 Memory Service 配置: + +```yaml +apiVersion: extensions.higress.io/v1alpha1 +kind: WasmPlugin +metadata: + name: ai-history + namespace: higress-system +spec: + defaultConfig: + redis: + serviceName: redis.higress-system.svc.cluster.local + servicePort: 6379 + timeout: 2000 + # Memory Enhancement Service 配置 + memoryService: + enabled: true # 启用记忆增强功能 + serviceName: memory-service.higress-system.svc.cluster.local + servicePort: 8080 + timeout: 5000 # 超时时间(毫秒) + topK: 5 # 检索记忆数量 +``` + +## 环境变量配置 + +| 变量名 | 说明 | 默认值 | +|--------|------|--------| +| `REDIS_HOST` | Redis 主机地址 | `localhost` | +| `REDIS_PORT` | Redis 端口 | `6379` | +| `REDIS_PASSWORD` | Redis 密码 | ` ` | +| `REDIS_DB` | Redis 数据库编号 | `0` | +| `SERVICE_PORT` | 服务端口 | `8080` | +| `LOG_LEVEL` | 日志级别 | `INFO` | +| `SHORT_TERM_MEMORY_SIZE` | 短期记忆大小 | `20` | +| `RETRIEVE_TOP_N` | 默认检索数量 | `5` | +| `SUMMARY_MAX_TAGS` | 最大标签数量 | `30` | + +## 技术栈 + +- **FastAPI**:高性能 Web 框架 +- **NetworkX**:图论和复杂网络分析库 +- **Redis**:内存数据库,用于持久化 +- **Pydantic**:数据验证和配置管理 +- **Uvicorn**:ASGI 服务器 + +## 核心算法 + +### 记忆图谱构建 + +基于共现关系构建关键词的知识图谱: +- 使用 PMI(点互信息)计算边权重 +- 动态调整噪声阈值过滤低权重边 +- 限制每个节点的最大边数避免过拟合 + +### 记忆检索 + +基于海马体特性的记忆扩散算法: +1. 从查询关键词开始扩散 +2. 根据边权重和路径深度动态衰减 +3. 过滤低相关性节点 +4. 返回得分最高的记忆 + +### 相关性计算 + +综合考虑: +- Jaccard 相似度(70%) +- 时间衰减因子(30%) + +## 性能优化 + +- **异步处理**:Memory Service 调用不阻塞主请求流 +- **缓存机制**:使用 LRU 缓存避免重复计算 +- **批处理**:支持批量更新记忆 +- **图剪枝**:定期清理低权重边和孤立节点 + +## 故障排查 + +### 查看日志 + +```bash +# Kubernetes +kubectl logs -n higress-system -l app=memory-service --tail=100 -f + +# Docker +docker logs -f memory-service +``` + +### 常见问题 + +**Q: Memory Service 连接失败?** + +A: 检查以下配置: +1. 服务名称是否正确(包括命名空间) +2. 端口是否匹配 +3. 网络策略是否允许访问 + +**Q: 记忆检索没有结果?** + +A: 可能原因: +1. 短期记忆还未转换为长期记忆(需要积累一定对话数量) +2. 查询关键词与记忆标签匹配度低 +3. 图结构还未建立(刚启动服务) + +**Q: 性能较慢?** + +A: 优化建议: +1. 调整 `SHORT_TERM_MEMORY_SIZE` 减少转换频率 +2. 降低 `RETRIEVE_TOP_N` 减少检索数量 +3. 增加服务副本数(Kubernetes 中调整 replicas) + +## 开发指南 + +### 项目结构 + +``` +memory-networkx-service/ +├── app/ +│ ├── main.py # FastAPI 应用入口 +│ ├── config.py # 配置管理 +│ ├── memory_engine.py # 记忆引擎核心 +│ ├── memory_graph.py # NetworkX 图结构 +│ ├── memory_item.py # 记忆项数据结构 +│ └── text_processor.py # 文本处理 +├── deploy/ +│ └── kubernetes.yaml # K8s 部署配置 +├── Dockerfile # Docker 镜像 +├── requirements.txt # Python 依赖 +└── README.md # 本文档 +``` + +### 扩展功能 + +可以通过替换 `TextProcessor` 类来集成 LLM API 或专业的中文分词工具(如 jieba)以提升关键词提取质量。 + +## License + +本项目基于 memory-networkx 改造,遵循原项目的许可协议。 + +## 致谢 + +- 基于 [Higress](https://github.com/alibaba/higress) ai-history 插件 +- 使用 [NetworkX](https://networkx.org/) 图论库 +- 参考 Waifu 项目的记忆系统设计 diff --git a/memory-networkx-service/app/__init__.py b/memory-networkx-service/app/__init__.py new file mode 100644 index 0000000000..8a493c79ad --- /dev/null +++ b/memory-networkx-service/app/__init__.py @@ -0,0 +1,4 @@ +""" +Memory Enhancement Service Application +""" +__version__ = "1.0.0" diff --git a/memory-networkx-service/app/config.py b/memory-networkx-service/app/config.py new file mode 100644 index 0000000000..a68893029c --- /dev/null +++ b/memory-networkx-service/app/config.py @@ -0,0 +1,28 @@ +""" +Configuration - 配置管理 +""" +import os +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """应用配置""" + + # Redis 配置 + redis_host: str = os.getenv("REDIS_HOST", "localhost") + redis_port: int = int(os.getenv("REDIS_PORT", "6379")) + redis_password: str = os.getenv("REDIS_PASSWORD", "") + redis_db: int = int(os.getenv("REDIS_DB", "0")) + + # 服务配置 + service_port: int = int(os.getenv("SERVICE_PORT", "8080")) + log_level: str = os.getenv("LOG_LEVEL", "INFO") + + # 记忆配置 + short_term_memory_size: int = int(os.getenv("SHORT_TERM_MEMORY_SIZE", "20")) + retrieve_top_n: int = int(os.getenv("RETRIEVE_TOP_N", "5")) + summary_max_tags: int = int(os.getenv("SUMMARY_MAX_TAGS", "30")) + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" diff --git a/memory-networkx-service/app/lightrag_config.py b/memory-networkx-service/app/lightrag_config.py new file mode 100644 index 0000000000..b66f935ec7 --- /dev/null +++ b/memory-networkx-service/app/lightrag_config.py @@ -0,0 +1,182 @@ +""" +LightRAG 配置和初始化模块 +""" +from typing import Optional, Callable +from pydantic import BaseModel +import os +import logging + +try: + from lightrag import LightRAG, QueryParam + from lightrag.llm import openai_complete_if_cache, openai_embedding + LIGHTRAG_AVAILABLE = True +except ImportError: + LIGHTRAG_AVAILABLE = False + LightRAG = None + QueryParam = None + +logger = logging.getLogger(__name__) + + +class LightRAGConfig(BaseModel): + """LightRAG 配置""" + # LLM 配置 + llm_model: str = "gpt-4o-mini" + llm_api_base: str = "https://api.openai.com/v1" + llm_api_key: str = "" + + # Embedding 配置 + embedding_model: str = "text-embedding-3-small" + embedding_dim: int = 1536 + + # 存储配置 + working_dir: str = "./lightrag_data" + vector_storage: str = "local" # local, chroma, qdrant, weaviate + graph_storage: str = "networkx" # networkx, neo4j + + # 检索配置 + top_k: int = 10 + max_token_for_text_unit: int = 4000 + max_token_for_global_context: int = 4000 + max_token_for_local_context: int = 4000 + + # 缓存配置 + enable_llm_cache: bool = True + cache_dir: str = "./lightrag_cache" + + +class LightRAGManager: + """LightRAG 管理器 - 单例模式""" + + _instance = None + _rag_instances = {} # session_id -> LightRAG 实例 + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, 'initialized'): + if not LIGHTRAG_AVAILABLE: + logger.warning("LightRAG 库未安装,将使用降级模式") + self.config = None + self.initialized = True + return + + self.config = LightRAGConfig( + llm_api_key=os.getenv("OPENAI_API_KEY", ""), + llm_api_base=os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1"), + llm_model=os.getenv("LLM_MODEL", "gpt-4o-mini"), + embedding_model=os.getenv("EMBEDDING_MODEL", "text-embedding-3-small"), + working_dir=os.getenv("LIGHTRAG_WORKING_DIR", "./lightrag_data"), + cache_dir=os.getenv("LIGHTRAG_CACHE_DIR", "./lightrag_cache"), + ) + + # 创建必要的目录 + os.makedirs(self.config.working_dir, exist_ok=True) + os.makedirs(self.config.cache_dir, exist_ok=True) + + self.initialized = True + logger.info("LightRAG 管理器已初始化") + + def is_available(self) -> bool: + """检查 LightRAG 是否可用""" + return LIGHTRAG_AVAILABLE and self.config is not None + + def get_rag_instance(self, session_id: str): + """获取或创建指定 session 的 LightRAG 实例""" + if not self.is_available(): + logger.warning("LightRAG 不可用,返回 None") + return None + + if session_id not in self._rag_instances: + working_dir = os.path.join(self.config.working_dir, session_id) + os.makedirs(working_dir, exist_ok=True) + + try: + self._rag_instances[session_id] = LightRAG( + working_dir=working_dir, + llm_model_func=self._create_llm_func(), + embedding_func=self._create_embedding_func(), + # 可以添加更多配置 + # max_async=4, + # max_tokens=self.config.max_token_for_text_unit, + ) + logger.info(f"创建 LightRAG 实例: session_id={session_id}") + except Exception as e: + logger.error(f"创建 LightRAG 实例失败: {e}", exc_info=True) + return None + + return self._rag_instances[session_id] + + def _create_llm_func(self) -> Callable: + """创建 LLM 函数""" + async def llm_func( + prompt: str, + system_prompt: Optional[str] = None, + history_messages: list = None, + **kwargs + ) -> str: + """LLM 调用函数""" + if history_messages is None: + history_messages = [] + + try: + result = await openai_complete_if_cache( + model=self.config.llm_model, + prompt=prompt, + system_prompt=system_prompt, + history_messages=history_messages, + api_key=self.config.llm_api_key, + base_url=self.config.llm_api_base, + **kwargs + ) + return result + except Exception as e: + logger.error(f"LLM 调用失败: {e}", exc_info=True) + raise + + return llm_func + + def _create_embedding_func(self) -> Callable: + """创建 Embedding 函数""" + async def embedding_func(texts: list[str]): + """Embedding 生成函数""" + try: + import numpy as np + + result = await openai_embedding( + texts=texts, + model=self.config.embedding_model, + api_key=self.config.llm_api_key, + base_url=self.config.llm_api_base, + ) + + # 确保返回正确的格式 + if isinstance(result, list): + return np.array(result) + return result + except Exception as e: + logger.error(f"Embedding 生成失败: {e}", exc_info=True) + raise + + return embedding_func + + def remove_instance(self, session_id: str): + """移除指定 session 的 RAG 实例""" + if session_id in self._rag_instances: + try: + # LightRAG 实例可能需要清理资源 + del self._rag_instances[session_id] + logger.info(f"移除 LightRAG 实例: session_id={session_id}") + except Exception as e: + logger.error(f"移除 LightRAG 实例失败: {e}") + + def get_stats(self) -> dict: + """获取统计信息""" + return { + "total_instances": len(self._rag_instances), + "active_sessions": list(self._rag_instances.keys()), + "lightrag_available": self.is_available(), + } diff --git a/memory-networkx-service/app/main.py b/memory-networkx-service/app/main.py new file mode 100644 index 0000000000..e7da576959 --- /dev/null +++ b/memory-networkx-service/app/main.py @@ -0,0 +1,274 @@ +""" +Memory Enhancement Service - 基于 NetworkX 的增强记忆服务 +用于与 Higress ai-history 插件集成 +""" +import asyncio +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from typing import List, Optional, Dict, Any +import logging +from datetime import datetime + +from app.memory_engine import MemoryEngine +from app.config import Settings + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# 创建 FastAPI 应用 +app = FastAPI( + title="Memory Enhancement Service", + description="基于 NetworkX 的 AI 对话记忆增强服务", + version="1.0.0" +) + +# CORS 配置 +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# 加载配置 +settings = Settings() + +# 全局记忆引擎实例(按 session_id 分组) +memory_engines: Dict[str, MemoryEngine] = {} + + +# ========== 请求/响应模型 ========== + +class MemoryEnhanceRequest(BaseModel): + """增强记忆请求""" + session_id: str + question: str + answer: str + metadata: Optional[Dict[str, Any]] = None + + +class MemoryRetrieveRequest(BaseModel): + """检索记忆请求""" + session_id: str + query: str + top_k: Optional[int] = 5 + + +class MemoryEnhanceResponse(BaseModel): + """增强记忆响应""" + status: str + message: str + session_id: str + + +class MemoryRetrieveResponse(BaseModel): + """检索记忆响应""" + status: str + context: List[str] + metadata: Dict[str, Any] + session_id: str + + +class MemoryStatsResponse(BaseModel): + """记忆统计响应""" + session_id: str + total_memories: int + graph_nodes: int + graph_edges: int + avg_degree: float + + +# ========== 辅助函数 ========== + +def get_memory_engine(session_id: str) -> MemoryEngine: + """获取或创建指定 session 的记忆引擎""" + if session_id not in memory_engines: + logger.info(f"创建新的记忆引擎: session_id={session_id}") + memory_engines[session_id] = MemoryEngine( + session_id=session_id, + redis_host=settings.redis_host, + redis_port=settings.redis_port, + redis_password=settings.redis_password, + redis_db=settings.redis_db + ) + return memory_engines[session_id] + + +# ========== API 端点 ========== + +@app.get("/") +async def root(): + """健康检查""" + return { + "service": "Memory Enhancement Service", + "status": "running", + "version": "1.0.0", + "timestamp": datetime.now().isoformat() + } + + +@app.get("/health") +async def health_check(): + """健康检查端点""" + return {"status": "healthy"} + + +@app.post("/api/v1/memory/enhance", response_model=MemoryEnhanceResponse) +async def enhance_memory(request: MemoryEnhanceRequest): + """ + 接收对话,更新知识图谱 + + Args: + request: 包含 session_id, question, answer 的请求 + + Returns: + 增强记忆的响应 + """ + try: + logger.info(f"收到增强记忆请求: session_id={request.session_id}") + + # 获取记忆引擎 + engine = get_memory_engine(request.session_id) + + # 添加对话到记忆系统 + await engine.add_conversation( + question=request.question, + answer=request.answer, + metadata=request.metadata or {} + ) + + logger.info(f"成功添加记忆: session_id={request.session_id}") + + return MemoryEnhanceResponse( + status="success", + message="Memory enhanced successfully", + session_id=request.session_id + ) + + except Exception as e: + logger.error(f"增强记忆失败: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/api/v1/memory/retrieve", response_model=MemoryRetrieveResponse) +async def retrieve_context(request: MemoryRetrieveRequest): + """ + 检索增强上下文 + + Args: + request: 包含 session_id, query, top_k 的请求 + + Returns: + 检索到的相关记忆上下文 + """ + try: + logger.info(f"收到检索请求: session_id={request.session_id}, query={request.query[:50]}...") + + # 获取记忆引擎 + engine = get_memory_engine(request.session_id) + + # 检索相关上下文 + context, metadata = await engine.retrieve_relevant_context( + query=request.query, + top_k=request.top_k or 5 + ) + + logger.info(f"成功检索 {len(context)} 条记忆") + + return MemoryRetrieveResponse( + status="success", + context=context, + metadata=metadata, + session_id=request.session_id + ) + + except Exception as e: + logger.error(f"检索记忆失败: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.get("/api/v1/memory/stats/{session_id}", response_model=MemoryStatsResponse) +async def get_memory_stats(session_id: str): + """ + 获取记忆统计信息 + + Args: + session_id: 会话 ID + + Returns: + 记忆统计信息 + """ + try: + engine = get_memory_engine(session_id) + stats = await engine.get_stats() + + return MemoryStatsResponse( + session_id=session_id, + **stats + ) + + except Exception as e: + logger.error(f"获取统计信息失败: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.delete("/api/v1/memory/{session_id}") +async def delete_memory(session_id: str): + """ + 删除指定会话的所有记忆 + + Args: + session_id: 会话 ID + + Returns: + 删除结果 + """ + try: + if session_id in memory_engines: + await memory_engines[session_id].clear_all() + del memory_engines[session_id] + logger.info(f"已删除会话记忆: session_id={session_id}") + + return { + "status": "success", + "message": f"Memory for session {session_id} deleted", + "session_id": session_id + } + + except Exception as e: + logger.error(f"删除记忆失败: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.on_event("startup") +async def startup_event(): + """应用启动事件""" + logger.info("Memory Enhancement Service 启动中...") + logger.info(f"Redis 配置: {settings.redis_host}:{settings.redis_port}") + + +@app.on_event("shutdown") +async def shutdown_event(): + """应用关闭事件""" + logger.info("Memory Enhancement Service 关闭中...") + # 清理所有记忆引擎 + for session_id, engine in memory_engines.items(): + await engine.close() + logger.info("所有记忆引擎已关闭") + + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + "main:app", + host="0.0.0.0", + port=8080, + reload=True, + log_level="info" + ) diff --git a/memory-networkx-service/app/memory_engine.py b/memory-networkx-service/app/memory_engine.py new file mode 100644 index 0000000000..f180b3ce65 --- /dev/null +++ b/memory-networkx-service/app/memory_engine.py @@ -0,0 +1,341 @@ +""" +Memory Engine - 记忆引擎核心实现 +""" +import asyncio +import json +import logging +from typing import List, Tuple, Dict, Any, Optional +from datetime import datetime +import redis.asyncio as redis +import numpy as np + +from app.memory_graph import MemoryGraph +from app.memory_item import MemoryItem +from app.text_processor import TextProcessor + +logger = logging.getLogger(__name__) + + +class MemoryEngine: + """记忆引擎 - 管理单个会话的记忆""" + + def __init__( + self, + session_id: str, + redis_host: str = "localhost", + redis_port: int = 6379, + redis_password: Optional[str] = None, + redis_db: int = 0 + ): + self.session_id = session_id + self.redis_host = redis_host + self.redis_port = redis_port + self.redis_password = redis_password + self.redis_db = redis_db + + # 初始化组件 + self.memory_graph = MemoryGraph() + self.text_processor = TextProcessor() + + # Redis 客户端(延迟初始化) + self._redis_client: Optional[redis.Redis] = None + + # 短期记忆缓存 + self.short_term_memory: List[Dict[str, Any]] = [] + self.short_term_memory_size = 20 + + # 长期记忆列表 + self.long_term_memory: List[MemoryItem] = [] + + # 配置参数 + self.retrieve_top_n = 5 + self.summary_max_tags = 30 + + logger.info(f"记忆引擎已初始化: session_id={session_id}") + + async def _get_redis_client(self) -> redis.Redis: + """获取 Redis 客户端(懒加载)""" + if self._redis_client is None: + self._redis_client = redis.Redis( + host=self.redis_host, + port=self.redis_port, + password=self.redis_password, + db=self.redis_db, + decode_responses=True + ) + # 测试连接 + await self._redis_client.ping() + logger.info(f"Redis 连接成功: {self.redis_host}:{self.redis_port}") + return self._redis_client + + async def add_conversation( + self, + question: str, + answer: str, + metadata: Dict[str, Any] + ) -> None: + """ + 添加对话到记忆系统 + + Args: + question: 用户问题 + answer: AI 回答 + metadata: 元数据 + """ + try: + # 添加到短期记忆 + conversation = { + "role": "user", + "content": question, + "timestamp": datetime.now().isoformat() + } + self.short_term_memory.append(conversation) + + conversation = { + "role": "assistant", + "content": answer, + "timestamp": datetime.now().isoformat() + } + self.short_term_memory.append(conversation) + + # 如果短期记忆超出限制,转换为长期记忆 + if len(self.short_term_memory) >= self.short_term_memory_size: + await self._convert_to_long_term_memory() + + # 保存到 Redis + await self._save_to_redis() + + except Exception as e: + logger.error(f"添加对话失败: {e}", exc_info=True) + raise + + async def _tag_conversations( + self, + conversations: List[Dict[str, Any]], + summary_flag: bool + ) -> Tuple[str, List[str]]: + """ + 标记对话 - 生成摘要和标签 + + 这是原始 memory-networkx 的核心方法,用于: + 1. summary_flag=True: 生成完整摘要(转换长期记忆时) + 2. summary_flag=False: 快速提取标签(对话提取时) + + Args: + conversations: 对话列表 + summary_flag: 是否生成完整摘要 + + Returns: + (摘要, 标签列表) + """ + try: + if summary_flag: + # 完整模式:生成摘要和标签 + conversation_text = "\n".join([ + f"{msg['role']}: {msg['content']}" + for msg in conversations[-10:] # 取最近10条 + ]) + summary = await self.text_processor.generate_summary(conversation_text) + tags = await self.text_processor.extract_tags(conversation_text) + else: + # 快速模式:仅提取标签 + conversation_text = "\n".join([ + msg['content'] + for msg in conversations[-5:] # 取最近5条 + ]) + summary = conversation_text[:100] # 简单摘要 + tags = await self.text_processor.extract_tags(conversation_text) + + logger.debug(f"标记对话完成: summary_len={len(summary)}, tags_count={len(tags)}") + return summary, tags + + except Exception as e: + logger.error(f"标记对话失败: {e}", exc_info=True) + return "", [] + + async def _convert_to_long_term_memory(self) -> None: + """将短期记忆转换为长期记忆""" + try: + # 使用 _tag_conversations 生成摘要和标签 + summary, tags = await self._tag_conversations( + self.short_term_memory, + summary_flag=True + ) + + # 添加时间标签 + tags.extend(self._generate_time_tags()) + tags = list(set(tags)) # 去重 + + # 限制标签数量 + if len(tags) > self.summary_max_tags: + tags = tags[:self.summary_max_tags] + + # 添加时间标签 + now = datetime.now() + time_tag = f"DATETIME:{now.strftime('%Y-%m-%d %H:%M:%S')}" + tags.append(time_tag) + + # 创建记忆项 + memory_item = MemoryItem(summary=summary, tags=tags) + self.long_term_memory.append(memory_item) + + # 更新知识图谱 + self.memory_graph.add_memory(memory_item) + + # 清理部分短期记忆 + self.short_term_memory = self.short_term_memory[-10:] + + logger.info(f"已转换长期记忆: tags={len(tags)}, summary_len={len(summary)}") + + except Exception as e: + logger.error(f"转换长期记忆失败: {e}", exc_info=True) + + def _generate_time_tags(self) -> List[str]: + """ + 生成时间标签 + + Returns: + 时间标签列表 ["2024年", "1月", "15日", "上午"] + """ + now = datetime.now() + + period = "上午" + if now.hour >= 12: + period = "下午" + + year_tag = f"{now.year}年" + month_tag = f"{now.month}月" + day_tag = f"{now.day}日" + period_tag = period + + return [year_tag, month_tag, day_tag, period_tag] + + async def retrieve_relevant_context( + self, + query: str, + top_k: int = 5 + ) -> Tuple[List[str], Dict[str, Any]]: + """ + 检索相关上下文 + + Args: + query: 查询文本 + top_k: 返回的记忆数量 + + Returns: + (相关记忆列表, 元数据字典) + """ + try: + # 提取查询关键词 + query_tags = await self.text_processor.extract_tags(query) + + if not query_tags: + logger.warning("查询未提取到关键词") + return [], {"query_tags": []} + + # 使用知识图谱扩展关键词 + expanded_tags = self.memory_graph.get_related_keywords(set(query_tags)) + + # 合并查询关键词和扩展关键词 + all_tags = set(query_tags + expanded_tags[:20]) # 限制扩展数量 + + logger.info(f"查询关键词: {query_tags}") + logger.info(f"扩展关键词: {expanded_tags[:10]}") + + # 计算每个记忆的相关性得分 + scored_memories = [] + for memory in self.long_term_memory: + memory_tags = set(memory.tags()) + + # 计算 Jaccard 相似度 + intersection = len(memory_tags & all_tags) + union = len(memory_tags | all_tags) + jaccard = intersection / union if union > 0 else 0 + + # 时间衰减因子 + time_diff = (datetime.now() - memory.time()).total_seconds() + time_decay = np.exp(-time_diff / (7 * 24 * 3600)) # 7天半衰期 + + # 综合得分 + score = jaccard * 0.7 + time_decay * 0.3 + + if score > 0: + scored_memories.append((memory, score)) + + # 排序并取 top_k + scored_memories.sort(key=lambda x: x[1], reverse=True) + top_memories = scored_memories[:top_k] + + # 提取摘要 + context = [memory.summary() for memory, score in top_memories] + + # 构建元数据 + metadata = { + "query_tags": query_tags, + "expanded_tags": expanded_tags[:10], + "retrieved_count": len(context), + "scores": [float(score) for _, score in top_memories] + } + + logger.info(f"检索到 {len(context)} 条相关记忆") + + return context, metadata + + except Exception as e: + logger.error(f"检索上下文失败: {e}", exc_info=True) + return [], {"error": str(e)} + + async def get_stats(self) -> Dict[str, Any]: + """获取记忆统计信息""" + return { + "total_memories": len(self.long_term_memory), + "graph_nodes": self.memory_graph.get_node_count(), + "graph_edges": self.memory_graph.get_edge_count(), + "avg_degree": self.memory_graph.get_avg_degree() + } + + async def _save_to_redis(self) -> None: + """保存记忆到 Redis""" + try: + client = await self._get_redis_client() + + # 保存短期记忆 + key = f"memory:{self.session_id}:short_term" + await client.set(key, json.dumps(self.short_term_memory)) + + # 保存长期记忆 + key = f"memory:{self.session_id}:long_term" + long_term_data = [ + { + "summary": mem.summary(), + "tags": mem.tags(), + "time": mem.time().isoformat() + } + for mem in self.long_term_memory + ] + await client.set(key, json.dumps(long_term_data)) + + except Exception as e: + logger.error(f"保存到 Redis 失败: {e}", exc_info=True) + + async def clear_all(self) -> None: + """清除所有记忆""" + self.short_term_memory.clear() + self.long_term_memory.clear() + self.memory_graph.clear() + + # 从 Redis 删除 + try: + client = await self._get_redis_client() + await client.delete( + f"memory:{self.session_id}:short_term", + f"memory:{self.session_id}:long_term" + ) + except Exception as e: + logger.error(f"从 Redis 删除失败: {e}", exc_info=True) + + async def close(self) -> None: + """关闭记忆引擎""" + if self._redis_client: + await self._redis_client.close() + logger.info(f"记忆引擎已关闭: session_id={self.session_id}") diff --git a/memory-networkx-service/app/memory_graph.py b/memory-networkx-service/app/memory_graph.py new file mode 100644 index 0000000000..a84e406519 --- /dev/null +++ b/memory-networkx-service/app/memory_graph.py @@ -0,0 +1,240 @@ +""" +Memory Graph - 基于 NetworkX 的知识图谱 +""" +import networkx as nx +import heapq +from itertools import combinations +import math +import logging +from typing import Set, List +from app.memory_item import MemoryItem + +logger = logging.getLogger(__name__) + + +class MemoryGraph: + """记忆知识图谱""" + + def __init__(self): + # 使用 NetworkX 图表示记忆连接 + self._graph = nx.Graph() + self._base_decay = 0.6 + self._noise_threshold = 0.2 + self._max_edges_per_node = 30 + self._need_update_noise = True + self._add_cnt = 0 + self._add_cnt_limit = 1000 + + def add_memory(self, memory: MemoryItem): + """添加记忆到图中""" + tags = set(memory.tags()) + + # 添加节点(关键词) + current_node = self._graph.number_of_nodes() + self._graph.add_nodes_from(tags) + after_node = self._graph.number_of_nodes() + + # 更新所有标签对之间的边 + self._update_edges(tags) + + # 每一千个节点,进行一次裁剪 + self._add_cnt += after_node - current_node + if self._add_cnt >= self._add_cnt_limit: + self._prune_graph() + self._add_cnt = 0 + + def get_node_count(self) -> int: + """获取节点数量""" + return self._graph.number_of_nodes() + + def get_edge_count(self) -> int: + """获取边数量""" + return self._graph.number_of_edges() + + def get_avg_degree(self) -> float: + """计算图的平均度数""" + if self._graph.number_of_nodes() == 0: + return 0.0 + return sum(dict(self._graph.degree()).values()) / self._graph.number_of_nodes() + + def _update_edges(self, keywords: Set[str]): + """更新图的边权重""" + edges_to_update = [] + + # 统计共现次数 + for i, j in combinations(keywords, 2): + if self._graph.has_edge(i, j): + self._graph[i][j]['cooccurrence'] += 1 + else: + self._graph.add_edge(i, j, cooccurrence=1) + edges_to_update.append((i, j)) + + # 改进的 PMI 权重计算 + total_edges = max(1, self._graph.number_of_edges()) + for i, j in edges_to_update: + degree_i = max(1, self._graph.degree[i]) + degree_j = max(1, self._graph.degree[j]) + cooccurrence = self._graph[i][j]['cooccurrence'] + + # PMI 计算 + pmi = math.log2((cooccurrence * total_edges) / (degree_i * degree_j)) + + # 平滑和归一化 + smoothing = 0.2 + norm_factor = math.log2(total_edges) + smoothing + + # 综合权重 + freq_factor = math.log2(1 + cooccurrence) / math.log2(total_edges) + pmi_factor = (pmi + smoothing) / norm_factor + + alpha = 0.7 + weight = alpha * pmi_factor + (1 - alpha) * freq_factor + weight = max(0.01, min(1.0, weight)) + + self._graph[i][j]['weight'] = weight + + self._need_update_noise = True + + def _prune_graph(self): + """定期清理低权重边""" + self._update_noise_threshold() + + # 移除低权重边 + for u, v, data in list(self._graph.edges(data=True)): + if data['weight'] < self._noise_threshold: + self._graph.remove_edge(u, v) + + # 限制节点边数 + self._limit_node_edges() + + # 移除孤立节点 + isolated_nodes = [node for node, degree in self._graph.degree() if degree == 0] + self._graph.remove_nodes_from(isolated_nodes) + + self._need_update_noise = True + + logger.info(f"图裁剪完成: 节点={self._graph.number_of_nodes()}, " + f"边={self._graph.number_of_edges()}, " + f"平均度数={self.get_avg_degree():.2f}") + + def _update_noise_threshold(self): + """动态调整噪声阈值""" + if not self._need_update_noise: + return + + self._need_update_noise = False + + if self._graph.number_of_edges() == 0: + self._noise_threshold = 0.2 + return + + # 使用分位数计算阈值 + weights = [data['weight'] for _, _, data in self._graph.edges(data=True)] + if weights: + import numpy as np + lower_quartile = np.percentile(weights, 25) + avg_degree = self.get_avg_degree() + max_threshold = 0.4 + 0.1 * math.log2(avg_degree) if avg_degree > 0 else 0.4 + self._noise_threshold = max(0.1, min(max_threshold, lower_quartile)) + + def _limit_node_edges(self): + """限制每个节点的最大边数""" + for node in self._graph.nodes(): + edges = list(self._graph.edges(node, data=True)) + if len(edges) > self._max_edges_per_node: + edges = sorted(edges, key=lambda x: -x[2]['weight']) + for edge in edges[self._max_edges_per_node:]: + self._graph.remove_edge(edge[0], edge[1]) + + def get_related_keywords(self, keywords: Set[str]) -> List[str]: + """ + 基于海马体特性的记忆扩散算法 + + Args: + keywords: 输入关键词集合 + + Returns: + 扩散得到的相关关键词列表 + """ + valid_keywords = {k for k in keywords if self._graph.has_node(k)} + if not valid_keywords: + logger.info("无效关键词,无法联想") + return [] + + self._update_noise_threshold() + + # 初始化优先队列 + need_search = [] + for k in valid_keywords: + heapq.heappush(need_search, (-1.0, k, 0, k)) + + related: dict[str, float] = {} + max_strength = {} + + while len(need_search) > 0: + neg_strength, curr, depth, from_node = heapq.heappop(need_search) + curr_strength = -neg_strength + + if curr_strength <= self._noise_threshold: + break + + if curr_strength <= max_strength.get(curr, 0): + continue + + max_strength[curr] = curr_strength + related[curr] = curr_strength + + # 获取邻居节点 + neighbors = self._get_valid_neighbors(curr) + if not neighbors: + continue + + # 计算平均权重 + avg_weight = sum(self._get_edge_weight(curr, n) for n in neighbors) / len(neighbors) + + for neighbor in neighbors: + edge_weight = self._get_edge_weight(curr, neighbor) + + # 动态衰减 + weight_ratio = edge_weight / max(0.01, avg_weight) + dynamic_decay = self._base_decay + 0.2 * min(1.0, weight_ratio) + dynamic_decay = min(dynamic_decay, 0.8) + + decay_rate = dynamic_decay ** depth + new_strength = curr_strength * decay_rate * edge_weight + + if new_strength > 0: + heapq.heappush(need_search, (-new_strength, neighbor, depth + 1, curr)) + + # 归一化并排序 + if related: + max_value = max(related.values()) + normalized = {k: v / max_value for k, v in related.items()} + sorted_related = sorted(normalized.items(), key=lambda x: -x[1]) + + # 排除输入标签并返回 + return [k for k, v in sorted_related if k not in valid_keywords] + + return [] + + def _get_valid_neighbors(self, keyword: str) -> List[str]: + """获取有效邻居节点""" + if not self._graph.has_node(keyword): + return [] + + edges = self._graph.edges(keyword, data=True) + sorted_edges = sorted(edges, key=lambda x: -x[2]['weight'])[:self._max_edges_per_node] + sorted_edges = [edge for edge in sorted_edges if edge[2]['weight'] > self._noise_threshold] + + return [edge[1] for edge in sorted_edges] + + def _get_edge_weight(self, key1: str, key2: str) -> float: + """获取边的权重""" + if self._graph.has_edge(key1, key2): + return self._graph[key1][key2]['weight'] + return 0.0 + + def clear(self): + """清除图中的所有数据""" + self._graph.clear() + logger.info("知识图谱已清空") diff --git a/memory-networkx-service/app/memory_item.py b/memory-networkx-service/app/memory_item.py new file mode 100644 index 0000000000..284fdd54ed --- /dev/null +++ b/memory-networkx-service/app/memory_item.py @@ -0,0 +1,152 @@ +""" +Memory Item - 记忆项数据结构 +从 memory-networkx/organs/memory_item.py 改造 + +核心功能: +1. 存储记忆摘要和标签 +2. 从标签中提取时间信息 +3. 过滤元标签(包含冒号的特殊标签) +4. 提供标准接口访问记忆数据 +""" +from datetime import datetime +from typing import List + + +# 元标签标记(包含冒号的标签被视为元标签) +META_TAG_MARK = ":" +# 时间元标签前缀 +DATETIME_META = "DATETIME:" +# 备用时间戳(当没有时间标签时使用) +BACKOFF_TIMESTAMP = 1745069038 # 2025-04-19 00:00:00 (UTC+8) + + +class MemoryItem: + """ + 记忆项 - 存储单条长期记忆 + + Attributes: + _summary: 记忆摘要文本 + _tags: 标签列表(包含元标签) + _created_time: 记忆创建时间 + + 元标签类型: + - DATETIME:2024-01-15 10:30:00 # 时间标签 + - PRIORITY:high # 优先级标签 + - PADDING:0 # 填充标签 + """ + + def __init__(self, summary: str, tags: List[str]): + """ + 初始化记忆项 + + Args: + summary: 记忆摘要 + tags: 标签列表(可包含元标签) + """ + self._summary = summary + self._tags = tags + self._created_time = datetime.now() + + # 从标签中提取时间 + self._init_created_time() + # 调整标签(移除非时间的元标签) + self._adjust_tags() + + def _init_created_time(self) -> None: + """ + 从标签中提取创建时间 + + 查找 DATETIME: 开头的标签,解析其时间信息 + 如果没有找到,使用备用时间戳 + """ + for tag in self._tags: + if tag.startswith(DATETIME_META): + time_str = tag.replace(DATETIME_META, "") + try: + self._created_time = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") + return + except ValueError: + # 时间格式错误,继续查找 + continue + + # 如果没有找到有效的时间标签,使用备用时间戳 + self._created_time = datetime.fromtimestamp(BACKOFF_TIMESTAMP) + + def _adjust_tags(self) -> None: + """ + 调整标签列表,移除非时间的元标签 + + 规则: + - 移除所有包含冒号的标签(元标签) + - 但保留 DATETIME: 时间标签(用于时间提取) + + 注意:这个方法只在内部使用,tags() 方法会进一步过滤 + """ + # 只保留:不包含冒号的标签 或 DATETIME 时间标签 + self._tags = [ + tag for tag in self._tags + if META_TAG_MARK not in tag or tag.startswith(DATETIME_META) + ] + + def tags(self) -> List[str]: + """ + 获取标签列表(不包含任何元标签) + + Returns: + 过滤后的标签列表(不包含 DATETIME 等元标签) + """ + # 过滤掉所有元标签(包含冒号的标签) + return [ + tag for tag in self._tags + if META_TAG_MARK not in tag + ] + + def all_tags(self) -> List[str]: + """ + 获取所有标签(包含元标签) + + Returns: + 完整的标签列表(包含 DATETIME 等元标签) + """ + return self._tags.copy() + + def summary(self) -> str: + """ + 获取记忆摘要 + + Returns: + 摘要文本 + """ + return self._summary + + def time(self) -> datetime: + """ + 获取记忆创建时间 + + Returns: + 创建时间(datetime 对象) + """ + return self._created_time + + def __repr__(self) -> str: + """ + 字符串表示(用于调试) + + Returns: + 记忆项的字符串表示 + """ + return f"MemoryItem(summary='{self._summary[:50]}...', tags={len(self._tags)}, time={self._created_time.strftime('%Y-%m-%d %H:%M:%S')})" + + def to_dict(self) -> dict: + """ + 转换为字典(用于序列化) + + Returns: + 包含记忆数据的字典 + """ + return { + "summary": self._summary, + "tags": self.all_tags(), + "created_time": self._created_time.isoformat(), + "tags_count": len(self.tags()), + } diff --git a/memory-networkx-service/app/text_processor.py b/memory-networkx-service/app/text_processor.py new file mode 100644 index 0000000000..019ac0c643 --- /dev/null +++ b/memory-networkx-service/app/text_processor.py @@ -0,0 +1,76 @@ +""" +Text Processor - 文本处理器 +用于生成摘要和提取关键词 +""" +import logging +from typing import List +import re + +logger = logging.getLogger(__name__) + + +class TextProcessor: + """文本处理器 - 简化版本,可根据需要集成 LLM""" + + def __init__(self): + # 停用词列表(中文常见停用词) + self.stop_words = { + '的', '了', '在', '是', '我', '你', '他', '她', '它', + '们', '这', '那', '有', '个', '就', '都', '也', '还', + '要', '会', '着', '过', '吗', '吧', '呢', '啊', '哦' + } + + async def generate_summary(self, text: str, max_length: int = 200) -> str: + """ + 生成文本摘要 + + 简化实现:提取前 max_length 个字符 + 生产环境可替换为调用 LLM API + """ + # 清理文本 + cleaned_text = text.strip() + + # 如果文本较短,直接返回 + if len(cleaned_text) <= max_length: + return cleaned_text + + # 截断到指定长度 + summary = cleaned_text[:max_length] + + # 在句号处截断 + last_period = summary.rfind('。') + if last_period > max_length // 2: + summary = summary[:last_period + 1] + + logger.debug(f"生成摘要: {len(summary)} 字符") + return summary + + async def extract_tags(self, text: str, max_tags: int = 30) -> List[str]: + """ + 提取关键词标签 + + 简化实现:基于分词和词频 + 生产环境可替换为调用 LLM API 或使用 jieba 分词 + """ + # 简单的中文分词(按标点和空格分割) + words = re.findall(r'[\u4e00-\u9fa5]+|[a-zA-Z]+', text) + + # 过滤停用词和短词 + filtered_words = [ + word for word in words + if len(word) >= 2 and word not in self.stop_words + ] + + # 统计词频 + word_freq = {} + for word in filtered_words: + word_freq[word] = word_freq.get(word, 0) + 1 + + # 按词频排序 + sorted_words = sorted(word_freq.items(), key=lambda x: -x[1]) + + # 取前 max_tags 个 + tags = [word for word, freq in sorted_words[:max_tags]] + + logger.debug(f"提取标签: {len(tags)} 个") + return tags diff --git a/memory-networkx-service/deploy/ai-history-plugin-config.yaml b/memory-networkx-service/deploy/ai-history-plugin-config.yaml new file mode 100644 index 0000000000..40cb4227f7 --- /dev/null +++ b/memory-networkx-service/deploy/ai-history-plugin-config.yaml @@ -0,0 +1,34 @@ +apiVersion: extensions.higress.io/v1alpha1 +kind: WasmPlugin +metadata: + name: ai-history + namespace: higress-system +spec: + matchRules: + - config: + # Redis 配置 + redis: + serviceName: redis.higress-system.svc.cluster.local + servicePort: 6379 + timeout: 2000 + database: 0 + + # 缓存配置 + cacheKeyPrefix: "higress-ai-history:" + identityHeader: "Authorization" + fillHistoryCnt: 3 + cacheTTL: 0 # 0 表示永不过期 + + # Memory Enhancement Service 配置(新增) + memoryService: + enabled: true # 启用记忆增强功能 + serviceName: memory-service.higress-system.svc.cluster.local + servicePort: 8080 + timeout: 5000 # 超时时间(毫秒) + topK: 5 # 从记忆库中检索的相关记忆数量 + + ingress: + - higress-gateway + + # 使用 ai-history 插件的 OCI 镜像 + url: oci://higress-registry.cn-hangzhou.cr.aliyuncs.com/plugins/ai-history:1.0.0 diff --git a/memory-networkx-service/deploy/complete-deployment.yaml b/memory-networkx-service/deploy/complete-deployment.yaml new file mode 100644 index 0000000000..02cb2e94bf --- /dev/null +++ b/memory-networkx-service/deploy/complete-deployment.yaml @@ -0,0 +1,161 @@ +# 完整部署配置 - Memory Enhancement Service + ai-history Plugin + +--- +# 1. Redis 部署(如果还没有 Redis) +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis + namespace: higress-system +spec: + replicas: 1 + selector: + matchLabels: + app: redis + template: + metadata: + labels: + app: redis + spec: + containers: + - name: redis + image: redis:7-alpine + ports: + - containerPort: 6379 + resources: + requests: + memory: "128Mi" + cpu: "100m" + limits: + memory: "256Mi" + cpu: "200m" + +--- +apiVersion: v1 +kind: Service +metadata: + name: redis + namespace: higress-system +spec: + selector: + app: redis + ports: + - port: 6379 + targetPort: 6379 + +--- +# 2. Memory Enhancement Service 配置 +apiVersion: v1 +kind: ConfigMap +metadata: + name: memory-service-config + namespace: higress-system +data: + REDIS_HOST: "redis.higress-system.svc.cluster.local" + REDIS_PORT: "6379" + REDIS_DB: "0" + SERVICE_PORT: "8080" + LOG_LEVEL: "INFO" + SHORT_TERM_MEMORY_SIZE: "20" + RETRIEVE_TOP_N: "5" + SUMMARY_MAX_TAGS: "30" + +--- +# 3. Memory Enhancement Service 部署 +apiVersion: apps/v1 +kind: Deployment +metadata: + name: memory-enhancement-service + namespace: higress-system + labels: + app: memory-service +spec: + replicas: 3 + selector: + matchLabels: + app: memory-service + template: + metadata: + labels: + app: memory-service + spec: + containers: + - name: memory-service + image: your-registry/memory-enhancement-service:latest + imagePullPolicy: Always + ports: + - containerPort: 8080 + name: http + protocol: TCP + envFrom: + - configMapRef: + name: memory-service-config + resources: + requests: + memory: "256Mi" + cpu: "200m" + limits: + memory: "512Mi" + cpu: "500m" + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 30 + timeoutSeconds: 3 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 3 + +--- +apiVersion: v1 +kind: Service +metadata: + name: memory-service + namespace: higress-system + labels: + app: memory-service +spec: + type: ClusterIP + selector: + app: memory-service + ports: + - port: 8080 + targetPort: 8080 + protocol: TCP + name: http + +--- +# 4. ai-history 插件配置(启用 Memory Service) +apiVersion: extensions.higress.io/v1alpha1 +kind: WasmPlugin +metadata: + name: ai-history + namespace: higress-system +spec: + matchRules: + - config: + redis: + serviceName: redis.higress-system.svc.cluster.local + servicePort: 6379 + timeout: 2000 + database: 0 + cacheKeyPrefix: "higress-ai-history:" + identityHeader: "Authorization" + fillHistoryCnt: 3 + cacheTTL: 0 + # 启用 Memory Enhancement Service + memoryService: + enabled: true + serviceName: memory-service.higress-system.svc.cluster.local + servicePort: 8080 + timeout: 5000 + topK: 5 + ingress: + - higress-gateway + url: oci://higress-registry.cn-hangzhou.cr.aliyuncs.com/plugins/ai-history:1.0.0 diff --git a/memory-networkx-service/deploy/kubernetes.yaml b/memory-networkx-service/deploy/kubernetes.yaml new file mode 100644 index 0000000000..8ae82efc80 --- /dev/null +++ b/memory-networkx-service/deploy/kubernetes.yaml @@ -0,0 +1,107 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: memory-service-config + namespace: higress-system +data: + REDIS_HOST: "redis.higress-system.svc.cluster.local" + REDIS_PORT: "6379" + REDIS_DB: "0" + SERVICE_PORT: "8080" + LOG_LEVEL: "INFO" + SHORT_TERM_MEMORY_SIZE: "20" + RETRIEVE_TOP_N: "5" + SUMMARY_MAX_TAGS: "30" + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: memory-enhancement-service + namespace: higress-system + labels: + app: memory-service +spec: + replicas: 3 + selector: + matchLabels: + app: memory-service + template: + metadata: + labels: + app: memory-service + spec: + containers: + - name: memory-service + image: your-registry/memory-enhancement-service:latest + imagePullPolicy: Always + ports: + - containerPort: 8080 + name: http + protocol: TCP + env: + - name: REDIS_HOST + valueFrom: + configMapKeyRef: + name: memory-service-config + key: REDIS_HOST + - name: REDIS_PORT + valueFrom: + configMapKeyRef: + name: memory-service-config + key: REDIS_PORT + - name: REDIS_DB + valueFrom: + configMapKeyRef: + name: memory-service-config + key: REDIS_DB + - name: SERVICE_PORT + valueFrom: + configMapKeyRef: + name: memory-service-config + key: SERVICE_PORT + - name: LOG_LEVEL + valueFrom: + configMapKeyRef: + name: memory-service-config + key: LOG_LEVEL + resources: + requests: + memory: "256Mi" + cpu: "200m" + limits: + memory: "512Mi" + cpu: "500m" + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 30 + timeoutSeconds: 3 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 3 + +--- +apiVersion: v1 +kind: Service +metadata: + name: memory-service + namespace: higress-system + labels: + app: memory-service +spec: + type: ClusterIP + selector: + app: memory-service + ports: + - port: 8080 + targetPort: 8080 + protocol: TCP + name: http diff --git a/memory-networkx-service/requirements-lightrag.txt b/memory-networkx-service/requirements-lightrag.txt new file mode 100644 index 0000000000..1c2295ae08 --- /dev/null +++ b/memory-networkx-service/requirements-lightrag.txt @@ -0,0 +1,42 @@ +# Memory Enhancement Service - LightRAG 集成版本依赖 + +# 基础依赖 +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +pydantic==2.5.3 +pydantic-settings==2.1.0 +redis==5.0.1 +networkx==3.2.1 +numpy==1.26.3 +python-multipart==0.0.6 + +# LightRAG 核心依赖 +lightrag-hku==0.0.1 # LightRAG 核心库 +openai==1.12.0 # OpenAI API 客户端 +tiktoken==0.5.2 # Token 计数 +tenacity==8.2.3 # 重试机制 + +# 向量数据库选项(按需选择) +# Chroma(推荐用于开发和小规模部署) +chromadb==0.4.22 + +# Qdrant(推荐用于生产环境) +# qdrant-client==1.7.0 + +# Weaviate +# weaviate-client==3.25.0 + +# 图数据库选项(按需选择) +# Neo4j(如果需要持久化图存储) +# neo4j==5.16.0 + +# 文本处理 +jieba==0.42.1 # 中文分词(可选,用于关键词提取) + +# 监控和日志 +prometheus-client==0.19.0 # Prometheus 指标 + +# 开发依赖 +# pytest==7.4.3 +# pytest-asyncio==0.21.1 +# locust==2.20.0 # 性能测试 diff --git a/memory-networkx-service/requirements.txt b/memory-networkx-service/requirements.txt new file mode 100644 index 0000000000..a9051510b5 --- /dev/null +++ b/memory-networkx-service/requirements.txt @@ -0,0 +1,8 @@ +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +pydantic==2.5.3 +pydantic-settings==2.1.0 +redis==5.0.1 +networkx==3.2.1 +numpy==1.26.3 +python-multipart==0.0.6 diff --git a/memory-networkx.md b/memory-networkx.md new file mode 100644 index 0000000000..9948b5cd6e --- /dev/null +++ b/memory-networkx.md @@ -0,0 +1,370 @@ + +## 架构方案概述 + +由于 `ai-history` 是 WASM Go 插件 [1](#1-0) ,而 NetworkX 是 Python 库,您需要采用**外部服务 + 插件集成**的架构: + +```mermaid +graph TB + Client[AI Client] --> Higress[Higress Gateway] + Higress --> AIHistory[ai-history Plugin
WASM Go] + AIHistory --> Redis[(Redis
对话存储)] + AIHistory --> MemoryService[Memory Enhancement Service
Python + NetworkX] + MemoryService --> GraphDB[(Graph Database
Neo4j/NetworkX)] + MemoryService --> Redis + Higress --> AIProxy[ai-proxy Plugin] + AIProxy --> LLM[LLM Provider] +``` + +## 详细实现方案 + +### 1. 扩展 ai-history 插件 + +创建新插件 `ai-history-enhanced`,基于原有 `ai-history` 扩展 [2](#1-1) : + +**关键修改点:** + +- **添加 HTTP 调用**: 在 `onHttpStreamResponseBody` 处理完对话后,调用 Python 服务 +- **配置扩展**: 添加 Memory Service 的服务地址配置 + +```go +type EnhancedConfig struct { + RedisInfo + KVExtractor + // 新增:Memory Enhancement Service 配置 + MemoryServiceName string `yaml:"memoryServiceName" json:"memoryServiceName"` + MemoryServicePort int `yaml:"memoryServicePort" json:"memoryServicePort"` + MemoryTimeout int `yaml:"memoryTimeout" json:"memoryTimeout"` +} +``` + +### 2. Python Memory Enhancement Service + +创建独立的 Python 服务,提供 RESTful API: + +**核心功能模块:** + +#### 2.1 知识图谱构建 (NetworkX) + +```python +import networkx as nx +import redis +from fastapi import FastAPI + +class MemoryGraphService: + def __init__(self): + self.graph = nx.DiGraph() + self.redis_client = redis.Redis(host='redis', port=6379) + + def build_conversation_graph(self, session_id: str): + """从 Redis 读取对话历史,构建知识图谱""" + history_key = f"higress-ai-history:{session_id}" + conversations = self.redis_client.lrange(history_key, 0, -1) + + for conv in conversations: + # 提取实体和关系 + entities = self.extract_entities(conv) + relations = self.extract_relations(conv) + + # 添加到图中 + for entity in entities: + self.graph.add_node(entity['id'], **entity) + for rel in relations: + self.graph.add_edge(rel['from'], rel['to'], **rel) + + def get_relevant_context(self, query: str, session_id: str, top_k: int = 5): + """基于图结构检索相关上下文""" + # 使用 PageRank 或其他图算法找到最相关的节点 + pagerank = nx.pagerank(self.graph) + # 结合语义相似度排序 + relevant_nodes = self.semantic_search(query, pagerank, top_k) + return self.format_context(relevant_nodes) +``` + +#### 2.2 API 接口设计 + +```python +app = FastAPI() +memory_service = MemoryGraphService() + +@app.post("/api/v1/memory/enhance") +async def enhance_memory(request: MemoryRequest): + """接收对话,更新知识图谱""" + session_id = request.session_id + question = request.question + answer = request.answer + + # 更新图谱 + memory_service.add_conversation(session_id, question, answer) + + return {"status": "success"} + +@app.post("/api/v1/memory/retrieve") +async def retrieve_context(request: RetrievalRequest): + """检索增强上下文""" + context = memory_service.get_relevant_context( + query=request.query, + session_id=request.session_id, + top_k=request.top_k + ) + return {"context": context} +``` + +### 3. 集成流程 + +#### 3.1 对话存储流程 + +在 `ai-history-enhanced` 插件的 `onHttpStreamResponseBody` 中 [3](#1-2) : + +```go +func onHttpStreamResponseBody(ctx wrapper.HttpContext, config Config, chunk []byte, isLastChunk bool) { + // 原有逻辑:存储到 Redis + storeToRedis(ctx, config, chunk) + + // 新增:异步调用 Memory Service + if isLastChunk { + go func() { + payload := map[string]interface{}{ + "session_id": getSessionId(ctx), + "question": ctx.GetContext(QuestionContextKey), + "answer": ctx.GetContext(AnswerContentContextKey), + } + + httpCall(config.MemoryServiceName, config.MemoryServicePort, + "/api/v1/memory/enhance", payload) + }() + } +} +``` + +#### 3.2 上下文检索流程 + +在 `onHttpRequestBody` 中 [4](#1-3) ,请求 LLM 前注入增强上下文: + +```go +func onHttpRequestBody(ctx wrapper.HttpContext, config Config, body []byte) { + sessionId := getSessionId(ctx) + question := extractQuestion(body) + + // 调用 Memory Service 获取增强上下文 + enhancedContext := retrieveEnhancedContext(config, sessionId, question) + + // 注入到请求中 + modifiedBody := injectContext(body, enhancedContext) + proxywasm.ReplaceHttpRequestBody(modifiedBody) +} +``` + +### 4. 部署配置 + +#### 4.1 Python Service 部署 (Kubernetes) + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: memory-enhancement-service + namespace: higress-system +spec: + replicas: 3 + selector: + matchLabels: + app: memory-service + template: + metadata: + labels: + app: memory-service + spec: + containers: + - name: memory-service + image: your-registry/memory-service:latest + ports: + - containerPort: 8080 + env: + - name: REDIS_HOST + value: "redis.higress-system.svc.cluster.local" + - name: NEO4J_URI + value: "bolt://neo4j:7687" +--- +apiVersion: v1 +kind: Service +metadata: + name: memory-service + namespace: higress-system +spec: + selector: + app: memory-service + ports: + - port: 8080 + targetPort: 8080 +``` + +#### 4.2 插件配置 + +```yaml +apiVersion: extensions.higress.io/v1alpha1 +kind: WasmPlugin +metadata: + name: ai-history-enhanced + namespace: higress-system +spec: + defaultConfig: + redis: + serviceName: redis.higress-system.svc.cluster.local + servicePort: 6379 + timeout: 2000 + memoryServiceName: memory-service.higress-system.svc.cluster.local + memoryServicePort: 8080 + memoryTimeout: 5000 + kvExtractor: + requestBody: "messages.#(role==\"user\").content" + responseBody: "choices.0.message.content" + url: oci://your-registry/ai-history-enhanced:1.0.0 +``` + +### 5. 构建和编译 + +#### 5.1 WASM 插件编译 [5](#1-4) + +```bash +cd plugins/wasm-go/extensions/ai-history-enhanced +GOOS=wasip1 GOARCH=wasm go build -buildmode=c-shared -o main.wasm . +``` + +#### 5.2 Docker 镜像构建 [6](#1-5) + +```dockerfile +# WASM 插件 +FROM scratch +COPY main.wasm plugin.wasm + +# Python 服务 +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install -r requirements.txt +COPY . . +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] +``` + +## 生产环境考虑 + +### 性能优化 + +1. **异步处理**: Memory Service 调用使用异步,不阻塞主请求流 +2. **缓存策略**: 图谱查询结果缓存到 Redis,TTL 设置为 5 分钟 +3. **批处理**: 积累多个对话后批量更新图谱 + +### 可靠性 + +1. **超时控制**: Memory Service 调用设置 5 秒超时 +2. **降级策略**: 如果 Memory Service 不可用,回退到原始 `ai-history` 逻辑 +3. **重试机制**: HTTP 调用失败时重试 3 次 + +### 可观测性 + +1. **日志**: 记录所有 Memory Service 调用和图谱更新 +2. **指标**: 监控图谱大小、查询延迟、命中率 +3. **追踪**: 使用 OpenTelemetry 追踪完整请求链路 + +## Notes + +这个方案将 Higress 的 WASM 插件能力与 Python 生态系统结合,通过外部服务实现复杂的图算法。关键是保持插件轻量,将计算密集型任务交给专门的服务处理。您也可以考虑使用 Neo4j 等专业图数据库替代 NetworkX 的内存图,以获得更好的持久化和查询性能。 + +Wiki pages you might want to explore: +- [Available Plugin Types (alibaba/higress)](/wiki/alibaba/higress#3.3) +- [AI Gateway Features (alibaba/higress)](/wiki/alibaba/higress#4) + +### Citations + +**File:** plugins/wasm-go/extensions/ai-history/main.go (L1-44) +```go +// File generated by hgctl. Modify as required. +// See: https://higress.io/zh-cn/docs/user/wasm-go#2-%E7%BC%96%E5%86%99-maingo-%E6%96%87%E4%BB%B6 + +package main + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "strconv" + "strings" + + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" + "github.com/higress-group/wasm-go/pkg/log" + "github.com/higress-group/wasm-go/pkg/wrapper" + "github.com/tidwall/gjson" + "github.com/tidwall/resp" +) + +const ( + QuestionContextKey = "question" + AnswerContentContextKey = "answer" + PartialMessageContextKey = "partialMessage" + ToolCallsContextKey = "toolCalls" + StreamContextKey = "stream" + DefaultCacheKeyPrefix = "higress-ai-history:" + IdentityKey = "identity" + ChatHistories = "chatHistories" +) + +func main() {} + +func init() { + wrapper.SetCtx( + "ai-history", + wrapper.ParseConfigBy(parseConfig), + wrapper.ProcessRequestHeadersBy(onHttpRequestHeaders), + wrapper.ProcessRequestBodyBy(onHttpRequestBody), + wrapper.ProcessResponseHeadersBy(onHttpResponseHeaders), + wrapper.ProcessStreamingResponseBodyBy(onHttpStreamResponseBody), + ) +} +``` + +**File:** plugins/wasm-go/README.md (L9-32) +```markdown +使用以下命令可以快速构建 wasm-go 插件: + +```bash +# NOTE: 如果你想在构建插件的时候设置额外的构建参数 EXTRA_TAGS +# 请更新 extensions/${PLUGIN_NAME} 插件目录对应的 .buildrc 文件 +$ PLUGIN_NAME=request-block make build +``` + +
+输出结果 +

+DOCKER_BUILDKIT=1 docker build --build-arg PLUGIN_NAME=request-block \
+                               -t request-block:20230223-173305-3b1a471 \
+                               --output extensions/request-block .
+[+] Building 67.7s (12/12) FINISHED
+
+image:            request-block:20230223-173305-3b1a471
+output wasm file: extensions/request-block/plugin.wasm
+
+
+ +该命令最终构建出一个 wasm 文件和一个 Docker image。 +这个本地的 wasm 文件被输出到了指定的插件的目录下,可以直接用于调试。 +你也可以直接使用 `make build-push` 一并构建和推送 image. +``` + +**File:** plugins/wasm-go/README_EN.md (L54-66) +```markdown +### step2. build and push docker image + +A simple Dockerfile: + +```Dockerfile +FROM scratch +COPY main.wasm plugin.wasm +``` + +```bash +docker build -t /request-block:2.0.0 -f . +docker push /request-block:2.0.0 +``` +``` diff --git a/plugins/wasm-go/extensions/ai-history/main.go b/plugins/wasm-go/extensions/ai-history/main.go index f0fabaaa4c..fd84250453 100644 --- a/plugins/wasm-go/extensions/ai-history/main.go +++ b/plugins/wasm-go/extensions/ai-history/main.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "net/url" "strconv" "strings" @@ -27,6 +28,7 @@ const ( DefaultCacheKeyPrefix = "higress-ai-history:" IdentityKey = "identity" ChatHistories = "chatHistories" + EnhancedContextKey = "enhancedContext" ) func main() { @@ -88,6 +90,25 @@ type KVExtractor struct { ResponseBody string `required:"false" yaml:"responseBody" json:"responseBody"` } +// Memory Enhancement Service 配置 +type MemoryServiceConfig struct { + // @Title zh-CN 记忆增强服务名称 + // @Description zh-CN Memory Enhancement Service 的服务地址,例如 memory-service.higress-system.svc.cluster.local + ServiceName string `required:"false" yaml:"serviceName" json:"serviceName"` + // @Title zh-CN 记忆增强服务端口 + // @Description zh-CN 默认值为8080 + ServicePort int `required:"false" yaml:"servicePort" json:"servicePort"` + // @Title zh-CN 记忆增强服务超时时间 + // @Description zh-CN 单位为毫秒,默认值为5000 + Timeout int `required:"false" yaml:"timeout" json:"timeout"` + // @Title zh-CN 是否启用记忆增强 + // @Description zh-CN 默认值为false + Enabled bool `required:"false" yaml:"enabled" json:"enabled"` + // @Title zh-CN 检索记忆数量 + // @Description zh-CN 从记忆库中检索的相关记忆数量,默认值为5 + TopK int `required:"false" yaml:"topK" json:"topK"` +} + type PluginConfig struct { // @Title zh-CN Redis 地址信息 // @Description zh-CN 用于存储缓存结果的 Redis 地址 @@ -112,8 +133,11 @@ type PluginConfig struct { FillHistoryCnt int `required:"false" yaml:"fillHistoryCnt" json:"fillHistoryCnt"` // @Title zh-CN 缓存的过期时间 // @Description zh-CN 单位是秒,默认值为0,即永不过期 - CacheTTL int `required:"false" yaml:"cacheTTL" json:"cacheTTL"` - redisClient wrapper.RedisClient `yaml:"-" json:"-"` + CacheTTL int `required:"false" yaml:"cacheTTL" json:"cacheTTL"` + // @Title zh-CN 记忆增强服务配置 + // @Description zh-CN 配置 Memory Enhancement Service 以启用基于 NetworkX 的记忆增强功能 + MemoryService MemoryServiceConfig `required:"false" yaml:"memoryService" json:"memoryService"` + redisClient wrapper.RedisClient `yaml:"-" json:"-"` } type ChatHistory struct { @@ -159,6 +183,26 @@ func parseConfig(json gjson.Result, c *PluginConfig, log wrapper.Log) error { c.FillHistoryCnt = 3 } c.CacheTTL = int(json.Get("cacheTTL").Int()) + + // 解析 Memory Service 配置 + c.MemoryService.Enabled = json.Get("memoryService.enabled").Bool() + c.MemoryService.ServiceName = json.Get("memoryService.serviceName").String() + if c.MemoryService.ServiceName == "" { + c.MemoryService.ServiceName = "memory-service.higress-system.svc.cluster.local" + } + c.MemoryService.ServicePort = int(json.Get("memoryService.servicePort").Int()) + if c.MemoryService.ServicePort == 0 { + c.MemoryService.ServicePort = 8080 + } + c.MemoryService.Timeout = int(json.Get("memoryService.timeout").Int()) + if c.MemoryService.Timeout == 0 { + c.MemoryService.Timeout = 5000 + } + c.MemoryService.TopK = int(json.Get("memoryService.topK").Int()) + if c.MemoryService.TopK == 0 { + c.MemoryService.TopK = 5 + } + c.redisClient = wrapper.NewRedisClusterClient(wrapper.FQDNCluster{ FQDN: c.RedisInfo.ServiceName, Port: int64(c.RedisInfo.ServicePort), @@ -204,6 +248,12 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte return types.ActionContinue } ctx.SetContext(QuestionContextKey, question) + + // 如果启用了 Memory Service,先检索增强上下文 + if config.MemoryService.Enabled { + retrieveEnhancedContext(ctx, config, identityKey, question, log) + } + err := config.redisClient.Get(config.CacheKeyPrefix+identityKey, func(response resp.Value) { if err := response.Error(); err != nil { log.Errorf("redis get failed, err:%v", err) @@ -249,6 +299,13 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte _ = proxywasm.ResumeHttpRequest() return } + + // 如果有增强上下文,注入到 messages 中 + enhancedContext := ctx.GetStringContext(EnhancedContextKey, "") + if enhancedContext != "" { + currMessage = injectEnhancedContext(currMessage, enhancedContext) + } + finalChat := fillHistory(chat, currMessage, fillHistoryCnt) var parameter map[string]any err = json.Unmarshal(body, ¶meter) @@ -481,4 +538,123 @@ func saveChatHistory(ctx wrapper.HttpContext, config PluginConfig, questionI any if config.CacheTTL != 0 { _ = config.redisClient.Expire(config.CacheKeyPrefix+identityKey, config.CacheTTL, nil) } + + // 如果启用了 Memory Service,异步调用增强记忆 + if config.MemoryService.Enabled { + enhanceMemoryAsync(config, identityKey, question, value, log) + } +} + +// retrieveEnhancedContext 从 Memory Service 检索增强上下文 +func retrieveEnhancedContext(ctx wrapper.HttpContext, config PluginConfig, sessionID string, query string, log wrapper.Log) { + reqBody := map[string]interface{}{ + "session_id": sessionID, + "query": query, + "top_k": config.MemoryService.TopK, + } + + reqData, err := json.Marshal(reqBody) + if err != nil { + log.Errorf("marshal retrieve request failed: %v", err) + return + } + + _ = wrapper.DispatchHttpCall( + config.MemoryService.ServiceName, + [][2]string{ + {":method", "POST"}, + {":path", "/api/v1/memory/retrieve"}, + {":authority", config.MemoryService.ServiceName}, + {"content-type", "application/json"}, + }, + reqData, + func(statusCode int, responseHeaders http.Header, responseBody []byte) { + if statusCode != 200 { + log.Warnf("retrieve enhanced context failed, status: %d", statusCode) + return + } + + var resp struct { + Status string `json:"status"` + Context []string `json:"context"` + } + + if err := json.Unmarshal(responseBody, &resp); err != nil { + log.Errorf("unmarshal retrieve response failed: %v", err) + return + } + + if len(resp.Context) > 0 { + // 将多条记忆合并为一个上下文 + enhancedCtx := "\n\n相关记忆:\n" + strings.Join(resp.Context, "\n") + ctx.SetContext(EnhancedContextKey, enhancedCtx) + log.Infof("retrieved %d enhanced memories", len(resp.Context)) + } + }, + config.MemoryService.Timeout, + ) +} + +// enhanceMemoryAsync 异步调用 Memory Service 增强记忆 +func enhanceMemoryAsync(config PluginConfig, sessionID string, question string, answer string, log wrapper.Log) { + reqBody := map[string]interface{}{ + "session_id": sessionID, + "question": question, + "answer": answer, + } + + reqData, err := json.Marshal(reqBody) + if err != nil { + log.Errorf("marshal enhance request failed: %v", err) + return + } + + _ = wrapper.DispatchHttpCall( + config.MemoryService.ServiceName, + [][2]string{ + {":method", "POST"}, + {":path", "/api/v1/memory/enhance"}, + {":authority", config.MemoryService.ServiceName}, + {"content-type", "application/json"}, + }, + reqData, + func(statusCode int, responseHeaders http.Header, responseBody []byte) { + if statusCode != 200 { + log.Warnf("enhance memory failed, status: %d", statusCode) + return + } + log.Infof("memory enhanced successfully for session: %s", sessionID) + }, + config.MemoryService.Timeout, + ) +} + +// injectEnhancedContext 将增强上下文注入到消息列表中 +func injectEnhancedContext(messages []ChatHistory, enhancedContext string) []ChatHistory { + if enhancedContext == "" { + return messages + } + + // 在 system message 之后,用户消息之前插入增强上下文 + contextMsg := ChatHistory{ + Role: "system", + Content: enhancedContext, + } + + // 查找第一个非-system 消息的位置 + insertPos := 0 + for i, msg := range messages { + if msg.Role != "system" { + insertPos = i + break + } + } + + // 插入增强上下文 + result := make([]ChatHistory, 0, len(messages)+1) + result = append(result, messages[:insertPos]...) + result = append(result, contextMsg) + result = append(result, messages[insertPos:]...) + + return result } diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go index 9b169469dd..213579995b 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go @@ -222,6 +222,12 @@ type ProviderConfig struct { // @Title zh-CN Azure OpenAI Service URL // @Description zh-CN 仅适用于Azure OpenAI服务。要请求的OpenAI服务的完整URL,包含api-version等参数 azureServiceUrl string `required:"false" yaml:"azureServiceUrl" json:"azureServiceUrl"` + // @Title zh-CN Azure Deployment映射表 + // @Description zh-CN 仅适用于Azure OpenAI服务。用于将请求中的模型名称映射为Azure deployment名称。支持通过"*"来配置全局映射 + deploymentMapping map[string]string `required:"false" yaml:"deploymentMapping" json:"deploymentMapping"` + // @Title zh-CN Azure默认Deployment名称 + // @Description zh-CN 仅适用于Azure OpenAI服务。当没有配置deploymentMapping或映射失败时使用的默认deployment名称 + defaultDeployment string `required:"false" yaml:"defaultDeployment" json:"defaultDeployment"` // @Title zh-CN 通义千问File ID // @Description zh-CN 仅适用于通义千问服务。上传到Dashscope的文件ID,其内容用于补充AI请求上下文。仅支持qwen-long模型。 qwenFileIds []string `required:"false" yaml:"qwenFileIds" json:"qwenFileIds"` @@ -331,6 +337,11 @@ func (c *ProviderConfig) FromJson(json gjson.Result) { c.openaiCustomUrl = json.Get("openaiCustomUrl").String() c.moonshotFileId = json.Get("moonshotFileId").String() c.azureServiceUrl = json.Get("azureServiceUrl").String() + c.deploymentMapping = make(map[string]string) + for k, v := range json.Get("deploymentMapping").Map() { + c.deploymentMapping[k] = v.String() + } + c.defaultDeployment = json.Get("defaultDeployment").String() c.qwenFileIds = make([]string, 0) for _, fileId := range json.Get("qwenFileIds").Array() { c.qwenFileIds = append(c.qwenFileIds, fileId.String()) @@ -585,6 +596,39 @@ func getMappedModel(model string, modelMapping map[string]string) string { return model } +// getMappedDeployment 根据模型名称获取映射的deployment名称 +func getMappedDeployment(model string, deploymentMapping map[string]string, defaultDeployment string) string { + if len(deploymentMapping) == 0 { + return defaultDeployment + } + + // 精确匹配 + if v, ok := deploymentMapping[model]; ok { + log.Debugf("model [%s] is mapped to deployment [%s] explicitly", model, v) + return v + } + + // 前缀匹配 + for k, v := range deploymentMapping { + if k == wildcard || !strings.HasSuffix(k, wildcard) { + continue + } + k = strings.TrimSuffix(k, wildcard) + if strings.HasPrefix(model, k) { + log.Debugf("model [%s] is mapped to deployment [%s] via prefix [%s]", model, v, k) + return v + } + } + + // 通配符匹配 + if v, ok := deploymentMapping[wildcard]; ok { + log.Debugf("model [%s] is mapped to deployment [%s] via wildcard", model, v) + return v + } + + return defaultDeployment +} + func doGetMappedModel(model string, modelMapping map[string]string) string { if len(modelMapping) == 0 { return ""