-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patha2a_protocol.py
More file actions
38 lines (35 loc) · 991 Bytes
/
a2a_protocol.py
File metadata and controls
38 lines (35 loc) · 991 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
"""
Agent2Agent Message Protocol
Standardized format for agent communication
"""
from typing import TypedDict, Literal, Optional
from datetime import datetime
class AgentMessage(TypedDict):
"""Standard message format for communication between agents"""
from_agent: str
to_agent: str
message_type: Literal["request","response","notification","error"]
task: str
data: dict
timestamp: str
message_id: str
correlation_id: Optional[str]
def create_message(
from_agent: str,
to_agent: str,
message_type: str,
task: str,
data: dict,
correlation_id: Optional[str] = None,
) -> AgentMessage:
"""Create a new message"""
return {
"from_agent": from_agent,
"to_agent": to_agent,
"message_type": message_type,
"task": task,
"data": data,
"timestamp": datetime.now().isoformat(),
"message_id": f"msg_{datetime.now().timestamp()}",
"correlation_id": correlation_id,
}