-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemos_rest_client.py
More file actions
144 lines (119 loc) · 5.49 KB
/
memos_rest_client.py
File metadata and controls
144 lines (119 loc) · 5.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
memOS REST API Client Service
Provides a clean interface for interacting with memOS REST API endpoints
"""
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime
import aiohttp
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
class MemoryEntity(BaseModel):
"""Represents a memory entity in memOS"""
id: Optional[str] = None
name: str
entity_type: str = Field(alias="entityType")
observations: List[str] = []
created_at: Optional[datetime] = Field(None, alias="createdAt")
updated_at: Optional[datetime] = Field(None, alias="updatedAt")
class MemoryRelation(BaseModel):
"""Represents a relation between memory entities"""
from_entity: str = Field(alias="from")
to_entity: str = Field(alias="to")
relation_type: str = Field(alias="relationType")
metadata: Dict[str, Any] = {}
class MemOSRestClient:
"""REST API client for memOS memory management system"""
def __init__(self, base_url: str, api_key: Optional[str] = None):
"""
Initialize memOS REST client
Args:
base_url: Base URL for memOS REST API
api_key: Optional API key for authentication
"""
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.session = None
async def __aenter__(self):
"""Async context manager entry"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.close()
async def connect(self):
"""Create aiohttp session"""
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
self.session = aiohttp.ClientSession(
base_url=self.base_url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
)
async def close(self):
"""Close aiohttp session"""
if self.session:
await self.session.close()
self.session = None
async def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
"""Make HTTP request to memOS API"""
if not self.session:
await self.connect()
url = f"{self.base_url}{endpoint}"
try:
async with self.session.request(method, endpoint, **kwargs) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"memOS API request failed: {e}")
raise
async def create_entity(self, entity: MemoryEntity) -> MemoryEntity:
"""Create a new memory entity"""
data = entity.model_dump(by_alias=True, exclude_none=True)
response = await self._make_request("POST", "/api/v1/entities", json=data)
return MemoryEntity(**response)
async def get_entity(self, entity_id: str) -> MemoryEntity:
"""Get a memory entity by ID"""
response = await self._make_request("GET", f"/api/v1/entities/{entity_id}")
return MemoryEntity(**response)
async def update_entity(self, entity_id: str, entity: MemoryEntity) -> MemoryEntity:
"""Update an existing memory entity"""
data = entity.model_dump(by_alias=True, exclude_none=True)
response = await self._make_request("PUT", f"/api/v1/entities/{entity_id}", json=data)
return MemoryEntity(**response)
async def delete_entity(self, entity_id: str) -> bool:
"""Delete a memory entity"""
await self._make_request("DELETE", f"/api/v1/entities/{entity_id}")
return True
async def search_entities(self, query: str, entity_type: Optional[str] = None) -> List[MemoryEntity]:
"""Search for memory entities"""
params = {"q": query}
if entity_type:
params["type"] = entity_type
response = await self._make_request("GET", "/api/v1/entities/search", params=params)
return [MemoryEntity(**entity) for entity in response.get("entities", [])]
async def add_observations(self, entity_id: str, observations: List[str]) -> MemoryEntity:
"""Add observations to an existing entity"""
data = {"observations": observations}
response = await self._make_request(
"POST",
f"/api/v1/entities/{entity_id}/observations",
json=data
)
return MemoryEntity(**response)
async def create_relation(self, relation: MemoryRelation) -> Dict[str, Any]:
"""Create a relation between entities"""
data = relation.model_dump(by_alias=True)
return await self._make_request("POST", "/api/v1/relations", json=data)
async def get_relations(self, entity_id: str) -> List[MemoryRelation]:
"""Get all relations for an entity"""
response = await self._make_request("GET", f"/api/v1/entities/{entity_id}/relations")
return [MemoryRelation(**rel) for rel in response.get("relations", [])]
async def search_memories(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search across all memories"""
params = {"q": query, "limit": limit}
response = await self._make_request("GET", "/api/v1/search", params=params)
return response.get("results", [])