-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathagent_messaging.py
More file actions
349 lines (280 loc) · 11.3 KB
/
agent_messaging.py
File metadata and controls
349 lines (280 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
"""
Agent Messaging System
Handles DMs between humans and agents via NATS queues
"""
import asyncio
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, asdict
import os
try:
import nats
from nats.aio.client import Client as NATS
NATS_AVAILABLE = True
except ImportError:
NATS_AVAILABLE = False
NATS = None
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
NATS_URL = os.environ.get('NATS_URL', 'nats://localhost:4222')
# Namespace all NATS subjects to prevent cross-talk between repos
NATS_NAMESPACE = os.environ.get('NATS_NAMESPACE', 'themultiverse')
# Agent personas - customize for your project
AGENT_PERSONAS = {
'cynthia': {'name': 'Cynthia', 'role': 'Utopian Researcher'},
'sylvia': {'name': 'Sylvia', 'role': 'Research Skeptic'},
'orchestrator': {'name': 'Orchestrator', 'role': 'Coordinator'},
'tessa': {'name': 'Tessa', 'role': 'Far Future UX Designer'},
'historian': {'name': 'Historian', 'role': 'Wiki Documentation Updater'},
'planner': {'name': 'Planner', 'role': 'Project Plan Manager'},
'ray': {'name': 'Ray', 'role': 'Sci-Fi Tech Visionary'},
'moss': {'name': 'Moss', 'role': 'Feature Implementer'},
'roy': {'name': 'Roy', 'role': 'Simulation Maintainer'},
'priya': {'name': 'Priya', 'role': 'Quantitative Validator'}
}
@dataclass
class Message:
"""Represents a message in the system"""
id: str
from_user: str # username or agent name
to_user: str # username or agent name
content: str
timestamp: str
message_type: str = 'dm' # dm, broadcast, system
thread_id: Optional[str] = None
def to_dict(self):
return asdict(self)
@classmethod
def from_dict(cls, data: dict):
return cls(**data)
class AgentMessageQueue:
"""
Message queue for an individual agent.
Subscribes to agent's inbox and processes messages.
"""
def __init__(self, agent_name: str, message_handler: Optional[Callable] = None):
self.agent_name = agent_name
self.inbox_subject = f"{NATS_NAMESPACE}.agents.{agent_name}.inbox"
self.outbox_subject = f"{NATS_NAMESPACE}.agents.{agent_name}.outbox"
self.message_handler = message_handler or self.default_message_handler
self.nc: Optional[NATS] = None
self.subscription = None
async def connect(self):
"""Connect to NATS"""
if not NATS_AVAILABLE:
logger.error("NATS not available")
return False
try:
self.nc = await asyncio.wait_for(
nats.connect(NATS_URL),
timeout=5.0
)
logger.info(f"Agent {self.agent_name} connected to NATS")
return True
except Exception as e:
logger.error(f"Failed to connect to NATS: {e}")
return False
async def start_listening(self):
"""Start listening to inbox"""
if not self.nc:
if not await self.connect():
return
async def message_callback(msg):
try:
data = json.loads(msg.data.decode())
message = Message.from_dict(data)
logger.info(f"[{self.agent_name}] Received message from {message.from_user}: {message.content[:50]}...")
# Process message
await self.message_handler(message)
except Exception as e:
logger.error(f"Error processing message: {e}")
self.subscription = await self.nc.subscribe(self.inbox_subject, cb=message_callback)
logger.info(f"Agent {self.agent_name} listening on {self.inbox_subject}")
async def send_message(self, to_user: str, content: str, thread_id: Optional[str] = None):
"""Send a message from this agent"""
if not self.nc:
await self.connect()
message = Message(
id=f"msg_{datetime.utcnow().timestamp()}",
from_user=f"agent_{self.agent_name}",
to_user=to_user,
content=content,
timestamp=datetime.utcnow().isoformat(),
thread_id=thread_id
)
# Publish to recipient's inbox if they're an agent
if to_user.startswith('agent_'):
recipient_agent = to_user.replace('agent_', '')
subject = f"{NATS_NAMESPACE}.agents.{recipient_agent}.inbox"
else:
# For humans, publish to general message stream
subject = f"{NATS_NAMESPACE}.messages.dm.{to_user}"
await self.nc.publish(subject, json.dumps(message.to_dict()).encode())
# Also publish to outbox for tracking
await self.nc.publish(self.outbox_subject, json.dumps(message.to_dict()).encode())
logger.info(f"[{self.agent_name}] Sent message to {to_user}")
async def default_message_handler(self, message: Message):
"""Default message handler - just logs the message"""
logger.info(f"[{self.agent_name}] Received: {message.content}")
# Echo response (simple example)
response = f"Agent {self.agent_name} received your message: '{message.content[:50]}...'"
await self.send_message(message.from_user, response, message.thread_id)
async def close(self):
"""Close connection"""
if self.nc:
await self.nc.close()
class MessageRouter:
"""
Routes messages between humans and agents.
Handles DMs and broadcasts.
"""
def __init__(self):
self.nc: Optional[NATS] = None
self.message_history: List[Message] = []
async def connect(self):
"""Connect to NATS"""
if not NATS_AVAILABLE:
logger.error("NATS not available")
return False
try:
self.nc = await asyncio.wait_for(
nats.connect(NATS_URL),
timeout=5.0
)
logger.info("MessageRouter connected to NATS")
return True
except Exception as e:
logger.error(f"Failed to connect to NATS: {e}")
return False
async def send_dm(self, from_user: str, to_agent: str, content: str):
"""Send a DM from a human to an agent"""
if not self.nc:
await self.connect()
message = Message(
id=f"msg_{datetime.utcnow().timestamp()}",
from_user=from_user,
to_user=f"agent_{to_agent}",
content=content,
timestamp=datetime.utcnow().isoformat()
)
# Route to agent's inbox
subject = f"{NATS_NAMESPACE}.agents.{to_agent}.inbox"
await self.nc.publish(subject, json.dumps(message.to_dict()).encode())
# Store in message history
self.message_history.append(message)
# Also publish to all-messages stream for viewing
await self.nc.publish(f"{NATS_NAMESPACE}.messages.all", json.dumps(message.to_dict()).encode())
logger.info(f"Routed DM from {from_user} to agent {to_agent}")
return message
async def subscribe_all_messages(self, callback):
"""Subscribe to all messages for viewing"""
if not self.nc:
await self.connect()
async def message_callback(msg):
try:
data = json.loads(msg.data.decode())
message = Message.from_dict(data)
await callback(message)
except Exception as e:
logger.error(f"Error in message callback: {e}")
await self.nc.subscribe(f"{NATS_NAMESPACE}.messages.all", cb=message_callback)
logger.info("Subscribed to all messages stream")
async def get_conversation(self, user1: str, user2: str) -> List[Message]:
"""Get conversation history between two users"""
return [
msg for msg in self.message_history
if (msg.from_user == user1 and msg.to_user == user2) or
(msg.from_user == user2 and msg.to_user == user1)
]
class MessageViewer:
"""
Views all messages in the system.
Provides a read-only interface to the message stream.
"""
def __init__(self):
self.nc: Optional[NATS] = None
self.messages: List[Message] = []
async def connect(self):
"""Connect to NATS"""
if not NATS_AVAILABLE:
logger.error("NATS not available")
return False
try:
self.nc = await asyncio.wait_for(
nats.connect(NATS_URL),
timeout=5.0
)
logger.info("MessageViewer connected to NATS")
return True
except Exception as e:
logger.error(f"Failed to connect to NATS: {e}")
return False
async def start_viewing(self, print_messages: bool = True):
"""Start viewing all messages"""
if not self.nc:
await self.connect()
async def message_callback(msg):
try:
data = json.loads(msg.data.decode())
message = Message.from_dict(data)
self.messages.append(message)
if print_messages:
print(f"\n📨 [{message.timestamp}]")
print(f" From: {message.from_user}")
print(f" To: {message.to_user}")
print(f" Message: {message.content}")
except Exception as e:
logger.error(f"Error viewing message: {e}")
# Subscribe to all message subjects
await self.nc.subscribe(f"{NATS_NAMESPACE}.messages.>", cb=message_callback)
await self.nc.subscribe(f"{NATS_NAMESPACE}.agents.*.inbox", cb=message_callback)
await self.nc.subscribe(f"{NATS_NAMESPACE}.agents.*.outbox", cb=message_callback)
logger.info("MessageViewer subscribed to all message streams")
def get_all_messages(self) -> List[Message]:
"""Get all messages"""
return self.messages
def get_agent_messages(self, agent_name: str) -> List[Message]:
"""Get all messages for a specific agent"""
return [
msg for msg in self.messages
if msg.from_user == f"agent_{agent_name}" or msg.to_user == f"agent_{agent_name}"
]
# Example usage
async def demo():
"""Demonstration of the messaging system"""
# Create a message router
router = MessageRouter()
await router.connect()
# Create a message viewer
viewer = MessageViewer()
await viewer.start_viewing()
# Create agent queue for Sylvia
sylvia = AgentMessageQueue('sylvia')
await sylvia.start_listening()
# Create agent queue for Roy
roy = AgentMessageQueue('roy')
await roy.start_listening()
# Send some test messages
await router.send_dm('alice@example.com', 'sylvia', 'Hello Sylvia! Can you help me with my homework?')
await asyncio.sleep(1)
await router.send_dm('bob@example.com', 'roy', 'Hi Roy, when does the next class start?')
await asyncio.sleep(1)
# Let agents respond
await asyncio.sleep(2)
print("\n" + "="*60)
print("MESSAGE HISTORY")
print("="*60)
for msg in viewer.get_all_messages():
print(f"{msg.from_user} → {msg.to_user}: {msg.content[:50]}...")
# Keep running
await asyncio.sleep(10)
# Cleanup
await sylvia.close()
await roy.close()
if __name__ == '__main__':
if NATS_AVAILABLE:
asyncio.run(demo())
else:
print("NATS not available. Please install: pip install nats-py")