-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_server_memory.py
More file actions
88 lines (69 loc) · 3.15 KB
/
test_server_memory.py
File metadata and controls
88 lines (69 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python3
"""
Test script to verify memory integration in the server context
"""
import asyncio
import json
import os
import sys
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Add the current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from memory_service import EchoMindMemoryService
async def test_server_memory():
"""Test memory service in server context with environment variables"""
print("🧠 Testing Echo Mind Memory Service in Server Context")
print("=" * 60)
# Check environment variables
openai_key = os.getenv("OPENAI_API_KEY")
print(f"OPENAI_API_KEY present: {'✅ Yes' if openai_key else '❌ No'}")
if openai_key:
print(f"API Key (first 10 chars): {openai_key[:10]}...")
# Initialize memory service
print("\n1. Initializing memory service...")
memory_service = EchoMindMemoryService()
# Get stats
stats = memory_service.get_memory_stats()
print(f" Status: {stats['status']}")
print(f" User ID: {stats.get('user_id', 'N/A')}")
print(f" Persistent Storage: {stats.get('persistent_storage', False)}")
if stats['status'] == 'initialized_with_memos':
print(" ✅ Real MemoryOS is working!")
# Test storing a conversation
print("\n2. Testing conversation storage...")
success = await memory_service.store_conversation_turn(
user_input="Hello, I'm testing the memory system",
assistant_response="Great! I'll remember this conversation in MemoryOS.",
metadata={"test": True}
)
print(f" Storage success: {'✅ Yes' if success else '❌ No'}")
# Test memory search
print("\n3. Testing memory search...")
memories = await memory_service.search_memories("testing memory", limit=2)
print(f" Found {len(memories)} memories")
# Test context building
print("\n4. Testing context building...")
context = await memory_service.build_context_prompt("Tell me about our conversation")
print(f" Context length: {len(context)} characters")
print("\n✅ MemoryOS integration is working perfectly!")
elif stats['status'] == 'initialized_fallback':
print(" ⚠️ Using fallback memory (MemoryOS not available)")
print(" This means:")
print(" - Memory works during session")
print(" - Memory is lost on server restart")
print(" - No persistent storage")
else:
print(" ❌ Memory service failed to initialize")
return stats['status'] == 'initialized_with_memos'
if __name__ == "__main__":
result = asyncio.run(test_server_memory())
if result:
print("\n🎉 Real MemoryOS with persistent storage is active!")
else:
print("\n⚠️ Using fallback memory system")
print("To enable persistent MemoryOS:")
print("1. Ensure OPENAI_API_KEY is set in environment")
print("2. Restart the Echo Mind server")
print("3. MemoryOS will automatically store conversations persistently")