Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ line-length = 100
src = ["src", "tests"]

[tool.ruff.lint]
isort.known-first-party = ["src"]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
Expand Down Expand Up @@ -241,6 +242,7 @@ exclude = '''
[tool.isort]
profile = "black"
line_length = 100
known_first_party = ["src"]
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
Expand Down
Binary file added src/__pycache__/__init__.cpython-312.pyc
Binary file not shown.
Binary file added src/agents/__pycache__/__init__.cpython-312.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
50 changes: 26 additions & 24 deletions src/agents/parcel_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@
- Optimize via LangGraph + Sentient Foundation models
"""

import uuid
import asyncio
from typing import Optional, Dict, Any
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

from src.payments.x402_client import X402Client
from src.mcp.mcp_tools import MCPToolkit
from src.payments.x402_client import X402Client


@dataclass
class ParcelState:
parcel_id: str
owner_address: str
location: Dict[str, float] # {lat, lng, alt}
metadata: Dict[str, Any] = field(default_factory=dict)
location: dict[str, float] # {lat, lng, alt}
metadata: dict[str, Any] = field(default_factory=dict)
balance_usdx: float = 0.0
active: bool = True
last_updated: str = field(default_factory=lambda: datetime.utcnow().isoformat())
Expand All @@ -34,10 +34,10 @@ class ParcelAgent:

def __init__(
self,
parcel_id: Optional[str] = None,
parcel_id: str | None = None,
owner_address: str = "",
location: Optional[Dict[str, float]] = None,
wallet_private_key: Optional[str] = None,
location: dict[str, float] | None = None,
wallet_private_key: str | None = None,
):
self.parcel_id = parcel_id or str(uuid.uuid4())
self.owner_address = owner_address
Expand All @@ -58,7 +58,7 @@ def update_metadata(self, key: str, value: Any) -> None:
self.state.metadata[key] = value
self.state.last_updated = datetime.utcnow().isoformat()

def get_state(self) -> Dict[str, Any]:
def get_state(self) -> dict[str, Any]:
"""Return the current parcel state as a dict."""
return {
"parcel_id": self.state.parcel_id,
Expand All @@ -72,7 +72,7 @@ def get_state(self) -> Dict[str, Any]:

# ── Communication ─────────────────────────────────────────────────────────

async def send_message(self, target_parcel_id: str, content: Dict[str, Any]) -> Dict:
async def send_message(self, target_parcel_id: str, content: dict[str, Any]) -> dict:
"""Send an MCP message to another parcel agent."""
return await self.mcp.send(
to=target_parcel_id,
Expand All @@ -92,7 +92,7 @@ async def receive_messages(self) -> list:

# ── Trading ───────────────────────────────────────────────────────────────

async def deposit(self, amount_usdx: float) -> Dict:
async def deposit(self, amount_usdx: float) -> dict:
"""Deposit USDx into the parcel wallet via x402."""
result = await self.x402.deposit(amount=amount_usdx)
if result.get("success"):
Expand All @@ -104,8 +104,8 @@ async def trade(
counterparty_id: str,
amount_usdx: float,
trade_type: str = "transfer",
contract_terms: Optional[Dict] = None,
) -> Dict:
contract_terms: dict | None = None,
) -> dict:
"""Execute a USDx trade with another parcel agent."""
if self.state.balance_usdx < amount_usdx:
return {"success": False, "error": "Insufficient USDx balance"}
Expand All @@ -120,9 +120,7 @@ async def trade(
self.state.balance_usdx -= amount_usdx
return result

async def sign_contract(
self, counterparty_id: str, contract: Dict[str, Any]
) -> Dict:
async def sign_contract(self, counterparty_id: str, contract: dict[str, Any]) -> dict:
"""Sign a smart contract with another parcel agent."""
return await self.x402.sign_contract(
contract=contract,
Expand All @@ -132,7 +130,7 @@ async def sign_contract(

# ── Optimization ──────────────────────────────────────────────────────────

async def optimize(self, context: Optional[Dict] = None) -> Dict:
async def optimize(self, context: dict | None = None) -> dict:
"""Run the LangGraph optimization workflow for this parcel."""
from src.graphs.langgraph_workflow import run_parcel_optimization

Expand All @@ -155,19 +153,23 @@ async def run(self, cycles: int = 0) -> None:
if cycles and count >= cycles:
break

async def _handle_message(self, msg: Dict) -> None:
async def _handle_message(self, msg: dict) -> None:
"""Route incoming MCP messages to appropriate handlers."""
msg_type = msg.get("type", "unknown")
# Handle enveloped messages from MCPToolkit/Route.X
data = msg.get("payload", msg)
msg_type = data.get("type", "unknown")
sender = data.get("from", msg.get("from", "unknown"))

if msg_type == "trade_request":
await self.trade(
counterparty_id=msg["from"],
amount_usdx=msg["amount"],
trade_type=msg.get("trade_type", "transfer"),
counterparty_id=sender,
amount_usdx=data["amount"],
trade_type=data.get("trade_type", "transfer"),
)
elif msg_type == "contract_offer":
await self.sign_contract(
counterparty_id=msg["from"],
contract=msg["contract"],
counterparty_id=sender,
contract=data["contract"],
)
elif msg_type == "optimize":
await self.optimize(context=msg.get("context"))
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the optimize message path, the context is read from msg.get("context"), but enveloped Route.X messages put the payload under msg["payload"]. As written, an incoming {payload: {type: "optimize", context: ...}} will ignore the provided context. Use data.get("context") (where data = msg.get("payload", msg)) when invoking optimize().

Suggested change
await self.optimize(context=msg.get("context"))
await self.optimize(context=data.get("context"))

Copilot uses AI. Check for mistakes.
Expand Down
35 changes: 17 additions & 18 deletions src/agents/trade_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"""

import asyncio
from typing import Dict, List, Optional, Any
from datetime import datetime
from typing import Any


class TradeOffer:
Expand All @@ -23,16 +23,18 @@ def __init__(
self.asset = asset
self.amount_usdx = amount_usdx
self.expires_at = datetime.utcnow().timestamp() + ttl_seconds
self.bids: List[Dict] = []
self.accepted: Optional[Dict] = None
self.bids: list[dict] = []
self.accepted: dict | None = None

def is_expired(self) -> bool:
return datetime.utcnow().timestamp() > self.expires_at

def add_bid(self, bidder_id: str, bid_amount: float) -> None:
self.bids.append({"bidder": bidder_id, "amount": bid_amount, "ts": datetime.utcnow().isoformat()})
self.bids.append(
{"bidder": bidder_id, "amount": bid_amount, "ts": datetime.utcnow().isoformat()}
)

def best_bid(self) -> Optional[Dict]:
def best_bid(self) -> dict | None:
if not self.bids:
return None
return max(self.bids, key=lambda b: b["amount"])
Expand All @@ -43,8 +45,8 @@ class TradeAgent:

def __init__(self, agent_id: str):
self.agent_id = agent_id
self.offers: Dict[str, TradeOffer] = {}
self.trade_history: List[Dict] = []
self.offers: dict[str, TradeOffer] = {}
self.trade_history: list[dict] = []

# ── Offer Management ───────────────────────────────────────────────────

Expand All @@ -60,7 +62,7 @@ def create_offer(
self.offers[offer_id] = offer
return offer

def place_bid(self, offer_id: str, bidder_id: str, bid_amount: float) -> Dict:
def place_bid(self, offer_id: str, bidder_id: str, bid_amount: float) -> dict:
offer = self.offers.get(offer_id)
if not offer:
return {"success": False, "error": "Offer not found"}
Expand All @@ -69,7 +71,7 @@ def place_bid(self, offer_id: str, bidder_id: str, bid_amount: float) -> Dict:
offer.add_bid(bidder_id, bid_amount)
return {"success": True, "offer_id": offer_id, "bid": bid_amount}

def close_offer(self, offer_id: str) -> Dict:
def close_offer(self, offer_id: str) -> dict:
offer = self.offers.get(offer_id)
if not offer:
return {"success": False, "error": "Offer not found"}
Expand All @@ -93,8 +95,8 @@ def close_offer(self, offer_id: str) -> Dict:
async def batch_transfer(
self,
sender_agent: Any,
recipients: List[Dict], # [{"parcel_id": ..., "amount": ...}]
) -> List[Dict]:
recipients: list[dict], # [{"parcel_id": ..., "amount": ...}]
) -> list[dict]:
"""Execute multiple USDx transfers concurrently."""
tasks = [
sender_agent.trade(
Expand All @@ -105,10 +107,7 @@ async def batch_transfer(
for r in recipients
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
r if isinstance(r, dict) else {"success": False, "error": str(r)}
for r in results
]
return [r if isinstance(r, dict) else {"success": False, "error": str(r)} for r in results]

# ── Contract Templates ─────────────────────────────────────────────────

Expand All @@ -119,7 +118,7 @@ def parcel_lease_contract(
parcel_id: str,
monthly_usdx: float,
duration_months: int,
) -> Dict:
) -> dict:
return {
"type": "parcel_lease",
"version": "1.0",
Expand All @@ -140,7 +139,7 @@ def data_access_contract(
consumer_id: str,
dataset: str,
price_usdx: float,
) -> Dict:
) -> dict:
return {
"type": "data_access",
"version": "1.0",
Expand All @@ -157,7 +156,7 @@ def data_access_contract(

# ── History ───────────────────────────────────────────────────────────────

def get_history(self, limit: int = 50) -> List[Dict]:
def get_history(self, limit: int = 50) -> list[dict]:
return self.trade_history[-limit:]

def volume_usdx(self) -> float:
Expand Down
2 changes: 1 addition & 1 deletion src/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Web4AGI API routers."""

from src.api import parcels, trades, contracts, payments, mcp
from src.api import contracts, mcp, parcels, payments, trades

__all__ = ["parcels", "trades", "contracts", "payments", "mcp"]
Empty file added src/api/contracts/__init__.py
Empty file.
Empty file added src/api/mcp/__init__.py
Empty file.
Empty file added src/api/parcels/__init__.py
Empty file.
Empty file added src/api/payments/__init__.py
Empty file.
Empty file added src/api/trades/__init__.py
Empty file.
1 change: 1 addition & 0 deletions src/graphs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
This package provides workflow orchestration and optimization
using LangGraph for intelligent parcel decision-making.
"""

from src.graphs.langgraph_workflow import run_parcel_optimization

__all__ = ["run_parcel_optimization"]
Binary file added src/graphs/__pycache__/__init__.cpython-312.pyc
Binary file not shown.
Binary file not shown.
47 changes: 29 additions & 18 deletions src/graphs/langgraph_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@
4. Reflect — score outcome and update memory
"""

from typing import Dict, Any, List, Optional, TypedDict
from datetime import datetime
from typing import Any, TypedDict

try:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph

LANGGRAPH_AVAILABLE = True
except ImportError:
LANGGRAPH_AVAILABLE = False
StateGraph = None
END = "__end__"

try:
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

LANGCHAIN_AVAILABLE = True
except ImportError:
LANGCHAIN_AVAILABLE = False
Expand All @@ -34,23 +36,26 @@

# ── State Schema ───────────────────────────────────────────────────────────────


class ParcelOptState(TypedDict):
parcel_state: Dict[str, Any]
context: Dict[str, Any]
assessment: Optional[str]
strategies: List[str]
chosen_strategy: Optional[str]
actions_taken: List[Dict]
reflection: Optional[str]
parcel_state: dict[str, Any]
context: dict[str, Any]
assessment: str | None
strategies: list[str]
chosen_strategy: str | None
actions_taken: list[dict]
reflection: str | None
score: float
iteration: int


# ── Node Functions ─────────────────────────────────────────────────────────────


def _get_llm():
"""Return the configured LLM (Sentient Foundation or OpenAI fallback)."""
import os

sentient_key = os.getenv("SENTIENT_API_KEY")
sentient_url = os.getenv("SENTIENT_BASE_URL", "https://api.sentientfoundation.ai/v1")
openai_key = os.getenv("OPENAI_API_KEY")
Expand Down Expand Up @@ -100,13 +105,17 @@ def plan_node(state: ParcelOptState) -> ParcelOptState:
"""Generate optimization strategies based on the assessment."""
llm = _get_llm()
if llm:
prompt = f"""Assessment: {state['assessment']}
Parcel state: {state['parcel_state']}
prompt = f"""Assessment: {state["assessment"]}
Parcel state: {state["parcel_state"]}

List 3 concrete optimization strategies as a numbered list.
Each strategy should be a single actionable sentence."""
response = llm.invoke([HumanMessage(content=prompt)])
lines = [l.strip() for l in response.content.split("\n") if l.strip() and l[0].isdigit()]
lines = [
line.strip()
for line in response.content.split("\n")
if line.strip() and line[0].isdigit()
]
strategies = lines[:3] if lines else [response.content]
else:
ps = state["parcel_state"]
Expand Down Expand Up @@ -140,8 +149,8 @@ def reflect_node(state: ParcelOptState) -> ParcelOptState:
"""Score the outcome and generate a reflection."""
llm = _get_llm()
if llm:
prompt = f"""Strategy executed: {state['chosen_strategy']}
Actions taken: {state['actions_taken']}
prompt = f"""Strategy executed: {state["chosen_strategy"]}
Actions taken: {state["actions_taken"]}

In 1-2 sentences, reflect on the outcome and assign a score from 0.0 to 1.0.
Respond in format: SCORE: 0.X | REFLECTION: <text>"""
Expand Down Expand Up @@ -175,6 +184,7 @@ def should_continue(state: ParcelOptState) -> str:

# ── Graph Builder ───────────────────────────────────────────────────────────────


def build_optimization_graph():
"""Build and compile the LangGraph optimization workflow."""
if not LANGGRAPH_AVAILABLE:
Expand Down Expand Up @@ -207,10 +217,11 @@ def _get_graph():

# ── Public Entry Point ───────────────────────────────────────────────────────────


async def run_parcel_optimization(
parcel_state: Dict[str, Any],
context: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
parcel_state: dict[str, Any],
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Run the optimization workflow for a parcel and return the final state."""
initial: ParcelOptState = {
"parcel_state": parcel_state,
Expand Down
Loading
Loading