-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemos_chat_client.py
More file actions
136 lines (112 loc) · 4.25 KB
/
memos_chat_client.py
File metadata and controls
136 lines (112 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""
meMOS Chat API Client
Simplified client for interacting with meMOS chat endpoint
"""
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime
import aiohttp
logger = logging.getLogger(__name__)
class MemOSChatClient:
"""Chat-focused client for meMOS memory management system"""
def __init__(self, base_url: str, user_id: str = "echo_mind_user"):
"""
Initialize meMOS chat client
Args:
base_url: Base URL for meMOS REST API
user_id: User ID for memory isolation
"""
self.base_url = base_url.rstrip('/')
self.user_id = user_id
self.session = None
async def __aenter__(self):
"""Async context manager entry"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.close()
async def connect(self):
"""Create aiohttp session"""
self.session = aiohttp.ClientSession(
headers={"Content-Type": "application/json"},
timeout=aiohttp.ClientTimeout(total=30)
)
async def close(self):
"""Close aiohttp session"""
if self.session:
await self.session.close()
self.session = None
async def chat(self, query: str) -> Dict[str, Any]:
"""
Send a chat query to meMOS
Args:
query: The user's message/query
Returns:
Response from meMOS including the AI response
"""
if not self.session:
await self.connect()
url = f"{self.base_url}/chat"
data = {
"query": query,
"user_id": self.user_id
}
try:
async with self.session.post(url, json=data) as response:
result = await response.json()
if response.status == 200 and result.get("code") == 200:
return {
"success": True,
"response": result.get("data", ""),
"timestamp": datetime.now().isoformat()
}
else:
logger.error(f"meMOS chat error: {result}")
return {
"success": False,
"error": result.get("message", "Unknown error"),
"timestamp": datetime.now().isoformat()
}
except aiohttp.ClientError as e:
logger.error(f"meMOS API request failed: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def search_memories(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""
Search memories through chat interface
Args:
query: Search query
limit: Maximum results to return
Returns:
List of relevant memories
"""
# Use chat endpoint to search memories by asking directly
search_query = f"Based on our previous conversations, tell me what you remember about: {query}"
result = await self.chat(search_query)
if result.get("success"):
# Parse the response to extract memory-like information
return [{
"content": result.get("response", ""),
"timestamp": result.get("timestamp"),
"relevance": 1.0
}]
return []
async def store_memory(self, user_input: str, assistant_response: str) -> bool:
"""
Store a conversation in memory via chat
Args:
user_input: The user's message
assistant_response: The assistant's response
Returns:
Success status
"""
# The chat endpoint automatically stores conversations
# We just need to send the user input and get a response
result = await self.chat(user_input)
return result.get("success", False)