-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemory_service.py
More file actions
246 lines (217 loc) · 10.2 KB
/
memory_service.py
File metadata and controls
246 lines (217 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""
Memory Service for Echo Mind
Provides unified interface for memory operations with memOS REST API integration
Maintains backward compatibility with local file storage
"""
import json
import os
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
from memos_rest_client import MemOSRestClient, MemoryEntity
from memos_chat_client import MemOSChatClient
logger = logging.getLogger(__name__)
class MemoryService:
def __init__(self, use_memos: bool = False, memos_url: str = None, memos_api_key: str = None, user_id: str = "echo_mind_user"):
"""
Initialize memory service with configurable backend
Args:
use_memos: Whether to use memOS REST API (default: False for backward compatibility)
memos_url: memOS REST API base URL
memos_api_key: memOS API key for authentication
"""
self.use_memos = use_memos
self.memos_client = None
self.chat_client = None
self.user_id = user_id
# Local storage (backward compatibility)
self.conversations = []
self.entities = {}
self.memory_file = "echo_mind_memory.json"
self.memory_type = "memos_rest" if use_memos else "local_file"
self.persistent_storage = True
if use_memos and memos_url:
# Use the chat client for meMOS integration
self.chat_client = MemOSChatClient(memos_url, user_id=user_id)
self.memory_type = "memos_chat"
logger.info("✅ Memory service initialized with meMOS Chat API")
else:
# Load existing local memory if available
self._load_memory()
logger.info(f"✅ Memory service initialized with {self.memory_type}")
async def __aenter__(self):
"""Async context manager entry"""
if self.chat_client:
await self.chat_client.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.chat_client:
await self.chat_client.close()
def _load_memory(self):
"""Load memory from local file (backward compatibility)"""
if self.use_memos:
return
try:
if os.path.exists(self.memory_file):
with open(self.memory_file, 'r') as f:
data = json.load(f)
self.conversations = data.get('conversations', [])
self.entities = data.get('entities', {})
logger.info(f"Loaded {len(self.conversations)} conversations from local memory")
except Exception as e:
logger.warning(f"Could not load memory file: {e}")
self.conversations = []
self.entities = {}
def _save_memory(self):
"""Save memory to local file (backward compatibility)"""
if self.use_memos:
return
try:
data = {
'conversations': self.conversations,
'entities': self.entities,
'last_updated': datetime.now().isoformat()
}
with open(self.memory_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Could not save memory: {e}")
async def store_conversation(self, user_input: str, assistant_response: str, metadata: Dict[str, Any] = None):
"""Store a conversation exchange"""
conversation = {
'timestamp': datetime.now().isoformat(),
'user_input': user_input,
'assistant_response': assistant_response,
'metadata': metadata or {}
}
if self.use_memos and self.chat_client:
# Store in meMOS via chat - the chat endpoint automatically stores conversations
try:
# Simply chatting with meMOS stores the conversation
result = await self.chat_client.chat(user_input)
if result.get("success"):
logger.info(f"Stored conversation in meMOS: {user_input[:50]}...")
else:
logger.error(f"Failed to store in meMOS: {result.get('error')}")
# Fallback to local storage
self.conversations.append(conversation)
except Exception as e:
logger.error(f"Failed to store in meMOS: {e}")
# Fallback to local storage
self.conversations.append(conversation)
else:
# Store locally
self.conversations.append(conversation)
await self._extract_entities(user_input, assistant_response)
self._save_memory()
logger.info(f"Stored conversation locally: {user_input[:50]}...")
async def _extract_entities(self, user_input: str, assistant_response: str):
"""Simple entity extraction (local storage only)"""
if self.use_memos:
return
text = f"{user_input} {assistant_response}".lower()
# Extract common entities
if 'weather' in text:
self.entities['weather_queries'] = self.entities.get('weather_queries', 0) + 1
if 'time' in text or 'date' in text:
self.entities['time_queries'] = self.entities.get('time_queries', 0) + 1
if 'search' in text or 'find' in text:
self.entities['search_queries'] = self.entities.get('search_queries', 0) + 1
async def search_memory(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Search through stored conversations"""
if self.use_memos and self.chat_client:
try:
results = await self.chat_client.search_memories(query, limit)
return results
except Exception as e:
logger.error(f"Failed to search meMOS: {e}")
return []
else:
# Local search
query_lower = query.lower()
results = []
for conv in self.conversations:
if (query_lower in conv['user_input'].lower() or
query_lower in conv['assistant_response'].lower()):
results.append({
'timestamp': conv['timestamp'],
'user_input': conv['user_input'],
'assistant_response': conv['assistant_response'],
'relevance_score': 1.0
})
results.sort(key=lambda x: x['timestamp'], reverse=True)
return results[:limit]
async def get_conversation_context(self, limit: int = 5) -> str:
"""Get recent conversation context"""
if self.use_memos and self.chat_client:
try:
# Ask meMOS for conversation context
context_query = f"What are the last {limit} conversations we had? Summarize them briefly."
result = await self.chat_client.chat(context_query)
if result.get("success"):
return result.get("response", "No previous conversations found.")
else:
return "No previous conversations found."
except Exception as e:
logger.error(f"Failed to get context from meMOS: {e}")
return "No previous conversations found."
else:
# Local context
if not self.conversations:
return "No previous conversations found."
recent_conversations = self.conversations[-limit:]
context_parts = []
for conv in recent_conversations:
context_parts.append(f"User: {conv['user_input']}")
context_parts.append(f"Assistant: {conv['assistant_response']}")
return "\n".join(context_parts)
async def get_memory_stats(self) -> Dict[str, Any]:
"""Get memory statistics"""
if self.use_memos and self.chat_client:
try:
# Get stats via chat
stats_query = "How many conversations have we had? What do you remember about me?"
result = await self.chat_client.chat(stats_query)
return {
'memory_type': self.memory_type,
'persistent_storage': True,
'backend': 'meMOS Chat API',
'status': 'connected' if result.get('success') else 'error',
'summary': result.get('response', 'Unable to get stats') if result.get('success') else result.get('error')
}
except Exception as e:
logger.error(f"Failed to get stats from meMOS: {e}")
return {'error': str(e)}
else:
# Local stats
return {
'total_conversations': len(self.conversations),
'entities_tracked': len(self.entities),
'memory_type': self.memory_type,
'persistent_storage': self.persistent_storage,
'entities': self.entities
}
async def clear_memory(self):
"""Clear all stored memory"""
if self.use_memos and self.memos_client:
try:
# Note: memOS doesn't support bulk deletion via REST API
# This would need to be handled differently in production
logger.warning("Clearing memory not supported for memOS REST API")
except Exception as e:
logger.error(f"Failed to clear memOS memory: {e}")
else:
# Clear local memory
self.conversations = []
self.entities = {}
if os.path.exists(self.memory_file):
os.remove(self.memory_file)
logger.info("Local memory cleared")
async def initialize_memos_connection(self, url: str, api_key: str = None):
"""Initialize meMOS connection after initialization"""
self.chat_client = MemOSChatClient(url, user_id=self.user_id)
self.use_memos = True
self.memory_type = "memos_chat"
await self.chat_client.connect()
logger.info("Connected to meMOS Chat API")