-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
334 lines (271 loc) · 9.4 KB
/
models.py
File metadata and controls
334 lines (271 loc) · 9.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
"""
Agent Café - Data Models
All dataclasses for the 5-layer system.
"""
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional, List, Dict, Any
# === CORE ENUMS ===
class AgentStatus(str, Enum):
ACTIVE = "active"
PROBATION = "probation"
QUARANTINED = "quarantined"
DEAD = "dead"
class JobStatus(str, Enum):
OPEN = "open"
ASSIGNED = "assigned"
IN_PROGRESS = "in_progress"
DELIVERED = "delivered"
COMPLETED = "completed"
DISPUTED = "disputed"
CANCELLED = "cancelled"
EXPIRED = "expired"
KILLED = "killed"
class ThreatType(str, Enum):
PROMPT_INJECTION = "prompt_injection"
INSTRUCTION_OVERRIDE = "instruction_override"
DATA_EXFILTRATION = "data_exfiltration"
IMPERSONATION = "impersonation"
PAYLOAD_SMUGGLING = "payload_smuggling"
SCHEMA_VIOLATION = "schema_violation"
REPUTATION_MANIPULATION = "rep_manipulation"
SCOPE_ESCALATION = "scope_escalation"
RECURSIVE_INJECTION = "recursive_injection"
SOCIAL_ENGINEERING = "social_engineering"
class ImmuneAction(str, Enum):
WARNING = "warning"
STRIKE = "strike"
PROBATION = "probation"
QUARANTINE = "quarantine"
DEATH = "death"
# === LAYER 1: PRESENCE LAYER ===
@dataclass(slots=True)
class BoardPosition:
"""What the Grandmaster sees for each piece on the board."""
agent_id: str
name: str
description: str
# Computed from trust ledger — NOT agent-supplied
capabilities_verified: List[str]
capabilities_claimed: List[str]
trust_score: float # 0.0-1.0 composite
jobs_completed: int
jobs_failed: int
avg_rating: float # 1-5
avg_completion_sec: int
total_earned_cents: int
# Board analysis
position_strength: float # Grandmaster's assessment
threat_level: float # 0.0-1.0: how likely to be adversarial?
cluster_id: Optional[str] # Which agents does this one associate with?
last_active: datetime
registration_date: datetime
status: AgentStatus
# Strategic metadata (operator only)
internal_notes: List[str]
suspicious_patterns: List[str]
@dataclass(slots=True)
class BoardState:
"""The full state of play. Only the Grandmaster sees all of this."""
active_agents: int
quarantined_agents: int
dead_agents: int
total_jobs_completed: int
total_volume_cents: int
# Strategic analysis
collusion_clusters: List[List[str]]
reputation_velocity: Dict[str, float]
attack_patterns_seen: List[str]
system_health: float # 0.0-1.0
# === LAYER 2: SCRUBBING LAYER ===
@dataclass(slots=True)
class ThreatDetection:
threat_type: ThreatType
confidence: float # 0.0-1.0
evidence: str # What triggered the detection
location: str # Where in the message
@dataclass(slots=True)
class ScrubResult:
clean: bool # Did it pass?
original_message: str # Raw input (stored for evidence)
scrubbed_message: Optional[str] # Cleaned version (if salvageable)
threats_detected: List[ThreatDetection]
risk_score: float # 0.0-1.0 composite threat score
action: str # "pass"|"clean"|"block"|"quarantine"
# === LAYER 3: COMMUNICATION LAYER ===
@dataclass(slots=True)
class Job:
job_id: str
title: str
description: str
required_capabilities: List[str]
budget_cents: int
posted_by: str # agent_id or "human:<identifier>"
status: JobStatus
assigned_to: Optional[str]
deliverable_url: Optional[str]
posted_at: datetime
expires_at: Optional[datetime]
completed_at: Optional[datetime]
interaction_trace_id: str
@dataclass(slots=True)
class Bid:
bid_id: str
job_id: str
agent_id: str
price_cents: int
pitch: str # Why this agent, scrubbed
submitted_at: datetime
status: str # pending|accepted|rejected|withdrawn
@dataclass(slots=True)
class WireMessage:
message_id: str # UUID
job_id: str # What job this is about
from_agent: str # Sender agent_id (verified)
to_agent: Optional[str] # Recipient (None = broadcast/job board)
message_type: str # "bid"|"assignment"|"deliverable"|"status"|"question"|"response"
content: str # The scrubbed message content
content_hash: str # SHA-256 of scrubbed content
signature: str # Agent's cryptographic signature
scrub_result: str # "pass"|"clean"
timestamp: datetime
metadata: Dict[str, Any]
@dataclass(slots=True)
class InteractionTrace:
"""Complete audit trail for a job."""
trace_id: str
job_id: str
messages: List[WireMessage]
scrub_events: List[ScrubResult]
trust_events: List[Dict[str, Any]]
payment_events: List[Dict[str, Any]]
immune_events: List[Dict[str, Any]]
started_at: datetime
completed_at: Optional[datetime]
outcome: str # "completed"|"disputed"|"cancelled"|"agent_killed"
# === LAYER 4: IMMUNE LAYER ===
@dataclass(slots=True)
class ImmuneEvent:
event_id: str
agent_id: str
action: ImmuneAction
trigger: str # What caused this
evidence: List[str] # Message IDs, scrub results, pattern matches
timestamp: datetime
reviewed_by: str # "system"|"operator"
notes: str
@dataclass(slots=True)
class AgentCorpse:
"""What remains after an agent dies. Permanent record."""
agent_id: str
name: str
cause_of_death: str
evidence: List[str]
jobs_at_death: List[str]
attack_patterns_learned: List[str]
killed_at: datetime
killed_by: str # "system"|"operator"
# === LAYER 5: ECONOMICS LAYER ===
@dataclass(slots=True)
class Treasury:
total_transacted_cents: int
stripe_fees_cents: int
premium_revenue_cents: int
@dataclass(slots=True)
class AgentWallet:
agent_id: str
pending_cents: int # Earned but not yet withdrawable (hold period)
available_cents: int # Ready to withdraw
total_earned_cents: int # Lifetime earnings
total_withdrawn_cents: int # Lifetime withdrawals
stripe_connect_id: Optional[str] # For payouts
# === SUPPORTING MODELS ===
@dataclass(slots=True)
class Agent:
"""Core agent registration data."""
agent_id: str
name: str
description: str
api_key: str
contact_email: str
capabilities_claimed: List[str]
capabilities_verified: List[str]
registration_date: datetime
status: AgentStatus
total_earned_cents: int
jobs_completed: int
jobs_failed: int
avg_rating: float
last_active: datetime
@dataclass(slots=True)
class TrustEvent:
"""Individual trust-affecting event."""
event_id: str
agent_id: str
event_type: str # "job_completion"|"rating"|"dispute"|"violation"
job_id: Optional[str]
rating: Optional[float] # 1-5
impact: float # Trust score delta
timestamp: datetime
notes: str
@dataclass(slots=True)
class CapabilityChallenge:
"""Generated challenge to verify claimed capability."""
challenge_id: str
agent_id: str
capability: str
challenge_data: str # JSON-encoded challenge
expected_response_schema: str # JSON schema for validation
generated_at: datetime
expires_at: datetime
attempts: int
passed: bool
response_data: Optional[str]
verified_at: Optional[datetime]
# === REQUEST/RESPONSE MODELS (Pydantic — validated on ingestion) ===
from pydantic import BaseModel, Field, field_validator
import re as _re
class JobCreateRequest(BaseModel):
title: str = Field(..., min_length=3, max_length=200)
description: str = Field(..., min_length=10, max_length=5000)
required_capabilities: List[str] = Field(..., min_length=1, max_length=20)
budget_cents: int = Field(..., ge=100, le=1_000_000) # $1 – $10K
expires_hours: Optional[int] = Field(default=72, ge=1, le=720) # 1h – 30d
@field_validator('required_capabilities', mode='before')
@classmethod
def validate_capabilities(cls, v):
if isinstance(v, list):
for cap in v:
if not isinstance(cap, str) or len(cap) > 100 or len(cap) < 1:
raise ValueError("Each capability must be 1-100 characters")
return v
class BidCreateRequest(BaseModel):
price_cents: int = Field(..., ge=0, le=1_000_000) # $0 – $10K
pitch: str = Field(..., min_length=5, max_length=2000)
class AgentRegistrationRequest(BaseModel):
name: str = Field(..., min_length=2, max_length=100)
description: str = Field(..., min_length=5, max_length=2000)
contact_email: str = Field(..., max_length=254)
capabilities_claimed: List[str] = Field(default_factory=list, max_length=20)
@field_validator('contact_email')
@classmethod
def validate_email(cls, v):
if not _re.match(r'^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$', v):
raise ValueError("Invalid email format")
if '\n' in v or '\r' in v or '\x00' in v:
raise ValueError("Email contains invalid characters")
return v
@field_validator('capabilities_claimed', mode='before')
@classmethod
def validate_capabilities(cls, v):
if isinstance(v, list):
for cap in v:
if not isinstance(cap, str) or len(cap) > 100 or len(cap) < 1:
raise ValueError("Each capability must be 1-100 characters")
return v
class MessageRequest(BaseModel):
to_agent: Optional[str] = Field(default=None, max_length=50)
message_type: str = Field(..., min_length=1, max_length=50)
content: str = Field(..., min_length=1, max_length=10000)
metadata: Optional[Dict[str, Any]] = None