From ee52a53ef740d4baea2668a011c8b1270315e242 Mon Sep 17 00:00:00 2001 From: Dawe000 Date: Wed, 4 Mar 2026 10:57:59 +0000 Subject: [PATCH 1/3] Added telemetry and tests to mirror the agent0-ts Todo: Fix subgraph --- .env.example | 20 +++ agent0_sdk/core/sdk.py | 312 ++++++++++++++++++++++++++--------- agent0_sdk/core/telemetry.py | 91 ++++++++++ pyproject.toml | 1 + tests/config.py | 12 ++ tests/test_telemetry_sdk.py | 202 +++++++++++++++++++++++ 6 files changed, 562 insertions(+), 76 deletions(-) create mode 100644 .env.example create mode 100644 agent0_sdk/core/telemetry.py create mode 100644 tests/test_telemetry_sdk.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..662651e --- /dev/null +++ b/.env.example @@ -0,0 +1,20 @@ +# Chain / RPC +CHAIN_ID=11155111 +RPC_URL= + +# Agent (feedback tests) +AGENT_PRIVATE_KEY= +PINATA_JWT= +AGENT_ID=11155111:374 +CLIENT_PRIVATE_KEY= + +# Subgraph +SUBGRAPH_URL= + +# Telemetry (after running dashboard seed seed-telemetry-test-user.sql) +AGENT0_API_KEY=ag0_live_telemetrytest_12345678901234567890 +AGENT0_TELEMETRY_ENDPOINT=http://127.0.0.1:54321/functions/v1/ingest-telemetry + +# Supabase (for telemetry DB assertions; use local anon + service_role from supabase start) +SUPABASE_URL=http://127.0.0.1:54321 +SUPABASE_SERVICE_ROLE_KEY= diff --git a/agent0_sdk/core/sdk.py b/agent0_sdk/core/sdk.py index df76ab1..979db0b 100644 --- a/agent0_sdk/core/sdk.py +++ b/agent0_sdk/core/sdk.py @@ -29,6 +29,11 @@ from .feedback_manager import FeedbackManager from .transaction_handle import TransactionHandle from .subgraph_client import SubgraphClient +from .telemetry import ( + TelemetryClient, + DEFAULT_TELEMETRY_ENDPOINT, + categorize_error, +) class SDK: @@ -52,12 +57,16 @@ def __init__( pinataJwt: Optional[str] = None, # Subgraph configuration subgraphOverrides: Optional[Dict[ChainId, str]] = None, # Override subgraph URLs per chain + subgraphUrl: Optional[str] = None, # Single default subgraph URL for current chain (e.g. for tests) + # Telemetry + api_key: Optional[str] = None, + telemetry_endpoint: Optional[str] = None, ): """Initialize the SDK.""" self.chainId = chainId self.rpcUrl = rpcUrl self.signer = signer - + # Initialize Web3 client (with or without signer for read-only operations) if signer: if isinstance(signer, str): @@ -67,33 +76,28 @@ def __init__( else: # Read-only mode - no signer self.web3_client = Web3Client(rpcUrl) - + # Registry addresses self.registry_overrides = registryOverrides or {} self._registries = self._resolve_registries() - + # Initialize contract instances self._identity_registry = None self._reputation_registry = None self._validation_registry = None - + # Resolve subgraph URL (with fallback chain) - self._subgraph_urls = {} + self._subgraph_urls: Dict[ChainId, str] = {} if subgraphOverrides: self._subgraph_urls.update(subgraphOverrides) - + if subgraphUrl: + self._subgraph_urls[chainId] = subgraphUrl + # Get subgraph URL for current chain - resolved_subgraph_url = None - - # Priority 1: Chain-specific override - if chainId in self._subgraph_urls: - resolved_subgraph_url = self._subgraph_urls[chainId] - # Priority 2: Default for chain - elif chainId in DEFAULT_SUBGRAPH_URLS: - resolved_subgraph_url = DEFAULT_SUBGRAPH_URLS[chainId] - else: - # No subgraph available - subgraph_client will be None - resolved_subgraph_url = None + resolved_subgraph_url = ( + self._subgraph_urls.get(chainId) + or DEFAULT_SUBGRAPH_URLS.get(chainId) + ) # Initialize subgraph client if URL available if resolved_subgraph_url: @@ -128,6 +132,20 @@ def __init__( indexer=self.indexer # Pass indexer for unified search interface ) + # Telemetry (fire-and-forget when api_key is set) + if api_key and api_key.strip(): + self._telemetry = TelemetryClient( + api_key=api_key.strip(), + endpoint=telemetry_endpoint or DEFAULT_TELEMETRY_ENDPOINT, + ) + else: + self._telemetry = None + + def _emit_telemetry_event(self, event: Dict[str, Any]) -> None: + """Emit a single telemetry event. No-op if telemetry is disabled.""" + if self._telemetry: + self._telemetry.emit([event]) + def _resolve_registries(self) -> Dict[str, Address]: """Resolve registry addresses for current chain.""" # Start with defaults @@ -287,46 +305,66 @@ def loadAgent(self, agentId: AgentId) -> Agent: In that case we return a partially-hydrated Agent with an empty registration file so the caller can resume publishing and set the URI later. """ - # Convert agentId to string if it's an integer - agentId = str(agentId) - - # Parse agent ID - if ":" in agentId: - chain_id, token_id = agentId.split(":", 1) - if int(chain_id) != self.chainId: - raise ValueError(f"Agent {agentId} is not on current chain {self.chainId}") - else: - token_id = agentId - - # Get token URI from contract + start_ms = int(time.time() * 1000) + agent_id_str = str(agentId) try: - agent_uri = self.web3_client.call_contract( - self.identity_registry, "tokenURI", int(token_id) # tokenURI is ERC-721 standard, but represents agentURI - ) - except Exception as e: - raise ValueError(f"Failed to load agent {agentId}: {e}") - - # Load registration file (or fall back to a minimal file if agent URI is missing) - registration_file = self._load_registration_file(agent_uri) - registration_file.agentId = agentId - registration_file.agentURI = agent_uri if agent_uri else None + # Convert agentId to string if it's an integer + agentId = agent_id_str - if not agent_uri or not str(agent_uri).strip(): - logger.warning( - f"Agent {agentId} has no agentURI set on-chain yet. " - "Returning a partial agent; update info and call registerIPFS() to publish and set URI." - ) - - # Store registry address for proper JSON generation - registry_address = self._registries.get("IDENTITY") - if registry_address: - registration_file._registry_address = registry_address - registration_file._chain_id = self.chainId - - # Hydrate on-chain data - self._hydrate_agent_data(registration_file, int(token_id)) - - return Agent(sdk=self, registration_file=registration_file) + # Parse agent ID + if ":" in agentId: + chain_id, token_id = agentId.split(":", 1) + if int(chain_id) != self.chainId: + raise ValueError(f"Agent {agentId} is not on current chain {self.chainId}") + else: + token_id = agentId + + # Get token URI from contract + try: + agent_uri = self.web3_client.call_contract( + self.identity_registry, "tokenURI", int(token_id) # tokenURI is ERC-721 standard, but represents agentURI + ) + except Exception as e: + raise ValueError(f"Failed to load agent {agentId}: {e}") + + # Load registration file (or fall back to a minimal file if agent URI is missing) + registration_file = self._load_registration_file(agent_uri) + registration_file.agentId = agentId + registration_file.agentURI = agent_uri if agent_uri else None + + if not agent_uri or not str(agent_uri).strip(): + logger.warning( + f"Agent {agentId} has no agentURI set on-chain yet. " + "Returning a partial agent; update info and call registerIPFS() to publish and set URI." + ) + + # Store registry address for proper JSON generation + registry_address = self._registries.get("IDENTITY") + if registry_address: + registration_file._registry_address = registry_address + registration_file._chain_id = self.chainId + + # Hydrate on-chain data + self._hydrate_agent_data(registration_file, int(token_id)) + + self._emit_telemetry_event({ + "eventType": "agent.loaded", + "success": True, + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": {"chainId": self.chainId, "agentId": agent_id_str}, + }) + return Agent(sdk=self, registration_file=registration_file) + except Exception as e: + self._emit_telemetry_event({ + "eventType": "agent.loaded", + "success": False, + "errorType": categorize_error(e), + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": {"chainId": self.chainId, "agentId": agent_id_str}, + }) + raise def _load_registration_file(self, uri: str) -> RegistrationFile: """Load registration file from URI. @@ -456,7 +494,36 @@ def refreshIndex( def getAgent(self, agentId: AgentId) -> AgentSummary: """Get agent summary from index.""" - return self.indexer.get_agent(agentId) + start_ms = int(time.time() * 1000) + agent_id_str = str(agentId) + try: + result = self.indexer.get_agent(agentId) + self._emit_telemetry_event({ + "eventType": "agent.fetched", + "success": True, + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agent_id_str, + "found": result is not None, + }, + }) + return result + except Exception as e: + self._emit_telemetry_event({ + "eventType": "agent.fetched", + "success": False, + "errorType": categorize_error(e), + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agent_id_str, + "found": False, + }, + }) + raise def searchAgents( self, @@ -492,10 +559,39 @@ def searchAgents( options = SearchOptions(**options) # Do not force a default sort here; the indexer chooses keyword-aware defaults. - out = self.indexer.search_agents(filters, options) - if isinstance(out, dict): - return out.get("items") or [] - return out or [] + start_ms = int(time.time() * 1000) + try: + out = self.indexer.search_agents(filters, options) + items = out.get("items") if isinstance(out, dict) else out + items = items or [] + results = [ + getattr(a, "agentId", a.get("agentId") if isinstance(a, dict) else None) + for a in (items[:100]) + ] + payload = { + "chainId": self.chainId, + "results": results, + } + self._emit_telemetry_event({ + "eventType": "search.query", + "success": True, + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": payload, + }) + if isinstance(out, dict): + return out.get("items") or [] + return out or [] + except Exception as e: + self._emit_telemetry_event({ + "eventType": "search.query", + "success": False, + "errorType": categorize_error(e), + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": {"chainId": self.chainId, "results": []}, + }) + raise # Feedback methods are defined later in this class (single authoritative API). @@ -583,19 +679,59 @@ def searchFeedback( "(agentId/agents/reviewers/tags/capabilities/skills/tasks/names/minValue/maxValue)." ) - return self.feedback_manager.searchFeedback( - agentId=agentId, - agents=agents, - clientAddresses=reviewers, - tags=tags, - capabilities=capabilities, - skills=skills, - tasks=tasks, - names=names, - minValue=minValue, - maxValue=maxValue, - include_revoked=include_revoked, - ) + start_ms = int(time.time() * 1000) + agent_count = len(agents) if agents else (1 if agentId else 0) + try: + result = self.feedback_manager.searchFeedback( + agentId=agentId, + agents=agents, + clientAddresses=reviewers, + tags=tags, + capabilities=capabilities, + skills=skills, + tasks=tasks, + names=names, + minValue=minValue, + maxValue=maxValue, + include_revoked=include_revoked, + ) + self._emit_telemetry_event({ + "eventType": "feedback.searched", + "success": True, + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agentId, + "agentCount": agent_count, + "tags": tags, + "reviewers": reviewers, + "capabilities": capabilities, + "skills": skills, + "tasks": tasks, + "names": names, + "minValue": minValue, + "maxValue": maxValue, + "includeRevoked": include_revoked, + "resultCount": len(result), + "isZeroResults": len(result) == 0, + }, + }) + return result + except Exception as e: + self._emit_telemetry_event({ + "eventType": "feedback.searched", + "success": False, + "errorType": categorize_error(e), + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agentId, + "agentCount": agent_count, + }, + }) + raise def revokeFeedback( self, @@ -622,9 +758,33 @@ def getReputationSummary( agentId: "AgentId", ) -> Dict[str, Any]: """Get reputation summary for an agent.""" - return self.feedback_manager.getReputationSummary( - agentId - ) + start_ms = int(time.time() * 1000) + agent_id_str = str(agentId) + try: + result = self.feedback_manager.getReputationSummary(agentId) + self._emit_telemetry_event({ + "eventType": "reputation.summary.fetched", + "success": True, + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agent_id_str, + "count": result.get("count", 0), + "averageValue": result.get("averageValue", 0.0), + }, + }) + return result + except Exception as e: + self._emit_telemetry_event({ + "eventType": "reputation.summary.fetched", + "success": False, + "errorType": categorize_error(e), + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": {"chainId": self.chainId, "agentId": agent_id_str}, + }) + raise def transferAgent( self, diff --git a/agent0_sdk/core/telemetry.py b/agent0_sdk/core/telemetry.py new file mode 100644 index 0000000..16cc73f --- /dev/null +++ b/agent0_sdk/core/telemetry.py @@ -0,0 +1,91 @@ +""" +Telemetry client for SDK events (Telemetry-Events-Specs-v2). +Fire-and-forget; never blocks or raises. +""" + +from __future__ import annotations + +import json +import threading +import time +from typing import Any, Dict, List, Optional + +import requests + +DEFAULT_TELEMETRY_ENDPOINT = ( + "https://pepzouxscqxcejwjcbro.supabase.co/functions/v1/ingest-telemetry" +) + +TelemetryErrorType = str # one of the literals below + + +def categorize_error(error: Optional[BaseException]) -> str: + """Map an exception to a telemetry error type string.""" + if error is None: + return "UNKNOWN" + msg = str(error) + code = "" + if isinstance(error, BaseException) and hasattr(error, "code"): + code = str(getattr(error, "code", "")) + if code == "NETWORK_ERROR" or ( + "fetch" in msg.lower() + or "network" in msg.lower() + or "econnrefused" in msg.lower() + or "enotfound" in msg.lower() + ): + return "NETWORK_ERROR" + if "CALL_EXCEPTION" in code or "contract" in code or "revert" in code or "execution reverted" in code: + return "CONTRACT_ERROR" + if "revert" in msg.lower() or "contract" in msg.lower(): + return "CONTRACT_ERROR" + if "validation" in msg.lower() or "invalid" in msg.lower() or "bad request" in msg.lower() or "VALIDATION" in code: + return "VALIDATION_ERROR" + if "timeout" in msg.lower() or "timed out" in msg.lower() or "ETIMEDOUT" in msg.lower(): + return "TIMEOUT" + if "not found" in msg.lower() or "404" in msg or "NOT_FOUND" in msg: + return "NOT_FOUND" + if "unauthorized" in msg.lower() or "403" in msg or "permission" in msg.lower() or "UNAUTHORIZED" in msg: + return "UNAUTHORIZED" + if "rate limit" in msg.lower() or "429" in msg or "RATE_LIMITED" in msg: + return "RATE_LIMITED" + if "ipfs" in msg.lower() or "IPFS_ERROR" in msg or "pinata" in msg.lower() or "pin.fs" in msg.lower(): + return "IPFS_ERROR" + if "subgraph" in msg.lower() or "graphql" in msg.lower() or "SUBGRAPH_ERROR" in msg: + return "SUBGRAPH_ERROR" + return "UNKNOWN" + + +def _send_request(endpoint: str, api_key: str, events: List[Dict[str, Any]]) -> None: + """Run in background; never raise.""" + try: + body = json.dumps({"events": events}) + requests.post( + endpoint, + data=body, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}", + }, + timeout=10, + ) + except Exception: + pass # Silently ignore telemetry failures + + +class TelemetryClient: + """Fire-and-forget telemetry client. Never blocks or raises.""" + + def __init__(self, api_key: str, endpoint: Optional[str] = None) -> None: + self._api_key = api_key + self._endpoint = endpoint or DEFAULT_TELEMETRY_ENDPOINT + + def emit(self, events: List[Dict[str, Any]]) -> None: + """Emit events (fire-and-forget). Never raises; failures are ignored.""" + if not events: + return + thread = threading.Thread( + target=_send_request, + args=(self._endpoint, self._api_key, events), + daemon=True, + ) + thread.start() diff --git a/pyproject.toml b/pyproject.toml index 9f55b43..bd84ba7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ test = [ "pytest-asyncio>=0.21.0", "pytest-cov>=4.0.0", "pytest-mock>=3.10.0", + "supabase>=2.0.0", ] [project.urls] diff --git a/tests/config.py b/tests/config.py index fead183..678fc95 100644 --- a/tests/config.py +++ b/tests/config.py @@ -38,6 +38,14 @@ # Client Private Key (for feedback tests - different wallet from agent) CLIENT_PRIVATE_KEY = os.getenv("CLIENT_PRIVATE_KEY", "") +# Telemetry (for SDK integration tests; after running dashboard seed seed-telemetry-test-user.sql) +AGENT0_API_KEY = os.getenv("AGENT0_API_KEY", "") +AGENT0_TELEMETRY_ENDPOINT = os.getenv("AGENT0_TELEMETRY_ENDPOINT", "") + +# Supabase (for telemetry DB assertions; use local from supabase start) +SUPABASE_URL = os.getenv("SUPABASE_URL", "") +SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "") + def print_config(): """Print current configuration (hiding sensitive values).""" @@ -49,5 +57,9 @@ def print_config(): print(f" PINATA_JWT: {'***' if PINATA_JWT else 'NOT SET'}") print(f" SUBGRAPH_URL: {SUBGRAPH_URL[:50]}...") print(f" AGENT_ID: {AGENT_ID}") + print(f" AGENT0_API_KEY: {'***' if AGENT0_API_KEY else 'NOT SET'}") + print(f" AGENT0_TELEMETRY_ENDPOINT: {AGENT0_TELEMETRY_ENDPOINT or '(default)'}") + print(f" SUPABASE_URL: {'***' if SUPABASE_URL else 'NOT SET'}") + print(f" SUPABASE_SERVICE_ROLE_KEY: {'***' if SUPABASE_SERVICE_ROLE_KEY else 'NOT SET'}") print() diff --git a/tests/test_telemetry_sdk.py b/tests/test_telemetry_sdk.py new file mode 100644 index 0000000..780994c --- /dev/null +++ b/tests/test_telemetry_sdk.py @@ -0,0 +1,202 @@ +""" +Integration tests: SDK telemetry (Telemetry-Events-Specs-v2). + +Set in .env: AGENT0_API_KEY, optionally AGENT0_TELEMETRY_ENDPOINT (defaults to prod). +Run: pytest tests/test_telemetry_sdk.py -v + +Tests that require local Supabase (SUPABASE_URL + SUPABASE_SERVICE_ROLE_KEY) and +ingest-telemetry running (e.g. in agent0-dashboard: npx supabase functions serve): + - test_search_agents_returns_and_emits_telemetry (DB check: search.query) + - test_get_agent_returns_and_emits_telemetry (DB check: agent.fetched) + - test_load_agent_returns_and_emits_telemetry (DB check: agent.loaded, only when agent URI is HTTP/IPFS) + - test_search_feedback_emits_telemetry (DB check: feedback.searched) + - test_get_reputation_summary_emits_telemetry (DB check: reputation.summary.fetched) + - test_telemetry_events_written_to_database_spec_coverage +Apply seed-telemetry-test-user.sql so the test API key exists. + +Spec coverage (read-only, no signer): + search.query, agent.fetched, agent.loaded, feedback.searched, reputation.summary.fetched +Write/lifecycle events (agent.registered, feedback.given, etc.) require signer/agent and are not covered here. +""" + +import time +from datetime import datetime, timezone, timedelta + +import pytest + +from tests.config import ( + CHAIN_ID, + RPC_URL, + SUBGRAPH_URL, + AGENT_ID, + AGENT0_API_KEY, + AGENT0_TELEMETRY_ENDPOINT, + SUPABASE_URL, + SUPABASE_SERVICE_ROLE_KEY, + print_config, +) +from agent0_sdk.core.sdk import SDK +from agent0_sdk.core.models import SearchOptions + +HAS_API_KEY = bool(AGENT0_API_KEY and AGENT0_API_KEY.strip()) +HAS_SUPABASE = bool( + SUPABASE_URL and SUPABASE_URL.strip() + and SUPABASE_SERVICE_ROLE_KEY + and SUPABASE_SERVICE_ROLE_KEY.strip() +) + + +def assert_event_in_db(event_type: str, since: str) -> None: + """Assert at least one telemetry event of type exists in DB after since. Only runs when HAS_SUPABASE.""" + if not HAS_SUPABASE or not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: + return + time.sleep(6) + try: + from supabase import create_client + except ImportError: + pytest.skip("supabase package required for DB assertions (pip install supabase)") + client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) + resp = ( + client.table("telemetry_events") + .select("event_type") + .gte("timestamp", since) + .eq("event_type", event_type) + .limit(1) + .execute() + ) + if resp.data is None or len(resp.data) == 0: + raise AssertionError( + f'No telemetry event "{event_type}" found. ' + "Ensure Supabase and ingest-telemetry are running (e.g. in agent0-dashboard: npx supabase functions serve)." + ) + + +@pytest.mark.skipif(not HAS_API_KEY, reason="AGENT0_API_KEY not set") +class TestSDKWithTelemetry: + """SDK with api_key + telemetry_endpoint.""" + + def setup_method(self): + print_config() + self.sdk = SDK( + chainId=CHAIN_ID, + rpcUrl=RPC_URL, + subgraphUrl=SUBGRAPH_URL, + api_key=AGENT0_API_KEY, + telemetry_endpoint=AGENT0_TELEMETRY_ENDPOINT or None, + ) + + def test_search_agents_returns_and_emits_telemetry(self): + since = datetime.now(timezone.utc).isoformat() + result = self.sdk.searchAgents({}, SearchOptions(sort=["updatedAt:desc"])) + assert isinstance(result, list) + if result: + assert hasattr(result[0], "chainId") or "chainId" in getattr(result[0], "__dict__", {}) + assert hasattr(result[0], "agentId") or "agentId" in getattr(result[0], "__dict__", {}) + assert_event_in_db("search.query", since) + + def test_get_agent_returns_and_emits_telemetry(self): + since = datetime.now(timezone.utc).isoformat() + try: + agent = self.sdk.getAgent(AGENT_ID) + if agent: + assert getattr(agent, "agentId", None) == AGENT_ID or agent.get("agentId") == AGENT_ID + except Exception: + pass # getAgent may raise if not found + assert_event_in_db("agent.fetched", since) + + def test_load_agent_returns_and_emits_telemetry(self): + since = datetime.now(timezone.utc).isoformat() + emitted = False + try: + agent = self.sdk.loadAgent(AGENT_ID) + assert agent is not None + assert getattr(agent, "agentId", None) == AGENT_ID + emitted = True + except Exception as e: + msg = str(e) + if "Unsupported URI scheme" in msg or "Data URI" in msg: + return # Test agent may use data: URI; SDK only loads HTTP/IPFS + raise + if emitted: + assert_event_in_db("agent.loaded", since) + + def test_search_feedback_emits_telemetry(self): + since = datetime.now(timezone.utc).isoformat() + result = self.sdk.searchFeedback(agentId=AGENT_ID) + assert isinstance(result, list) + assert_event_in_db("feedback.searched", since) + + def test_get_reputation_summary_emits_telemetry(self): + since = datetime.now(timezone.utc).isoformat() + summary = self.sdk.getReputationSummary(AGENT_ID) + assert summary is not None + assert "count" in summary + assert "averageValue" in summary + assert_event_in_db("reputation.summary.fetched", since) + + @pytest.mark.skipif(not HAS_SUPABASE, reason="SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY required") + def test_telemetry_events_written_to_database_spec_coverage(self): + since_iso = (datetime.now(timezone.utc) - timedelta(seconds=120)).isoformat() + self.sdk.searchAgents({}, SearchOptions(sort=["updatedAt:desc"])) + self.sdk.getAgent(AGENT_ID) + load_agent_emitted = False + try: + self.sdk.loadAgent(AGENT_ID) + load_agent_emitted = True + except Exception as e: + if "Unsupported URI scheme" not in str(e) and "Data URI" not in str(e): + raise + self.sdk.searchFeedback(agentId=AGENT_ID) + self.sdk.getReputationSummary(AGENT_ID) + time.sleep(6) + + expected_types = [ + "search.query", + "agent.fetched", + "feedback.searched", + "reputation.summary.fetched", + ] + if load_agent_emitted: + expected_types.append("agent.loaded") + + try: + from supabase import create_client + except ImportError: + pytest.skip("supabase package required for DB assertions") + client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) + resp = ( + client.table("telemetry_events") + .select("event_type, payload, timestamp") + .gte("timestamp", since_iso) + .in_("event_type", expected_types) + .order("timestamp", desc=True) + .execute() + ) + assert resp.data is not None + if len(resp.data) == 0: + raise AssertionError( + "No telemetry events found. Ensure Edge Functions are served " + "(e.g. in agent0-dashboard run: npx supabase functions serve) and " + "ingest-telemetry is reachable at AGENT0_TELEMETRY_ENDPOINT." + ) + types = [r["event_type"] for r in resp.data] + for t in expected_types: + assert t in types, f"Expected event type {t} in {types}" + + search_evt = next((e for e in resp.data if e.get("event_type") == "search.query"), None) + if search_evt and search_evt.get("payload") and isinstance(search_evt["payload"], dict): + assert "results" in search_evt["payload"] + assert isinstance(search_evt["payload"]["results"], list) + + +class TestSDKWithoutApiKey: + """SDK without api_key (no telemetry).""" + + def test_constructs_and_search_agents_works(self): + sdk = SDK( + chainId=CHAIN_ID, + rpcUrl=RPC_URL, + subgraphUrl=SUBGRAPH_URL, + ) + result = sdk.searchAgents({}, SearchOptions(sort=["updatedAt:desc"])) + assert isinstance(result, list) From 7bce54299e6452040aee9b96f1454c06ebc1d15f Mon Sep 17 00:00:00 2001 From: Dawe000 Date: Sun, 8 Mar 2026 00:37:40 +0000 Subject: [PATCH 2/3] ignore venv --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 6e3d9eb..a2426ba 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ wheels/ *.egg # Virtual environments +.venv/ venv/ env/ ENV/ From 25fcfb6b66a8aba99fb77e485d33bfcbd06b396a Mon Sep 17 00:00:00 2001 From: Dawe000 Date: Sun, 8 Mar 2026 01:16:08 +0000 Subject: [PATCH 3/3] Updated telemetry for spec aligned feedback --- agent0_sdk/core/sdk.py | 173 ++++++++++++++++++++++++++++++++---- tests/test_telemetry_sdk.py | 7 +- 2 files changed, 161 insertions(+), 19 deletions(-) diff --git a/agent0_sdk/core/sdk.py b/agent0_sdk/core/sdk.py index 04f4d0e..1cd1620 100644 --- a/agent0_sdk/core/sdk.py +++ b/agent0_sdk/core/sdk.py @@ -631,14 +631,63 @@ def giveFeedback( - If feedbackFile is None: submit on-chain only (no upload even if IPFS is configured). - If feedbackFile is provided: requires IPFS configured; uploads and commits URI/hash on-chain. """ - return self.feedback_manager.giveFeedback( - agentId=agentId, - value=value, - tag1=tag1, - tag2=tag2, - endpoint=endpoint, - feedbackFile=feedbackFile, - ) + start_ms = int(time.time() * 1000) + value_num = int(value) if isinstance(value, (int, float)) else int(str(value), 10) + agent_id_str = str(agentId) + try: + result = self.feedback_manager.giveFeedback( + agentId=agentId, + value=value, + tag1=tag1, + tag2=tag2, + endpoint=endpoint, + feedbackFile=feedbackFile, + ) + payload = { + "chainId": self.chainId, + "agentId": agent_id_str, + "value": value_num, + "tag1": tag1, + "tag2": tag2, + "hasEndpoint": bool(endpoint), + "endpoint": endpoint, + "hasOffchainFile": feedbackFile is not None, + "hasText": bool(feedbackFile.get("text")) if feedbackFile else False, + "hasContext": bool(feedbackFile.get("context")) if feedbackFile else False, + "hasProofOfPayment": bool(feedbackFile.get("proofOfPayment")) if feedbackFile else False, + "mcpTool": feedbackFile.get("mcpTool") if feedbackFile else None, + "mcpPrompt": feedbackFile.get("mcpPrompt") if feedbackFile else None, + "mcpResource": feedbackFile.get("mcpResource") if feedbackFile else None, + "a2aSkills": feedbackFile.get("a2aSkills") if feedbackFile else None, + "a2aContextId": feedbackFile.get("a2aContextId") if feedbackFile else None, + "a2aTaskId": feedbackFile.get("a2aTaskId") if feedbackFile else None, + "oasfSkills": feedbackFile.get("oasfSkills") if feedbackFile else None, + "oasfDomains": feedbackFile.get("oasfDomains") if feedbackFile else None, + } + self._emit_telemetry_event({ + "eventType": "feedback.given", + "success": True, + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": payload, + }) + return result + except Exception as e: + self._emit_telemetry_event({ + "eventType": "feedback.given", + "success": False, + "errorType": categorize_error(e), + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agent_id_str, + "value": value_num, + "tag1": tag1, + "tag2": tag2, + }, + }) + raise def getFeedback( self, @@ -647,9 +696,39 @@ def getFeedback( feedbackIndex: int, ) -> "Feedback": """Get feedback (maps 8004 endpoint).""" - return self.feedback_manager.getFeedback( - agentId, clientAddress, feedbackIndex - ) + start_ms = int(time.time() * 1000) + try: + result = self.feedback_manager.getFeedback( + agentId, clientAddress, feedbackIndex + ) + self._emit_telemetry_event({ + "eventType": "feedback.fetched", + "success": True, + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": str(agentId), + "feedbackIndex": feedbackIndex, + "found": True, + }, + }) + return result + except Exception as e: + self._emit_telemetry_event({ + "eventType": "feedback.fetched", + "success": False, + "errorType": categorize_error(e), + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": str(agentId), + "feedbackIndex": feedbackIndex, + "found": False, + }, + }) + raise def searchFeedback( self, @@ -752,8 +831,37 @@ def revokeFeedback( feedbackIndex: int, ) -> "TransactionHandle[Feedback]": """Revoke feedback (submitted-by-default).""" - return self.feedback_manager.revokeFeedback(agentId, feedbackIndex) - + start_ms = int(time.time() * 1000) + agent_id_str = str(agentId) + try: + result = self.feedback_manager.revokeFeedback(agentId, feedbackIndex) + self._emit_telemetry_event({ + "eventType": "feedback.revoked", + "success": True, + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agent_id_str, + "feedbackIndex": feedbackIndex, + }, + }) + return result + except Exception as e: + self._emit_telemetry_event({ + "eventType": "feedback.revoked", + "success": False, + "errorType": categorize_error(e), + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agent_id_str, + "feedbackIndex": feedbackIndex, + }, + }) + raise + def appendResponse( self, agentId: "AgentId", @@ -762,9 +870,42 @@ def appendResponse( response: Dict[str, Any], ) -> "TransactionHandle[Feedback]": """Append a response/follow-up to existing feedback (submitted-by-default).""" - return self.feedback_manager.appendResponse( - agentId, clientAddress, feedbackIndex, response - ) + start_ms = int(time.time() * 1000) + agent_id_str = str(agentId) + response_uri = response.get("uri") if response else None + try: + result = self.feedback_manager.appendResponse( + agentId, clientAddress, feedbackIndex, response + ) + self._emit_telemetry_event({ + "eventType": "feedback.response.appended", + "success": True, + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agent_id_str, + "clientAddress": clientAddress, + "feedbackIndex": feedbackIndex, + "responseUri": response_uri, + }, + }) + return result + except Exception as e: + self._emit_telemetry_event({ + "eventType": "feedback.response.appended", + "success": False, + "errorType": categorize_error(e), + "durationMs": int(time.time() * 1000) - start_ms, + "timestamp": int(time.time() * 1000), + "payload": { + "chainId": self.chainId, + "agentId": agent_id_str, + "clientAddress": clientAddress, + "feedbackIndex": feedbackIndex, + }, + }) + raise def getReputationSummary( self, diff --git a/tests/test_telemetry_sdk.py b/tests/test_telemetry_sdk.py index 780994c..d2bb9cd 100644 --- a/tests/test_telemetry_sdk.py +++ b/tests/test_telemetry_sdk.py @@ -114,8 +114,8 @@ def test_load_agent_returns_and_emits_telemetry(self): emitted = True except Exception as e: msg = str(e) - if "Unsupported URI scheme" in msg or "Data URI" in msg: - return # Test agent may use data: URI; SDK only loads HTTP/IPFS + if "Unsupported URI scheme" in msg or "Data URI" in msg or "Invalid base64 payload in data URI" in msg: + return # Test agent may use data: URI (or malformed); skip DB assertion raise if emitted: assert_event_in_db("agent.loaded", since) @@ -144,7 +144,8 @@ def test_telemetry_events_written_to_database_spec_coverage(self): self.sdk.loadAgent(AGENT_ID) load_agent_emitted = True except Exception as e: - if "Unsupported URI scheme" not in str(e) and "Data URI" not in str(e): + msg = str(e) + if "Unsupported URI scheme" not in msg and "Data URI" not in msg and "Invalid base64 payload in data URI" not in msg: raise self.sdk.searchFeedback(agentId=AGENT_ID) self.sdk.getReputationSummary(AGENT_ID)