-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcrewai_tool.py
More file actions
134 lines (106 loc) · 4.72 KB
/
crewai_tool.py
File metadata and controls
134 lines (106 loc) · 4.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
CrewAI tool for Headless Oracle V5.
Usage:
from integrations.crewai_tool import MarketStatusTool
from crewai import Agent, Task, Crew
oracle_tool = MarketStatusTool(api_key="ok_live_...")
trader_agent = Agent(
role="Trading Agent",
goal="Execute trades only when markets are open and verified",
tools=[oracle_tool],
backstory="A cautious trading agent that verifies market status before any action.",
)
Requires:
pip install "headless-oracle[crewai]"
"""
from __future__ import annotations
from typing import Optional, Type
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from headless_oracle import OracleClient, verify
class _MicInput(BaseModel):
mic: str = Field(
default="XNYS",
description="Exchange MIC code. One of: XNYS, XNAS, XLON, XJPX, XPAR, XHKG, XSES.",
)
class MarketStatusTool(BaseTool):
"""
CrewAI tool that fetches a cryptographically signed market status receipt
from Headless Oracle V5 and verifies the Ed25519 signature.
Returns OPEN, CLOSED, HALTED, or UNKNOWN. UNKNOWN must be treated as CLOSED.
"""
name: str = "get_market_status"
description: str = (
"Prevent your code from executing trades against closed, halted, or holiday-closed markets. "
"Fetch the current open/closed status of a stock exchange from Headless Oracle. "
"Provides a cryptographically signed, tamper-proof receipt. "
"Input is a MIC code (e.g. XNYS = NYSE, XNAS = NASDAQ, XLON = LSE). "
"UNKNOWN must always be treated as CLOSED — halt all trade execution."
)
args_schema: Type[BaseModel] = _MicInput
api_key: Optional[str] = None
public_key: Optional[str] = None
base_url: str = "https://headlessoracle.com"
def _run(self, mic: str = "XNYS") -> str:
with OracleClient(api_key=self.api_key, base_url=self.base_url) as client:
try:
receipt = client.get_demo(mic) if not self.api_key else client.get_status(mic)
except Exception as e:
return (
f"ORACLE_ERROR: could not fetch market status for {mic}. "
f"Error: {e}. Treat as CLOSED and halt all execution."
)
result = verify(receipt, public_key=self.public_key)
if not result.valid:
return (
f"SIGNATURE_INVALID: receipt for {mic} failed verification "
f"(reason: {result.reason}). Treat as CLOSED and halt all execution."
)
status = receipt["status"]
lines = [
f"Exchange: {mic}",
f"Status: {status}",
f"Safe to trade: {'YES' if status == 'OPEN' else 'NO'}",
f"Source: {receipt['source']}",
f"Issued at: {receipt['issued_at']}",
f"Expires at: {receipt['expires_at']}",
]
if receipt.get("reason"):
lines.append(f"Override reason: {receipt['reason']}")
if status == "UNKNOWN":
lines.append("WARNING: UNKNOWN status — treat as CLOSED, halt all execution.")
return "\n".join(lines)
class _BatchInput(BaseModel):
mics: str = Field(
description="Comma-separated MIC codes to check (e.g. 'XNYS,XNAS,XLON').",
)
class BatchMarketStatusTool(BaseTool):
"""
CrewAI tool that checks multiple exchanges in a single authenticated request.
Useful for multi-market trading agents that need consistent point-in-time status.
"""
name: str = "get_batch_market_status"
description: str = (
"Fetch signed market status for multiple exchanges in one request. "
"Input: comma-separated MIC codes (e.g. 'XNYS,XNAS,XLON'). "
"Returns independent signed receipts for each exchange. "
"UNKNOWN for any exchange means halt execution for that exchange."
)
args_schema: Type[BaseModel] = _BatchInput
api_key: str = ""
public_key: Optional[str] = None
base_url: str = "https://headlessoracle.com"
def _run(self, mics: str) -> str:
mic_list = [m.strip() for m in mics.split(",")]
with OracleClient(api_key=self.api_key, base_url=self.base_url) as client:
try:
data = client.get_batch(mic_list)
except Exception as e:
return f"BATCH_ERROR: {e}. Treat all exchanges as CLOSED."
lines = []
for receipt in data.get("receipts", []):
result = verify(receipt, public_key=self.public_key)
status = receipt["status"] if result.valid else "UNKNOWN"
safe = "YES" if status == "OPEN" else "NO"
lines.append(f"{receipt['mic']}: {status} (safe_to_trade={safe})")
return "\n".join(lines) if lines else "No receipts returned."