diff --git a/pyth_observer/__init__.py b/pyth_observer/__init__.py index f3d94ba..36d41ad 100644 --- a/pyth_observer/__init__.py +++ b/pyth_observer/__init__.py @@ -24,6 +24,7 @@ from pyth_observer.crosschain import CrosschainPrice from pyth_observer.crosschain import CrosschainPriceObserver as Crosschain from pyth_observer.dispatch import Dispatch +from pyth_observer.metrics import metrics from pyth_observer.models import Publisher PYTHTEST_HTTP_ENDPOINT = "https://api.pythtest.pyth.network/" @@ -71,7 +72,14 @@ def __init__( self.crosschain_throttler = Throttler(rate_limit=1, period=1) self.coingecko_mapping = coingecko_mapping + metrics.set_observer_info( + network=config["network"]["name"], + config=config, + ) + async def run(self): + # global states + states = [] while True: try: logger.info("Running checks") @@ -82,6 +90,9 @@ async def run(self): health_server.observer_ready = True + processed_feeds = 0 + active_publishers_by_symbol = {} + for product in products: # Skip tombstone accounts with blank metadata if "base" not in product.attrs: @@ -121,80 +132,136 @@ async def run(self): if not price_account.aggregate_price_info: raise RuntimeError("Aggregate price info is missing") - states.append( - PriceFeedState( - symbol=product.attrs["symbol"], - asset_type=product.attrs["asset_type"], - schedule=MarketSchedule(product.attrs["schedule"]), - public_key=price_account.key, - status=price_account.aggregate_price_status, - # this is the solana block slot when price account was fetched - latest_block_slot=latest_block_slot, - latest_trading_slot=price_account.last_slot, - price_aggregate=price_account.aggregate_price_info.price, - confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, - coingecko_price=coingecko_prices.get( - product.attrs["base"] - ), - coingecko_update=coingecko_updates.get( - product.attrs["base"] - ), - crosschain_price=crosschain_price, - ) + price_feed_state = PriceFeedState( + symbol=product.attrs["symbol"], + asset_type=product.attrs["asset_type"], + schedule=MarketSchedule(product.attrs["schedule"]), + public_key=price_account.key, + status=price_account.aggregate_price_status, + # this is the solana block slot when price account was fetched + latest_block_slot=latest_block_slot, + latest_trading_slot=price_account.last_slot, + price_aggregate=price_account.aggregate_price_info.price, + confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, + coingecko_price=coingecko_prices.get(product.attrs["base"]), + coingecko_update=coingecko_updates.get( + product.attrs["base"] + ), + crosschain_price=crosschain_price, ) + states.append(price_feed_state) + processed_feeds += 1 + + metrics.update_price_feed_metrics(price_feed_state) + + symbol = product.attrs["symbol"] + if symbol not in active_publishers_by_symbol: + active_publishers_by_symbol[symbol] = { + "count": 0, + "asset_type": product.attrs["asset_type"], + } + for component in price_account.price_components: pub = self.publishers.get(component.publisher_key.key, None) publisher_name = ( (pub.name if pub else "") + f" ({component.publisher_key.key})" ).strip() - states.append( - PublisherState( - publisher_name=publisher_name, - symbol=product.attrs["symbol"], - asset_type=product.attrs["asset_type"], - schedule=MarketSchedule(product.attrs["schedule"]), - public_key=component.publisher_key, - confidence_interval=component.latest_price_info.confidence_interval, - confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, - price=component.latest_price_info.price, - price_aggregate=price_account.aggregate_price_info.price, - slot=component.latest_price_info.pub_slot, - aggregate_slot=price_account.last_slot, - # this is the solana block slot when price account was fetched - latest_block_slot=latest_block_slot, - status=component.latest_price_info.price_status, - aggregate_status=price_account.aggregate_price_status, - ) + + publisher_state = PublisherState( + publisher_name=publisher_name, + symbol=product.attrs["symbol"], + asset_type=product.attrs["asset_type"], + schedule=MarketSchedule(product.attrs["schedule"]), + public_key=component.publisher_key, + confidence_interval=component.latest_price_info.confidence_interval, + confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, + price=component.latest_price_info.price, + price_aggregate=price_account.aggregate_price_info.price, + slot=component.latest_price_info.pub_slot, + aggregate_slot=price_account.last_slot, + # this is the solana block slot when price account was fetched + latest_block_slot=latest_block_slot, + status=component.latest_price_info.price_status, + aggregate_status=price_account.aggregate_price_status, ) - await self.dispatch.run(states) + states.append(publisher_state) + active_publishers_by_symbol[symbol]["count"] += 1 + + metrics.price_feeds_processed.set(processed_feeds) + + for symbol, info in active_publishers_by_symbol.items(): + metrics.publishers_active.labels( + symbol=symbol, asset_type=info["asset_type"] + ).set(info["count"]) + + await self.dispatch.run(states) + except Exception as e: logger.error(f"Error in run loop: {e}") health_server.observer_ready = False - - logger.debug("Sleeping...") + metrics.loop_errors_total.labels(error_type=type(e).__name__).inc() await asyncio.sleep(5) async def get_pyth_products(self) -> List[PythProductAccount]: logger.debug("Fetching Pyth product accounts...") - async with self.pyth_throttler: - return await self.pyth_client.refresh_products() + try: + async with self.pyth_throttler: + with metrics.time_operation( + metrics.api_request_duration, service="pyth", endpoint="products" + ): + result = await self.pyth_client.refresh_products() + metrics.api_request_total.labels( + service="pyth", endpoint="products", status="success" + ).inc() + return result + except Exception: + metrics.api_request_total.labels( + service="pyth", endpoint="products", status="error" + ).inc() + raise async def get_pyth_prices( self, product: PythProductAccount ) -> Dict[PythPriceType, PythPriceAccount]: logger.debug("Fetching Pyth price accounts...") - async with self.pyth_throttler: - return await product.refresh_prices() + try: + async with self.pyth_throttler: + with metrics.time_operation( + metrics.api_request_duration, service="pyth", endpoint="prices" + ): + result = await product.refresh_prices() + metrics.api_request_total.labels( + service="pyth", endpoint="prices", status="success" + ).inc() + return result + except Exception: + metrics.api_request_total.labels( + service="pyth", endpoint="prices", status="error" + ).inc() + raise async def get_coingecko_prices(self): logger.debug("Fetching CoinGecko prices...") - data = await get_coingecko_prices(self.coingecko_mapping) + try: + with metrics.time_operation( + metrics.api_request_duration, service="coingecko", endpoint="prices" + ): + data = await get_coingecko_prices(self.coingecko_mapping) + metrics.api_request_total.labels( + service="coingecko", endpoint="prices", status="success" + ).inc() + except Exception: + metrics.api_request_total.labels( + service="coingecko", endpoint="prices", status="error" + ).inc() + raise + prices: Dict[str, float] = {} updates: Dict[str, int] = {} # Unix timestamps @@ -205,4 +272,17 @@ async def get_coingecko_prices(self): return (prices, updates) async def get_crosschain_prices(self) -> Dict[str, CrosschainPrice]: - return await self.crosschain.get_crosschain_prices() + try: + with metrics.time_operation( + metrics.api_request_duration, service="crosschain", endpoint="prices" + ): + result = await self.crosschain.get_crosschain_prices() + metrics.api_request_total.labels( + service="crosschain", endpoint="prices", status="success" + ).inc() + return result + except Exception: + metrics.api_request_total.labels( + service="crosschain", endpoint="prices", status="error" + ).inc() + raise diff --git a/pyth_observer/dispatch.py b/pyth_observer/dispatch.py index de5fb73..f6d0171 100644 --- a/pyth_observer/dispatch.py +++ b/pyth_observer/dispatch.py @@ -6,7 +6,6 @@ from typing import Any, Awaitable, Dict, List from loguru import logger -from prometheus_client import Gauge from pyth_observer.check import Check, State from pyth_observer.check.price_feed import PRICE_FEED_CHECKS, PriceFeedState @@ -15,6 +14,7 @@ from pyth_observer.event import LogEvent # Used dynamically from pyth_observer.event import TelegramEvent # Used dynamically from pyth_observer.event import Context, Event, ZendutyEvent +from pyth_observer.metrics import metrics from pyth_observer.zenduty import send_zenduty_alert assert DatadogEvent @@ -32,16 +32,6 @@ class Dispatch: def __init__(self, config, publishers): self.config = config self.publishers = publishers - self.price_feed_check_gauge = Gauge( - "price_feed_check_failed", - "Price feed check failure status", - ["check", "symbol"], - ) - self.publisher_check_gauge = Gauge( - "publisher_check_failed", - "Publisher check failure status", - ["check", "symbol", "publisher"], - ) if "ZendutyEvent" in self.config["events"]: self.open_alerts_file = os.environ["OPEN_ALERTS_FILE"] self.open_alerts = self.load_alerts() @@ -98,48 +88,70 @@ async def run(self, states: List[State]): sent_events.append(event.send()) await asyncio.gather(*sent_events) + + metrics.update_alert_metrics(self.open_alerts) + if "ZendutyEvent" in self.config["events"]: await self.process_zenduty_events(current_time) def check_price_feed(self, state: PriceFeedState) -> List[Check]: failed_checks: List[Check] = [] + total_checks = 0 + passed_checks = 0 for check_class in PRICE_FEED_CHECKS: config = self.load_config(check_class.__name__, state.symbol) - check = check_class(state, config) - gauge = self.price_feed_check_gauge.labels( - check=check_class.__name__, - symbol=state.symbol, - ) if config["enable"]: - if check.run(): - gauge.set(0) + total_checks += 1 + check = check_class(state, config) + + with metrics.time_operation( + metrics.check_execution_duration, check_type=check_class.__name__ + ): + check_passed = check.run() + + if check_passed: + passed_checks += 1 else: failed_checks.append(check) - gauge.set(1) + + if total_checks > 0: + success_rate = passed_checks / total_checks + metrics.check_success_rate.labels( + check_type="price_feed", symbol=state.symbol + ).set(success_rate) return failed_checks def check_publisher(self, state: PublisherState) -> List[Check]: failed_checks: List[Check] = [] + total_checks = 0 + passed_checks = 0 for check_class in PUBLISHER_CHECKS: config = self.load_config(check_class.__name__, state.symbol) - check = check_class(state, config) - gauge = self.publisher_check_gauge.labels( - check=check_class.__name__, - symbol=state.symbol, - publisher=self.publishers.get(state.public_key, state.public_key), - ) if config["enable"]: - if check.run(): - gauge.set(0) + total_checks += 1 + check = check_class(state, config) + + with metrics.time_operation( + metrics.check_execution_duration, check_type=check_class.__name__ + ): + check_passed = check.run() + + if check_passed: + passed_checks += 1 else: - gauge.set(1) failed_checks.append(check) + if total_checks > 0: + success_rate = passed_checks / total_checks + metrics.check_success_rate.labels( + check_type="publisher", symbol=state.symbol + ).set(success_rate) + return failed_checks def load_config(self, check_name: str, symbol: str) -> Dict[str, Any]: @@ -187,12 +199,16 @@ async def process_zenduty_events(self, current_time): ): logger.debug(f"Resolving Zenduty alert {identifier}") resolved = True + if info["sent"]: response = await send_zenduty_alert( identifier, identifier, resolved=True ) if response and 200 <= response.status < 300: to_remove.append(identifier) + metrics.alerts_sent_total.labels( + alert_type=info["type"], channel="zenduty" + ).inc() else: to_remove.append(identifier) # Raise alert if failed > $threshold times within the last 5m window @@ -216,6 +232,10 @@ async def process_zenduty_events(self, current_time): event = self.delayed_events.get(key) if event: to_alert.append(event.send()) + metrics.alerts_sent_total.labels( + alert_type=info["type"], + channel=event_type.lower().replace("event", ""), + ).inc() # Send the alerts that were delayed due to thresholds await asyncio.gather(*to_alert) @@ -229,5 +249,7 @@ async def process_zenduty_events(self, current_time): if self.delayed_events.get(key): del self.delayed_events[key] + metrics.update_alert_metrics(self.open_alerts) + with open(self.open_alerts_file, "w") as file: json.dump(self.open_alerts, file) diff --git a/pyth_observer/metrics.py b/pyth_observer/metrics.py new file mode 100644 index 0000000..8d331f9 --- /dev/null +++ b/pyth_observer/metrics.py @@ -0,0 +1,260 @@ +import time +from contextlib import contextmanager +from typing import Any, Dict, Optional + +from prometheus_client import ( + REGISTRY, + CollectorRegistry, + Counter, + Gauge, + Histogram, + Info, +) + + +class PythObserverMetrics: + def __init__(self, registry: CollectorRegistry = REGISTRY): + self.registry = registry + + self.observer_info = Info( + "pyth_observer_info", + "Information about the Pyth Observer instance", + registry=registry, + ) + + self.check_execution_duration = Histogram( + "pyth_observer_check_execution_duration_seconds", + "Time spent executing checks", + ["check_type"], + buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0], + registry=registry, + ) + + self.loop_errors_total = Counter( + "pyth_observer_loop_errors_total", + "Total number of errors in observation loop", + ["error_type"], + registry=registry, + ) + + self.price_feeds_processed = Gauge( + "pyth_observer_price_feeds_processed_total", + "Total number of price feeds processed in last cycle", + registry=registry, + ) + + self.price_feed_status = Gauge( + "pyth_observer_price_feed_status", + "Status of price feeds (1=trading, 0=not trading)", + ["symbol", "asset_type"], + registry=registry, + ) + + self.price_feed_staleness = Gauge( + "pyth_observer_price_feed_staleness_slots", + "Number of slots since last price update", + ["symbol", "asset_type"], + registry=registry, + ) + + self.price_feed_confidence_interval = Gauge( + "pyth_observer_price_feed_confidence_interval", + "Price feed confidence interval", + ["symbol", "asset_type"], + registry=registry, + ) + + self.check_success_rate = Gauge( + "pyth_observer_check_success_rate", + "Success rate of checks (0-1)", + ["check_type", "symbol"], + registry=registry, + ) + + self.price_deviation_from_coingecko = Gauge( + "pyth_observer_price_deviation_from_coingecko_percent", + "Price deviation from CoinGecko as percentage", + ["symbol"], + registry=registry, + ) + + self.coingecko_price_age = Gauge( + "pyth_observer_coingecko_price_age_seconds", + "Age of CoinGecko price data in seconds", + ["symbol"], + registry=registry, + ) + + self.publishers_active = Gauge( + "pyth_observer_publishers_active_total", + "Number of active publishers for a symbol", + ["symbol", "asset_type"], + registry=registry, + ) + + self.alerts_active = Gauge( + "pyth_observer_alerts_active_total", + "Number of currently active alerts", + ["alert_type"], + registry=registry, + ) + + self.alerts_sent_total = Counter( + "pyth_observer_alerts_sent_total", + "Total number of alerts sent", + ["alert_type", "channel"], + registry=registry, + ) + + self.api_request_duration = Histogram( + "pyth_observer_api_request_duration_seconds", + "Duration of external API requests", + ["service", "endpoint"], + buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0], + registry=registry, + ) + + self.api_request_total = Counter( + "pyth_observer_api_requests_total", + "Total number of API requests", + ["service", "endpoint", "status"], + registry=registry, + ) + + self.api_rate_limit_hits = Counter( + "pyth_observer_api_rate_limit_hits_total", + "Number of times rate limits were hit", + ["service"], + registry=registry, + ) + + self.crosschain_price_age = Gauge( + "pyth_observer_crosschain_price_age_seconds", + "Age of cross-chain price data in seconds", + ["symbol"], + registry=registry, + ) + + self.latest_block_slot = Gauge( + "pyth_observer_latest_block_slot", + "Latest Solana block slot observed", + registry=registry, + ) + + self.network_connection_status = Gauge( + "pyth_observer_network_connection_status", + "Network connection status (1=connected, 0=disconnected)", + ["network", "endpoint_type"], + registry=registry, + ) + + def set_observer_info(self, network: str, config: Dict[str, Any]): + """Set static information about the observer instance.""" + self.observer_info.info( + { + "network": network, + "checks_enabled": str( + len( + [ + c + for c in config.get("checks", {}).get("global", {}) + if config["checks"]["global"][c].get("enable", False) + ] + ) + ), + "event_handlers": ",".join(config.get("events", [])), + } + ) + + @contextmanager + def time_operation(self, metric: Histogram, **labels): + start_time = time.time() + try: + yield + finally: + duration = time.time() - start_time + metric.labels(**labels).observe(duration) + + def update_price_feed_metrics(self, state): + labels = {"symbol": state.symbol, "asset_type": state.asset_type} + + from pythclient.pythaccounts import PythPriceStatus + + status_value = 1 if state.status == PythPriceStatus.TRADING else 0 + self.price_feed_status.labels(**labels).set(status_value) + + staleness = state.latest_block_slot - state.latest_trading_slot + self.price_feed_staleness.labels(**labels).set(staleness) + + self.price_feed_confidence_interval.labels(**labels).set( + state.confidence_interval_aggregate + ) + + if state.coingecko_price: + deviation = ( + abs(state.price_aggregate - state.coingecko_price) + / state.coingecko_price + * 100 + ) + self.price_deviation_from_coingecko.labels(symbol=state.symbol).set( + deviation + ) + + if state.coingecko_update: + age = time.time() - state.coingecko_update + self.coingecko_price_age.labels(symbol=state.symbol).set(age) + + if state.crosschain_price and state.crosschain_price.get("publish_time"): + age = ( + state.crosschain_price["snapshot_time"] + - state.crosschain_price["publish_time"] + ) + self.crosschain_price_age.labels(symbol=state.symbol).set(age) + + self.latest_block_slot.set(state.latest_block_slot) + + def record_api_request( + self, + service: str, + endpoint: str, + duration: float, + status_code: int, + rate_limited: bool = False, + ): + status = "success" if 200 <= status_code < 300 else "error" + + self.api_request_duration.labels(service=service, endpoint=endpoint).observe( + duration + ) + self.api_request_total.labels( + service=service, endpoint=endpoint, status=status + ).inc() + + if rate_limited: + self.api_rate_limit_hits.labels(service=service).inc() + + def update_alert_metrics( + self, active_alerts: Dict[str, Any], sent_alert: Optional[str] = None + ): + alert_counts = {} + for alert_id, alert_info in active_alerts.items(): + alert_type = alert_info.get("type", "unknown") + alert_counts[alert_type] = alert_counts.get(alert_type, 0) + 1 + + for alert_type, count in alert_counts.items(): + self.alerts_active.labels(alert_type=alert_type).set(count) + + if sent_alert: + alert_type = sent_alert.split("-")[0] + self.alerts_sent_total.labels( + alert_type=alert_type, channel="configured" + ).inc() + + def set_network_status(self, network: str, endpoint_type: str, connected: bool): + status = 1 if connected else 0 + self.network_connection_status.labels( + network=network, endpoint_type=endpoint_type + ).set(status) + + +metrics = PythObserverMetrics()