From ee6a9fe99932a7c52d2682fb8d90ebbc05ec2931 Mon Sep 17 00:00:00 2001 From: GitHub Actions Date: Mon, 8 Dec 2025 20:51:12 +0000 Subject: [PATCH] Release 2025-12-08-20-51 --- CHANGELOG.md | 6 + docker-compose.prd.yaml | 2 +- docker-compose.validator.yaml | 2 +- neurons/validator/db/operations.py | 30 ++ neurons/validator/main.py | 11 + neurons/validator/models/chutes.py | 12 + neurons/validator/models/numinous_client.py | 12 + .../validator/models/tests/test_weights.py | 42 +++ neurons/validator/models/weights.py | 15 + neurons/validator/numinous_client/client.py | 14 + .../tests/test_numinous_client.py | 79 +++++ .../validator/scheduler/tasks_scheduler.py | 5 +- .../scheduler/tests/test_tasks_scheduler.py | 52 ++- neurons/validator/tasks/db_cleaner.py | 20 ++ .../validator/tasks/export_agent_run_logs.py | 98 ++++++ neurons/validator/tasks/run_agents.py | 41 +-- neurons/validator/tasks/set_weights.py | 110 ++++-- .../validator/tasks/tests/test_db_cleaner.py | 14 +- .../tasks/tests/test_export_agent_run_logs.py | 322 ++++++++++++++++++ .../validator/tasks/tests/test_run_agents.py | 122 ++++--- .../validator/tasks/tests/test_set_weights.py | 197 +++++++---- neurons/validator/tests/test_main.py | 4 +- neurons/validator/version.py | 2 +- 23 files changed, 1024 insertions(+), 188 deletions(-) create mode 100644 neurons/validator/models/tests/test_weights.py create mode 100644 neurons/validator/models/weights.py create mode 100644 neurons/validator/tasks/export_agent_run_logs.py create mode 100644 neurons/validator/tasks/tests/test_export_agent_run_logs.py diff --git a/CHANGELOG.md b/CHANGELOG.md index cedf1f4..6255531 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Release Notes +## [2.0.4] - 2025-12-08 +- **Weights**: SetWeights task now fetches latest weights from backend API +- **Database**: Migration to backfill agent runs from validator predictions +- **Tasks**: Run agents task stores execution logs; db cleaner removes exported logs +- **Scheduler**: Added timeout handling for scheduled tasks + ## [2.0.3] - 2025-12-03 - **Scoring**: Fill missing predictions with 0.5 for miners without predictions - **Architecture**: Sandbox retry mechanism and error handling with comprehensive log exports diff --git a/docker-compose.prd.yaml b/docker-compose.prd.yaml index c5d0b20..b5848a2 100644 --- a/docker-compose.prd.yaml +++ b/docker-compose.prd.yaml @@ -38,7 +38,7 @@ services: --wallet.hotkey ifhkey --db.directory /root/infinite_games/database --numinous.env prod - --sandbox.max_concurrent 25 + --sandbox.max_concurrent 50 --sandbox.timeout_seconds 150 --validator.sync_hour 0 --logging.debug" diff --git a/docker-compose.validator.yaml b/docker-compose.validator.yaml index 4bb3f52..f460625 100644 --- a/docker-compose.validator.yaml +++ b/docker-compose.validator.yaml @@ -37,7 +37,7 @@ services: --wallet.hotkey ${WALLET_HOTKEY} --db.directory /root/infinite_games/database --numinous.env prod - --sandbox.max_concurrent 25 + --sandbox.max_concurrent 50 --sandbox.timeout_seconds 150 --logging.debug" diff --git a/neurons/validator/db/operations.py b/neurons/validator/db/operations.py index 9218ec8..dbd18da 100644 --- a/neurons/validator/db/operations.py +++ b/neurons/validator/db/operations.py @@ -1276,6 +1276,36 @@ async def delete_agent_run_logs(self, batch_size: int) -> Iterable[tuple[int]]: [AgentRunLogExportedStatus.EXPORTED, batch_size], ) + async def delete_agent_runs(self, batch_size: int) -> Iterable[tuple[int]]: + return await self.__db_client.delete( + """ + WITH runs_to_delete AS ( + SELECT + ROWID + FROM + agent_runs + WHERE + exported = ? + AND datetime(created_at) < datetime(CURRENT_TIMESTAMP, '-7 day') + ORDER BY + ROWID ASC + LIMIT ? + ) + DELETE FROM + agent_runs + WHERE + ROWID IN ( + SELECT + ROWID + FROM + runs_to_delete + ) + RETURNING + ROWID + """, + [AgentRunExportedStatus.EXPORTED, batch_size], + ) + async def count_runs_for_event_and_agent( self, unique_event_id: str, diff --git a/neurons/validator/main.py b/neurons/validator/main.py index 74596bd..ccf1cdf 100644 --- a/neurons/validator/main.py +++ b/neurons/validator/main.py @@ -14,6 +14,7 @@ from neurons.validator.tasks.db_cleaner import DbCleaner from neurons.validator.tasks.db_vacuum import DbVacuum from neurons.validator.tasks.delete_events import DeleteEvents +from neurons.validator.tasks.export_agent_run_logs import ExportAgentRunLogs from neurons.validator.tasks.export_agent_runs import ExportAgentRuns from neurons.validator.tasks.export_predictions import ExportPredictions from neurons.validator.tasks.export_scores import ExportScores @@ -179,6 +180,14 @@ async def main(): validator_hotkey=validator_hotkey, ) + export_agent_run_logs_task = ExportAgentRunLogs( + interval_seconds=600.0, + batch_size=500, + db_operations=db_operations, + api_client=numinous_api_client, + logger=logger, + ) + set_weights_task = SetWeights( interval_seconds=379.0, db_operations=db_operations, @@ -187,6 +196,7 @@ async def main(): netuid=bt_netuid, subtensor=bt_subtensor, wallet=bt_wallet, + api_client=numinous_api_client, ) db_cleaner_task = DbCleaner( @@ -211,6 +221,7 @@ async def main(): scheduler.add(task=run_agents_task) scheduler.add(task=export_predictions_task) scheduler.add(task=export_agent_runs_task) + scheduler.add(task=export_agent_run_logs_task) scheduler.add(task=scoring_task) scheduler.add(task=metagraph_scoring_task) scheduler.add(task=export_scores_task) diff --git a/neurons/validator/models/chutes.py b/neurons/validator/models/chutes.py index 2e43c38..af45f62 100644 --- a/neurons/validator/models/chutes.py +++ b/neurons/validator/models/chutes.py @@ -15,6 +15,8 @@ class ChuteModel(StrEnum): DEEPSEEK_V3_1 = "deepseek-ai/DeepSeek-V3.1" DEEPSEEK_TNG_R1T2_CHIMERA = "tngtech/DeepSeek-TNG-R1T2-Chimera" DEEPSEEK_V3_2_EXP = "deepseek-ai/DeepSeek-V3.2-Exp" + DEEPSEEK_V3_2 = "deepseek-ai/DeepSeek-V3.2" + DEEPSEEK_V3_2_SPECIALE = "deepseek-ai/DeepSeek-V3.2-Speciale" # Gemma models GEMMA_3_4B_IT = "unsloth/gemma-3-4b-it" @@ -171,6 +173,16 @@ def calculate_cost(self, completion: ChutesCompletion) -> float: input_cost=0.25, output_cost=0.35, ), + ChuteModel.DEEPSEEK_V3_2: Chute( + name=ChuteModel.DEEPSEEK_V3_2, + input_cost=0.27, + output_cost=0.41, + ), + ChuteModel.DEEPSEEK_V3_2_SPECIALE: Chute( + name=ChuteModel.DEEPSEEK_V3_2_SPECIALE, + input_cost=0.27, + output_cost=0.41, + ), ChuteModel.GLM_4_6: Chute( name=ChuteModel.GLM_4_6, input_cost=0.4, diff --git a/neurons/validator/models/numinous_client.py b/neurons/validator/models/numinous_client.py index 45c32ae..28aa808 100644 --- a/neurons/validator/models/numinous_client.py +++ b/neurons/validator/models/numinous_client.py @@ -239,3 +239,15 @@ class GatewayDesearchWebSearchResponse(WebSearchResponse, GatewayCallResponse): class GatewayDesearchWebCrawlResponse(WebCrawlResponse, GatewayCallResponse): pass + + +class MinerWeight(BaseModel): + miner_uid: int + miner_hotkey: str + aggregated_weight: float + + +class GetWeightsResponse(BaseModel): + aggregated_at: datetime + weights: typing.List[MinerWeight] + count: int diff --git a/neurons/validator/models/tests/test_weights.py b/neurons/validator/models/tests/test_weights.py new file mode 100644 index 0000000..715cd9b --- /dev/null +++ b/neurons/validator/models/tests/test_weights.py @@ -0,0 +1,42 @@ +from datetime import datetime, timezone + +import pytest + +from neurons.validator.models.weights import WeightsModel + + +class TestWeightsModel: + def test_weights_model_creation(self): + weights = WeightsModel( + miner_uid=1, + miner_hotkey="5C4hrfjw9nL7fKRAn3RRKqUWewVE5bGVU7VKF5Ut3N7TkPJJ", + metagraph_score=0.835, + aggregated_at=datetime(2025, 1, 30, 12, 0, 0, tzinfo=timezone.utc), + ) + + assert weights.miner_uid == 1 + assert weights.miner_hotkey == "5C4hrfjw9nL7fKRAn3RRKqUWewVE5bGVU7VKF5Ut3N7TkPJJ" + assert weights.metagraph_score == 0.835 + assert weights.aggregated_at == datetime(2025, 1, 30, 12, 0, 0, tzinfo=timezone.utc) + + def test_weights_model_without_aggregated_at(self): + weights = WeightsModel( + miner_uid=2, + miner_hotkey="5Dpqn...", + metagraph_score=0.165, + ) + + assert weights.aggregated_at is None + + def test_weights_model_primary_key(self): + weights = WeightsModel( + miner_uid=3, + miner_hotkey="test_hotkey", + metagraph_score=0.5, + ) + + assert weights.primary_key == ["miner_uid", "miner_hotkey"] + + def test_weights_model_validation(self): + with pytest.raises(ValueError): + WeightsModel(miner_uid=1, miner_hotkey="test") diff --git a/neurons/validator/models/weights.py b/neurons/validator/models/weights.py new file mode 100644 index 0000000..fa9be65 --- /dev/null +++ b/neurons/validator/models/weights.py @@ -0,0 +1,15 @@ +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel + + +class WeightsModel(BaseModel): + miner_uid: int + miner_hotkey: str + metagraph_score: float + aggregated_at: Optional[datetime] = None + + @property + def primary_key(self): + return ["miner_uid", "miner_hotkey"] diff --git a/neurons/validator/numinous_client/client.py b/neurons/validator/numinous_client/client.py index eca9b39..ca20f0f 100644 --- a/neurons/validator/numinous_client/client.py +++ b/neurons/validator/numinous_client/client.py @@ -16,6 +16,7 @@ GetEventsDeletedResponse, GetEventsResolvedResponse, GetEventsResponse, + GetWeightsResponse, PostAgentLogsRequestBody, PostAgentRunsRequestBody, PostPredictionsRequestBody, @@ -309,3 +310,16 @@ async def desearch_ai_search(self, body: dict | DesearchAISearchRequest): data = await response.json() return AISearchResponse.model_validate(data) + + async def get_weights(self): + auth_headers = self.make_auth_headers(data="") + + async with self.create_session(other_headers=auth_headers) as session: + path = "/api/v1/validators/weights" + + async with session.get(path) as response: + response.raise_for_status() + + data = await response.json() + + return GetWeightsResponse.model_validate(data) diff --git a/neurons/validator/numinous_client/tests/test_numinous_client.py b/neurons/validator/numinous_client/tests/test_numinous_client.py index 858cc83..386ac3e 100644 --- a/neurons/validator/numinous_client/tests/test_numinous_client.py +++ b/neurons/validator/numinous_client/tests/test_numinous_client.py @@ -21,6 +21,7 @@ GetEventsDeletedResponse, GetEventsResolvedResponse, GetEventsResponse, + GetWeightsResponse, PostAgentLogsRequestBody, PostAgentRunsRequestBody, PostPredictionsRequestBody, @@ -1085,3 +1086,81 @@ async def test_desearch_ai_search_error_raised(self, client_test_env: NuminousCl ) assert e.value.status == status_code + + async def test_get_weights_response(self, client_test_env: NuminousClient): + mock_response_data = { + "aggregated_at": "2025-01-30T12:00:00Z", + "weights": [ + { + "miner_uid": 0, + "miner_hotkey": "5C4hrfjw9DjXZTzV3MwzrrAr9P1MJhSrvWGWqi1eSuyUpnhM", + "aggregated_weight": 0.5, + }, + { + "miner_uid": 1, + "miner_hotkey": "5Dpqn31QEwkqXoMJQF2xvPn9Rh6dDo9CKk1aq3PJxJZgP5Wf", + "aggregated_weight": 0.3, + }, + { + "miner_uid": 2, + "miner_hotkey": "5F3sa2TJAWMqDhXG6jhV4N8ko9SxwGy8TpaNS1repo5EYjQX", + "aggregated_weight": 0.2, + }, + ], + "count": 3, + } + + with aioresponses() as mocked: + mocked.get( + "/api/v1/validators/weights", + status=200, + body=json.dumps(mock_response_data).encode("utf-8"), + ) + + result = await client_test_env.get_weights() + + mocked.assert_called_once() + + assert result == GetWeightsResponse.model_validate(mock_response_data) + assert len(result.weights) == 3 + assert result.count == 3 + assert result.weights[0].miner_uid == 0 + assert result.weights[0].aggregated_weight == 0.5 + + async def test_get_weights_error_503_raised(self, client_test_env: NuminousClient): + mock_response_data = {"message": "No weights available yet"} + status_code = 503 + + with aioresponses() as mocked: + url_path = "/api/v1/validators/weights" + mocked.get( + url_path, + status=status_code, + body=json.dumps(mock_response_data).encode("utf-8"), + ) + + with pytest.raises(ClientResponseError) as e: + await client_test_env.get_weights() + + mocked.assert_called_with(url_path) + + assert e.value.status == status_code + + async def test_get_weights_error_500_raised(self, client_test_env: NuminousClient): + mock_response_data = {"message": "Internal server error"} + status_code = 500 + + with aioresponses() as mocked: + url_path = "/api/v1/validators/weights" + mocked.get( + url_path, + status=status_code, + body=json.dumps(mock_response_data).encode("utf-8"), + ) + + with pytest.raises(ClientResponseError) as e: + await client_test_env.get_weights() + + mocked.assert_called_with(url_path) + + assert e.value.status == status_code diff --git a/neurons/validator/scheduler/tasks_scheduler.py b/neurons/validator/scheduler/tasks_scheduler.py index 715dbb3..1e6bb89 100644 --- a/neurons/validator/scheduler/tasks_scheduler.py +++ b/neurons/validator/scheduler/tasks_scheduler.py @@ -12,6 +12,7 @@ class TasksScheduler: __tasks: list[AbstractTask] __logger: NuminousLogger + DEFAULT_TASK_TIMEOUT_SECONDS = 60 * 60 * 24 # 24 hours def __init__(self, logger: NuminousLogger): # Validate logger @@ -37,8 +38,8 @@ async def __schedule_task(self, task: AbstractTask): self.__logger.info("Task started", extra={"task_name": task.name}) try: - # Execute the task's run async function - await task.run() + # Execute the task's run async function with a default timeout + await asyncio.wait_for(task.run(), timeout=self.DEFAULT_TASK_TIMEOUT_SECONDS) elapsed_time_ms = round((time.time() - start_time) * 1000) diff --git a/neurons/validator/scheduler/tests/test_tasks_scheduler.py b/neurons/validator/scheduler/tests/test_tasks_scheduler.py index be68e9d..be8e0fc 100644 --- a/neurons/validator/scheduler/tests/test_tasks_scheduler.py +++ b/neurons/validator/scheduler/tests/test_tasks_scheduler.py @@ -1,4 +1,5 @@ import asyncio +import sys from unittest.mock import MagicMock import pytest @@ -172,14 +173,57 @@ async def run(self): assert logger.info.call_count == 2 # task started logs assert logger.exception.call_count == 2 # task errored logs + async def test_task_timeout(self, scheduler, await_start_with_timeout, logger): + original_timeout = TasksScheduler.DEFAULT_TASK_TIMEOUT_SECONDS + TasksScheduler.DEFAULT_TASK_TIMEOUT_SECONDS = 0.1 + + try: + + class HangingTask(AbstractTask): + @property + def name(self): + return "Hanging Task" + + @property + def interval_seconds(self): + return 10.0 + + async def run(self): + # Intentionally sleep longer than the scheduler timeout to trigger a timeout + await asyncio.sleep(10) + + captured_excs = [] + + def capture_exception(message, *args, **kwargs): + captured_excs.append(sys.exc_info()[1]) + + logger.exception.side_effect = capture_exception + + task = HangingTask() + scheduler.add(task) + + # Run the scheduler briefly to allow at least one timeout to occur + await await_start_with_timeout(start_future=scheduler.start(), timeout=0.2) + + # Task should return to idle even after the timeout + assert task.status == "idle" + + # Ensure an exception was logged and it originated from a timeout + assert len(captured_excs) >= 1 + + assert any(isinstance(e, asyncio.TimeoutError) for e in captured_excs) + + assert any( + call.args and call.args[0] == "Task errored" for call in logger.exception.mock_calls + ) + finally: + # Restore timeout + TasksScheduler.DEFAULT_TASK_TIMEOUT_SECONDS = original_timeout + async def test_task_with_invalid_status(self, scheduler, await_start_with_timeout): # Verify the task is not executed because its status is not "unscheduled" runs = 0 - async def dummy_task(): - nonlocal runs - runs += 1 - class TestTask(AbstractTask): @property def name(self): diff --git a/neurons/validator/tasks/db_cleaner.py b/neurons/validator/tasks/db_cleaner.py index a01a193..c95d6d1 100644 --- a/neurons/validator/tasks/db_cleaner.py +++ b/neurons/validator/tasks/db_cleaner.py @@ -79,6 +79,26 @@ async def run(self): await asyncio.sleep(1) + # Delete agent run logs + deleted_agent_run_logs = await self.db_operations.delete_agent_run_logs(self.batch_size) + + if len(deleted_agent_run_logs) > 0: + self.logger.debug( + "Agent run logs deleted", extra={"deleted_count": len(deleted_agent_run_logs)} + ) + + await asyncio.sleep(1) + + # Delete agent runs + deleted_agent_runs = await self.db_operations.delete_agent_runs(self.batch_size) + + if len(deleted_agent_runs) > 0: + self.logger.debug( + "Agent runs deleted", extra={"deleted_count": len(deleted_agent_runs)} + ) + + await asyncio.sleep(1) + # Delete events deleted_events = await self.db_operations.delete_events_hard_delete(self.batch_size) diff --git a/neurons/validator/tasks/export_agent_run_logs.py b/neurons/validator/tasks/export_agent_run_logs.py new file mode 100644 index 0000000..d506262 --- /dev/null +++ b/neurons/validator/tasks/export_agent_run_logs.py @@ -0,0 +1,98 @@ +from uuid import UUID + +from neurons.validator.db.operations import DatabaseOperations +from neurons.validator.models.agent_run_logs import AgentRunLogsModel +from neurons.validator.models.numinous_client import PostAgentLogsRequestBody +from neurons.validator.numinous_client.client import NuminousClient +from neurons.validator.scheduler.task import AbstractTask +from neurons.validator.utils.logger.logger import NuminousLogger + + +class ExportAgentRunLogs(AbstractTask): + interval: float + batch_size: int + db_operations: DatabaseOperations + api_client: NuminousClient + logger: NuminousLogger + + def __init__( + self, + interval_seconds: float, + batch_size: int, + db_operations: DatabaseOperations, + api_client: NuminousClient, + logger: NuminousLogger, + ): + if not isinstance(interval_seconds, float) or interval_seconds <= 0: + raise ValueError("interval_seconds must be a positive number (float).") + + if not isinstance(db_operations, DatabaseOperations): + raise TypeError("db_operations must be an instance of DatabaseOperations.") + + self.interval = interval_seconds + self.batch_size = batch_size + self.db_operations = db_operations + self.api_client = api_client + + self.errors_count = 0 + self.logger = logger + + @property + def name(self) -> str: + return "export-agent-run-logs" + + @property + def interval_seconds(self) -> float: + return self.interval + + async def export_log_to_backend(self, log: AgentRunLogsModel) -> None: + payload = PostAgentLogsRequestBody( + run_id=UUID(log.run_id), + log_content=log.log_content, + ) + + await self.api_client.post_agent_logs(body=payload) + + async def run(self) -> None: + unexported_logs = await self.db_operations.get_unexported_agent_run_logs( + limit=self.batch_size + ) + + if not unexported_logs: + self.logger.debug("No unexported logs to export") + else: + self.logger.debug( + "Found unexported logs to export", + extra={"n_logs": len(unexported_logs)}, + ) + + successfully_exported_run_ids = [] + + for log in unexported_logs: + try: + await self.export_log_to_backend(log) + successfully_exported_run_ids.append(log.run_id) + except Exception: + self.errors_count += 1 + self.logger.warning( + "Failed to export log to backend", + extra={"run_id": log.run_id}, + exc_info=True, + ) + + if successfully_exported_run_ids: + await self.db_operations.mark_agent_run_logs_as_exported( + run_ids=successfully_exported_run_ids + ) + + self.logger.debug( + "Marked logs as exported", + extra={"n_logs": len(successfully_exported_run_ids)}, + ) + + self.logger.debug( + "Export logs task completed", + extra={"errors_count": self.errors_count}, + ) + + self.errors_count = 0 diff --git a/neurons/validator/tasks/run_agents.py b/neurons/validator/tasks/run_agents.py index 5823519..2031a66 100644 --- a/neurons/validator/tasks/run_agents.py +++ b/neurons/validator/tasks/run_agents.py @@ -1,14 +1,13 @@ import asyncio import json -import uuid from datetime import datetime from pathlib import Path from typing import List, Optional +from uuid import uuid4 from neurons.validator.db.operations import DatabaseOperations from neurons.validator.models.agent_runs import AgentRunsModel, AgentRunStatus from neurons.validator.models.miner_agent import MinerAgentsModel -from neurons.validator.models.numinous_client import PostAgentLogsRequestBody from neurons.validator.models.prediction import PredictionsModel from neurons.validator.numinous_client.client import NuminousClient from neurons.validator.sandbox import SandboxManager @@ -290,25 +289,6 @@ def _build_error_logs(self, logs: str, error_msg: str, traceback: Optional[str] logs += f"\nTraceback:\n{traceback}" return logs - async def post_agent_logs(self, run_id: str, logs: str) -> None: - try: - original_length = len(logs) - if original_length > MAX_LOG_CHARS: - truncation_msg = ( - f"[LOG TRUNCATED: Original {original_length:,} chars, " - f"showing last {MAX_LOG_CHARS:,} chars]\n\n" - ) - logs = truncation_msg + logs[-MAX_LOG_CHARS:] - - body = PostAgentLogsRequestBody(run_id=uuid.UUID(run_id), log_content=logs) - await self.api_client.post_agent_logs(body) - - except Exception as e: - self.logger.warning( - "Failed to post agent logs", - extra={"run_id": run_id, "error": str(e)}, - ) - def _determine_status_and_extract_prediction( self, result: Optional[dict], @@ -445,7 +425,7 @@ async def execute_agent_for_event( metadata, ) = event_tuple - run_id = str(uuid.uuid4()) + run_id = str(uuid4()) self.logger.info( "Executing agent for event", extra={ @@ -489,7 +469,13 @@ async def execute_agent_for_event( logs, result.get("error", "Unknown error"), result.get("traceback") ) - await self.post_agent_logs(run_id, logs) + original_length = len(logs) + if original_length > MAX_LOG_CHARS: + truncation_msg = ( + f"[LOG TRUNCATED: Original {original_length:,} chars, " + f"showing last {MAX_LOG_CHARS:,} chars]\n\n" + ) + logs = truncation_msg + logs[-MAX_LOG_CHARS:] run_status, prediction_value = self._determine_status_and_extract_prediction( result, event_id, agent.version_id, run_id @@ -503,6 +489,15 @@ async def execute_agent_for_event( ) await self.db_operations.upsert_agent_runs([agent_run]) + try: + await self.db_operations.insert_agent_run_log(run_id, logs) + except Exception as e: + self.logger.error( + "Failed to store agent run log", + extra={"run_id": run_id, "error": str(e), "log_content": logs}, + exc_info=True, + ) + if run_status == AgentRunStatus.SUCCESS and prediction_value is not None: await self.store_prediction( event_id, agent, prediction_value, run_id, interval_start_minutes diff --git a/neurons/validator/tasks/set_weights.py b/neurons/validator/tasks/set_weights.py index d06fa5a..8a96be5 100644 --- a/neurons/validator/tasks/set_weights.py +++ b/neurons/validator/tasks/set_weights.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from datetime import datetime, timezone +import aiohttp import pandas as pd import torch from bittensor import AsyncSubtensor @@ -10,6 +11,9 @@ from bittensor_wallet.wallet import Wallet from neurons.validator.db.operations import DatabaseOperations +from neurons.validator.models.numinous_client import GetWeightsResponse +from neurons.validator.models.weights import WeightsModel +from neurons.validator.numinous_client.client import NuminousClient from neurons.validator.scheduler.task import AbstractTask from neurons.validator.utils.common.converters import ( pydantic_models_to_dataframe, @@ -51,6 +55,7 @@ def __init__( netuid: int, subtensor: AsyncSubtensor, wallet: Wallet, # type: ignore + api_client: NuminousClient, ): if not isinstance(interval_seconds, float) or interval_seconds <= 0: raise ValueError("interval_seconds must be a positive number (float).") @@ -58,6 +63,9 @@ def __init__( if not isinstance(db_operations, DatabaseOperations): raise TypeError("db_operations must be an instance of DatabaseOperations.") + if not isinstance(api_client, NuminousClient): + raise TypeError("api_client must be an instance of NuminousClient.") + self.interval = interval_seconds self.db_operations = db_operations self.logger = logger @@ -66,6 +74,7 @@ def __init__( self.netuid = netuid self.subtensor = subtensor self.wallet = wallet + self.api_client = api_client self.current_hotkeys = None self.n_hotkeys = None @@ -134,32 +143,24 @@ async def time_to_set_weights(self): return True - def filter_last_scores(self, last_metagraph_scores) -> pd.DataFrame: - # this is re-normalizing the weights for the current miners - - filtered_scores = pydantic_models_to_dataframe(last_metagraph_scores) - # merge the current metagraph with the last metagraph scores - filtered_scores = pd.merge( + def merge_weights_with_metagraph(self, weights_from_api) -> pd.DataFrame: + merged_weights = pydantic_models_to_dataframe(weights_from_api) + merged_weights = pd.merge( self.current_miners_df, - filtered_scores, + merged_weights, on=[SWNames.miner_uid, SWNames.miner_hotkey], how="left", ) - # some stats for logging stats = { - "len_last_metagraph_scores": len(last_metagraph_scores), - "len_filtered_scores": len(filtered_scores), + "len_weights_from_api": len(weights_from_api), + "len_merged_weights": len(merged_weights), "len_current_miners": len(self.current_miners_df), - "len_valid_meta_scores": len(filtered_scores.dropna(subset=[SWNames.metagraph_score])), - "len_valid_event_scores": len(filtered_scores.dropna(subset=[SWNames.event_score])), - "distinct_events": len(filtered_scores[SWNames.event_id].unique()), - "distinct_spec_version": len(filtered_scores[SWNames.spec_version_name].unique()), - "distinct_created_at": len(filtered_scores[SWNames.created_at].unique()), + "len_non_zero_weights": (merged_weights[SWNames.metagraph_score].notna()).sum(), } - self.logger.debug("Stats for filter last scores", extra=stats) + self.logger.debug("Merged API weights with current metagraph", extra=stats) - filtered_scores = filtered_scores[ + merged_weights = merged_weights[ [SWNames.miner_uid, SWNames.miner_hotkey, SWNames.metagraph_score] ] data_types = { @@ -167,12 +168,12 @@ def filter_last_scores(self, last_metagraph_scores) -> pd.DataFrame: SWNames.miner_hotkey: "str", SWNames.metagraph_score: "float", } - filtered_scores = filtered_scores.astype(data_types) - filtered_scores[SWNames.metagraph_score] = filtered_scores[SWNames.metagraph_score].fillna( + merged_weights = merged_weights.astype(data_types) + merged_weights[SWNames.metagraph_score] = merged_weights[SWNames.metagraph_score].fillna( 0.0 ) - return filtered_scores + return merged_weights def check_scores_sanity(self, filtered_scores: pd.DataFrame) -> bool: # Do some sanity checks before and throw assert exceptions if there are issues @@ -347,6 +348,30 @@ def get_owner_neuron(self): return {"uid": owner_uid, "hotkey": owner_hotkey} + def _convert_api_weights_to_weights( + self, api_response: GetWeightsResponse + ) -> list[WeightsModel]: + weights = [] + for weight in api_response.weights: + weights.append( + WeightsModel( + miner_uid=weight.miner_uid, + miner_hotkey=weight.miner_hotkey, + metagraph_score=weight.aggregated_weight, + aggregated_at=api_response.aggregated_at, + ) + ) + + self.logger.debug( + "Converted API response to weights", + extra={ + "num_weights": len(weights), + "aggregated_at": api_response.aggregated_at.isoformat(), + }, + ) + + return weights + async def run(self): await self.metagraph_lite_sync() @@ -354,16 +379,49 @@ async def run(self): if not can_set_weights: return - last_metagraph_scores = await self.db_operations.get_last_metagraph_scores() + try: + api_response = await self.api_client.get_weights() + + self.logger.info( + "Fetched centralized weights from API", + extra={ + "aggregated_at": api_response.aggregated_at.isoformat(), + "num_weights": len(api_response.weights), + "count": api_response.count, + }, + ) + + weights_from_api = self._convert_api_weights_to_weights(api_response) + + except aiohttp.ClientResponseError as e: + if e.status == 503: + self.logger.warning( + "Backend has no weights available yet (503). Skipping set_weights this round.", + extra={"status": e.status}, + ) + return + else: + self.logger.error( + "Failed to fetch weights from backend API", + extra={"status": e.status, "message": str(e)}, + ) + raise + + except Exception as e: + self.logger.exception( + "Unexpected error fetching weights from API", + extra={"error_type": type(e).__name__}, + ) + raise - if not last_metagraph_scores: - raise ValueError("Failed to get the last metagraph scores.") + if not weights_from_api: + raise ValueError("Failed to get weights from API (empty response).") - filtered_scores = self.filter_last_scores(last_metagraph_scores) + merged_weights = self.merge_weights_with_metagraph(weights_from_api) - self.check_scores_sanity(filtered_scores) + self.check_scores_sanity(merged_weights) - normalized_scores = self.renormalize_weights(filtered_scores) + normalized_scores = self.renormalize_weights(merged_weights) uids, weights = await self.preprocess_weights(normalized_scores) diff --git a/neurons/validator/tasks/tests/test_db_cleaner.py b/neurons/validator/tasks/tests/test_db_cleaner.py index ecf2e90..2036e65 100644 --- a/neurons/validator/tasks/tests/test_db_cleaner.py +++ b/neurons/validator/tasks/tests/test_db_cleaner.py @@ -20,6 +20,8 @@ async def test_db_cleaner_run(self, db_operations_mock: AsyncMock): db_operations_mock.delete_predictions = AsyncMock(return_value=[1, 2]) db_operations_mock.delete_scores = AsyncMock(return_value=[3]) db_operations_mock.delete_reasonings = AsyncMock(return_value=[5, 6, 7]) + db_operations_mock.delete_agent_run_logs = AsyncMock(return_value=[8, 9]) + db_operations_mock.delete_agent_runs = AsyncMock(return_value=[10, 11, 12]) db_operations_mock.delete_events_hard_delete = AsyncMock(return_value=(3, 4, 5, 7, 8)) logger_mock = MagicMock(spec=NuminousLogger) batch_size = 100 @@ -38,6 +40,8 @@ async def test_db_cleaner_run(self, db_operations_mock: AsyncMock): db_operations_mock.delete_predictions.assert_awaited_once_with(batch_size) db_operations_mock.delete_scores.assert_awaited_once_with(batch_size) db_operations_mock.delete_reasonings.assert_awaited_once_with(batch_size) + db_operations_mock.delete_agent_run_logs.assert_awaited_once_with(batch_size) + db_operations_mock.delete_agent_runs.assert_awaited_once_with(batch_size) db_operations_mock.delete_events_hard_delete.assert_awaited_once_with(batch_size) logger_mock.debug.assert_has_calls( @@ -45,11 +49,13 @@ async def test_db_cleaner_run(self, db_operations_mock: AsyncMock): call("Predictions deleted", extra={"deleted_count": 2}), call("Scores deleted", extra={"deleted_count": 1}), call("Reasonings deleted", extra={"deleted_count": 3}), + call("Agent run logs deleted", extra={"deleted_count": 2}), + call("Agent runs deleted", extra={"deleted_count": 3}), call("Events hard deleted", extra={"deleted_count": 5}), ] ) - assert sleep_mock.call_args_list == [call(1), call(1), call(1)] + assert sleep_mock.call_args_list == [call(1), call(1), call(1), call(1), call(1)] async def test_db_cleaner_run_no_deletions(self, db_operations_mock: AsyncMock): # Prepare mocks @@ -57,6 +63,8 @@ async def test_db_cleaner_run_no_deletions(self, db_operations_mock: AsyncMock): db_operations_mock.delete_predictions = AsyncMock(return_value=[]) db_operations_mock.delete_scores = AsyncMock(return_value=[]) db_operations_mock.delete_reasonings = AsyncMock(return_value=[]) + db_operations_mock.delete_agent_run_logs = AsyncMock(return_value=[]) + db_operations_mock.delete_agent_runs = AsyncMock(return_value=[]) db_operations_mock.delete_events_hard_delete = AsyncMock(return_value=[]) logger_mock = MagicMock(spec=NuminousLogger) batch_size = 100 @@ -75,8 +83,10 @@ async def test_db_cleaner_run_no_deletions(self, db_operations_mock: AsyncMock): db_operations_mock.delete_predictions.assert_awaited_once_with(batch_size) db_operations_mock.delete_scores.assert_awaited_once_with(batch_size) db_operations_mock.delete_reasonings.assert_awaited_once_with(batch_size) + db_operations_mock.delete_agent_run_logs.assert_awaited_once_with(batch_size) + db_operations_mock.delete_agent_runs.assert_awaited_once_with(batch_size) db_operations_mock.delete_events_hard_delete.assert_awaited_once_with(batch_size) logger_mock.debug.assert_not_called() # Sleep called between each delete - assert sleep_mock.call_args_list == [call(1), call(1), call(1)] + assert sleep_mock.call_args_list == [call(1), call(1), call(1), call(1), call(1)] diff --git a/neurons/validator/tasks/tests/test_export_agent_run_logs.py b/neurons/validator/tasks/tests/test_export_agent_run_logs.py new file mode 100644 index 0000000..f8bf5de --- /dev/null +++ b/neurons/validator/tasks/tests/test_export_agent_run_logs.py @@ -0,0 +1,322 @@ +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock +from uuid import UUID, uuid4 + +import pytest +from bittensor_wallet import Wallet + +from neurons.validator.db.client import DatabaseClient +from neurons.validator.db.operations import DatabaseOperations +from neurons.validator.models.agent_runs import AgentRunsModel, AgentRunStatus +from neurons.validator.models.event import EventsModel, EventStatus +from neurons.validator.models.miner_agent import MinerAgentsModel +from neurons.validator.models.numinous_client import PostAgentLogsRequestBody +from neurons.validator.numinous_client.client import NuminousClient +from neurons.validator.tasks.export_agent_run_logs import ExportAgentRunLogs +from neurons.validator.utils.logger.logger import NuminousLogger + + +class TestExportAgentRunLogs: + async def _create_event(self, db_operations: DatabaseOperations, unique_event_id: str) -> None: + event = EventsModel( + unique_event_id=unique_event_id, + event_id=f"event_{unique_event_id}", + market_type="test_market", + event_type="test_type", + description="Test event", + outcome=None, + status=EventStatus.PENDING, + metadata="{}", + created_at="2024-01-01T00:00:00+00:00", + cutoff="2024-12-31T23:59:59+00:00", + ) + await db_operations.upsert_events([event]) + + async def _create_miner_agent( + self, db_operations: DatabaseOperations, version_id: str, miner_uid: int, miner_hotkey: str + ) -> None: + agent = MinerAgentsModel( + version_id=version_id, + miner_uid=miner_uid, + miner_hotkey=miner_hotkey, + agent_name="TestAgent", + version_number=1, + file_path=f"/data/agents/{miner_uid}/test.py", + pulled_at=datetime(2024, 1, 1, 10, 0, 0, tzinfo=timezone.utc), + created_at=datetime(2024, 1, 1, 9, 0, 0, tzinfo=timezone.utc), + ) + await db_operations.upsert_miner_agents([agent]) + + @pytest.fixture + def db_operations(self, db_client: DatabaseClient): + logger = MagicMock(spec=NuminousLogger) + return DatabaseOperations(db_client=db_client, logger=logger) + + @pytest.fixture + def bt_wallet(self): + hotkey_mock = MagicMock() + hotkey_mock.sign = MagicMock(side_effect=lambda x: x.encode("utf-8")) + hotkey_mock.ss58_address = "validator_hotkey_test" + + bt_wallet = MagicMock(spec=Wallet) + bt_wallet.get_hotkey = MagicMock(return_value=hotkey_mock) + bt_wallet.hotkey.ss58_address = "validator_hotkey_test" + + return bt_wallet + + @pytest.fixture + def export_agent_run_logs_task( + self, + db_operations: DatabaseOperations, + bt_wallet: Wallet, + ): + api_client = NuminousClient( + env="test", logger=MagicMock(spec=NuminousLogger), bt_wallet=bt_wallet + ) + logger = MagicMock(spec=NuminousLogger) + + return ExportAgentRunLogs( + interval_seconds=180.0, + batch_size=100, + db_operations=db_operations, + api_client=api_client, + logger=logger, + ) + + def test_init(self, export_agent_run_logs_task): + unit = export_agent_run_logs_task + + assert isinstance(unit, ExportAgentRunLogs) + assert unit.interval == 180.0 + assert unit.interval_seconds == 180.0 + assert unit.batch_size == 100 + assert unit.errors_count == 0 + + async def test_export_log_to_backend(self, export_agent_run_logs_task: ExportAgentRunLogs): + unit = export_agent_run_logs_task + unit.api_client.post_agent_logs = AsyncMock(return_value=True) + + from neurons.validator.models.agent_run_logs import AgentRunLogsModel + + log = AgentRunLogsModel( + run_id="123e4567-e89b-12d3-a456-426614174000", + log_content="Test log content", + exported=False, + ) + + await unit.export_log_to_backend(log) + + assert unit.api_client.post_agent_logs.call_count == 1 + call_args = unit.api_client.post_agent_logs.call_args.kwargs + payload = call_args["body"] + + assert isinstance(payload, PostAgentLogsRequestBody) + assert payload.run_id == UUID("123e4567-e89b-12d3-a456-426614174000") + assert payload.log_content == "Test log content" + + async def test_run_no_unexported_logs(self, export_agent_run_logs_task: ExportAgentRunLogs): + export_agent_run_logs_task.api_client = AsyncMock(spec=NuminousClient) + + await export_agent_run_logs_task.run() + + export_agent_run_logs_task.logger.debug.assert_any_call("No unexported logs to export") + export_agent_run_logs_task.api_client.post_agent_logs.assert_not_called() + + async def test_run_with_single_unexported_log( + self, + export_agent_run_logs_task: ExportAgentRunLogs, + db_operations: DatabaseOperations, + db_client: DatabaseClient, + ): + unit = export_agent_run_logs_task + unit.api_client.post_agent_logs = AsyncMock(return_value=True) + + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", 1, "miner_hotkey_1") + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="miner_hotkey_1", + status=AgentRunStatus.SUCCESS, + exported=False, + is_final=True, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, "Log content for run") + + await unit.run() + + assert unit.api_client.post_agent_logs.call_count == 1 + + result = await db_client.many( + "SELECT exported FROM agent_run_logs WHERE run_id = ?", [run_id] + ) + assert len(result) == 1 + assert result[0][0] == 1 + + async def test_run_with_multiple_unexported_logs( + self, + export_agent_run_logs_task: ExportAgentRunLogs, + db_operations: DatabaseOperations, + db_client: DatabaseClient, + ): + unit = export_agent_run_logs_task + unit.api_client.post_agent_logs = AsyncMock(return_value=True) + + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", 1, "miner_hotkey_1") + + run_ids = [str(uuid4()) for _ in range(3)] + for run_id in run_ids: + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="miner_hotkey_1", + status=AgentRunStatus.SUCCESS, + exported=False, + is_final=True, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, f"Log for {run_id}") + + await unit.run() + + assert unit.api_client.post_agent_logs.call_count == 3 + + result = await db_client.many("SELECT exported FROM agent_run_logs ORDER BY run_id") + assert len(result) == 3 + assert all(row[0] == 1 for row in result) + + async def test_run_partial_export_failure( + self, + export_agent_run_logs_task: ExportAgentRunLogs, + db_operations: DatabaseOperations, + db_client: DatabaseClient, + ): + unit = export_agent_run_logs_task + + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", 1, "miner_hotkey_1") + + run_id_success = str(uuid4()) + run_id_fail = str(uuid4()) + + for run_id in [run_id_success, run_id_fail]: + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="miner_hotkey_1", + status=AgentRunStatus.SUCCESS, + exported=False, + is_final=True, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, f"Log for {run_id}") + + def side_effect(body): + if body.run_id == UUID(run_id_fail): + raise Exception("Simulated failure") + return True + + unit.api_client.post_agent_logs = AsyncMock(side_effect=side_effect) + + await unit.run() + + assert unit.api_client.post_agent_logs.call_count == 2 + unit.logger.warning.assert_called_once() + + result_success = await db_client.one( + "SELECT exported FROM agent_run_logs WHERE run_id = ?", [run_id_success] + ) + assert result_success[0] == 1 + + result_fail = await db_client.one( + "SELECT exported FROM agent_run_logs WHERE run_id = ?", [run_id_fail] + ) + assert result_fail[0] == 0 + + async def test_run_all_exports_fail( + self, + export_agent_run_logs_task: ExportAgentRunLogs, + db_operations: DatabaseOperations, + db_client: DatabaseClient, + ): + unit = export_agent_run_logs_task + unit.api_client.post_agent_logs = AsyncMock(side_effect=Exception("All fail")) + + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", 1, "miner_hotkey_1") + + run_ids = [str(uuid4()) for _ in range(2)] + for run_id in run_ids: + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="miner_hotkey_1", + status=AgentRunStatus.SUCCESS, + exported=False, + is_final=True, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, f"Log for {run_id}") + + await unit.run() + + assert unit.api_client.post_agent_logs.call_count == 2 + assert unit.logger.warning.call_count == 2 + + result = await db_client.many("SELECT exported FROM agent_run_logs ORDER BY run_id") + assert len(result) == 2 + assert all(row[0] == 0 for row in result) + + async def test_run_respects_batch_size( + self, + export_agent_run_logs_task: ExportAgentRunLogs, + db_operations: DatabaseOperations, + db_client: DatabaseClient, + ): + unit = export_agent_run_logs_task + unit.batch_size = 2 + unit.api_client.post_agent_logs = AsyncMock(return_value=True) + + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", 1, "miner_hotkey_1") + + run_ids = [str(uuid4()) for _ in range(5)] + for run_id in run_ids: + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="miner_hotkey_1", + status=AgentRunStatus.SUCCESS, + exported=False, + is_final=True, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, f"Log for {run_id}") + + await unit.run() + + assert unit.api_client.post_agent_logs.call_count == 2 + + exported_count = await db_client.one( + "SELECT COUNT(*) FROM agent_run_logs WHERE exported = 1", [] + ) + assert exported_count[0] == 2 + + unexported_count = await db_client.one( + "SELECT COUNT(*) FROM agent_run_logs WHERE exported = 0", [] + ) + assert unexported_count[0] == 3 diff --git a/neurons/validator/tasks/tests/test_run_agents.py b/neurons/validator/tasks/tests/test_run_agents.py index 8876648..fda9ebb 100644 --- a/neurons/validator/tasks/tests/test_run_agents.py +++ b/neurons/validator/tasks/tests/test_run_agents.py @@ -1,6 +1,6 @@ import asyncio from datetime import datetime, timezone -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import numpy as np import pytest @@ -242,9 +242,19 @@ def test_invalid_timeout_zero( @pytest.mark.asyncio class TestRunAgentsRun: + @patch("neurons.validator.tasks.run_agents.datetime") async def test_no_events( - self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + self, + mock_datetime, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, ): + # Mock datetime.utcnow() to return hour >= 4 to pass sync_hour check + mock_datetime.utcnow.return_value = datetime(2025, 12, 3, 10, 0, 0) + mock_db_operations.get_events_to_predict.return_value = [] task = RunAgents( @@ -262,9 +272,19 @@ async def test_no_events( mock_logger.debug.assert_called_with("No events to predict") mock_db_operations.get_active_agents.assert_not_called() + @patch("neurons.validator.tasks.run_agents.datetime") async def test_no_agents( - self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + self, + mock_datetime, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, ): + # Mock datetime.utcnow() to return hour >= 4 to pass sync_hour check + mock_datetime.utcnow.return_value = datetime(2025, 12, 3, 10, 0, 0) + mock_db_operations.get_events_to_predict.return_value = [ ( "event_1", @@ -479,8 +499,10 @@ def test_parse_event_description_without_separator( @pytest.mark.asyncio class TestRunAgentsIdempotency: + @patch("neurons.validator.tasks.run_agents.datetime") async def test_skip_when_prediction_exists( self, + mock_datetime, mock_db_operations, mock_sandbox_manager, mock_metagraph, @@ -489,6 +511,9 @@ async def test_skip_when_prediction_exists( sample_event_tuple, sample_agent, ): + # Mock datetime.utcnow() to return hour >= 4 to pass sync_hour check + mock_datetime.utcnow.return_value = datetime(2025, 12, 3, 10, 0, 0) + from neurons.validator.models.prediction import PredictionsModel from neurons.validator.utils.common.interval import get_interval_start_minutes @@ -531,8 +556,10 @@ async def test_skip_when_prediction_exists( extra={"event_id": "event_123", "agent_version_id": "agent_v1", "miner_uid": 42}, ) + @patch("neurons.validator.tasks.run_agents.datetime") async def test_execute_when_prediction_not_exists( self, + mock_datetime, mock_db_operations, mock_sandbox_manager, mock_metagraph, @@ -541,6 +568,9 @@ async def test_execute_when_prediction_not_exists( sample_event_tuple, sample_agent, ): + # Mock datetime.utcnow() to return hour >= 4 to pass sync_hour check + mock_datetime.utcnow.return_value = datetime(2025, 12, 3, 10, 0, 0) + mock_db_operations.get_events_to_predict.return_value = [sample_event_tuple] mock_db_operations.get_active_agents.return_value = [sample_agent] @@ -574,8 +604,10 @@ async def test_execute_when_prediction_not_exists( assert call_args["agent"] == sample_agent assert call_args["event_tuple"] == sample_event_tuple + @patch("neurons.validator.tasks.run_agents.datetime") async def test_replicate_when_prediction_exists_in_different_interval( self, + mock_datetime, mock_db_operations, mock_sandbox_manager, mock_metagraph, @@ -584,6 +616,9 @@ async def test_replicate_when_prediction_exists_in_different_interval( sample_event_tuple, sample_agent, ): + # Mock datetime.utcnow() to return hour >= 4 to pass sync_hour check + mock_datetime.utcnow.return_value = datetime(2025, 12, 3, 10, 0, 0) + from neurons.validator.models.prediction import PredictionsModel mock_db_operations.get_events_to_predict.return_value = [sample_event_tuple] @@ -955,55 +990,6 @@ async def test_store_prediction_handles_failure( assert "Failed to store prediction" in call_args[0][0] -@pytest.mark.asyncio -class TestRunAgentsPostLogs: - async def test_post_agent_logs_success( - self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger - ): - task = RunAgents( - interval_seconds=600.0, - db_operations=mock_db_operations, - sandbox_manager=mock_sandbox_manager, - metagraph=mock_metagraph, - api_client=mock_api_client, - logger=mock_logger, - ) - - run_id = "123e4567-e89b-12d3-a456-426614174000" - logs = "Agent execution log:\nStep 1: Initialize\nStep 2: Process\nStep 3: Complete" - - await task.post_agent_logs(run_id, logs) - - mock_api_client.post_agent_logs.assert_called_once() - call_args = mock_api_client.post_agent_logs.call_args[0][0] - assert str(call_args.run_id) == run_id - assert call_args.log_content == logs - - async def test_post_agent_logs_truncates_long_logs( - self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger - ): - task = RunAgents( - interval_seconds=600.0, - db_operations=mock_db_operations, - sandbox_manager=mock_sandbox_manager, - metagraph=mock_metagraph, - api_client=mock_api_client, - logger=mock_logger, - ) - - run_id = "223e4567-e89b-12d3-a456-426614174001" - # > MAX_LOG_CHARS (25,000) - long_logs = "x" * 30000 - - await task.post_agent_logs(run_id, long_logs) - - mock_api_client.post_agent_logs.assert_called_once() - call_args = mock_api_client.post_agent_logs.call_args[0][0] - assert str(call_args.run_id) == run_id - assert "LOG TRUNCATED" in call_args.log_content - assert len(call_args.log_content) < 30000 - - @pytest.mark.asyncio class TestRunAgentsErrorLogging: async def test_logs_exported_on_agent_execution_error( @@ -1042,10 +1028,12 @@ async def test_logs_exported_on_agent_execution_error( interval_start_minutes=1000, ) - mock_api_client.post_agent_logs.assert_called_once() - body = mock_api_client.post_agent_logs.call_args[0][0] - logs = body.log_content + mock_db_operations.insert_agent_run_log.assert_called_once() + call_args = mock_db_operations.insert_agent_run_log.call_args + run_id = call_args[0][0] + logs = call_args[0][1] + assert run_id is not None assert "[AGENT_RUNNER] Starting" in logs assert "ERROR DETAILS" in logs assert "agent_main() must return a dict" in logs @@ -1086,10 +1074,12 @@ async def test_logs_exported_on_timeout( interval_start_minutes=1000, ) - mock_api_client.post_agent_logs.assert_called_once() - body = mock_api_client.post_agent_logs.call_args[0][0] - logs = body.log_content + mock_db_operations.insert_agent_run_log.assert_called_once() + call_args = mock_db_operations.insert_agent_run_log.call_args + run_id = call_args[0][0] + logs = call_args[0][1] + assert run_id is not None assert "[AGENT_RUNNER] Starting" in logs assert "TIMEOUT" in logs assert "Execution exceeded timeout limit" in logs @@ -1129,10 +1119,12 @@ async def test_logs_exported_on_validation_error( interval_start_minutes=1000, ) - mock_api_client.post_agent_logs.assert_called_once() - body = mock_api_client.post_agent_logs.call_args[0][0] - logs = body.log_content + mock_db_operations.insert_agent_run_log.assert_called_once() + call_args = mock_db_operations.insert_agent_run_log.call_args + run_id = call_args[0][0] + logs = call_args[0][1] + assert run_id is not None assert "[AGENT_RUNNER] Starting" in logs assert "[AGENT_RUNNER] Completed" in logs @@ -1168,10 +1160,12 @@ async def test_logs_exported_on_result_none( interval_start_minutes=1000, ) - mock_api_client.post_agent_logs.assert_called_once() - body = mock_api_client.post_agent_logs.call_args[0][0] - logs = body.log_content + mock_db_operations.insert_agent_run_log.assert_called_once() + call_args = mock_db_operations.insert_agent_run_log.call_args + run_id = call_args[0][0] + logs = call_args[0][1] + assert run_id is not None assert "Sandbox timeout - no logs" in logs diff --git a/neurons/validator/tasks/tests/test_set_weights.py b/neurons/validator/tasks/tests/test_set_weights.py index 4952e03..adcba34 100644 --- a/neurons/validator/tasks/tests/test_set_weights.py +++ b/neurons/validator/tasks/tests/test_set_weights.py @@ -11,7 +11,8 @@ from neurons.validator.db.client import DatabaseClient from neurons.validator.db.operations import DatabaseOperations -from neurons.validator.models.score import SCORE_FIELDS, ScoresModel +from neurons.validator.models.score import ScoresModel +from neurons.validator.models.weights import WeightsModel from neurons.validator.tasks.set_weights import SetWeights, SWNames from neurons.validator.utils.common.interval import BLOCK_DURATION from neurons.validator.utils.if_metagraph import IfMetagraph @@ -44,6 +45,8 @@ def set_weights_task( db_operations: DatabaseOperations, bt_wallet: Wallet, # type: ignore ): + from neurons.validator.numinous_client.client import NuminousClient + metagraph = MagicMock(spec=IfMetagraph) metagraph.sync = AsyncMock() subtensor = MagicMock(spec=bt.AsyncSubtensor) @@ -59,6 +62,8 @@ def set_weights_task( subtensor.weights_rate_limit = AsyncMock(return_value=100) # Set weights rate limit subtensor.network = "mock" + api_client = MagicMock(spec=NuminousClient) + logger = MagicMock(spec=NuminousLogger) with freeze_time("2024-12-27 07:00:00"): @@ -70,6 +75,7 @@ def set_weights_task( netuid=155, subtensor=subtensor, wallet=bt_wallet, + api_client=api_client, ) def test_init(self, set_weights_task: SetWeights): @@ -84,6 +90,7 @@ def test_init(self, set_weights_task: SetWeights): assert unit.netuid == 155 assert unit.subtensor is not None assert unit.wallet is not None + assert unit.api_client is not None assert unit.spec_version == spec_version @@ -122,13 +129,12 @@ async def test_time_to_set_weights(self, set_weights_task: SetWeights, delta, ex assert result is expected - async def test_filter_last_scores(self, set_weights_task: SetWeights): + async def test_merge_weights_with_metagraph(self, set_weights_task: SetWeights): unit = set_weights_task - # Need to call sync to load instance data await unit.metagraph_lite_sync() - last_metagraph_scores = [ + weights_from_api = [ ScoresModel( miner_uid=1, miner_hotkey="hotkey1", @@ -161,32 +167,28 @@ async def test_filter_last_scores(self, set_weights_task: SetWeights): ), ] - filtered_scores = unit.filter_last_scores(last_metagraph_scores) + merged_weights = unit.merge_weights_with_metagraph(weights_from_api) - assert len(filtered_scores) == 3 - assert filtered_scores.loc[0].miner_uid == 1 - assert filtered_scores.loc[1].miner_uid == 2 - assert filtered_scores.loc[2].miner_uid == 3 - assert filtered_scores.loc[0].miner_hotkey == "hotkey1" - assert filtered_scores.loc[1].miner_hotkey == "hotkey2" - assert filtered_scores.loc[2].miner_hotkey == "hotkey3" - assert filtered_scores.loc[0].metagraph_score == 0.8 - assert filtered_scores.loc[1].metagraph_score == 0.0 - assert filtered_scores.loc[2].metagraph_score == 0.0 + assert len(merged_weights) == 3 + assert merged_weights.loc[0].miner_uid == 1 + assert merged_weights.loc[1].miner_uid == 2 + assert merged_weights.loc[2].miner_uid == 3 + assert merged_weights.loc[0].miner_hotkey == "hotkey1" + assert merged_weights.loc[1].miner_hotkey == "hotkey2" + assert merged_weights.loc[2].miner_hotkey == "hotkey3" + assert merged_weights.loc[0].metagraph_score == 0.8 + assert merged_weights.loc[1].metagraph_score == 0.0 + assert merged_weights.loc[2].metagraph_score == 0.0 expected_stats = { - "len_last_metagraph_scores": 3, - "len_filtered_scores": 3, + "len_weights_from_api": 3, + "len_merged_weights": 3, "len_current_miners": 3, - "len_valid_meta_scores": 1, - "len_valid_event_scores": 2, - "distinct_events": 3, # null counts as distinct - "distinct_spec_version": 2, - "distinct_created_at": 3, + "len_non_zero_weights": 1, } assert unit.logger.debug.call_count == 1 - assert unit.logger.debug.call_args[0][0] == "Stats for filter last scores" + assert unit.logger.debug.call_args[0][0] == "Merged API weights with current metagraph" assert unit.logger.debug.call_args[1]["extra"] == expected_stats @pytest.mark.parametrize( @@ -435,8 +437,37 @@ def test_get_owner_neuron_not_found(self, set_weights_task: SetWeights): with pytest.raises(AssertionError, match="Owner uid not found in metagraph uids"): unit.get_owner_neuron() + def test_convert_api_weights_to_weights(self, set_weights_task: SetWeights): + from neurons.validator.models.numinous_client import GetWeightsResponse, MinerWeight + + unit = set_weights_task + + api_response = GetWeightsResponse( + aggregated_at=datetime(2025, 1, 30, 12, 0, 0, tzinfo=timezone.utc), + weights=[ + MinerWeight(miner_uid=1, miner_hotkey="hk1", aggregated_weight=0.6), + MinerWeight(miner_uid=2, miner_hotkey="hk2", aggregated_weight=0.4), + ], + count=2, + ) + + result = unit._convert_api_weights_to_weights(api_response) + + assert len(result) == 2 + assert isinstance(result[0], WeightsModel) + assert result[0].miner_uid == 1 + assert result[0].miner_hotkey == "hk1" + assert result[0].metagraph_score == 0.6 + assert result[0].aggregated_at == datetime(2025, 1, 30, 12, 0, 0, tzinfo=timezone.utc) + + assert result[1].miner_uid == 2 + assert result[1].miner_hotkey == "hk2" + assert result[1].metagraph_score == 0.4 + @pytest.mark.asyncio async def test_run_successful_x(self, set_weights_task: SetWeights, monkeypatch, db_client): + from neurons.validator.models.numinous_client import GetWeightsResponse, MinerWeight + unit = set_weights_task unit.metagraph.owner_hotkey = "hk3" unit.metagraph.uids = torch.tensor([0, 1, 2, 3, 4], dtype=torch.int32) @@ -444,49 +475,22 @@ async def test_run_successful_x(self, set_weights_task: SetWeights, monkeypatch, unit.last_set_weights_at = time.time() - 101 * BLOCK_DURATION created_at = datetime.now(timezone.utc) - timedelta(days=1) - scores_list = [ - ScoresModel( - event_id="latest_event", - miner_uid=3, - miner_hotkey="hk3", - prediction=0.75, - event_score=0.10, - metagraph_score=0.835, # Winner gets ~0.835 - created_at=created_at, - spec_version=1, - processed=True, - ), - ScoresModel( - event_id="latest_event", - miner_uid=4, - miner_hotkey="hk4", - prediction=0.50, - event_score=0.25, - metagraph_score=0.165, # Second place - created_at=created_at, - spec_version=1, - processed=True, - ), - ] - # insert scores - sql = f""" - INSERT INTO scores ({', '.join(SCORE_FIELDS)}) - VALUES ({', '.join(['?'] * len(SCORE_FIELDS))}) - """ - score_tuples = [ - tuple(getattr(score, field) for field in SCORE_FIELDS) for score in scores_list - ] - await db_client.insert_many(sql, score_tuples) - inserted_scores = await db_client.many("SELECT * FROM scores") - assert len(inserted_scores) == len(scores_list) + mock_api_response = GetWeightsResponse( + aggregated_at=created_at, + weights=[ + MinerWeight(miner_uid=3, miner_hotkey="hk3", aggregated_weight=0.835), + MinerWeight(miner_uid=4, miner_hotkey="hk4", aggregated_weight=0.165), + ], + count=2, + ) + unit.api_client.get_weights = AsyncMock(return_value=mock_api_response) unit.subtensor.set_weights = AsyncMock(return_value=(True, "Success")) - # run the task await unit.run() - assert unit.logger.debug.call_count == 4 + assert unit.logger.debug.call_count == 5 assert unit.logger.warning.call_count == 0 assert unit.logger.error.call_count == 0 @@ -494,9 +498,10 @@ async def test_run_successful_x(self, set_weights_task: SetWeights, monkeypatch, assert debug_calls[0][0][0] == "Attempting to set the weights - enough blocks passed." assert debug_calls[0][1]["extra"]["blocks_since_last_attempt"] >= 100 - assert debug_calls[1][0][0] == "Stats for filter last scores" - assert debug_calls[2][0][0] == "Top 5 and bottom 5 miners by raw_weights" - assert debug_calls[3][0][0] == "Weights set successfully." + assert debug_calls[1][0][0] == "Converted API response to weights" + assert debug_calls[2][0][0] == "Merged API weights with current metagraph" + assert debug_calls[3][0][0] == "Top 5 and bottom 5 miners by raw_weights" + assert debug_calls[4][0][0] == "Weights set successfully." assert unit.subtensor.set_weights.call_count == 1 assert unit.subtensor.set_weights.call_args.kwargs["uids"].tolist() == [3, 4] @@ -504,3 +509,69 @@ async def test_run_successful_x(self, set_weights_task: SetWeights, monkeypatch, unit.subtensor.set_weights.call_args.kwargs["weights"], torch.tensor([0.835, 0.165], dtype=torch.float), ) + + async def test_run_with_api_weights(self, set_weights_task: SetWeights): + from neurons.validator.models.numinous_client import GetWeightsResponse, MinerWeight + + unit = set_weights_task + + mock_api_response = GetWeightsResponse( + aggregated_at=datetime(2025, 1, 30, 12, 0, 0, tzinfo=timezone.utc), + weights=[ + MinerWeight(miner_uid=1, miner_hotkey="hotkey1", aggregated_weight=0.5), + MinerWeight(miner_uid=2, miner_hotkey="hotkey2", aggregated_weight=0.3), + MinerWeight(miner_uid=3, miner_hotkey="hotkey3", aggregated_weight=0.2), + ], + count=3, + ) + + unit.api_client.get_weights = AsyncMock(return_value=mock_api_response) + + unit.subtensor.weights_rate_limit = AsyncMock(return_value=0) + unit.subtensor.min_allowed_weights = AsyncMock(return_value=1) + unit.subtensor.max_weight_limit = AsyncMock(return_value=10) + unit.subtensor.set_weights = AsyncMock(return_value=(True, "Success")) + + await unit.run() + + unit.api_client.get_weights.assert_called_once() + unit.subtensor.set_weights.assert_called_once() + + async def test_run_handles_503_gracefully(self, set_weights_task: SetWeights): + import aiohttp + + unit = set_weights_task + + error = aiohttp.ClientResponseError( + request_info=MagicMock(), + history=(), + status=503, + message="Service Unavailable", + ) + unit.api_client.get_weights = AsyncMock(side_effect=error) + + unit.subtensor.weights_rate_limit = AsyncMock(return_value=0) + unit.subtensor.set_weights = AsyncMock() + + await unit.run() + + unit.subtensor.set_weights.assert_not_called() + unit.logger.warning.assert_called_once() + + async def test_run_raises_on_other_http_errors(self, set_weights_task: SetWeights): + import aiohttp + + unit = set_weights_task + + error = aiohttp.ClientResponseError( + request_info=MagicMock(), + history=(), + status=500, + message="Internal Server Error", + ) + unit.api_client.get_weights = AsyncMock(side_effect=error) + + unit.subtensor.weights_rate_limit = AsyncMock(return_value=0) + + with pytest.raises(aiohttp.ClientResponseError): + await unit.run() diff --git a/neurons/validator/tests/test_main.py b/neurons/validator/tests/test_main.py index f89cda6..f1c2f3e 100644 --- a/neurons/validator/tests/test_main.py +++ b/neurons/validator/tests/test_main.py @@ -12,6 +12,7 @@ @patch("neurons.validator.main.ExportScores", spec=True) @patch("neurons.validator.main.MetagraphScoring", spec=True) @patch("neurons.validator.main.Scoring", spec=True) +@patch("neurons.validator.main.ExportAgentRunLogs", spec=True) @patch("neurons.validator.main.ExportAgentRuns", spec=True) @patch("neurons.validator.main.ExportPredictions", spec=True) class TestValidatorMain: @@ -26,6 +27,7 @@ def test_main( self, mock_export_predictions, mock_export_agent_runs, + mock_export_agent_run_logs, mock_peer_scoring, mock_metagraph_scoring, mock_export_scores, @@ -116,7 +118,7 @@ def test_main( mock_measure_event_loop_lag.assert_awaited_once() # Verify tasks added count - assert mock_scheduler.add.call_count == 14 + assert mock_scheduler.add.call_count == 15 # Verify logging mock_logger.info.assert_called_with( diff --git a/neurons/validator/version.py b/neurons/validator/version.py index a423ec4..eabb4bb 100644 --- a/neurons/validator/version.py +++ b/neurons/validator/version.py @@ -1,4 +1,4 @@ -__version__ = "2.0.3" +__version__ = "2.0.4" version_split = __version__.split(".")