Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 127 additions & 47 deletions pyth_observer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pyth_observer.crosschain import CrosschainPrice
from pyth_observer.crosschain import CrosschainPriceObserver as Crosschain
from pyth_observer.dispatch import Dispatch
from pyth_observer.metrics import metrics
from pyth_observer.models import Publisher

PYTHTEST_HTTP_ENDPOINT = "https://api.pythtest.pyth.network/"
Expand Down Expand Up @@ -71,7 +72,14 @@ def __init__(
self.crosschain_throttler = Throttler(rate_limit=1, period=1)
self.coingecko_mapping = coingecko_mapping

metrics.set_observer_info(
network=config["network"]["name"],
config=config,
)

async def run(self):
# global states
states = []
while True:
try:
Comment on lines +81 to 84
Copy link

Copilot AI Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The states list is initialized inside the while loop scope but declared outside it. This will cause the list to accumulate data across iterations, leading to memory growth and incorrect metrics. Move this initialization inside the while loop.

Suggested change
# global states
states = []
while True:
try:
while True:
try:
states = []

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it won't. L107 will clear it

logger.info("Running checks")
Expand All @@ -82,6 +90,9 @@ async def run(self):

health_server.observer_ready = True

processed_feeds = 0
active_publishers_by_symbol = {}

for product in products:
# Skip tombstone accounts with blank metadata
if "base" not in product.attrs:
Expand Down Expand Up @@ -121,80 +132,136 @@ async def run(self):
if not price_account.aggregate_price_info:
raise RuntimeError("Aggregate price info is missing")

states.append(
PriceFeedState(
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=price_account.key,
status=price_account.aggregate_price_status,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
latest_trading_slot=price_account.last_slot,
price_aggregate=price_account.aggregate_price_info.price,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
coingecko_price=coingecko_prices.get(
product.attrs["base"]
),
coingecko_update=coingecko_updates.get(
product.attrs["base"]
),
crosschain_price=crosschain_price,
)
price_feed_state = PriceFeedState(
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=price_account.key,
status=price_account.aggregate_price_status,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
latest_trading_slot=price_account.last_slot,
price_aggregate=price_account.aggregate_price_info.price,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
coingecko_price=coingecko_prices.get(product.attrs["base"]),
coingecko_update=coingecko_updates.get(
product.attrs["base"]
),
crosschain_price=crosschain_price,
)

states.append(price_feed_state)
processed_feeds += 1

metrics.update_price_feed_metrics(price_feed_state)

symbol = product.attrs["symbol"]
if symbol not in active_publishers_by_symbol:
active_publishers_by_symbol[symbol] = {
"count": 0,
"asset_type": product.attrs["asset_type"],
}

for component in price_account.price_components:
pub = self.publishers.get(component.publisher_key.key, None)
publisher_name = (
(pub.name if pub else "")
+ f" ({component.publisher_key.key})"
).strip()
states.append(
PublisherState(
publisher_name=publisher_name,
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=component.publisher_key,
confidence_interval=component.latest_price_info.confidence_interval,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
price=component.latest_price_info.price,
price_aggregate=price_account.aggregate_price_info.price,
slot=component.latest_price_info.pub_slot,
aggregate_slot=price_account.last_slot,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
status=component.latest_price_info.price_status,
aggregate_status=price_account.aggregate_price_status,
)

publisher_state = PublisherState(
publisher_name=publisher_name,
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=component.publisher_key,
confidence_interval=component.latest_price_info.confidence_interval,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
price=component.latest_price_info.price,
price_aggregate=price_account.aggregate_price_info.price,
slot=component.latest_price_info.pub_slot,
aggregate_slot=price_account.last_slot,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
status=component.latest_price_info.price_status,
aggregate_status=price_account.aggregate_price_status,
)

await self.dispatch.run(states)
states.append(publisher_state)
active_publishers_by_symbol[symbol]["count"] += 1

metrics.price_feeds_processed.set(processed_feeds)

for symbol, info in active_publishers_by_symbol.items():
metrics.publishers_active.labels(
symbol=symbol, asset_type=info["asset_type"]
).set(info["count"])

await self.dispatch.run(states)

except Exception as e:
logger.error(f"Error in run loop: {e}")
health_server.observer_ready = False

logger.debug("Sleeping...")
metrics.loop_errors_total.labels(error_type=type(e).__name__).inc()
await asyncio.sleep(5)

async def get_pyth_products(self) -> List[PythProductAccount]:
logger.debug("Fetching Pyth product accounts...")

async with self.pyth_throttler:
return await self.pyth_client.refresh_products()
try:
async with self.pyth_throttler:
with metrics.time_operation(
metrics.api_request_duration, service="pyth", endpoint="products"
):
result = await self.pyth_client.refresh_products()
metrics.api_request_total.labels(
service="pyth", endpoint="products", status="success"
).inc()
return result
except Exception:
metrics.api_request_total.labels(
service="pyth", endpoint="products", status="error"
).inc()
raise

async def get_pyth_prices(
self, product: PythProductAccount
) -> Dict[PythPriceType, PythPriceAccount]:
logger.debug("Fetching Pyth price accounts...")

async with self.pyth_throttler:
return await product.refresh_prices()
try:
async with self.pyth_throttler:
with metrics.time_operation(
metrics.api_request_duration, service="pyth", endpoint="prices"
):
result = await product.refresh_prices()
metrics.api_request_total.labels(
service="pyth", endpoint="prices", status="success"
).inc()
return result
except Exception:
metrics.api_request_total.labels(
service="pyth", endpoint="prices", status="error"
).inc()
raise

async def get_coingecko_prices(self):
logger.debug("Fetching CoinGecko prices...")

data = await get_coingecko_prices(self.coingecko_mapping)
try:
with metrics.time_operation(
metrics.api_request_duration, service="coingecko", endpoint="prices"
):
data = await get_coingecko_prices(self.coingecko_mapping)
metrics.api_request_total.labels(
service="coingecko", endpoint="prices", status="success"
).inc()
except Exception:
metrics.api_request_total.labels(
service="coingecko", endpoint="prices", status="error"
).inc()
raise

prices: Dict[str, float] = {}
updates: Dict[str, int] = {} # Unix timestamps

Expand All @@ -205,4 +272,17 @@ async def get_coingecko_prices(self):
return (prices, updates)

async def get_crosschain_prices(self) -> Dict[str, CrosschainPrice]:
return await self.crosschain.get_crosschain_prices()
try:
with metrics.time_operation(
metrics.api_request_duration, service="crosschain", endpoint="prices"
):
result = await self.crosschain.get_crosschain_prices()
metrics.api_request_total.labels(
service="crosschain", endpoint="prices", status="success"
).inc()
return result
except Exception:
metrics.api_request_total.labels(
service="crosschain", endpoint="prices", status="error"
).inc()
raise
78 changes: 50 additions & 28 deletions pyth_observer/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Any, Awaitable, Dict, List

from loguru import logger
from prometheus_client import Gauge

from pyth_observer.check import Check, State
from pyth_observer.check.price_feed import PRICE_FEED_CHECKS, PriceFeedState
Expand All @@ -15,6 +14,7 @@
from pyth_observer.event import LogEvent # Used dynamically
from pyth_observer.event import TelegramEvent # Used dynamically
from pyth_observer.event import Context, Event, ZendutyEvent
from pyth_observer.metrics import metrics
from pyth_observer.zenduty import send_zenduty_alert

assert DatadogEvent
Expand All @@ -32,16 +32,6 @@ class Dispatch:
def __init__(self, config, publishers):
self.config = config
self.publishers = publishers
self.price_feed_check_gauge = Gauge(
"price_feed_check_failed",
"Price feed check failure status",
["check", "symbol"],
)
self.publisher_check_gauge = Gauge(
"publisher_check_failed",
"Publisher check failure status",
["check", "symbol", "publisher"],
)
if "ZendutyEvent" in self.config["events"]:
self.open_alerts_file = os.environ["OPEN_ALERTS_FILE"]
self.open_alerts = self.load_alerts()
Expand Down Expand Up @@ -98,48 +88,70 @@ async def run(self, states: List[State]):
sent_events.append(event.send())

await asyncio.gather(*sent_events)

metrics.update_alert_metrics(self.open_alerts)

if "ZendutyEvent" in self.config["events"]:
await self.process_zenduty_events(current_time)

def check_price_feed(self, state: PriceFeedState) -> List[Check]:
failed_checks: List[Check] = []
total_checks = 0
passed_checks = 0

for check_class in PRICE_FEED_CHECKS:
config = self.load_config(check_class.__name__, state.symbol)
check = check_class(state, config)
gauge = self.price_feed_check_gauge.labels(
check=check_class.__name__,
symbol=state.symbol,
)

if config["enable"]:
if check.run():
gauge.set(0)
total_checks += 1
check = check_class(state, config)

with metrics.time_operation(
metrics.check_execution_duration, check_type=check_class.__name__
):
check_passed = check.run()

if check_passed:
passed_checks += 1
else:
failed_checks.append(check)
gauge.set(1)

if total_checks > 0:
success_rate = passed_checks / total_checks
metrics.check_success_rate.labels(
check_type="price_feed", symbol=state.symbol
).set(success_rate)

return failed_checks

def check_publisher(self, state: PublisherState) -> List[Check]:
failed_checks: List[Check] = []
total_checks = 0
passed_checks = 0

for check_class in PUBLISHER_CHECKS:
config = self.load_config(check_class.__name__, state.symbol)
check = check_class(state, config)
gauge = self.publisher_check_gauge.labels(
check=check_class.__name__,
symbol=state.symbol,
publisher=self.publishers.get(state.public_key, state.public_key),
)

if config["enable"]:
if check.run():
gauge.set(0)
total_checks += 1
check = check_class(state, config)

with metrics.time_operation(
metrics.check_execution_duration, check_type=check_class.__name__
):
check_passed = check.run()

if check_passed:
passed_checks += 1
else:
gauge.set(1)
failed_checks.append(check)

if total_checks > 0:
success_rate = passed_checks / total_checks
metrics.check_success_rate.labels(
check_type="publisher", symbol=state.symbol
).set(success_rate)

return failed_checks

def load_config(self, check_name: str, symbol: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -187,12 +199,16 @@ async def process_zenduty_events(self, current_time):
):
logger.debug(f"Resolving Zenduty alert {identifier}")
resolved = True

if info["sent"]:
response = await send_zenduty_alert(
identifier, identifier, resolved=True
)
if response and 200 <= response.status < 300:
to_remove.append(identifier)
metrics.alerts_sent_total.labels(
alert_type=info["type"], channel="zenduty"
).inc()
else:
to_remove.append(identifier)
# Raise alert if failed > $threshold times within the last 5m window
Expand All @@ -216,6 +232,10 @@ async def process_zenduty_events(self, current_time):
event = self.delayed_events.get(key)
if event:
to_alert.append(event.send())
metrics.alerts_sent_total.labels(
alert_type=info["type"],
channel=event_type.lower().replace("event", ""),
).inc()
Comment on lines +235 to +238
Copy link

Copilot AI Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable event_type is not defined in this scope. This will cause a NameError when this code path is executed.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm i think it is tho


# Send the alerts that were delayed due to thresholds
await asyncio.gather(*to_alert)
Expand All @@ -229,5 +249,7 @@ async def process_zenduty_events(self, current_time):
if self.delayed_events.get(key):
del self.delayed_events[key]

metrics.update_alert_metrics(self.open_alerts)

with open(self.open_alerts_file, "w") as file:
json.dump(self.open_alerts, file)
Loading