diff --git a/CHANGELOG.md b/CHANGELOG.md index 74af97d..cedf1f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Release Notes +## [2.0.3] - 2025-12-03 +- **Scoring**: Fill missing predictions with 0.5 for miners without predictions +- **Architecture**: Sandbox retry mechanism and error handling with comprehensive log exports + ## [2.0.2] - 2025-11-30 - **Validator**: Implement burn mechanism - UID 239 receives 80% of emissions, remaining miners share 20% diff --git a/neurons/miner/scripts/fetch_agent_logs.py b/neurons/miner/scripts/fetch_agent_logs.py index 5409af1..805dee3 100644 --- a/neurons/miner/scripts/fetch_agent_logs.py +++ b/neurons/miner/scripts/fetch_agent_logs.py @@ -211,7 +211,6 @@ def fetch_logs( # Add all metadata fields meta_table.add_row("Event ID", str(metadata.get("event_id", "N/A"))) - meta_table.add_row("Market Type", str(metadata.get("market_type", "N/A"))) meta_table.add_row("Validator UID", str(metadata.get("vali_uid", "N/A"))) validator_hotkey = str(metadata.get("vali_hotkey", "N/A")) @@ -219,14 +218,27 @@ def fetch_logs( validator_hotkey = validator_hotkey[:16] + "..." + validator_hotkey[-8:] meta_table.add_row("Validator Hotkey", validator_hotkey) - meta_table.add_row( - "Interval Prediction", str(metadata.get("interval_agg_prediction", "N/A")) - ) - meta_table.add_row( - "Interval DateTime", str(metadata.get("interval_datetime", "N/A")) - ) - meta_table.add_row("Submitted At", str(metadata.get("submitted_at", "N/A"))) meta_table.add_row("Version ID", str(metadata.get("version_id", "N/A"))) + meta_table.add_row("Status", str(metadata.get("status", "N/A"))) + meta_table.add_row("Is Final", str(metadata.get("is_final", "N/A"))) + + # Add prediction fields if available + prediction = metadata.get("prediction") + if prediction: + meta_table.add_row( + "Interval Prediction", + str(prediction.get("interval_agg_prediction", "N/A")), + ) + meta_table.add_row( + "Interval DateTime", str(prediction.get("interval_datetime", "N/A")) + ) + meta_table.add_row( + "Submitted At", str(prediction.get("submitted_at", "N/A")) + ) + else: + meta_table.add_row("Interval Prediction", "N/A") + meta_table.add_row("Interval DateTime", "N/A") + meta_table.add_row("Submitted At", "N/A") console.print( Panel.fit(meta_table, title="📊 Run Metadata", border_style="cyan") diff --git a/neurons/validator/alembic/versions/M_2025_12_02__15_12_42_add_agent_run_logs_table.py b/neurons/validator/alembic/versions/M_2025_12_02__15_12_42_add_agent_run_logs_table.py new file mode 100644 index 0000000..dcaa411 --- /dev/null +++ b/neurons/validator/alembic/versions/M_2025_12_02__15_12_42_add_agent_run_logs_table.py @@ -0,0 +1,48 @@ +"""Add agent_run_logs table + +Revision ID: 0fc1f13544dc +Revises: 40606aaa49f9 +Create Date: 2025-12-02 15:12:42.000000 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "0fc1f13544dc" +down_revision: Union[str, None] = "40606aaa49f9" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute( + """ + CREATE TABLE IF NOT EXISTS agent_run_logs ( + run_id TEXT PRIMARY KEY, + log_content TEXT NOT NULL, + exported INTEGER DEFAULT 0, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (run_id) REFERENCES agent_runs(run_id) + ) + """ + ) + + op.execute( + """ + CREATE INDEX idx_agent_run_logs_exported ON agent_run_logs(exported) WHERE exported = 0 + """ + ) + + op.execute( + """ + CREATE INDEX idx_agent_run_logs_created_at ON agent_run_logs(created_at) + """ + ) + + +def downgrade() -> None: + pass diff --git a/neurons/validator/alembic/versions/M_2025_12_03__15_45_05_backfill_agent_runs_from_predictions.py b/neurons/validator/alembic/versions/M_2025_12_03__15_45_05_backfill_agent_runs_from_predictions.py new file mode 100644 index 0000000..784e440 --- /dev/null +++ b/neurons/validator/alembic/versions/M_2025_12_03__15_45_05_backfill_agent_runs_from_predictions.py @@ -0,0 +1,54 @@ +"""Backfill agent_runs from predictions + +Revision ID: e330661ab24a +Revises: 40606aaa49f9 +Create Date: 2025-11-28 15:32:05.000000 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "e330661ab24a" +down_revision: Union[str, None] = "0fc1f13544dc" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute( + """ + INSERT OR IGNORE INTO agent_runs ( + run_id, + unique_event_id, + agent_version_id, + miner_uid, + miner_hotkey, + status, + exported, + is_final, + created_at, + updated_at + ) + SELECT + run_id, + unique_event_id, + version_id, + miner_uid, + miner_hotkey, + 'SUCCESS', + 0, + 1, + submitted, + submitted + FROM predictions + WHERE run_id IS NOT NULL + AND version_id IS NOT NULL + """ + ) + + +def downgrade() -> None: + pass diff --git a/neurons/validator/db/operations.py b/neurons/validator/db/operations.py index ed74eb1..9218ec8 100644 --- a/neurons/validator/db/operations.py +++ b/neurons/validator/db/operations.py @@ -4,6 +4,17 @@ from pydantic import BaseModel from neurons.validator.db.client import DatabaseClient +from neurons.validator.models.agent_run_logs import ( + AGENT_RUN_LOGS_FIELDS, + AgentRunLogExportedStatus, + AgentRunLogsModel, +) +from neurons.validator.models.agent_runs import ( + AGENT_RUNS_FIELDS, + AgentRunExportedStatus, + AgentRunsModel, + AgentRunStatus, +) from neurons.validator.models.event import EVENTS_FIELDS, EventsModel, EventStatus from neurons.validator.models.miner import MINERS_FIELDS, MinersModel from neurons.validator.models.miner_agent import MINER_AGENTS_FIELDS, MinerAgentsModel @@ -958,28 +969,23 @@ async def mark_scores_as_exported(self, event_id: str) -> list: async def get_last_metagraph_scores(self) -> list: """ - Returns the last known metagraph_score for each miner_uid, miner_hotkey; - We cannot simply take from the last event - could be an old event scored now, so - if the miner registered after the event cutoff, we will have no metagraph_score + Returns metagraph_scores for all miners from the SINGLE latest processed event. + This ensures all miners are scored from the same ranking, preserving winner-take-all. """ rows = await self.__db_client.many( f""" - WITH grouped AS ( - SELECT miner_uid AS g_miner_uid, - miner_hotkey AS g_miner_hotkey, - MAX(ROWID) AS max_rowid + WITH latest_event AS ( + SELECT event_id FROM scores WHERE processed = 1 AND created_at > datetime(CURRENT_TIMESTAMP, '-10 day') - GROUP BY miner_uid, miner_hotkey + ORDER BY ROWID DESC + LIMIT 1 ) SELECT {', '.join(SCORE_FIELDS)} FROM scores s - JOIN grouped - ON s.miner_uid = grouped.g_miner_uid - AND s.miner_hotkey = grouped.g_miner_hotkey - AND s.ROWID = grouped.max_rowid + WHERE s.event_id = (SELECT event_id FROM latest_event) """, use_row_factory=True, ) @@ -1113,3 +1119,206 @@ async def get_active_agents(self, limit: int | None = None) -> list[MinerAgentsM ) return self._parse_rows(model=MinerAgentsModel, rows=rows) + + async def upsert_agent_runs(self, runs: list[AgentRunsModel]) -> None: + if not runs: + return + + fields_to_insert = [ + "run_id", + "unique_event_id", + "agent_version_id", + "miner_uid", + "miner_hotkey", + "status", + "exported", + "is_final", + ] + + run_tuples = [ + ( + run.run_id, + run.unique_event_id, + run.agent_version_id, + run.miner_uid, + run.miner_hotkey, + run.status.value, + 1 if run.exported else 0, + 1 if run.is_final else 0, + ) + for run in runs + ] + + placeholders = ", ".join(["?"] * len(fields_to_insert)) + columns = ", ".join(fields_to_insert) + + await self.__db_client.insert_many( + f""" + INSERT INTO agent_runs ({columns}) + VALUES ({placeholders}) + ON CONFLICT (run_id) + DO UPDATE SET + status = excluded.status, + exported = excluded.exported, + is_final = excluded.is_final, + updated_at = CURRENT_TIMESTAMP + """, + run_tuples, + ) + + async def get_unexported_agent_runs(self, limit: int = 1000) -> list[AgentRunsModel]: + rows = await self.__db_client.many( + f""" + SELECT {', '.join(AGENT_RUNS_FIELDS)} + FROM agent_runs + WHERE exported = ? + ORDER BY created_at ASC + LIMIT ? + """, + [AgentRunExportedStatus.NOT_EXPORTED, limit], + use_row_factory=True, + ) + + return self._parse_rows(model=AgentRunsModel, rows=rows) + + async def mark_agent_runs_as_exported(self, run_ids: list[str]) -> None: + if not run_ids: + return + + placeholders = ", ".join(["?" for _ in run_ids]) + + await self.__db_client.update( + f""" + UPDATE agent_runs + SET + exported = ?, + updated_at = CURRENT_TIMESTAMP + WHERE run_id IN ({placeholders}) + """, + [AgentRunExportedStatus.EXPORTED] + run_ids, + ) + + async def insert_agent_run_log(self, run_id: str, log_content: str) -> None: + truncated_log = log_content[:30000] if len(log_content) > 30000 else log_content + + await self.__db_client.insert_many( + """ + INSERT INTO agent_run_logs (run_id, log_content, exported) + VALUES (?, ?, ?) + ON CONFLICT (run_id) + DO UPDATE SET + log_content = excluded.log_content, + exported = excluded.exported, + updated_at = CURRENT_TIMESTAMP + """, + [(run_id, truncated_log, AgentRunLogExportedStatus.NOT_EXPORTED)], + ) + + async def get_unexported_agent_run_logs(self, limit: int = 100) -> list[AgentRunLogsModel]: + rows = await self.__db_client.many( + f""" + SELECT {', '.join(AGENT_RUN_LOGS_FIELDS)} + FROM agent_run_logs + WHERE exported = ? + ORDER BY created_at ASC + LIMIT ? + """, + [AgentRunLogExportedStatus.NOT_EXPORTED, limit], + use_row_factory=True, + ) + + return self._parse_rows(model=AgentRunLogsModel, rows=rows) + + async def mark_agent_run_logs_as_exported(self, run_ids: list[str]) -> None: + if not run_ids: + return + + placeholders = ", ".join(["?" for _ in run_ids]) + + await self.__db_client.update( + f""" + UPDATE agent_run_logs + SET + exported = ?, + updated_at = CURRENT_TIMESTAMP + WHERE run_id IN ({placeholders}) + """, + [AgentRunLogExportedStatus.EXPORTED] + run_ids, + ) + + async def delete_agent_run_logs(self, batch_size: int) -> Iterable[tuple[int]]: + return await self.__db_client.delete( + """ + WITH logs_to_delete AS ( + SELECT + ROWID + FROM + agent_run_logs + WHERE + exported = ? + AND datetime(created_at) < datetime(CURRENT_TIMESTAMP, '-7 day') + ORDER BY + ROWID ASC + LIMIT ? + ) + DELETE FROM + agent_run_logs + WHERE + ROWID IN ( + SELECT + ROWID + FROM + logs_to_delete + ) + RETURNING + ROWID + """, + [AgentRunLogExportedStatus.EXPORTED, batch_size], + ) + + async def count_runs_for_event_and_agent( + self, + unique_event_id: str, + agent_version_id: str, + status: Optional[AgentRunStatus] = None, + is_final: Optional[bool] = None, + ) -> int: + conditions = [ + "unique_event_id = ?", + "agent_version_id = ?", + ] + params: list = [unique_event_id, agent_version_id] + + if status is not None: + conditions.append("status = ?") + params.append(status.value) + + if is_final is not None: + conditions.append("is_final = ?") + params.append(1 if is_final else 0) + + sql = f""" + SELECT COUNT(*) + FROM agent_runs + WHERE {' AND '.join(conditions)} + """ + + result = await self.__db_client.one(sql, params) + return result[0] if result else 0 + + async def has_final_run( + self, + unique_event_id: str, + agent_version_id: str, + ) -> bool: + sql = """ + SELECT 1 + FROM agent_runs + WHERE unique_event_id = ? + AND agent_version_id = ? + AND is_final = 1 + LIMIT 1 + """ + + result = await self.__db_client.one(sql, [unique_event_id, agent_version_id]) + return result is not None diff --git a/neurons/validator/db/sql/metagraph_score.sql b/neurons/validator/db/sql/metagraph_score.sql index 5a64602..2733a25 100644 --- a/neurons/validator/db/sql/metagraph_score.sql +++ b/neurons/validator/db/sql/metagraph_score.sql @@ -18,6 +18,12 @@ last_n_events AS ( ORDER BY event_min_row DESC LIMIT :n_events ), +current_event_miners AS ( + -- Get all (miner_uid, miner_hotkey) pairs that exist in the current event + SELECT DISTINCT miner_uid, miner_hotkey + FROM scores + WHERE event_id = :event_id +), miner_event_briers AS ( -- Calculate Brier score for each miner's prediction for each event in the window SELECT @@ -28,6 +34,9 @@ miner_event_briers AS ( s.event_score AS brier_score FROM scores s JOIN events e ON s.event_id = e.event_id + JOIN current_event_miners cem + ON s.miner_uid = cem.miner_uid + AND s.miner_hotkey = cem.miner_hotkey WHERE s.event_id IN (SELECT event_id FROM last_n_events) ), avg_brier_per_miner AS ( diff --git a/neurons/validator/db/tests/test_db_operations_part_2.py b/neurons/validator/db/tests/test_db_operations_part_2.py index f23b3bb..9863626 100644 --- a/neurons/validator/db/tests/test_db_operations_part_2.py +++ b/neurons/validator/db/tests/test_db_operations_part_2.py @@ -232,75 +232,62 @@ async def test_mark_peer_scores_as_exported(self, db_operations, db_client): async def test_get_last_metagraph_scores(self, db_operations, db_client): created_at = datetime.now(timezone.utc) - timedelta(days=1) - scores_list = [ + + # Older event with some scores (should be ignored) + older_event_scores = [ ScoresModel( - event_id="expected_event_id_1", + event_id="older_event", miner_uid=3, miner_hotkey="hk3", prediction=0.75, event_score=0.80, - metagraph_score=1.0, - created_at=created_at, - spec_version=1, - processed=True, - ), - ScoresModel( - event_id="expected_event_id_2", - miner_uid=3, - miner_hotkey="hk3", - prediction=0.75, - event_score=0.40, - metagraph_score=0.9, + metagraph_score=0.5, # Different score than latest created_at=created_at, spec_version=1, processed=True, ), + ] + + # Latest event with ALL miners (this is what should be returned) + latest_event_scores = [ ScoresModel( - event_id="expected_event_id_3", + event_id="latest_event", miner_uid=3, miner_hotkey="hk3", prediction=0.75, - event_score=0.60, - metagraph_score=0.835, - created_at=created_at, - spec_version=1, - processed=True, - ), - ScoresModel( - event_id="expected_event_id_2", - miner_uid=4, - miner_hotkey="hk4", - prediction=0.75, - event_score=0.40, - metagraph_score=0.1, + event_score=0.10, + metagraph_score=0.198, # Winner (rank 1) created_at=created_at, spec_version=1, processed=True, ), ScoresModel( - event_id="expected_event_id_1", + event_id="latest_event", miner_uid=4, miner_hotkey="hk4", - prediction=0.75, - event_score=0.40, - metagraph_score=0.165, + prediction=0.50, + event_score=0.25, + metagraph_score=0.001, # Rank 2 created_at=created_at, spec_version=1, processed=True, ), ScoresModel( - event_id="expected_event_id_2", + event_id="latest_event", miner_uid=5, miner_hotkey="hk5", - prediction=0.75, - event_score=-0.40, - metagraph_score=0.0, + prediction=0.50, + event_score=0.25, + metagraph_score=0.001, # Rank 3 created_at=created_at, spec_version=1, processed=True, ), ] - # insert scores + + # Insert older event first, then latest event + scores_list = older_event_scores + latest_event_scores + sql = f""" INSERT INTO scores ({', '.join(SCORE_FIELDS)}) VALUES ({', '.join(['?'] * len(SCORE_FIELDS))}) @@ -313,24 +300,27 @@ async def test_get_last_metagraph_scores(self, db_operations, db_client): inserted_scores = await db_client.many("SELECT * FROM scores") assert len(inserted_scores) == len(scores_list) - # get last metagraph scores + # Get last metagraph scores - should return all miners from latest_event only last_metagraph_scores = await db_operations.get_last_metagraph_scores() + + # Should have 3 scores, all from latest_event assert len(last_metagraph_scores) == 3 - assert last_metagraph_scores[0].event_id == "expected_event_id_3" - assert last_metagraph_scores[0].miner_uid == 3 - assert last_metagraph_scores[0].miner_hotkey == "hk3" - assert last_metagraph_scores[0].metagraph_score == 0.835 + # All scores should be from the same (latest) event + for score in last_metagraph_scores: + assert score.event_id == "latest_event" + + # Verify the miners and their scores + scores_by_uid = {s.miner_uid: s for s in last_metagraph_scores} + + assert scores_by_uid[3].miner_hotkey == "hk3" + assert scores_by_uid[3].metagraph_score == 0.198 # Winner - assert last_metagraph_scores[1].event_id == "expected_event_id_1" - assert last_metagraph_scores[1].miner_uid == 4 - assert last_metagraph_scores[1].miner_hotkey == "hk4" - assert last_metagraph_scores[1].metagraph_score == 0.165 + assert scores_by_uid[4].miner_hotkey == "hk4" + assert scores_by_uid[4].metagraph_score == 0.001 - assert last_metagraph_scores[2].event_id == "expected_event_id_2" - assert last_metagraph_scores[2].miner_uid == 5 - assert last_metagraph_scores[2].miner_hotkey == "hk5" - assert last_metagraph_scores[2].metagraph_score == 0.0 + assert scores_by_uid[5].miner_hotkey == "hk5" + assert scores_by_uid[5].metagraph_score == 0.001 async def test_mark_event_as_discarded(self, db_operations, db_client): now = datetime.now(timezone.utc) diff --git a/neurons/validator/db/tests/test_db_operations_part_4.py b/neurons/validator/db/tests/test_db_operations_part_4.py new file mode 100644 index 0000000..a5deb73 --- /dev/null +++ b/neurons/validator/db/tests/test_db_operations_part_4.py @@ -0,0 +1,1083 @@ +import asyncio +from datetime import datetime, timezone +from unittest.mock import MagicMock +from uuid import uuid4 + +import pytest + +from neurons.validator.db.client import DatabaseClient +from neurons.validator.db.operations import DatabaseOperations +from neurons.validator.db.tests.test_utils import TestDbOperationsBase +from neurons.validator.models.agent_runs import AgentRunsModel, AgentRunStatus +from neurons.validator.models.event import EventsModel, EventStatus +from neurons.validator.models.miner_agent import MinerAgentsModel +from neurons.validator.utils.logger.logger import NuminousLogger + + +class TestDbOperationsPart4(TestDbOperationsBase): + @pytest.fixture + async def db_operations(self, db_client: DatabaseClient): + logger = MagicMock(spec=NuminousLogger) + db_operations = DatabaseOperations(db_client=db_client, logger=logger) + return db_operations + + async def _create_event(self, db_operations: DatabaseOperations, unique_event_id: str) -> None: + """Helper to create an event for FK constraint""" + event = EventsModel( + unique_event_id=unique_event_id, + event_id=f"event_{unique_event_id}", + market_type="test_market", + event_type="test_type", + description="Test event", + outcome=None, + status=EventStatus.PENDING, + metadata="{}", + created_at="2024-01-01T00:00:00+00:00", + cutoff="2024-12-31T23:59:59+00:00", + ) + await db_operations.upsert_events([event]) + + async def _create_miner_agent( + self, db_operations: DatabaseOperations, version_id: str, miner_uid: int = 42 + ) -> None: + """Helper to create a miner agent for FK constraint""" + agent = MinerAgentsModel( + version_id=version_id, + miner_uid=miner_uid, + miner_hotkey="5GTest...", + agent_name="TestAgent", + version_number=1, + file_path=f"/data/agents/{miner_uid}/test.py", + pulled_at=datetime(2024, 1, 1, 10, 0, 0, tzinfo=timezone.utc), + created_at=datetime(2024, 1, 1, 9, 0, 0, tzinfo=timezone.utc), + ) + await db_operations.upsert_miner_agents([agent]) + + async def test_upsert_agent_runs_insert( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test inserting a new agent run""" + # Create FK dependencies + await self._create_event(db_operations, "event_123") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=42) + + run = AgentRunsModel( + run_id=str(uuid4()), + unique_event_id="event_123", + agent_version_id="agent_v1", + miner_uid=42, + miner_hotkey="5GTest...", + status=AgentRunStatus.SUCCESS, + ) + + await db_operations.upsert_agent_runs([run]) + + # Verify insertion + rows = await db_client.many( + "SELECT run_id, status, is_final FROM agent_runs WHERE run_id = ?", + [run.run_id], + ) + + assert len(rows) == 1 + assert rows[0][0] == run.run_id + assert rows[0][1] == AgentRunStatus.SUCCESS.value + assert rows[0][2] == 1 # is_final defaults to True + + async def test_upsert_agent_runs_update( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test updating an existing agent run""" + # Create FK dependencies + await self._create_event(db_operations, "event_123") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=42) + + run_id = str(uuid4()) + + # Insert initial run + initial_run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_123", + agent_version_id="agent_v1", + miner_uid=42, + miner_hotkey="5GTest...", + status=AgentRunStatus.SANDBOX_TIMEOUT, + is_final=False, + ) + + await db_operations.upsert_agent_runs([initial_run]) + + # Update with new status + updated_run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_123", + agent_version_id="agent_v1", + miner_uid=42, + miner_hotkey="5GTest...", + status=AgentRunStatus.SUCCESS, + is_final=True, + ) + + await db_operations.upsert_agent_runs([updated_run]) + + # Verify update + rows = await db_client.many( + "SELECT run_id, status, is_final FROM agent_runs WHERE run_id = ?", + [run_id], + ) + + assert len(rows) == 1 + assert rows[0][0] == run_id + assert rows[0][1] == AgentRunStatus.SUCCESS.value + assert rows[0][2] == 1 # is_final updated to True + + async def test_upsert_agent_runs_empty(self, db_operations: DatabaseOperations): + """Test inserting with empty list does nothing""" + await db_operations.upsert_agent_runs([]) + + async def test_get_unexported_agent_runs_empty(self, db_operations: DatabaseOperations): + """Test getting unexported runs when none exist""" + runs = await db_operations.get_unexported_agent_runs() + assert runs == [] + + async def test_get_unexported_agent_runs( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test getting unexported runs""" + # Create FK dependencies + await self._create_event(db_operations, "event_1") + await self._create_event(db_operations, "event_2") + await self._create_event(db_operations, "event_3") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + await self._create_miner_agent(db_operations, "agent_v2", miner_uid=2) + await self._create_miner_agent(db_operations, "agent_v3", miner_uid=3) + + # Insert exported run + exported_run = AgentRunsModel( + run_id=str(uuid4()), + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + exported=True, + ) + await db_operations.upsert_agent_runs([exported_run]) + + # Insert unexported runs + unexported_run_1 = AgentRunsModel( + run_id=str(uuid4()), + unique_event_id="event_2", + agent_version_id="agent_v2", + miner_uid=2, + miner_hotkey="hotkey_2", + status=AgentRunStatus.SUCCESS, + exported=False, + ) + await db_operations.upsert_agent_runs([unexported_run_1]) + + unexported_run_2 = AgentRunsModel( + run_id=str(uuid4()), + unique_event_id="event_3", + agent_version_id="agent_v3", + miner_uid=3, + miner_hotkey="hotkey_3", + status=AgentRunStatus.INTERNAL_AGENT_ERROR, + exported=False, + ) + await db_operations.upsert_agent_runs([unexported_run_2]) + + # Get unexported runs + runs = await db_operations.get_unexported_agent_runs() + + assert len(runs) == 2 + run_ids = [run.run_id for run in runs] + assert unexported_run_1.run_id in run_ids + assert unexported_run_2.run_id in run_ids + assert exported_run.run_id not in run_ids + + async def test_get_unexported_agent_runs_with_limit(self, db_operations: DatabaseOperations): + """Test getting unexported runs with limit""" + # Create FK dependencies + for i in range(5): + await self._create_event(db_operations, f"event_{i}") + await self._create_miner_agent(db_operations, f"agent_v{i}", miner_uid=i) + + # Insert 5 unexported runs + for i in range(5): + run = AgentRunsModel( + run_id=str(uuid4()), + unique_event_id=f"event_{i}", + agent_version_id=f"agent_v{i}", + miner_uid=i, + miner_hotkey=f"hotkey_{i}", + status=AgentRunStatus.SUCCESS, + exported=False, + ) + await db_operations.upsert_agent_runs([run]) + + # Get only 3 + runs = await db_operations.get_unexported_agent_runs(limit=3) + + assert len(runs) == 3 + + async def test_mark_agent_runs_as_exported( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test marking runs as exported""" + # Create FK dependencies + await self._create_event(db_operations, "event_1") + await self._create_event(db_operations, "event_2") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + await self._create_miner_agent(db_operations, "agent_v2", miner_uid=2) + + # Insert unexported runs + run_1_id = str(uuid4()) + run_2_id = str(uuid4()) + + run_1 = AgentRunsModel( + run_id=run_1_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + exported=False, + ) + await db_operations.upsert_agent_runs([run_1]) + + run_2 = AgentRunsModel( + run_id=run_2_id, + unique_event_id="event_2", + agent_version_id="agent_v2", + miner_uid=2, + miner_hotkey="hotkey_2", + status=AgentRunStatus.SUCCESS, + exported=False, + ) + await db_operations.upsert_agent_runs([run_2]) + + # Mark as exported + await db_operations.mark_agent_runs_as_exported([run_1_id, run_2_id]) + + # Verify they're marked + rows = await db_client.many( + "SELECT run_id, exported FROM agent_runs WHERE run_id IN (?, ?)", + [run_1_id, run_2_id], + ) + + assert len(rows) == 2 + for row in rows: + assert row[1] == 1 # exported = True + + async def test_mark_agent_runs_as_exported_empty_list(self, db_operations: DatabaseOperations): + """Test marking empty list does nothing""" + await db_operations.mark_agent_runs_as_exported([]) + + async def test_retry_scenario( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test multiple runs for same event/miner with is_final flag""" + # Create FK dependencies + event_id = "event_retry_test" + miner_uid = 42 + agent_version_id = "agent_v1" + await self._create_event(db_operations, event_id) + await self._create_miner_agent(db_operations, agent_version_id, miner_uid=miner_uid) + + # First attempt - timeout, not final + run_1 = AgentRunsModel( + run_id=str(uuid4()), + unique_event_id=event_id, + agent_version_id=agent_version_id, + miner_uid=miner_uid, + miner_hotkey="hotkey_42", + status=AgentRunStatus.SANDBOX_TIMEOUT, + is_final=False, + ) + await db_operations.upsert_agent_runs([run_1]) + + # Second attempt - timeout, not final + run_2 = AgentRunsModel( + run_id=str(uuid4()), + unique_event_id=event_id, + agent_version_id=agent_version_id, + miner_uid=miner_uid, + miner_hotkey="hotkey_42", + status=AgentRunStatus.SANDBOX_TIMEOUT, + is_final=False, + ) + await db_operations.upsert_agent_runs([run_2]) + + # Third attempt - success, final + run_3 = AgentRunsModel( + run_id=str(uuid4()), + unique_event_id=event_id, + agent_version_id=agent_version_id, + miner_uid=miner_uid, + miner_hotkey="hotkey_42", + status=AgentRunStatus.SUCCESS, + is_final=True, + ) + await db_operations.upsert_agent_runs([run_3]) + + # Verify all 3 runs exist + all_runs = await db_client.many( + "SELECT run_id, status, is_final FROM agent_runs WHERE unique_event_id = ? ORDER BY created_at", + [event_id], + ) + + assert len(all_runs) == 3 + assert all_runs[0][2] == 0 # is_final = False + assert all_runs[1][2] == 0 # is_final = False + assert all_runs[2][2] == 1 # is_final = True + + # Verify only final run when filtering + final_runs = await db_client.many( + "SELECT run_id FROM agent_runs WHERE unique_event_id = ? AND is_final = 1", + [event_id], + ) + + assert len(final_runs) == 1 + assert final_runs[0][0] == run_3.run_id + + async def test_insert_agent_run_log( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test inserting a new agent run log""" + # Create FK dependencies + await self._create_event(db_operations, "event_123") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=42) + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_123", + agent_version_id="agent_v1", + miner_uid=42, + miner_hotkey="5GTest...", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + + # Insert log + log_content = "Agent execution completed successfully\nOutput: prediction=0.75" + await db_operations.insert_agent_run_log(run_id, log_content) + + # Verify insertion + rows = await db_client.many( + "SELECT run_id, log_content, exported FROM agent_run_logs WHERE run_id = ?", + [run_id], + ) + + assert len(rows) == 1 + assert rows[0][0] == run_id + assert rows[0][1] == log_content + assert rows[0][2] == 0 # exported defaults to NOT_EXPORTED + + async def test_insert_agent_run_log_upsert( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test updating an existing log (ON CONFLICT behavior)""" + # Create FK dependencies + await self._create_event(db_operations, "event_123") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=42) + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_123", + agent_version_id="agent_v1", + miner_uid=42, + miner_hotkey="5GTest...", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + + # Insert initial log + initial_log = "First log entry" + await db_operations.insert_agent_run_log(run_id, initial_log) + + # Update with new log + updated_log = "Updated log entry with more details" + await db_operations.insert_agent_run_log(run_id, updated_log) + + # Verify only one row exists with updated content + rows = await db_client.many( + "SELECT run_id, log_content FROM agent_run_logs WHERE run_id = ?", + [run_id], + ) + + assert len(rows) == 1 + assert rows[0][0] == run_id + assert rows[0][1] == updated_log + + async def test_insert_agent_run_log_truncation( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test log content truncation at 30,000 chars""" + # Create FK dependencies + await self._create_event(db_operations, "event_123") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=42) + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_123", + agent_version_id="agent_v1", + miner_uid=42, + miner_hotkey="5GTest...", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + + # Create log larger than 30,000 chars + large_log = "x" * 35000 + await db_operations.insert_agent_run_log(run_id, large_log) + + # Verify truncation + rows = await db_client.many( + "SELECT run_id, log_content FROM agent_run_logs WHERE run_id = ?", + [run_id], + ) + + assert len(rows) == 1 + assert rows[0][0] == run_id + assert len(rows[0][1]) == 30000 # Truncated + assert rows[0][1] == "x" * 30000 + + async def test_insert_agent_run_log_empty_content( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test inserting log with empty content""" + # Create FK dependencies + await self._create_event(db_operations, "event_123") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=42) + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_123", + agent_version_id="agent_v1", + miner_uid=42, + miner_hotkey="5GTest...", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + + # Insert empty log + await db_operations.insert_agent_run_log(run_id, "") + + # Verify insertion + rows = await db_client.many( + "SELECT run_id, log_content FROM agent_run_logs WHERE run_id = ?", + [run_id], + ) + + assert len(rows) == 1 + assert rows[0][0] == run_id + assert rows[0][1] == "" + + async def test_insert_agent_run_log_special_characters( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test inserting log with special characters and unicode""" + # Create FK dependencies + await self._create_event(db_operations, "event_123") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=42) + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_123", + agent_version_id="agent_v1", + miner_uid=42, + miner_hotkey="5GTest...", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + + # Log with special characters + special_log = """Line 1: Normal text +Line 2: Special chars: !@#$%^&*() +Line 3: Unicode: 你好世界 🚀 +Line 4: Newlines and tabs\t\n""" + + await db_operations.insert_agent_run_log(run_id, special_log) + + # Verify insertion + rows = await db_client.many( + "SELECT log_content FROM agent_run_logs WHERE run_id = ?", + [run_id], + ) + + assert len(rows) == 1 + assert rows[0][0] == special_log + + async def test_insert_agent_run_log_fk_constraint( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test FK constraint - can't insert log without corresponding agent_run""" + non_existent_run_id = str(uuid4()) + + # Attempt to insert log for non-existent run_id + with pytest.raises(Exception): # Should raise FOREIGN KEY constraint error + await db_operations.insert_agent_run_log(non_existent_run_id, "This should fail") + + async def test_get_unexported_agent_run_logs_empty(self, db_operations: DatabaseOperations): + """Test getting unexported logs when none exist""" + logs = await db_operations.get_unexported_agent_run_logs() + assert logs == [] + + async def test_get_unexported_agent_run_logs( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test getting unexported logs""" + # Create FK dependencies and runs + await self._create_event(db_operations, "event_1") + await self._create_event(db_operations, "event_2") + await self._create_event(db_operations, "event_3") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + await self._create_miner_agent(db_operations, "agent_v2", miner_uid=2) + await self._create_miner_agent(db_operations, "agent_v3", miner_uid=3) + + run_1_id = str(uuid4()) + run_2_id = str(uuid4()) + run_3_id = str(uuid4()) + + # Create runs + run_1 = AgentRunsModel( + run_id=run_1_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run_1]) + + run_2 = AgentRunsModel( + run_id=run_2_id, + unique_event_id="event_2", + agent_version_id="agent_v2", + miner_uid=2, + miner_hotkey="hotkey_2", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run_2]) + + run_3 = AgentRunsModel( + run_id=run_3_id, + unique_event_id="event_3", + agent_version_id="agent_v3", + miner_uid=3, + miner_hotkey="hotkey_3", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run_3]) + + # Insert exported log + await db_operations.insert_agent_run_log(run_1_id, "Exported log content") + await db_client.update( + "UPDATE agent_run_logs SET exported = 1 WHERE run_id = ?", [run_1_id] + ) + + # Insert unexported logs + await db_operations.insert_agent_run_log(run_2_id, "Unexported log 1") + await db_operations.insert_agent_run_log(run_3_id, "Unexported log 2") + + # Get unexported logs + logs = await db_operations.get_unexported_agent_run_logs() + + assert len(logs) == 2 + run_ids = [log.run_id for log in logs] + assert run_2_id in run_ids + assert run_3_id in run_ids + assert run_1_id not in run_ids + + async def test_get_unexported_agent_run_logs_ordering( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test that logs are ordered by created_at ASC (oldest first)""" + # Create FK dependencies + await self._create_event(db_operations, "event_1") + await self._create_event(db_operations, "event_2") + await self._create_event(db_operations, "event_3") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + await self._create_miner_agent(db_operations, "agent_v2", miner_uid=2) + await self._create_miner_agent(db_operations, "agent_v3", miner_uid=3) + + run_ids = [str(uuid4()), str(uuid4()), str(uuid4())] + + # Create runs + for i, run_id in enumerate(run_ids): + run = AgentRunsModel( + run_id=run_id, + unique_event_id=f"event_{i+1}", + agent_version_id=f"agent_v{i+1}", + miner_uid=i + 1, + miner_hotkey=f"hotkey_{i+1}", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, f"Log {i+1}") + + # Get logs + logs = await db_operations.get_unexported_agent_run_logs() + + assert len(logs) == 3 + # Should be ordered by created_at (oldest first) + assert logs[0].run_id == run_ids[0] + assert logs[1].run_id == run_ids[1] + assert logs[2].run_id == run_ids[2] + + async def test_get_unexported_agent_run_logs_with_limit( + self, db_operations: DatabaseOperations + ): + """Test getting unexported logs with limit""" + # Create FK dependencies + for i in range(5): + await self._create_event(db_operations, f"event_{i}") + await self._create_miner_agent(db_operations, f"agent_v{i}", miner_uid=i) + + # Create 5 runs and logs + for i in range(5): + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id=f"event_{i}", + agent_version_id=f"agent_v{i}", + miner_uid=i, + miner_hotkey=f"hotkey_{i}", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, f"Log {i}") + + # Get only 3 + logs = await db_operations.get_unexported_agent_run_logs(limit=3) + + assert len(logs) == 3 + + async def test_get_unexported_agent_run_logs_mixed_exported( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test filtering of mixed exported/unexported logs""" + # Create FK dependencies + for i in range(4): + await self._create_event(db_operations, f"event_{i}") + await self._create_miner_agent(db_operations, f"agent_v{i}", miner_uid=i) + + run_ids = [] + for i in range(4): + run_id = str(uuid4()) + run_ids.append(run_id) + run = AgentRunsModel( + run_id=run_id, + unique_event_id=f"event_{i}", + agent_version_id=f"agent_v{i}", + miner_uid=i, + miner_hotkey=f"hotkey_{i}", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, f"Log {i}") + + # Mark some as exported (0 and 2) + await db_client.update( + "UPDATE agent_run_logs SET exported = 1 WHERE run_id IN (?, ?)", + [run_ids[0], run_ids[2]], + ) + + # Get unexported + logs = await db_operations.get_unexported_agent_run_logs() + + assert len(logs) == 2 + returned_run_ids = [log.run_id for log in logs] + assert run_ids[1] in returned_run_ids + assert run_ids[3] in returned_run_ids + assert run_ids[0] not in returned_run_ids + assert run_ids[2] not in returned_run_ids + + async def test_get_unexported_agent_run_logs_returns_model( + self, db_operations: DatabaseOperations + ): + """Test that method returns proper AgentRunLogsModel objects""" + # Create FK dependencies + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + + log_content = "Test log content" + await db_operations.insert_agent_run_log(run_id, log_content) + + # Get logs + logs = await db_operations.get_unexported_agent_run_logs() + + assert len(logs) == 1 + log = logs[0] + + # Verify it's a proper model with all fields + assert log.run_id == run_id + assert log.log_content == log_content + assert log.exported is False + assert log.created_at is not None + assert log.updated_at is not None + + async def test_mark_agent_run_logs_as_exported( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test marking logs as exported""" + # Create FK dependencies + await self._create_event(db_operations, "event_1") + await self._create_event(db_operations, "event_2") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + await self._create_miner_agent(db_operations, "agent_v2", miner_uid=2) + + # Create agent runs + run_1_id = str(uuid4()) + run_2_id = str(uuid4()) + + run_1 = AgentRunsModel( + run_id=run_1_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run_1]) + + run_2 = AgentRunsModel( + run_id=run_2_id, + unique_event_id="event_2", + agent_version_id="agent_v2", + miner_uid=2, + miner_hotkey="hotkey_2", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run_2]) + + # Insert unexported logs + await db_operations.insert_agent_run_log(run_1_id, "Log content 1") + await db_operations.insert_agent_run_log(run_2_id, "Log content 2") + + # Mark as exported + await db_operations.mark_agent_run_logs_as_exported([run_1_id, run_2_id]) + + # Verify they're marked + rows = await db_client.many( + "SELECT run_id, exported FROM agent_run_logs WHERE run_id IN (?, ?)", + [run_1_id, run_2_id], + ) + + assert len(rows) == 2 + for row in rows: + assert row[1] == 1 # exported = True + + async def test_mark_agent_run_logs_as_exported_empty_list( + self, db_operations: DatabaseOperations + ): + """Test marking empty list does nothing""" + await db_operations.mark_agent_run_logs_as_exported([]) + + async def test_mark_agent_run_logs_as_exported_single( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test marking single log as exported""" + # Create FK dependencies + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + + # Create agent run + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + + # Insert log + await db_operations.insert_agent_run_log(run_id, "Log content") + + # Mark as exported + await db_operations.mark_agent_run_logs_as_exported([run_id]) + + # Verify + row = await db_client.one("SELECT exported FROM agent_run_logs WHERE run_id = ?", [run_id]) + assert row[0] == 1 + + async def test_mark_agent_run_logs_as_exported_partial( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test marking only some logs as exported""" + # Create FK dependencies + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + + # Create agent runs + run_1_id = str(uuid4()) + run_2_id = str(uuid4()) + run_3_id = str(uuid4()) + + for run_id in [run_1_id, run_2_id, run_3_id]: + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, f"Log for {run_id}") + + # Mark only first two as exported + await db_operations.mark_agent_run_logs_as_exported([run_1_id, run_2_id]) + + # Verify only first two are exported + rows = await db_client.many( + "SELECT run_id, exported FROM agent_run_logs ORDER BY run_id", + [], + ) + + assert len(rows) == 3 + exported_run_ids = {row[0] for row in rows if row[1] == 1} + unexported_run_ids = {row[0] for row in rows if row[1] == 0} + + assert exported_run_ids == {run_1_id, run_2_id} + assert unexported_run_ids == {run_3_id} + + async def test_mark_agent_run_logs_as_exported_updates_timestamp( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test marking as exported updates the updated_at timestamp""" + # Create FK dependencies + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + + # Create agent run + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + + # Insert log + await db_operations.insert_agent_run_log(run_id, "Log content") + + # Get initial timestamp + row_before = await db_client.one( + "SELECT updated_at FROM agent_run_logs WHERE run_id = ?", [run_id] + ) + updated_at_before = row_before[0] + + # Wait for timestamp to change (SQLite CURRENT_TIMESTAMP has second precision) + await asyncio.sleep(1.1) + + # Mark as exported + await db_operations.mark_agent_run_logs_as_exported([run_id]) + + # Get new timestamp + row_after = await db_client.one( + "SELECT updated_at FROM agent_run_logs WHERE run_id = ?", [run_id] + ) + updated_at_after = row_after[0] + + # Verify timestamp was updated + assert updated_at_after > updated_at_before + + async def test_mark_agent_run_logs_as_exported_nonexistent( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test marking nonexistent logs does not error""" + # Try to mark logs that don't exist + await db_operations.mark_agent_run_logs_as_exported(["nonexistent_1", "nonexistent_2"]) + + # Verify no logs exist + count = await db_client.one("SELECT COUNT(*) FROM agent_run_logs", []) + assert count[0] == 0 + + async def test_delete_agent_run_logs_old_exported( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test deletion of old exported logs""" + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + + run_id_old = str(uuid4()) + run_id_recent = str(uuid4()) + + for run_id in [run_id_old, run_id_recent]: + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + + await db_operations.insert_agent_run_log(run_id_old, "Old log") + await db_operations.insert_agent_run_log(run_id_recent, "Recent log") + + await db_operations.mark_agent_run_logs_as_exported([run_id_old, run_id_recent]) + + await db_client.update( + "UPDATE agent_run_logs SET created_at = datetime('now', '-8 day') WHERE run_id = ?", + [run_id_old], + ) + + deleted = await db_operations.delete_agent_run_logs(batch_size=10) + deleted_rowids = [row[0] for row in deleted] + + assert len(deleted_rowids) == 1 + + remaining = await db_client.many("SELECT run_id FROM agent_run_logs", []) + assert len(remaining) == 1 + assert remaining[0][0] == run_id_recent + + async def test_delete_agent_run_logs_batch_size( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test batch_size limiting""" + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + + run_ids = [str(uuid4()) for _ in range(5)] + for run_id in run_ids: + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, "Log content") + + await db_operations.mark_agent_run_logs_as_exported(run_ids) + + await db_client.update( + "UPDATE agent_run_logs SET created_at = datetime('now', '-8 day')", + [], + ) + + deleted = await db_operations.delete_agent_run_logs(batch_size=3) + deleted_rowids = [row[0] for row in deleted] + + assert len(deleted_rowids) == 3 + + remaining = await db_client.many("SELECT run_id FROM agent_run_logs", []) + assert len(remaining) == 2 + + async def test_delete_agent_run_logs_no_delete_unexported( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test unexported logs are not deleted""" + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, "Log content") + + await db_client.update( + "UPDATE agent_run_logs SET created_at = datetime('now', '-8 day') WHERE run_id = ?", + [run_id], + ) + + deleted = await db_operations.delete_agent_run_logs(batch_size=10) + deleted_rowids = [row[0] for row in deleted] + + assert len(deleted_rowids) == 0 + + remaining = await db_client.many("SELECT run_id FROM agent_run_logs", []) + assert len(remaining) == 1 + + async def test_delete_agent_run_logs_no_delete_recent_exported( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test recent exported logs are not deleted""" + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, "Log content") + await db_operations.mark_agent_run_logs_as_exported([run_id]) + + deleted = await db_operations.delete_agent_run_logs(batch_size=10) + deleted_rowids = [row[0] for row in deleted] + + assert len(deleted_rowids) == 0 + + remaining = await db_client.many("SELECT run_id FROM agent_run_logs", []) + assert len(remaining) == 1 + + async def test_delete_agent_run_logs_returns_rowids( + self, db_operations: DatabaseOperations, db_client: DatabaseClient + ): + """Test that deleted ROWIDs are returned""" + await self._create_event(db_operations, "event_1") + await self._create_miner_agent(db_operations, "agent_v1", miner_uid=1) + + run_id = str(uuid4()) + run = AgentRunsModel( + run_id=run_id, + unique_event_id="event_1", + agent_version_id="agent_v1", + miner_uid=1, + miner_hotkey="hotkey_1", + status=AgentRunStatus.SUCCESS, + ) + await db_operations.upsert_agent_runs([run]) + await db_operations.insert_agent_run_log(run_id, "Log content") + await db_operations.mark_agent_run_logs_as_exported([run_id]) + + await db_client.update( + "UPDATE agent_run_logs SET created_at = datetime('now', '-8 day') WHERE run_id = ?", + [run_id], + ) + + deleted = await db_operations.delete_agent_run_logs(batch_size=10) + + assert len(deleted) == 1 + assert isinstance(deleted[0], tuple) + assert isinstance(deleted[0][0], int) diff --git a/neurons/validator/main.py b/neurons/validator/main.py index 8c471c5..74596bd 100644 --- a/neurons/validator/main.py +++ b/neurons/validator/main.py @@ -14,6 +14,7 @@ from neurons.validator.tasks.db_cleaner import DbCleaner from neurons.validator.tasks.db_vacuum import DbVacuum from neurons.validator.tasks.delete_events import DeleteEvents +from neurons.validator.tasks.export_agent_runs import ExportAgentRuns from neurons.validator.tasks.export_predictions import ExportPredictions from neurons.validator.tasks.export_scores import ExportScores from neurons.validator.tasks.metagraph_scoring import MetagraphScoring @@ -168,6 +169,16 @@ async def main(): validator_hotkey=validator_hotkey, ) + export_agent_runs_task = ExportAgentRuns( + interval_seconds=300.0, + batch_size=500, + db_operations=db_operations, + api_client=numinous_api_client, + logger=logger, + validator_uid=validator_uid, + validator_hotkey=validator_hotkey, + ) + set_weights_task = SetWeights( interval_seconds=379.0, db_operations=db_operations, @@ -199,6 +210,7 @@ async def main(): scheduler.add(task=delete_events_task) scheduler.add(task=run_agents_task) scheduler.add(task=export_predictions_task) + scheduler.add(task=export_agent_runs_task) scheduler.add(task=scoring_task) scheduler.add(task=metagraph_scoring_task) scheduler.add(task=export_scores_task) diff --git a/neurons/validator/models/agent_run_logs.py b/neurons/validator/models/agent_run_logs.py new file mode 100644 index 0000000..f53a7ff --- /dev/null +++ b/neurons/validator/models/agent_run_logs.py @@ -0,0 +1,33 @@ +from datetime import datetime +from enum import IntEnum +from typing import Any, Optional + +from pydantic import BaseModel, field_validator + + +class AgentRunLogExportedStatus(IntEnum): + NOT_EXPORTED = 0 + EXPORTED = 1 + + +class AgentRunLogsModel(BaseModel): + run_id: str + log_content: str + exported: Optional[bool] = False + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + model_config = {"arbitrary_types_allowed": True} + + @property + def primary_key(self): + return ["run_id"] + + @field_validator("exported", mode="before") + def parse_exported_as_bool(cls, v: Any) -> bool: + # If the DB returns an integer, convert it to boolean + if isinstance(v, int): + return bool(v) + return v + + +AGENT_RUN_LOGS_FIELDS = AgentRunLogsModel.model_fields.keys() diff --git a/neurons/validator/models/agent_runs.py b/neurons/validator/models/agent_runs.py index 9312b8f..3fca344 100644 --- a/neurons/validator/models/agent_runs.py +++ b/neurons/validator/models/agent_runs.py @@ -6,10 +6,10 @@ class AgentRunStatus(str, Enum): - SUCCESS = "success" - INTERNAL_AGENT_ERROR = "internal_agent_error" - INVALID_SANDBOX_OUTPUT = "invalid_sandbox_output" - SANDBOX_TIMEOUT = "sandbox_timeout" + SUCCESS = "SUCCESS" + INTERNAL_AGENT_ERROR = "INTERNAL_AGENT_ERROR" + INVALID_SANDBOX_OUTPUT = "INVALID_SANDBOX_OUTPUT" + SANDBOX_TIMEOUT = "SANDBOX_TIMEOUT" class AgentRunExportedStatus(IntEnum): @@ -54,4 +54,4 @@ def parse_is_final_as_bool(cls, v: Any) -> bool: return v -RUN_FIELDS = AgentRunsModel.model_fields.keys() +AGENT_RUNS_FIELDS = AgentRunsModel.model_fields.keys() diff --git a/neurons/validator/models/numinous_client.py b/neurons/validator/models/numinous_client.py index 8e7d974..45c32ae 100644 --- a/neurons/validator/models/numinous_client.py +++ b/neurons/validator/models/numinous_client.py @@ -148,6 +148,22 @@ class GatewayCall(BaseModel): run_id: UUID +class AgentRunSubmission(BaseModel): + run_id: UUID + miner_uid: int + miner_hotkey: str + vali_uid: int + vali_hotkey: str + status: str + event_id: str + version_id: UUID + is_final: bool + + +class PostAgentRunsRequestBody(BaseModel): + runs: typing.List[AgentRunSubmission] + + class ChutesInferenceRequest(GatewayCall): model: ChuteModel = Field(..., description="Model to use for inference.") messages: list[Message] = Field(..., description="List of chat messages") diff --git a/neurons/validator/models/tests/test_agent_run_logs_models.py b/neurons/validator/models/tests/test_agent_run_logs_models.py new file mode 100644 index 0000000..5a58c55 --- /dev/null +++ b/neurons/validator/models/tests/test_agent_run_logs_models.py @@ -0,0 +1,207 @@ +from datetime import datetime + +import pytest +from pydantic import ValidationError + +from neurons.validator.models.agent_run_logs import ( + AGENT_RUN_LOGS_FIELDS, + AgentRunLogExportedStatus, + AgentRunLogsModel, +) + + +class TestAgentRunLogsModel: + def test_create_minimal(self): + # Minimal required fields + model = AgentRunLogsModel( + run_id="run_123", + log_content="Agent execution log content here", + ) + + assert model.run_id == "run_123" + assert model.log_content == "Agent execution log content here" + + # Defaults + assert model.exported is False + assert model.created_at is None + assert model.updated_at is None + + def test_create_full_success(self): + created = datetime(2024, 1, 1, 12, 0, 0) + updated = datetime(2024, 1, 1, 12, 30, 0) + + model = AgentRunLogsModel( + run_id="run_abc_123", + log_content="Full log content with details", + exported=True, + created_at=created, + updated_at=updated, + ) + + assert model.run_id == "run_abc_123" + assert model.log_content == "Full log content with details" + assert model.exported is True + assert model.created_at == created + assert model.updated_at == updated + + def test_create_with_large_log_content(self): + # Test with large log content (25KB) + large_log = "x" * 25000 + model = AgentRunLogsModel( + run_id="run_large", + log_content=large_log, + ) + + assert len(model.log_content) == 25000 + assert model.log_content == large_log + + def test_exported_int_to_bool(self): + # exported as integer should convert to bool + model = AgentRunLogsModel( + run_id="run_1", + log_content="Log content", + exported=1, + ) + assert model.exported is True + + model2 = AgentRunLogsModel( + run_id="run_2", + log_content="Log content 2", + exported=0, + ) + assert model2.exported is False + + def test_exported_bool_passthrough(self): + # exported as boolean should pass through + model = AgentRunLogsModel( + run_id="run_1", + log_content="Log content", + exported=True, + ) + assert model.exported is True + + model2 = AgentRunLogsModel( + run_id="run_2", + log_content="Log content 2", + exported=False, + ) + assert model2.exported is False + + def test_invalid_run_id_type(self): + # run_id must be string + with pytest.raises(ValidationError): + AgentRunLogsModel( + run_id=123, + log_content="Log content", + ) + + def test_invalid_log_content_type(self): + # log_content must be string + with pytest.raises(ValidationError): + AgentRunLogsModel( + run_id="run_1", + log_content=123, + ) + + def test_missing_required_run_id(self): + # run_id is required + with pytest.raises(ValidationError): + AgentRunLogsModel( + log_content="Log content", + ) + + def test_missing_required_log_content(self): + # log_content is required + with pytest.raises(ValidationError): + AgentRunLogsModel( + run_id="run_1", + ) + + def test_primary_key_property(self): + model = AgentRunLogsModel( + run_id="run_123", + log_content="Log content", + ) + assert model.primary_key == ["run_id"] + + def test_empty_log_content(self): + # Empty log content should be allowed (though unusual) + model = AgentRunLogsModel( + run_id="run_empty", + log_content="", + ) + assert model.log_content == "" + + def test_log_content_with_special_characters(self): + # Test log content with special characters + special_log = """Line 1: Normal text +Line 2: Special chars: !@#$%^&*() +Line 3: Unicode: 你好世界 🚀 +Line 4: Newlines and tabs\t\n""" + + model = AgentRunLogsModel( + run_id="run_special", + log_content=special_log, + ) + assert model.log_content == special_log + + def test_log_content_with_error_traceback(self): + # Simulate typical error log with traceback + error_log = """Traceback (most recent call last): + File "agent.py", line 42, in agent_main + result = process_event(event_data) + File "agent.py", line 10, in process_event + return data['missing_key'] +KeyError: 'missing_key' +""" + model = AgentRunLogsModel( + run_id="run_error", + log_content=error_log, + ) + assert "KeyError" in model.log_content + assert "Traceback" in model.log_content + + def test_model_serialization(self): + # Test that model can be serialized to dict + model = AgentRunLogsModel( + run_id="run_serialize", + log_content="Serialization test", + exported=True, + ) + + data = model.model_dump() + assert data["run_id"] == "run_serialize" + assert data["log_content"] == "Serialization test" + assert data["exported"] is True + + def test_model_deserialization(self): + # Test that model can be deserialized from dict + data = { + "run_id": "run_deserialize", + "log_content": "Deserialization test", + "exported": False, + "created_at": datetime(2024, 1, 1, 12, 0, 0), + "updated_at": datetime(2024, 1, 1, 12, 30, 0), + } + + model = AgentRunLogsModel(**data) + assert model.run_id == "run_deserialize" + assert model.log_content == "Deserialization test" + assert model.exported is False + + +class TestAgentRunLogExportedStatus: + def test_exported_status_enum_values(self): + assert AgentRunLogExportedStatus.NOT_EXPORTED == 0 + assert AgentRunLogExportedStatus.EXPORTED == 1 + + +class TestAgentRunLogsFields: + def test_fields_constant_exists(self): + # Verify AGENT_RUN_LOGS_FIELDS constant is defined + assert AGENT_RUN_LOGS_FIELDS is not None + + def test_fields_contains_all_model_fields(self): + # Verify all model fields are in the constant + expected_fields = {"run_id", "log_content", "exported", "created_at", "updated_at"} + assert set(AGENT_RUN_LOGS_FIELDS) == expected_fields diff --git a/neurons/validator/models/tests/test_agent_runs_models.py b/neurons/validator/models/tests/test_agent_runs_models.py index 59f2428..1f14820 100644 --- a/neurons/validator/models/tests/test_agent_runs_models.py +++ b/neurons/validator/models/tests/test_agent_runs_models.py @@ -232,10 +232,10 @@ def test_retry_scenario(self): class TestAgentRunStatus: def test_status_enum_values(self): - assert AgentRunStatus.SUCCESS.value == "success" - assert AgentRunStatus.INTERNAL_AGENT_ERROR.value == "internal_agent_error" - assert AgentRunStatus.INVALID_SANDBOX_OUTPUT.value == "invalid_sandbox_output" - assert AgentRunStatus.SANDBOX_TIMEOUT.value == "sandbox_timeout" + assert AgentRunStatus.SUCCESS.value == "SUCCESS" + assert AgentRunStatus.INTERNAL_AGENT_ERROR.value == "INTERNAL_AGENT_ERROR" + assert AgentRunStatus.INVALID_SANDBOX_OUTPUT.value == "INVALID_SANDBOX_OUTPUT" + assert AgentRunStatus.SANDBOX_TIMEOUT.value == "SANDBOX_TIMEOUT" class TestAgentRunExportedStatus: diff --git a/neurons/validator/models/tests/test_numinous_client_agents.py b/neurons/validator/models/tests/test_numinous_client_agents.py index b2127de..f7f2932 100644 --- a/neurons/validator/models/tests/test_numinous_client_agents.py +++ b/neurons/validator/models/tests/test_numinous_client_agents.py @@ -6,9 +6,11 @@ from pydantic import ValidationError from neurons.validator.models.numinous_client import ( + AgentRunSubmission, GetAgentsQueryParams, GetAgentsResponse, MinerAgentWithCode, + PostAgentRunsRequestBody, ) @@ -205,3 +207,157 @@ def test_from_json(self): assert len(response.items) == 2 assert response.items[0].miner_uid == 1 assert response.items[1].miner_uid == 2 + + +class TestAgentRunSubmission: + def test_create_valid_run(self): + run = AgentRunSubmission( + run_id=UUID("123e4567-e89b-12d3-a456-426614174000"), + miner_uid=10, + miner_hotkey="miner_hotkey_1", + vali_uid=5, + vali_hotkey="validator_hotkey", + status="SUCCESS", + event_id="event_123", + version_id=UUID("223e4567-e89b-12d3-a456-426614174001"), + is_final=True, + ) + + assert run.run_id == UUID("123e4567-e89b-12d3-a456-426614174000") + assert run.miner_uid == 10 + assert run.miner_hotkey == "miner_hotkey_1" + assert run.vali_uid == 5 + assert run.vali_hotkey == "validator_hotkey" + assert run.status == "SUCCESS" + assert run.event_id == "event_123" + assert run.version_id == UUID("223e4567-e89b-12d3-a456-426614174001") + assert run.is_final is True + + def test_create_from_json(self): + json_data = { + "run_id": "323e4567-e89b-12d3-a456-426614174002", + "miner_uid": 20, + "miner_hotkey": "miner_hotkey_2", + "vali_uid": 5, + "vali_hotkey": "validator_hotkey", + "status": "SANDBOX_TIMEOUT", + "event_id": "event_456", + "version_id": "423e4567-e89b-12d3-a456-426614174003", + "is_final": False, + } + + run = AgentRunSubmission.model_validate(json_data) + + assert run.run_id == UUID("323e4567-e89b-12d3-a456-426614174002") + assert run.miner_uid == 20 + assert run.status == "SANDBOX_TIMEOUT" + assert run.is_final is False + + def test_invalid_run_id(self): + with pytest.raises(ValidationError): + AgentRunSubmission( + run_id="not-a-uuid", + miner_uid=10, + miner_hotkey="miner_hotkey", + vali_uid=5, + vali_hotkey="validator_hotkey", + status="SUCCESS", + event_id="event_123", + version_id=UUID("223e4567-e89b-12d3-a456-426614174001"), + is_final=True, + ) + + def test_invalid_version_id(self): + with pytest.raises(ValidationError): + AgentRunSubmission( + run_id=UUID("123e4567-e89b-12d3-a456-426614174000"), + miner_uid=10, + miner_hotkey="miner_hotkey", + vali_uid=5, + vali_hotkey="validator_hotkey", + status="SUCCESS", + event_id="event_123", + version_id="not-a-uuid", + is_final=True, + ) + + +class TestPostAgentRunsRequestBody: + def test_empty_runs_list(self): + body = PostAgentRunsRequestBody(runs=[]) + + assert body.runs == [] + + def test_single_run(self): + run = AgentRunSubmission( + run_id=UUID("523e4567-e89b-12d3-a456-426614174004"), + miner_uid=30, + miner_hotkey="miner_hotkey_3", + vali_uid=5, + vali_hotkey="validator_hotkey", + status="INTERNAL_AGENT_ERROR", + event_id="event_789", + version_id=UUID("623e4567-e89b-12d3-a456-426614174005"), + is_final=True, + ) + + body = PostAgentRunsRequestBody(runs=[run]) + + assert len(body.runs) == 1 + assert body.runs[0].run_id == UUID("523e4567-e89b-12d3-a456-426614174004") + assert body.runs[0].status == "INTERNAL_AGENT_ERROR" + + def test_multiple_runs(self): + runs = [ + AgentRunSubmission( + run_id=UUID("723e4567-e89b-12d3-a456-426614174006"), + miner_uid=10, + miner_hotkey="miner_1", + vali_uid=5, + vali_hotkey="validator_hotkey", + status="SUCCESS", + event_id="event_1", + version_id=UUID("823e4567-e89b-12d3-a456-426614174007"), + is_final=True, + ), + AgentRunSubmission( + run_id=UUID("923e4567-e89b-12d3-a456-426614174008"), + miner_uid=20, + miner_hotkey="miner_2", + vali_uid=5, + vali_hotkey="validator_hotkey", + status="SANDBOX_TIMEOUT", + event_id="event_2", + version_id=UUID("a23e4567-e89b-12d3-a456-426614174009"), + is_final=False, + ), + ] + + body = PostAgentRunsRequestBody(runs=runs) + + assert len(body.runs) == 2 + assert body.runs[0].miner_uid == 10 + assert body.runs[1].miner_uid == 20 + + def test_from_json(self): + json_data = { + "runs": [ + { + "run_id": "b23e4567-e89b-12d3-a456-42661417400a", + "miner_uid": 40, + "miner_hotkey": "miner_hotkey_4", + "vali_uid": 5, + "vali_hotkey": "validator_hotkey", + "status": "SUCCESS", + "event_id": "event_abc", + "version_id": "c23e4567-e89b-12d3-a456-42661417400b", + "is_final": True, + } + ] + } + + body = PostAgentRunsRequestBody.model_validate(json_data) + + assert len(body.runs) == 1 + assert body.runs[0].miner_uid == 40 + assert body.runs[0].event_id == "event_abc" diff --git a/neurons/validator/numinous_client/client.py b/neurons/validator/numinous_client/client.py index 48b2bea..eca9b39 100644 --- a/neurons/validator/numinous_client/client.py +++ b/neurons/validator/numinous_client/client.py @@ -17,6 +17,7 @@ GetEventsResolvedResponse, GetEventsResponse, PostAgentLogsRequestBody, + PostAgentRunsRequestBody, PostPredictionsRequestBody, PostScoresRequestBody, ) @@ -231,6 +232,26 @@ async def post_agent_logs(self, body: PostAgentLogsRequestBody): return await response.json() + async def post_agent_runs(self, body: PostAgentRunsRequestBody): + if not isinstance(body, PostAgentRunsRequestBody): + raise ValueError("Invalid parameter") + + assert len(body.runs) > 0 + + data = body.model_dump_json() + + auth_headers = self.make_auth_headers(data=data) + + async with self.create_session( + other_headers={**auth_headers, "Content-Type": "application/json"} + ) as session: + path = "/api/v1/validators/agents/runs" + + async with session.post(path, data=data) as response: + response.raise_for_status() + + return await response.json() + async def get_agents(self, offset: int, limit: int): if offset is None or limit is None: raise ValueError("Invalid parameters") diff --git a/neurons/validator/numinous_client/tests/test_numinous_client.py b/neurons/validator/numinous_client/tests/test_numinous_client.py index 60d01b3..858cc83 100644 --- a/neurons/validator/numinous_client/tests/test_numinous_client.py +++ b/neurons/validator/numinous_client/tests/test_numinous_client.py @@ -22,6 +22,7 @@ GetEventsResolvedResponse, GetEventsResponse, PostAgentLogsRequestBody, + PostAgentRunsRequestBody, PostPredictionsRequestBody, PostScoresRequestBody, ) @@ -685,6 +686,96 @@ async def test_post_agent_logs_error_raised(self, client_test_env: NuminousClien assert e.value.status == status_code + async def test_post_agent_runs(self, client_test_env: NuminousClient): + mock_response_data = {} + + request_body = PostAgentRunsRequestBody.model_validate( + { + "runs": [ + { + "run_id": "123e4567-e89b-12d3-a456-426614174000", + "miner_uid": 10, + "miner_hotkey": "miner_hotkey_1", + "vali_uid": 5, + "vali_hotkey": "validator_hotkey", + "status": "SUCCESS", + "event_id": "event_123", + "version_id": "223e4567-e89b-12d3-a456-426614174001", + "is_final": True, + }, + { + "run_id": "323e4567-e89b-12d3-a456-426614174002", + "miner_uid": 20, + "miner_hotkey": "miner_hotkey_2", + "vali_uid": 5, + "vali_hotkey": "validator_hotkey", + "status": "SANDBOX_TIMEOUT", + "event_id": "event_456", + "version_id": "423e4567-e89b-12d3-a456-426614174003", + "is_final": False, + }, + ] + } + ) + + with aioresponses() as mocked: + url_path = "/api/v1/validators/agents/runs" + + mocked.post( + url_path, + status=204, + body=json.dumps(mock_response_data).encode("utf-8"), + ) + + result = await client_test_env.post_agent_runs(body=request_body) + + mocked.assert_called_with( + url=url_path, method="POST", data=request_body.model_dump_json() + ) + + assert result == mock_response_data + + async def test_post_agent_runs_error_raised(self, client_test_env: NuminousClient): + mock_response_data = {"error": "Failed to process runs"} + + request_body = PostAgentRunsRequestBody.model_validate( + { + "runs": [ + { + "run_id": "523e4567-e89b-12d3-a456-426614174004", + "miner_uid": 30, + "miner_hotkey": "miner_hotkey_3", + "vali_uid": 5, + "vali_hotkey": "validator_hotkey", + "status": "INTERNAL_AGENT_ERROR", + "event_id": "event_789", + "version_id": "623e4567-e89b-12d3-a456-426614174005", + "is_final": True, + } + ] + } + ) + + status_code = 500 + + with aioresponses() as mocked: + url_path = "/api/v1/validators/agents/runs" + + mocked.post( + url_path, + status=status_code, + body=json.dumps(mock_response_data).encode("utf-8"), + ) + + with pytest.raises(ClientResponseError) as e: + await client_test_env.post_agent_runs(body=request_body) + + mocked.assert_called_with( + url=url_path, method="POST", data=request_body.model_dump_json() + ) + + assert e.value.status == status_code + @pytest.mark.parametrize( "offset,limit", [ diff --git a/neurons/validator/sandbox/manager.py b/neurons/validator/sandbox/manager.py index d5389ed..bb2e7a3 100644 --- a/neurons/validator/sandbox/manager.py +++ b/neurons/validator/sandbox/manager.py @@ -14,7 +14,7 @@ from bittensor_wallet import Wallet from neurons.validator.sandbox.agent_models import AgentOutput, AgentRunnerOutput, RunStatus -from neurons.validator.sandbox.models import SandboxResult, SandboxState +from neurons.validator.sandbox.models import SandboxErrorType, SandboxResult, SandboxState from neurons.validator.sandbox.utils.docker import build_docker_image, image_exists from neurons.validator.sandbox.utils.temp import cleanup_temp_dir, create_temp_dir from neurons.validator.utils.logger.logger import NuminousLogger @@ -383,12 +383,15 @@ def _run_sandbox(self, sandbox_id: str) -> None: self.logger.warning("Sandbox not found", extra={"sandbox_id": sandbox_id}) return - def finish_with_error(error_msg: str, result: SandboxResult) -> None: + def finish_with_error( + error_msg: str, result: SandboxResult, error_type: SandboxErrorType + ) -> None: self.logger.warning( "Sandbox failed", extra={"sandbox_id": sandbox_id, "error": error_msg} ) result.status = "error" result.error = error_msg + result.error_type = error_type try: sandbox.on_finish(result.model_dump()) except Exception as e: @@ -454,7 +457,7 @@ def finish_with_error(error_msg: str, result: SandboxResult) -> None: ) result.logs = f"Failed to capture partial logs on timeout: {e}" - finish_with_error("Timeout exceeded", result) + finish_with_error("Timeout exceeded", result, error_type=SandboxErrorType.TIMEOUT) return self.logger.debug("Sandbox finished", extra={"sandbox_id": sandbox_id}) @@ -479,7 +482,9 @@ def finish_with_error(error_msg: str, result: SandboxResult) -> None: except Exception as e: result.traceback = traceback.format_exc() - finish_with_error(f"Container error: {e}", result) + finish_with_error( + f"Container error: {e}", result, error_type=SandboxErrorType.CONTAINER_ERROR + ) return # Read output.json @@ -488,7 +493,11 @@ def finish_with_error(error_msg: str, result: SandboxResult) -> None: output_dict = json.loads(output_path.read_text()) self.logger.debug("Read output.json", extra={"sandbox_id": sandbox_id}) except Exception as e: - finish_with_error(f"Failed to read output.json: {e}", result) + finish_with_error( + f"Failed to read output.json: {e}", + result, + error_type=SandboxErrorType.INVALID_OUTPUT, + ) return # Validate output structure @@ -496,13 +505,21 @@ def finish_with_error(error_msg: str, result: SandboxResult) -> None: output = AgentRunnerOutput(**output_dict) self.logger.debug("Validated output with Pydantic", extra={"sandbox_id": sandbox_id}) except Exception as e: - finish_with_error(f"Invalid output.json structure: {e}", result) + finish_with_error( + f"Invalid output.json structure: {e}", + result, + error_type=SandboxErrorType.INVALID_OUTPUT, + ) return # Handle success or error status if output.status == RunStatus.SUCCESS: if output.output is None: - finish_with_error("output.json has status='success' but no 'output' field", result) + finish_with_error( + "output.json has status='success' but no 'output' field", + result, + error_type=SandboxErrorType.INVALID_OUTPUT, + ) return try: @@ -516,18 +533,30 @@ def finish_with_error(error_msg: str, result: SandboxResult) -> None: }, ) except Exception as e: - finish_with_error(f"Invalid agent output structure: {e}", result) + finish_with_error( + f"Invalid agent output structure: {e}", + result, + error_type=SandboxErrorType.INVALID_OUTPUT, + ) return elif output.status == RunStatus.ERROR: if output.error is None: - finish_with_error("output.json has status='error' but no 'error' field", result) + finish_with_error( + "output.json has status='error' but no 'error' field", + result, + error_type=SandboxErrorType.INVALID_OUTPUT, + ) return result.traceback = output.traceback - finish_with_error(output.error, result) + finish_with_error(output.error, result, error_type=SandboxErrorType.AGENT_ERROR) return else: - finish_with_error(f"Invalid status in output.json: {output.status}", result) + finish_with_error( + f"Invalid status in output.json: {output.status}", + result, + error_type=SandboxErrorType.INVALID_OUTPUT, + ) return # Success - call on_finish diff --git a/neurons/validator/sandbox/models.py b/neurons/validator/sandbox/models.py index 530ebb9..438bbfb 100644 --- a/neurons/validator/sandbox/models.py +++ b/neurons/validator/sandbox/models.py @@ -1,9 +1,17 @@ +from enum import Enum from typing import Any, Callable, Dict, Optional from docker.models.containers import Container from pydantic import BaseModel, ConfigDict, Field +class SandboxErrorType(str, Enum): + TIMEOUT = "timeout" + CONTAINER_ERROR = "container_error" + INVALID_OUTPUT = "invalid_output" + AGENT_ERROR = "agent_error" + + class SandboxState(BaseModel): temp_dir: str = Field(..., description="Path to temporary directory") run_id: str = Field(..., description="Unique run ID for this sandbox") @@ -22,3 +30,7 @@ class SandboxResult(BaseModel): logs: str = Field(default="", description="Container logs") error: Optional[str] = Field(default=None, description="Error message if failed") traceback: Optional[str] = Field(default=None, description="Python traceback if failed") + error_type: Optional[SandboxErrorType] = Field( + default=None, + description="Error category from SandboxErrorType enum", + ) diff --git a/neurons/validator/sandbox/tests/test_manager.py b/neurons/validator/sandbox/tests/test_manager.py index fe7a76b..ddf4c2c 100644 --- a/neurons/validator/sandbox/tests/test_manager.py +++ b/neurons/validator/sandbox/tests/test_manager.py @@ -50,7 +50,7 @@ def test_native_docker_timeout_used( manager.docker_client.containers.run = MagicMock(return_value=mock_container) temp_dir = tempfile.mkdtemp(prefix="test_sandbox_") (Path(temp_dir) / "output.json").write_text( - json.dumps({"status": "success", "output": {"event_id": "test", "prediction": 0.5}}) + json.dumps({"status": "SUCCESS", "output": {"event_id": "test", "prediction": 0.5}}) ) sandbox_id = "sandbox_test" @@ -113,7 +113,7 @@ def test_log_exception_handled_gracefully( manager.docker_client.containers.run = MagicMock(return_value=mock_container) temp_dir = tempfile.mkdtemp(prefix="test_sandbox_") (Path(temp_dir) / "output.json").write_text( - json.dumps({"status": "success", "output": {"event_id": "test", "prediction": 0.5}}) + json.dumps({"status": "SUCCESS", "output": {"event_id": "test", "prediction": 0.5}}) ) manager.sandboxes["sandbox_test"] = SandboxState( diff --git a/neurons/validator/tasks/export_agent_runs.py b/neurons/validator/tasks/export_agent_runs.py new file mode 100644 index 0000000..721929a --- /dev/null +++ b/neurons/validator/tasks/export_agent_runs.py @@ -0,0 +1,109 @@ +from uuid import UUID + +from neurons.validator.db.operations import DatabaseOperations +from neurons.validator.models.agent_runs import AgentRunsModel +from neurons.validator.models.numinous_client import AgentRunSubmission, PostAgentRunsRequestBody +from neurons.validator.numinous_client.client import NuminousClient +from neurons.validator.scheduler.task import AbstractTask +from neurons.validator.utils.logger.logger import NuminousLogger + + +class ExportAgentRuns(AbstractTask): + interval: float + batch_size: int + db_operations: DatabaseOperations + api_client: NuminousClient + logger: NuminousLogger + validator_uid: int + validator_hotkey: str + + def __init__( + self, + interval_seconds: float, + batch_size: int, + db_operations: DatabaseOperations, + api_client: NuminousClient, + logger: NuminousLogger, + validator_uid: int, + validator_hotkey: str, + ): + if not isinstance(interval_seconds, float) or interval_seconds <= 0: + raise ValueError("interval_seconds must be a positive number (float).") + + if not isinstance(db_operations, DatabaseOperations): + raise TypeError("db_operations must be an instance of DatabaseOperations.") + + self.interval = interval_seconds + self.batch_size = batch_size + self.db_operations = db_operations + self.api_client = api_client + self.validator_uid = validator_uid + self.validator_hotkey = validator_hotkey + + self.errors_count = 0 + self.logger = logger + + @property + def name(self) -> str: + return "export-agent-runs" + + @property + def interval_seconds(self) -> float: + return self.interval + + def prepare_runs_payload(self, db_runs: list[AgentRunsModel]) -> PostAgentRunsRequestBody: + runs = [] + + for db_run in db_runs: + run = AgentRunSubmission( + run_id=UUID(db_run.run_id), + miner_uid=db_run.miner_uid, + miner_hotkey=db_run.miner_hotkey, + vali_uid=self.validator_uid, + vali_hotkey=self.validator_hotkey, + status=db_run.status.value, + event_id=db_run.unique_event_id, + version_id=UUID(db_run.agent_version_id), + is_final=db_run.is_final, + ) + runs.append(run) + + return PostAgentRunsRequestBody(runs=runs) + + async def export_runs_to_backend(self, payload: PostAgentRunsRequestBody) -> None: + await self.api_client.post_agent_runs(body=payload) + + self.logger.debug( + "Exported runs to backend", + extra={"n_runs": len(payload.runs)}, + ) + + async def run(self) -> None: + unexported_runs = await self.db_operations.get_unexported_agent_runs(limit=self.batch_size) + + if not unexported_runs: + self.logger.debug("No unexported runs to export") + else: + self.logger.debug( + "Found unexported runs to export", + extra={"n_runs": len(unexported_runs)}, + ) + + payload = self.prepare_runs_payload(db_runs=unexported_runs) + + try: + await self.export_runs_to_backend(payload) + except Exception: + self.errors_count += 1 + self.logger.exception("Failed to export runs to backend") + return + + run_ids = [run.run_id for run in unexported_runs] + await self.db_operations.mark_agent_runs_as_exported(run_ids=run_ids) + + self.logger.debug( + "Export runs task completed", + extra={"errors_count": self.errors_count}, + ) + + self.errors_count = 0 diff --git a/neurons/validator/tasks/run_agents.py b/neurons/validator/tasks/run_agents.py index 6732729..5823519 100644 --- a/neurons/validator/tasks/run_agents.py +++ b/neurons/validator/tasks/run_agents.py @@ -6,11 +6,13 @@ from typing import List, Optional from neurons.validator.db.operations import DatabaseOperations +from neurons.validator.models.agent_runs import AgentRunsModel, AgentRunStatus from neurons.validator.models.miner_agent import MinerAgentsModel from neurons.validator.models.numinous_client import PostAgentLogsRequestBody from neurons.validator.models.prediction import PredictionsModel from neurons.validator.numinous_client.client import NuminousClient from neurons.validator.sandbox import SandboxManager +from neurons.validator.sandbox.models import SandboxErrorType from neurons.validator.scheduler.task import AbstractTask from neurons.validator.utils.common.converters import torch_or_numpy_to_int from neurons.validator.utils.common.interval import ( @@ -21,7 +23,8 @@ from neurons.validator.utils.logger.logger import NuminousLogger TITLE_SEPARATOR = " ==Further Information==: " -MAX_LOG_CHARS = 25_000 # ~25KB limit for logs +MAX_LOG_CHARS = 25_000 +MAX_TIMEOUT_RETRIES = 3 class RunAgents(AbstractTask): @@ -306,9 +309,16 @@ async def post_agent_logs(self, run_id: str, logs: str) -> None: extra={"run_id": run_id, "error": str(e)}, ) - def validate_sandbox_result( - self, result: dict, event_id: str, agent_version_id: str, run_id: str - ) -> Optional[float]: + def _determine_status_and_extract_prediction( + self, + result: Optional[dict], + event_id: str, + agent_version_id: str, + run_id: str, + ) -> tuple[AgentRunStatus, Optional[float]]: + if result is None: + return (AgentRunStatus.SANDBOX_TIMEOUT, None) + if not isinstance(result, dict): self.logger.error( "Invalid result type from sandbox", @@ -318,9 +328,11 @@ def validate_sandbox_result( "result_type": type(result).__name__, }, ) - return + return (AgentRunStatus.INVALID_SANDBOX_OUTPUT, None) + # Handle error status from sandbox if result.get("status") == "error": + error_type = result.get("error_type") self.logger.error( "Agent execution failed", extra={ @@ -328,10 +340,26 @@ def validate_sandbox_result( "agent_version_id": agent_version_id, "run_id": run_id, "error": result.get("error", "Unknown error"), + "error_type": error_type, }, ) - return + if error_type == SandboxErrorType.TIMEOUT: + return (AgentRunStatus.SANDBOX_TIMEOUT, None) + elif error_type == SandboxErrorType.CONTAINER_ERROR: + return (AgentRunStatus.SANDBOX_TIMEOUT, None) + elif error_type == SandboxErrorType.INVALID_OUTPUT: + return (AgentRunStatus.INVALID_SANDBOX_OUTPUT, None) + elif error_type == SandboxErrorType.AGENT_ERROR: + return (AgentRunStatus.INTERNAL_AGENT_ERROR, None) + else: + self.logger.warning( + "Unknown error_type from sandbox, defaulting to INTERNAL_AGENT_ERROR", + extra={"error_type": error_type, "error": result.get("error")}, + ) + return (AgentRunStatus.INTERNAL_AGENT_ERROR, None) + + # Handle success status - validate output structure output = result.get("output") if not isinstance(output, dict): self.logger.error( @@ -342,7 +370,7 @@ def validate_sandbox_result( "result": str(result), }, ) - return + return (AgentRunStatus.INVALID_SANDBOX_OUTPUT, None) if "prediction" not in output: self.logger.error( @@ -353,7 +381,7 @@ def validate_sandbox_result( "output": str(output), }, ) - return + return (AgentRunStatus.INVALID_SANDBOX_OUTPUT, None) prediction_value = output["prediction"] if not isinstance(prediction_value, (int, float)): @@ -366,9 +394,38 @@ def validate_sandbox_result( "type": type(prediction_value).__name__, }, ) - return + return (AgentRunStatus.INVALID_SANDBOX_OUTPUT, None) - return float(prediction_value) + return (AgentRunStatus.SUCCESS, float(prediction_value)) + + async def _create_agent_run( + self, + run_id: str, + event_id: str, + agent: MinerAgentsModel, + status: AgentRunStatus, + ) -> AgentRunsModel: + if status != AgentRunStatus.SANDBOX_TIMEOUT: + is_final = True + else: + timeout_count = await self.db_operations.count_runs_for_event_and_agent( + unique_event_id=event_id, + agent_version_id=agent.version_id, + status=AgentRunStatus.SANDBOX_TIMEOUT, + is_final=False, + ) + is_final = timeout_count >= MAX_TIMEOUT_RETRIES - 1 + + return AgentRunsModel( + run_id=run_id, + unique_event_id=event_id, + agent_version_id=agent.version_id, + miner_uid=agent.miner_uid, + miner_hotkey=agent.miner_hotkey, + status=status, + exported=False, + is_final=is_final, + ) async def execute_agent_for_event( self, @@ -434,24 +491,32 @@ async def execute_agent_for_event( await self.post_agent_logs(run_id, logs) - if result is None: - self.logger.error( - "Sandbox execution failed or timed out", + run_status, prediction_value = self._determine_status_and_extract_prediction( + result, event_id, agent.version_id, run_id + ) + + agent_run = await self._create_agent_run( + run_id=run_id, + event_id=event_id, + agent=agent, + status=run_status, + ) + await self.db_operations.upsert_agent_runs([agent_run]) + + if run_status == AgentRunStatus.SUCCESS and prediction_value is not None: + await self.store_prediction( + event_id, agent, prediction_value, run_id, interval_start_minutes + ) + else: + self.logger.info( + "Agent execution completed with non-success status", extra={ "event_id": event_id, "agent_version_id": agent.version_id, "run_id": run_id, + "status": run_status.value, }, ) - return - - prediction_value = self.validate_sandbox_result(result, event_id, agent.version_id, run_id) - if prediction_value is None: - return - - await self.store_prediction( - event_id, agent, prediction_value, run_id, interval_start_minutes - ) async def execute_all( self, events: List[tuple], agents: List[MinerAgentsModel], interval_start_minutes: int @@ -534,7 +599,21 @@ async def execute_with_semaphore( ) return - # No prediction exists - execute agent + has_final_run = await self.db_operations.has_final_run( + unique_event_id=event_id, + agent_version_id=agent.version_id, + ) + + if has_final_run: + self.logger.debug( + "Skipping execution - final run exists", + extra={ + "event_id": event_id, + "agent_version_id": agent.version_id, + }, + ) + return + await self.execute_agent_for_event( event_id=event_id, agent=agent, diff --git a/neurons/validator/tasks/scoring.py b/neurons/validator/tasks/scoring.py index 67bb2f5..37b002c 100644 --- a/neurons/validator/tasks/scoring.py +++ b/neurons/validator/tasks/scoring.py @@ -302,7 +302,7 @@ def return_empty_scores_df(self, reason: str, event_id: str) -> pd.DataFrame: ] ) - def fill_unresponsive_miners( + def fill_missing_predictions( self, interval_scores: pd.DataFrame, imputed_prediction: float = 0.5 ) -> pd.DataFrame: interval_scores_df = interval_scores.copy() @@ -310,14 +310,9 @@ def fill_unresponsive_miners( ScoreNames.interval_agg_prediction ].astype("Float64") - # for miners with registered_date_minutes < interval_start but no answer: - unresponsive_miners = ( - interval_scores_df[ScoreNames.miner_registered_minutes] - < interval_scores_df[ScoreNames.interval_start] - ) & (interval_scores_df[ScoreNames.interval_agg_prediction].isnull()) - + missing_predictions = interval_scores_df[ScoreNames.interval_agg_prediction].isnull() interval_scores_df.loc[ - unresponsive_miners, ScoreNames.interval_agg_prediction + missing_predictions, ScoreNames.interval_agg_prediction ] = imputed_prediction return interval_scores_df @@ -376,11 +371,8 @@ async def score_event( "No intervals to score - event discarded.", event.event_id ) - # do not score miners which registered after the scoring window started - miners = self.miners_last_reg[ - self.miners_last_reg[ScoreNames.miner_registered_minutes] - <= scoring_window_start_minutes - ].copy() + # Score all miners in the metagraph - those without predictions get 0.5 imputed + miners = self.miners_last_reg.copy() if miners.empty: return self.return_empty_scores_df("No miners to score.", event.event_id) @@ -393,8 +385,8 @@ async def score_event( predictions_df=predictions_df, miners=miners, intervals=intervals ) - # Fill unresponsive miners - interval_scores_df = self.fill_unresponsive_miners( + # Fill missing predictions with 0.5 for miners without predictions + interval_scores_df = self.fill_missing_predictions( interval_scores_df, imputed_prediction=0.5 ) diff --git a/neurons/validator/tasks/tests/test_export_agent_runs.py b/neurons/validator/tasks/tests/test_export_agent_runs.py new file mode 100644 index 0000000..592b901 --- /dev/null +++ b/neurons/validator/tasks/tests/test_export_agent_runs.py @@ -0,0 +1,314 @@ +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock +from uuid import UUID + +import pytest +from bittensor_wallet import Wallet + +from neurons.validator.db.client import DatabaseClient +from neurons.validator.db.operations import DatabaseOperations +from neurons.validator.models.agent_runs import AgentRunsModel, AgentRunStatus +from neurons.validator.models.event import EventsModel, EventStatus +from neurons.validator.models.miner_agent import MinerAgentsModel +from neurons.validator.models.numinous_client import AgentRunSubmission, PostAgentRunsRequestBody +from neurons.validator.numinous_client.client import NuminousClient +from neurons.validator.tasks.export_agent_runs import ExportAgentRuns +from neurons.validator.utils.logger.logger import NuminousLogger + + +class TestExportAgentRuns: + async def _create_event(self, db_operations: DatabaseOperations, unique_event_id: str) -> None: + """Helper to create an event for FK constraint""" + event = EventsModel( + unique_event_id=unique_event_id, + event_id=f"event_{unique_event_id}", + market_type="test_market", + event_type="test_type", + description="Test event", + outcome=None, + status=EventStatus.PENDING, + metadata="{}", + created_at="2024-01-01T00:00:00+00:00", + cutoff="2024-12-31T23:59:59+00:00", + ) + await db_operations.upsert_events([event]) + + async def _create_miner_agent( + self, db_operations: DatabaseOperations, version_id: str, miner_uid: int, miner_hotkey: str + ) -> None: + """Helper to create a miner agent for FK constraint""" + agent = MinerAgentsModel( + version_id=version_id, + miner_uid=miner_uid, + miner_hotkey=miner_hotkey, + agent_name="TestAgent", + version_number=1, + file_path=f"/data/agents/{miner_uid}/test.py", + pulled_at=datetime(2024, 1, 1, 10, 0, 0, tzinfo=timezone.utc), + created_at=datetime(2024, 1, 1, 9, 0, 0, tzinfo=timezone.utc), + ) + await db_operations.upsert_miner_agents([agent]) + + @pytest.fixture + def db_operations(self, db_client: DatabaseClient): + logger = MagicMock(spec=NuminousLogger) + + return DatabaseOperations(db_client=db_client, logger=logger) + + @pytest.fixture + def bt_wallet(self): + hotkey_mock = MagicMock() + hotkey_mock.sign = MagicMock(side_effect=lambda x: x.encode("utf-8")) + hotkey_mock.ss58_address = "validator_hotkey_test" + + bt_wallet = MagicMock(spec=Wallet) + bt_wallet.get_hotkey = MagicMock(return_value=hotkey_mock) + bt_wallet.hotkey.ss58_address = "validator_hotkey_test" + + return bt_wallet + + @pytest.fixture + def export_agent_runs_task( + self, + db_operations: DatabaseOperations, + bt_wallet: Wallet, + ): + api_client = NuminousClient( + env="test", logger=MagicMock(spec=NuminousLogger), bt_wallet=bt_wallet + ) + logger = MagicMock(spec=NuminousLogger) + + return ExportAgentRuns( + interval_seconds=180.0, + batch_size=1000, + db_operations=db_operations, + api_client=api_client, + logger=logger, + validator_uid=5, + validator_hotkey=bt_wallet.hotkey.ss58_address, + ) + + def test_init(self, export_agent_runs_task): + unit = export_agent_runs_task + + assert isinstance(unit, ExportAgentRuns) + assert unit.interval == 180.0 + assert unit.interval_seconds == 180.0 + assert unit.batch_size == 1000 + assert unit.errors_count == 0 + assert unit.validator_uid == 5 + assert unit.validator_hotkey == "validator_hotkey_test" + + def test_prepare_runs_payload_single_run(self, export_agent_runs_task: ExportAgentRuns): + db_runs = [ + AgentRunsModel( + run_id="123e4567-e89b-12d3-a456-426614174000", + unique_event_id="event_123", + agent_version_id="223e4567-e89b-12d3-a456-426614174001", + miner_uid=10, + miner_hotkey="miner_hotkey_1", + status=AgentRunStatus.SUCCESS, + exported=False, + is_final=True, + ) + ] + + payload = export_agent_runs_task.prepare_runs_payload(db_runs) + + assert isinstance(payload, PostAgentRunsRequestBody) + assert len(payload.runs) == 1 + + run = payload.runs[0] + assert isinstance(run, AgentRunSubmission) + assert run.run_id == UUID("123e4567-e89b-12d3-a456-426614174000") + assert run.miner_uid == 10 + assert run.miner_hotkey == "miner_hotkey_1" + assert run.vali_uid == 5 + assert run.vali_hotkey == "validator_hotkey_test" + assert run.status == "SUCCESS" + assert run.event_id == "event_123" + assert run.version_id == UUID("223e4567-e89b-12d3-a456-426614174001") + assert run.is_final is True + + def test_prepare_runs_payload_multiple_runs(self, export_agent_runs_task: ExportAgentRuns): + db_runs = [ + AgentRunsModel( + run_id="323e4567-e89b-12d3-a456-426614174002", + unique_event_id="event_1", + agent_version_id="423e4567-e89b-12d3-a456-426614174003", + miner_uid=1, + miner_hotkey="miner_1", + status=AgentRunStatus.SUCCESS, + exported=False, + is_final=True, + ), + AgentRunsModel( + run_id="523e4567-e89b-12d3-a456-426614174004", + unique_event_id="event_2", + agent_version_id="623e4567-e89b-12d3-a456-426614174005", + miner_uid=2, + miner_hotkey="miner_2", + status=AgentRunStatus.SANDBOX_TIMEOUT, + exported=False, + is_final=False, + ), + AgentRunsModel( + run_id="723e4567-e89b-12d3-a456-426614174006", + unique_event_id="event_3", + agent_version_id="823e4567-e89b-12d3-a456-426614174007", + miner_uid=3, + miner_hotkey="miner_3", + status=AgentRunStatus.INTERNAL_AGENT_ERROR, + exported=False, + is_final=True, + ), + ] + + payload = export_agent_runs_task.prepare_runs_payload(db_runs) + + assert len(payload.runs) == 3 + assert payload.runs[0].run_id == UUID("323e4567-e89b-12d3-a456-426614174002") + assert payload.runs[0].status == "SUCCESS" + assert payload.runs[0].is_final is True + assert payload.runs[1].run_id == UUID("523e4567-e89b-12d3-a456-426614174004") + assert payload.runs[1].status == "SANDBOX_TIMEOUT" + assert payload.runs[1].is_final is False + assert payload.runs[2].run_id == UUID("723e4567-e89b-12d3-a456-426614174006") + assert payload.runs[2].status == "INTERNAL_AGENT_ERROR" + assert payload.runs[2].is_final is True + + async def test_export_runs_to_backend(self, export_agent_runs_task: ExportAgentRuns): + unit = export_agent_runs_task + unit.api_client.post_agent_runs = AsyncMock(return_value=True) + + dummy_payload = PostAgentRunsRequestBody( + runs=[ + AgentRunSubmission( + run_id=UUID("923e4567-e89b-12d3-a456-426614174008"), + miner_uid=1, + miner_hotkey="miner_hotkey", + vali_uid=5, + vali_hotkey="validator_hotkey_test", + status="SUCCESS", + event_id="event_export", + version_id=UUID("a23e4567-e89b-12d3-a456-426614174009"), + is_final=True, + ) + ] + ) + + await export_agent_runs_task.export_runs_to_backend(dummy_payload) + export_agent_runs_task.logger.debug.assert_called_with( + "Exported runs to backend", + extra={"n_runs": 1}, + ) + + assert unit.api_client.post_agent_runs.call_count == 1 + assert unit.api_client.post_agent_runs.call_args.kwargs["body"] == dummy_payload + + async def test_run_no_unexported_runs(self, export_agent_runs_task: ExportAgentRuns): + export_agent_runs_task.api_client = AsyncMock(spec=NuminousClient) + + await export_agent_runs_task.run() + + export_agent_runs_task.logger.debug.assert_any_call("No unexported runs to export") + export_agent_runs_task.api_client.post_agent_runs.assert_not_called() + + async def test_run_with_unexported_runs( + self, + export_agent_runs_task: ExportAgentRuns, + db_operations: DatabaseOperations, + db_client: DatabaseClient, + ): + unit = export_agent_runs_task + unit.api_client.post_agent_runs = AsyncMock(return_value=True) + + await self._create_event(db_operations, "event_1") + await self._create_event(db_operations, "event_2") + await self._create_miner_agent( + db_operations, "c23e4567-e89b-12d3-a456-42661417400b", 10, "miner_hotkey_1" + ) + await self._create_miner_agent( + db_operations, "e23e4567-e89b-12d3-a456-42661417400d", 20, "miner_hotkey_2" + ) + + runs = [ + AgentRunsModel( + run_id="b23e4567-e89b-12d3-a456-42661417400a", + unique_event_id="event_1", + agent_version_id="c23e4567-e89b-12d3-a456-42661417400b", + miner_uid=10, + miner_hotkey="miner_hotkey_1", + status=AgentRunStatus.SUCCESS, + exported=False, + is_final=True, + ), + AgentRunsModel( + run_id="d23e4567-e89b-12d3-a456-42661417400c", + unique_event_id="event_2", + agent_version_id="e23e4567-e89b-12d3-a456-42661417400d", + miner_uid=20, + miner_hotkey="miner_hotkey_2", + status=AgentRunStatus.SANDBOX_TIMEOUT, + exported=False, + is_final=False, + ), + ] + + await db_operations.upsert_agent_runs(runs) + + await unit.run() + + unit.api_client.post_agent_runs.assert_called_once() + call_args = unit.api_client.post_agent_runs.call_args.kwargs + payload = call_args["body"] + + assert len(payload.runs) == 2 + assert payload.runs[0].run_id == UUID("b23e4567-e89b-12d3-a456-42661417400a") + assert payload.runs[0].miner_uid == 10 + assert payload.runs[0].status == "SUCCESS" + assert payload.runs[0].is_final is True + assert payload.runs[1].run_id == UUID("d23e4567-e89b-12d3-a456-42661417400c") + assert payload.runs[1].miner_uid == 20 + assert payload.runs[1].status == "SANDBOX_TIMEOUT" + assert payload.runs[1].is_final is False + + result = await db_client.many("SELECT exported FROM agent_runs ORDER BY run_id") + assert len(result) == 2 + assert result[0][0] == 1 + assert result[1][0] == 1 + + async def test_run_export_exception( + self, + export_agent_runs_task: ExportAgentRuns, + db_operations: DatabaseOperations, + db_client: DatabaseClient, + ): + unit = export_agent_runs_task + unit.api_client.post_agent_runs = AsyncMock(side_effect=Exception("Simulated failure")) + + await self._create_event(db_operations, "event_error") + await self._create_miner_agent( + db_operations, "023e4567-e89b-12d3-a456-42661417400f", 30, "miner_hotkey_3" + ) + + run = AgentRunsModel( + run_id="f23e4567-e89b-12d3-a456-42661417400e", + unique_event_id="event_error", + agent_version_id="023e4567-e89b-12d3-a456-42661417400f", + miner_uid=30, + miner_hotkey="miner_hotkey_3", + status=AgentRunStatus.SUCCESS, + exported=False, + is_final=True, + ) + + await db_operations.upsert_agent_runs([run]) + + await unit.run() + + unit.logger.exception.assert_called_with("Failed to export runs to backend") + + result = await db_client.many("SELECT exported FROM agent_runs") + assert len(result) == 1 + assert result[0][0] == 0 diff --git a/neurons/validator/tasks/tests/test_metagraph_scoring.py b/neurons/validator/tasks/tests/test_metagraph_scoring.py index 29da2a7..7ac9718 100644 --- a/neurons/validator/tasks/tests/test_metagraph_scoring.py +++ b/neurons/validator/tasks/tests/test_metagraph_scoring.py @@ -497,6 +497,8 @@ def test_init(self, metagraph_scoring_task: MetagraphScoring): }, ), # Case 7: 1 miner 3 events, 1 miner 2 events, 1 miner 1 event with very low Brier score. + # Event 3 only considers miners IN event 3 (not miner 5). + # This means miner 4 becomes rank 1 for event 3 (not miner 5 who isn't present). ( [ ScoresModel( @@ -591,6 +593,7 @@ def test_init(self, metagraph_scoring_task: MetagraphScoring): ), ], [ + # Event 1: Only miners 3 and 239 { "event_id": "expected_event_id_1", "processed": 1, @@ -609,28 +612,31 @@ def test_init(self, metagraph_scoring_task: MetagraphScoring): "rank": 2, }, }, + # Event 2: Miners 3, 4, 5, 239 - miner 5 is present and wins { "event_id": "expected_event_id_2", "processed": 1, - "metagraph_score": 0.0004, # Miner 3: avg=0.60, rank 3, gets (1-0.80)*(1-0.99)*(1/3)/(1/2+1/3) ≈ 0.0004 + "metagraph_score": 0.0008, # Miner 3: avg=0.60, rank 3, gets (1-0.80)*(1-0.99)*(1/3)/(1/2+1/3) "other_data": { "average_brier_score": 0.60, "rank": 3, }, }, + # Event 3: Only miners 3, 4, 239 - miner 5 NOT present! + # Miner 4 (avg=0.40) becomes rank 1, miner 3 (avg=0.60) becomes rank 2 { "event_id": "expected_event_id_3", "processed": 1, - "metagraph_score": 0.0004, # Miner 3: avg=0.60, rank 3 + "metagraph_score": 0.002, # Miner 3: avg=0.60, rank 2, only non-winner non-burn, gets all 0.002 "other_data": { "average_brier_score": 0.60, - "rank": 3, + "rank": 2, }, }, { "event_id": "expected_event_id_2", "processed": 1, - "metagraph_score": 0.0006, # Miner 4: avg=0.40, rank 2, gets (1-0.80)*(1-0.99)*(1/2)/(1/2+1/3) ≈ 0.0006 + "metagraph_score": 0.0012, # Miner 4: avg=0.40, rank 2, gets (1-0.80)*(1-0.99)*(1/2)/(1/2+1/3) "other_data": { "average_brier_score": 0.40, "rank": 2, @@ -639,10 +645,10 @@ def test_init(self, metagraph_scoring_task: MetagraphScoring): { "event_id": "expected_event_id_3", "processed": 1, - "metagraph_score": 0.0006, # Miner 4: avg=0.40, rank 2 + "metagraph_score": 0.198, # Miner 4: avg=0.40, rank 1 (winner since miner 5 not in event 3) "other_data": { "average_brier_score": 0.40, - "rank": 2, + "rank": 1, }, }, { @@ -666,10 +672,10 @@ def test_init(self, metagraph_scoring_task: MetagraphScoring): { "event_id": "expected_event_id_3", "processed": 1, - "metagraph_score": 0.8, # UID 239 gets 80% + "metagraph_score": 0.8, # UID 239 gets 80%, rank 3 (only 3 miners in event) "other_data": { "average_brier_score": 1.00, - "rank": 4, + "rank": 3, }, }, ], diff --git a/neurons/validator/tasks/tests/test_run_agents.py b/neurons/validator/tasks/tests/test_run_agents.py index f90f5a0..8876648 100644 --- a/neurons/validator/tasks/tests/test_run_agents.py +++ b/neurons/validator/tasks/tests/test_run_agents.py @@ -1,3 +1,4 @@ +import asyncio from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock @@ -5,9 +6,11 @@ import pytest from neurons.validator.db.operations import DatabaseOperations +from neurons.validator.models.agent_runs import AgentRunsModel, AgentRunStatus from neurons.validator.models.miner_agent import MinerAgentsModel from neurons.validator.numinous_client.client import NuminousClient from neurons.validator.sandbox import SandboxManager +from neurons.validator.sandbox.models import SandboxErrorType from neurons.validator.tasks.run_agents import RunAgents from neurons.validator.utils.if_metagraph import IfMetagraph from neurons.validator.utils.logger.logger import NuminousLogger @@ -548,6 +551,8 @@ async def test_execute_when_prediction_not_exists( # No prediction exists mock_db_operations.get_latest_prediction_for_event_and_miner.return_value = None + mock_db_operations.has_final_run = AsyncMock(return_value=False) + mock_db_operations.count_runs_for_event_and_agent = AsyncMock(return_value=0) task = RunAgents( interval_seconds=600.0, @@ -797,14 +802,14 @@ async def test_run_sandbox_failure( agent_code = "def agent_main(): return {'prediction': 0.75}" def mock_create_sandbox(agent_code, event_data, run_id, on_finish, timeout): - on_finish({"success": False, "error": "Execution failed"}) + on_finish({"SUCCESS": False, "error": "Execution failed"}) return "sandbox_123" mock_sandbox_manager.create_sandbox = mock_create_sandbox result = await task.run_sandbox(agent_code, event_data, "run_123") - assert result["success"] is False + assert result["SUCCESS"] is False assert "error" in result async def test_run_sandbox_timeout( @@ -1111,7 +1116,7 @@ async def test_logs_exported_on_validation_error( task.load_agent_code = AsyncMock(return_value="def agent_main(): pass") invalid_result = { - "status": "success", + "status": "SUCCESS", "output": {"event_id": "event_123"}, "logs": "[AGENT_RUNNER] Starting\n[AGENT_RUNNER] Completed", } @@ -1141,6 +1146,8 @@ async def test_logs_exported_on_result_none( sample_agent, sample_event_tuple, ): + mock_db_operations.count_runs_for_event_and_agent = AsyncMock(return_value=0) + task = RunAgents( interval_seconds=600.0, db_operations=mock_db_operations, @@ -1167,6 +1174,9 @@ async def test_logs_exported_on_result_none( assert "Sandbox timeout - no logs" in logs + +@pytest.mark.asyncio +class TestRunAgentsSyncHour: async def test_run_skips_when_before_sync_hour( self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger ): @@ -1249,3 +1259,834 @@ async def test_run_executes_when_after_sync_hour( mock_metagraph.sync.assert_called_once() mock_db_operations.get_events_to_predict.assert_called_once() + + +class TestRunAgentsDetermineRunStatus: + def test_timeout_at_run_agents_level( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + status, prediction = task._determine_status_and_extract_prediction( + result=None, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.SANDBOX_TIMEOUT + assert prediction is None + + def test_success_with_valid_prediction( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = {"status": "SUCCESS", "output": {"prediction": 0.75}} + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.SUCCESS + assert prediction == 0.75 + + def test_success_but_invalid_prediction_type( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = {"status": "SUCCESS", "output": {"prediction": "invalid"}} + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.INVALID_SANDBOX_OUTPUT + assert prediction is None + + def test_success_but_missing_prediction_field( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = {"status": "SUCCESS", "output": {"event_id": "event1"}} + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.INVALID_SANDBOX_OUTPUT + assert prediction is None + + def test_success_but_invalid_output_format( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = {"status": "SUCCESS", "output": "not a dict"} + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.INVALID_SANDBOX_OUTPUT + assert prediction is None + + def test_error_timeout( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = { + "status": "error", + "error": "Timeout exceeded", + "error_type": SandboxErrorType.TIMEOUT, + } + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.SANDBOX_TIMEOUT + assert prediction is None + + def test_error_container_error( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = { + "status": "error", + "error": "Container error: Failed to start", + "error_type": SandboxErrorType.CONTAINER_ERROR, + } + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.SANDBOX_TIMEOUT + assert prediction is None + + def test_error_invalid_output( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = { + "status": "error", + "error": "Failed to read output.json", + "error_type": SandboxErrorType.INVALID_OUTPUT, + } + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.INVALID_SANDBOX_OUTPUT + assert prediction is None + + def test_error_agent_error( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = { + "status": "error", + "error": "agent_main() must return a dict", + "error_type": SandboxErrorType.AGENT_ERROR, + } + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.INTERNAL_AGENT_ERROR + assert prediction is None + + def test_error_unknown_type( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = {"status": "error", "error": "Unknown error", "error_type": "unknown_type"} + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.INTERNAL_AGENT_ERROR + assert prediction is None + mock_logger.warning.assert_called_once() + + def test_invalid_result_type( + self, mock_db_operations, mock_sandbox_manager, mock_metagraph, mock_api_client, mock_logger + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + result = "not a dict" + status, prediction = task._determine_status_and_extract_prediction( + result=result, event_id="event1", agent_version_id="agent1", run_id="run1" + ) + assert status == AgentRunStatus.INVALID_SANDBOX_OUTPUT + assert prediction is None + + +@pytest.mark.asyncio +class TestRunAgentsCreateAgentRun: + async def test_create_agent_run_success( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + run_id = "test-run-123" + event_id = "event-456" + + run = await task._create_agent_run( + run_id=run_id, + event_id=event_id, + agent=sample_agent, + status=AgentRunStatus.SUCCESS, + ) + + assert isinstance(run, AgentRunsModel) + assert run.run_id == run_id + assert run.unique_event_id == event_id + assert run.agent_version_id == sample_agent.version_id + assert run.miner_uid == sample_agent.miner_uid + assert run.miner_hotkey == sample_agent.miner_hotkey + assert run.status == AgentRunStatus.SUCCESS + assert run.exported is False + assert run.is_final is True + + async def test_create_agent_run_internal_agent_error( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + run_id = "test-run-789" + event_id = "event-abc" + + run = await task._create_agent_run( + run_id=run_id, + event_id=event_id, + agent=sample_agent, + status=AgentRunStatus.INTERNAL_AGENT_ERROR, + ) + + assert run.status == AgentRunStatus.INTERNAL_AGENT_ERROR + assert run.is_final is True + + async def test_create_agent_run_sandbox_timeout_not_final( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + ): + mock_db_operations.count_runs_for_event_and_agent = AsyncMock(return_value=0) + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + run = await task._create_agent_run( + run_id="run-timeout", + event_id="event-timeout", + agent=sample_agent, + status=AgentRunStatus.SANDBOX_TIMEOUT, + ) + + assert run.status == AgentRunStatus.SANDBOX_TIMEOUT + assert run.is_final is False + + async def test_create_agent_run_sandbox_timeout_final( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + ): + mock_db_operations.count_runs_for_event_and_agent = AsyncMock(return_value=2) + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + run = await task._create_agent_run( + run_id="run-timeout-final", + event_id="event-timeout", + agent=sample_agent, + status=AgentRunStatus.SANDBOX_TIMEOUT, + ) + + assert run.status == AgentRunStatus.SANDBOX_TIMEOUT + assert run.is_final is True + + async def test_create_agent_run_invalid_sandbox_output( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + ): + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + ) + + run = await task._create_agent_run( + run_id="run-invalid", + event_id="event-invalid", + agent=sample_agent, + status=AgentRunStatus.INVALID_SANDBOX_OUTPUT, + ) + + assert run.status == AgentRunStatus.INVALID_SANDBOX_OUTPUT + assert run.is_final is True + + +@pytest.mark.asyncio +class TestRunAgentsRunCreation: + async def test_creates_run_on_success( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.upsert_predictions = AsyncMock() + mock_db_operations.upsert_agent_runs = AsyncMock() + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=120, + ) + + task.load_agent_code = AsyncMock(return_value="def agent_main(): pass") + success_result = { + "status": "SUCCESS", + "output": {"event_id": "external_event_123", "prediction": 0.75}, + "logs": "[AGENT_RUNNER] Success", + } + task.run_sandbox = AsyncMock(return_value=success_result) + + await task.execute_agent_for_event( + event_id="event_123", + agent=sample_agent, + event_tuple=sample_event_tuple, + interval_start_minutes=1000, + ) + + # Verify run was created + mock_db_operations.upsert_agent_runs.assert_called_once() + runs = mock_db_operations.upsert_agent_runs.call_args[0][0] + assert len(runs) == 1 + run = runs[0] + assert run.status == AgentRunStatus.SUCCESS + assert run.is_final is True + assert run.exported is False + assert run.unique_event_id == "event_123" + assert run.agent_version_id == sample_agent.version_id + + # Verify prediction was stored + mock_db_operations.upsert_predictions.assert_called_once() + + async def test_creates_run_on_timeout( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.upsert_agent_runs = AsyncMock() + mock_db_operations.count_runs_for_event_and_agent = AsyncMock(return_value=0) + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=1, + ) + + task.load_agent_code = AsyncMock(return_value="def agent_main(): pass") + task.run_sandbox = AsyncMock(return_value=None) # Timeout + + await task.execute_agent_for_event( + event_id="event_timeout", + agent=sample_agent, + event_tuple=sample_event_tuple, + interval_start_minutes=1000, + ) + + mock_db_operations.upsert_agent_runs.assert_called_once() + runs = mock_db_operations.upsert_agent_runs.call_args[0][0] + assert len(runs) == 1 + run = runs[0] + assert run.status == AgentRunStatus.SANDBOX_TIMEOUT + assert run.is_final is False + assert run.exported is False + + async def test_creates_run_on_agent_error( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.upsert_agent_runs = AsyncMock() + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=120, + ) + + task.load_agent_code = AsyncMock(return_value="def agent_main(): pass") + error_result = { + "status": "error", + "error": "agent_main() must return a dict", + "error_type": SandboxErrorType.AGENT_ERROR, + "traceback": "Traceback...", + "logs": "[AGENT_RUNNER] Error", + } + task.run_sandbox = AsyncMock(return_value=error_result) + + await task.execute_agent_for_event( + event_id="event_error", + agent=sample_agent, + event_tuple=sample_event_tuple, + interval_start_minutes=1000, + ) + + mock_db_operations.upsert_agent_runs.assert_called_once() + runs = mock_db_operations.upsert_agent_runs.call_args[0][0] + assert len(runs) == 1 + run = runs[0] + assert run.status == AgentRunStatus.INTERNAL_AGENT_ERROR + assert run.is_final is True + assert run.exported is False + + async def test_creates_run_on_invalid_output( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.upsert_agent_runs = AsyncMock() + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=120, + ) + + task.load_agent_code = AsyncMock(return_value="def agent_main(): pass") + # Success status but missing prediction field + invalid_result = { + "status": "SUCCESS", + "output": {"event_id": "external_event_123"}, # Missing prediction! + "logs": "[AGENT_RUNNER] Success but invalid", + } + task.run_sandbox = AsyncMock(return_value=invalid_result) + + await task.execute_agent_for_event( + event_id="event_invalid", + agent=sample_agent, + event_tuple=sample_event_tuple, + interval_start_minutes=1000, + ) + + mock_db_operations.upsert_agent_runs.assert_called_once() + runs = mock_db_operations.upsert_agent_runs.call_args[0][0] + assert len(runs) == 1 + run = runs[0] + assert run.status == AgentRunStatus.INVALID_SANDBOX_OUTPUT + assert run.is_final is True + assert run.exported is False + + async def test_run_links_to_prediction_on_success( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.upsert_predictions = AsyncMock() + mock_db_operations.upsert_agent_runs = AsyncMock() + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=120, + ) + + task.load_agent_code = AsyncMock(return_value="def agent_main(): pass") + success_result = { + "status": "SUCCESS", + "output": {"event_id": "external_event_123", "prediction": 0.85}, + "logs": "[AGENT_RUNNER] Success", + } + task.run_sandbox = AsyncMock(return_value=success_result) + + await task.execute_agent_for_event( + event_id="event_123", + agent=sample_agent, + event_tuple=sample_event_tuple, + interval_start_minutes=1000, + ) + + # Verify run and prediction share same run_id + runs = mock_db_operations.upsert_agent_runs.call_args[0][0] + predictions = mock_db_operations.upsert_predictions.call_args[0][0] + + assert len(runs) == 1 + assert len(predictions) == 1 + + run_id = runs[0].run_id + prediction_run_id = predictions[0].run_id + + assert run_id == prediction_run_id + assert runs[0].status == AgentRunStatus.SUCCESS + assert predictions[0].latest_prediction == 0.85 + + async def test_no_prediction_stored_on_error( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.upsert_predictions = AsyncMock() + mock_db_operations.upsert_agent_runs = AsyncMock() + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=120, + ) + + task.load_agent_code = AsyncMock(return_value="def agent_main(): pass") + error_result = { + "status": "error", + "error": "Something went wrong", + "error_type": SandboxErrorType.AGENT_ERROR, + "logs": "[AGENT_RUNNER] Error", + } + task.run_sandbox = AsyncMock(return_value=error_result) + + await task.execute_agent_for_event( + event_id="event_error", + agent=sample_agent, + event_tuple=sample_event_tuple, + interval_start_minutes=1000, + ) + + mock_db_operations.upsert_agent_runs.assert_called_once() + mock_db_operations.upsert_predictions.assert_not_called() + + info_calls = [ + call + for call in mock_logger.info.call_args_list + if len(call[0]) > 0 + and "Agent execution completed with non-success status" in call[0][0] + ] + assert len(info_calls) == 1 + + +@pytest.mark.asyncio +class TestRunAgentsMaxRetries: + async def test_first_timeout_allows_execution( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.get_latest_prediction_for_event_and_miner = AsyncMock(return_value=None) + mock_db_operations.has_final_run = AsyncMock(return_value=False) + mock_db_operations.count_runs_for_event_and_agent = AsyncMock(return_value=0) + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=1, + ) + task.execute_agent_for_event = AsyncMock() + + semaphore = asyncio.Semaphore(1) + await task.execute_with_semaphore( + semaphore=semaphore, + event=sample_event_tuple, + agent=sample_agent, + interval_start_minutes=1000, + ) + + mock_db_operations.has_final_run.assert_called_once_with( + unique_event_id="event_123", + agent_version_id="agent_v1", + ) + task.execute_agent_for_event.assert_called_once() + + async def test_second_timeout_allows_execution( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.get_latest_prediction_for_event_and_miner = AsyncMock(return_value=None) + mock_db_operations.has_final_run = AsyncMock(return_value=False) + mock_db_operations.count_runs_for_event_and_agent = AsyncMock(return_value=1) + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=1, + ) + task.execute_agent_for_event = AsyncMock() + + semaphore = asyncio.Semaphore(1) + await task.execute_with_semaphore( + semaphore=semaphore, + event=sample_event_tuple, + agent=sample_agent, + interval_start_minutes=1000, + ) + + task.execute_agent_for_event.assert_called_once() + + async def test_third_timeout_allows_execution_creates_final_run( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.get_latest_prediction_for_event_and_miner = AsyncMock(return_value=None) + mock_db_operations.has_final_run = AsyncMock(return_value=False) + mock_db_operations.count_runs_for_event_and_agent = AsyncMock(return_value=2) + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=1, + ) + task.execute_agent_for_event = AsyncMock() + + semaphore = asyncio.Semaphore(1) + await task.execute_with_semaphore( + semaphore=semaphore, + event=sample_event_tuple, + agent=sample_agent, + interval_start_minutes=1000, + ) + + task.execute_agent_for_event.assert_called_once() + + async def test_fourth_call_skips_when_final_run_exists( + self, + mock_db_operations, + mock_sandbox_manager, + mock_metagraph, + mock_api_client, + mock_logger, + sample_agent, + sample_event_tuple, + ): + mock_db_operations.get_latest_prediction_for_event_and_miner = AsyncMock(return_value=None) + mock_db_operations.has_final_run = AsyncMock(return_value=True) + + task = RunAgents( + interval_seconds=600.0, + db_operations=mock_db_operations, + sandbox_manager=mock_sandbox_manager, + metagraph=mock_metagraph, + api_client=mock_api_client, + logger=mock_logger, + timeout_seconds=1, + ) + task.execute_agent_for_event = AsyncMock() + + semaphore = asyncio.Semaphore(1) + await task.execute_with_semaphore( + semaphore=semaphore, + event=sample_event_tuple, + agent=sample_agent, + interval_start_minutes=1000, + ) + + task.execute_agent_for_event.assert_not_called() + mock_logger.debug.assert_called_with( + "Skipping execution - final run exists", + extra={ + "event_id": "event_123", + "agent_version_id": "agent_v1", + }, + ) diff --git a/neurons/validator/tasks/tests/test_scoring.py b/neurons/validator/tasks/tests/test_scoring.py index c631b02..57a8191 100644 --- a/neurons/validator/tasks/tests/test_scoring.py +++ b/neurons/validator/tasks/tests/test_scoring.py @@ -437,7 +437,8 @@ def test_return_empty_scores_df(self, scoring_task: Scoring): assert ScoreNames.rema_prediction in df.columns assert ScoreNames.rema_peer_score in df.columns - def test_fill_unresponsive_miners_with_unresponsive(self, scoring_task: Scoring): + def test_fill_missing_predictions_with_missing(self, scoring_task: Scoring): + """Test that missing predictions are filled with 0.5 regardless of registration time.""" df = pd.DataFrame( { ScoreNames.miner_uid: [1, 2], @@ -448,9 +449,10 @@ def test_fill_unresponsive_miners_with_unresponsive(self, scoring_task: Scoring) ScoreNames.weight: [1, 1], } ) - result_df = scoring_task.fill_unresponsive_miners(df, imputed_prediction=0.5) + result_df = scoring_task.fill_missing_predictions(df, imputed_prediction=0.5) assert isinstance(result_df, pd.DataFrame) + # Missing prediction should be filled with 0.5 np.testing.assert_allclose( result_df.loc[ result_df[ScoreNames.miner_uid] == 1, ScoreNames.interval_agg_prediction @@ -458,6 +460,7 @@ def test_fill_unresponsive_miners_with_unresponsive(self, scoring_task: Scoring) 0.5, rtol=1e-5, ) + # Existing prediction should be unchanged np.testing.assert_allclose( result_df.loc[ result_df[ScoreNames.miner_uid] == 2, ScoreNames.interval_agg_prediction @@ -466,11 +469,14 @@ def test_fill_unresponsive_miners_with_unresponsive(self, scoring_task: Scoring) rtol=1e-5, ) - def test_fill_unresponsive_miners_with_new_miner(self, scoring_task: Scoring): + def test_fill_missing_predictions_fills_all_regardless_of_registration( + self, scoring_task: Scoring + ): + """Test that ALL missing predictions are filled, even for miners registered after the interval.""" df = pd.DataFrame( { ScoreNames.miner_uid: [1, 2], - ScoreNames.miner_registered_minutes: [50, 150], # after interval + ScoreNames.miner_registered_minutes: [50, 150], # miner 2 registered after interval ScoreNames.interval_start: [100, 100], ScoreNames.interval_agg_prediction: [pd.NA, pd.NA], ScoreNames.interval_idx: [0, 0], @@ -478,10 +484,10 @@ def test_fill_unresponsive_miners_with_new_miner(self, scoring_task: Scoring): } ) - result_df = scoring_task.fill_unresponsive_miners(df, imputed_prediction=0.5) + result_df = scoring_task.fill_missing_predictions(df, imputed_prediction=0.5) assert isinstance(result_df, pd.DataFrame) - # registered before interval + # Miner 1: registered before interval, missing prediction -> filled with 0.5 np.testing.assert_allclose( result_df.loc[ result_df[ScoreNames.miner_uid] == 1, ScoreNames.interval_agg_prediction @@ -489,11 +495,13 @@ def test_fill_unresponsive_miners_with_new_miner(self, scoring_task: Scoring): 0.5, rtol=1e-5, ) - # registered after interval - assert pd.isna( + # Miner 2: registered AFTER interval, missing prediction -> ALSO filled with 0.5 + np.testing.assert_allclose( result_df.loc[ result_df[ScoreNames.miner_uid] == 2, ScoreNames.interval_agg_prediction - ].iloc[0] + ].iloc[0], + 0.5, + rtol=1e-5, ) def test_aggregate_predictions_by_miner(self, scoring_task: Scoring): @@ -575,12 +583,13 @@ async def test_score_event_no_intervals(self, scoring_task: Scoring, db_operatio assert len(updated_events) == 1 assert updated_events[0]["status"] == str(EventStatus.DISCARDED.value) - async def test_score_event_no_miners(self, scoring_task: Scoring): + async def test_score_event_miner_without_prediction_gets_imputed(self, scoring_task: Scoring): + """Test that miners without predictions get 0.5 imputed, regardless of registration time.""" base_time = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) duration_minutes = 3 * AGGREGATION_INTERVAL_LENGTH_MINUTES event = EventsModel( - unique_event_id="evt_no_miners", + unique_event_id="evt_imputation_test", event_id="e2", market_type="dummy", event_type="dummy", @@ -591,7 +600,6 @@ async def test_score_event_no_miners(self, scoring_task: Scoring): cutoff=base_time + timedelta(minutes=duration_minutes), registered_date=base_time, ) - predictions = [] unit = scoring_task @@ -602,20 +610,63 @@ async def test_score_event_no_miners(self, scoring_task: Scoring): SCORING_WINDOW_INTERVALS * AGGREGATION_INTERVAL_LENGTH_MINUTES ) - # Miner registered AFTER scoring window start should be excluded + # Miner 1: has predictions for all intervals (like run_agents does via replication) + # Miner 2: registered after scoring window, no prediction - should get 0.5 imputed + predictions = [ + # Miner 1: prediction in interval 0 (older, gets weight 1.0) + PredictionsModel( + unique_event_id="evt_imputation_test", + miner_hotkey="hotkey1", + miner_uid=1, + latest_prediction=0.8, + interval_start_minutes=scoring_window_start_minutes, + interval_agg_prediction=0.8, + interval_count=1, + submitted=base_time, + exported=False, + ), + # Miner 1: prediction in interval 1 (newer, gets weight 0.0 with 2 intervals) + PredictionsModel( + unique_event_id="evt_imputation_test", + miner_hotkey="hotkey1", + miner_uid=1, + latest_prediction=0.8, + interval_start_minutes=scoring_window_start_minutes + + AGGREGATION_INTERVAL_LENGTH_MINUTES, + interval_agg_prediction=0.8, + interval_count=1, + submitted=base_time, + exported=False, + ), + ] + unit.miners_last_reg = pd.DataFrame( { - ScoreNames.miner_uid: [1], - ScoreNames.miner_hotkey: ["hotkey1"], - ScoreNames.miner_registered_minutes: [scoring_window_start_minutes + 1], + ScoreNames.miner_uid: [1, 2], + ScoreNames.miner_hotkey: ["hotkey1", "hotkey2"], + ScoreNames.miner_registered_minutes: [ + scoring_window_start_minutes - 1, # miner 1: registered before + scoring_window_start_minutes + 1, # miner 2: registered after + ], } ) result = await unit.score_event(event, predictions) - assert result.empty - assert ScoreNames.rema_prediction in result.columns - assert unit.logger.error.call_count >= 1 - assert unit.logger.error.call_args_list[-1].args[0] == "No miners to score." + # Both miners should be scored + assert not result.empty + assert result.shape[0] == 2 + + # Miner 1: has prediction 0.8 in all intervals + miner1 = result[result[ScoreNames.miner_uid] == 1].iloc[0] + np.testing.assert_allclose(miner1[ScoreNames.rema_prediction], 0.8, rtol=1e-5) + # Brier for 0.8 with outcome=1: (0.8 - 1)² = 0.04 + np.testing.assert_allclose(miner1[ScoreNames.rema_peer_score], 0.04, rtol=1e-5) + + # Miner 2: no prediction, should be imputed with 0.5 + miner2 = result[result[ScoreNames.miner_uid] == 2].iloc[0] + np.testing.assert_allclose(miner2[ScoreNames.rema_prediction], 0.5, rtol=1e-5) + # Brier for 0.5 with outcome=1: (0.5 - 1)² = 0.25 + np.testing.assert_allclose(miner2[ScoreNames.rema_peer_score], 0.25, rtol=1e-5) async def test_score_event_no_predictions(self, scoring_task: Scoring): base_time = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) @@ -746,7 +797,10 @@ async def test_score_event_normal(self, scoring_task: Scoring): assert unit.errors_count == 0 assert unit.logger.error.call_count == 0 - async def test_score_event_scoring_window_miner_exclusion(self, scoring_task: Scoring): + async def test_score_event_all_miners_scored_regardless_of_registration( + self, scoring_task: Scoring + ): + """Test that ALL miners are scored regardless of when they registered.""" base_time = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) duration_minutes = 5 * AGGREGATION_INTERVAL_LENGTH_MINUTES @@ -769,26 +823,53 @@ async def test_score_event_scoring_window_miner_exclusion(self, scoring_task: Sc SCORING_WINDOW_INTERVALS * AGGREGATION_INTERVAL_LENGTH_MINUTES ) + # Predictions for both intervals (like run_agents does via replication) predictions = [ + # Miner 1: interval 0 PredictionsModel( unique_event_id="evt_window_exclusion", miner_hotkey="miner_early", miner_uid=1, latest_prediction=0.7, - interval_start_minutes=event_cutoff_start_minutes - - AGGREGATION_INTERVAL_LENGTH_MINUTES, + interval_start_minutes=scoring_window_start_minutes, + interval_agg_prediction=0.7, + interval_count=1, + submitted=base_time, + exported=False, + ), + # Miner 1: interval 1 + PredictionsModel( + unique_event_id="evt_window_exclusion", + miner_hotkey="miner_early", + miner_uid=1, + latest_prediction=0.7, + interval_start_minutes=scoring_window_start_minutes + + AGGREGATION_INTERVAL_LENGTH_MINUTES, interval_agg_prediction=0.7, interval_count=1, submitted=base_time, exported=False, ), + # Miner 2: interval 0 PredictionsModel( unique_event_id="evt_window_exclusion", miner_hotkey="miner_late", miner_uid=2, latest_prediction=0.9, - interval_start_minutes=event_cutoff_start_minutes - - AGGREGATION_INTERVAL_LENGTH_MINUTES, + interval_start_minutes=scoring_window_start_minutes, + interval_agg_prediction=0.9, + interval_count=1, + submitted=base_time, + exported=False, + ), + # Miner 2: interval 1 + PredictionsModel( + unique_event_id="evt_window_exclusion", + miner_hotkey="miner_late", + miner_uid=2, + latest_prediction=0.9, + interval_start_minutes=scoring_window_start_minutes + + AGGREGATION_INTERVAL_LENGTH_MINUTES, interval_agg_prediction=0.9, interval_count=1, submitted=base_time, @@ -798,8 +879,9 @@ async def test_score_event_scoring_window_miner_exclusion(self, scoring_task: Sc unit = scoring_task - # Miner 1: registered before scoring window (INCLUDED) - # Miner 2: registered after scoring window start (EXCLUDED) + # Miner 1: registered before scoring window + # Miner 2: registered after scoring window start + # BOTH should be scored unit.miners_last_reg = pd.DataFrame( { ScoreNames.miner_uid: [1, 2], @@ -815,11 +897,17 @@ async def test_score_event_scoring_window_miner_exclusion(self, scoring_task: Sc result = await unit.score_event(event, predictions) - # Only miner 1 should be scored + # BOTH miners should be scored (new behavior) assert not result.empty - assert result.shape[0] == 1 - assert result.iloc[0][ScoreNames.miner_uid] == 1 - assert result.iloc[0][ScoreNames.miner_hotkey] == "miner_early" + assert result.shape[0] == 2 + + # Miner 1 with prediction 0.7 + miner1_row = result[result[ScoreNames.miner_uid] == 1].iloc[0] + np.testing.assert_allclose(miner1_row[ScoreNames.rema_prediction], 0.7, rtol=1e-5) + + # Miner 2 with prediction 0.9 + miner2_row = result[result[ScoreNames.miner_uid] == 2].iloc[0] + np.testing.assert_allclose(miner2_row[ScoreNames.rema_prediction], 0.9, rtol=1e-5) assert unit.errors_count == 0 @@ -1259,21 +1347,29 @@ async def test_e2e_run( ) # Validate event2 scores (outcome=0) + # With new behavior, ALL miners are scored (miner 2 gets 0.5 imputed) df_actual_ev_2 = pd.DataFrame.from_dict(actual_scores_ev_2, orient="index").reset_index( drop=True ) - assert len(df_actual_ev_2) == 1, "event2 should have 1 miner scored" - assert df_actual_ev_2.iloc[0][ScoreNames.miner_uid] == 1 - assert df_actual_ev_2.iloc[0][ScoreNames.miner_hotkey] == "hotkey1" - - # Miner 1 prediction is 1.0 clipped to 0.99 - rema_pred_ev2 = df_actual_ev_2.iloc[0][ScoreNames.rema_prediction] - assert 0 <= rema_pred_ev2 <= 1, f"event2 rema_prediction {rema_pred_ev2} out of range" - - # Brier score for outcome=0: (pred - 0)² - expected_brier_ev2 = (rema_pred_ev2 - 0.0) ** 2 - actual_brier_ev2 = df_actual_ev_2.iloc[0][ScoreNames.rema_peer_score] - np.testing.assert_allclose(actual_brier_ev2, expected_brier_ev2, rtol=1e-5) + assert len(df_actual_ev_2) == 2, "event2 should have 2 miners scored (new behavior)" + assert set(df_actual_ev_2[ScoreNames.miner_uid].tolist()) == {1, 2} + + # Miner 1: has prediction 1.0 (clipped to 0.99) + miner1_ev2 = df_actual_ev_2[df_actual_ev_2[ScoreNames.miner_uid] == 1].iloc[0] + rema_pred_ev2_m1 = miner1_ev2[ScoreNames.rema_prediction] + assert ( + 0 <= rema_pred_ev2_m1 <= 1 + ), f"event2 miner1 prediction {rema_pred_ev2_m1} out of range" + expected_brier_ev2_m1 = (rema_pred_ev2_m1 - 0.0) ** 2 + np.testing.assert_allclose( + miner1_ev2[ScoreNames.rema_peer_score], expected_brier_ev2_m1, rtol=1e-5 + ) + + # Miner 2: no prediction, imputed 0.5 + miner2_ev2 = df_actual_ev_2[df_actual_ev_2[ScoreNames.miner_uid] == 2].iloc[0] + np.testing.assert_allclose(miner2_ev2[ScoreNames.rema_prediction], 0.5, rtol=1e-5) + # Brier for 0.5 with outcome=0: (0.5 - 0)² = 0.25 + np.testing.assert_allclose(miner2_ev2[ScoreNames.rema_peer_score], 0.25, rtol=1e-5) # Check events are marked as processed events_for_scoring = await db_ops.get_events_for_scoring() diff --git a/neurons/validator/tasks/tests/test_set_weights.py b/neurons/validator/tasks/tests/test_set_weights.py index 3ae8108..4952e03 100644 --- a/neurons/validator/tasks/tests/test_set_weights.py +++ b/neurons/validator/tasks/tests/test_set_weights.py @@ -446,67 +446,23 @@ async def test_run_successful_x(self, set_weights_task: SetWeights, monkeypatch, created_at = datetime.now(timezone.utc) - timedelta(days=1) scores_list = [ ScoresModel( - event_id="expected_event_id_1", + event_id="latest_event", miner_uid=3, miner_hotkey="hk3", prediction=0.75, - event_score=0.80, - metagraph_score=1.0, + event_score=0.10, + metagraph_score=0.835, # Winner gets ~0.835 created_at=created_at, spec_version=1, processed=True, ), ScoresModel( - event_id="expected_event_id_2", - miner_uid=3, - miner_hotkey="hk3", - prediction=0.75, - event_score=0.40, - metagraph_score=0.9, - created_at=created_at, - spec_version=1, - processed=True, - ), - ScoresModel( - event_id="expected_event_id_3", - miner_uid=3, - miner_hotkey="hk3", - prediction=0.75, - event_score=0.60, - metagraph_score=0.835, - created_at=created_at, - spec_version=1, - processed=True, - ), - ScoresModel( - event_id="expected_event_id_2", + event_id="latest_event", miner_uid=4, miner_hotkey="hk4", - prediction=0.75, - event_score=0.40, - metagraph_score=0.1, - created_at=created_at, - spec_version=1, - processed=True, - ), - ScoresModel( - event_id="expected_event_id_1", - miner_uid=4, - miner_hotkey="hk4", - prediction=0.75, - event_score=0.40, - metagraph_score=0.165, - created_at=created_at, - spec_version=1, - processed=True, - ), - ScoresModel( - event_id="expected_event_id_2", - miner_uid=5, - miner_hotkey="hk5", - prediction=0.75, - event_score=-0.40, - metagraph_score=0.0, + prediction=0.50, + event_score=0.25, + metagraph_score=0.165, # Second place created_at=created_at, spec_version=1, processed=True, diff --git a/neurons/validator/tests/test_main.py b/neurons/validator/tests/test_main.py index 3324220..f89cda6 100644 --- a/neurons/validator/tests/test_main.py +++ b/neurons/validator/tests/test_main.py @@ -12,6 +12,7 @@ @patch("neurons.validator.main.ExportScores", spec=True) @patch("neurons.validator.main.MetagraphScoring", spec=True) @patch("neurons.validator.main.Scoring", spec=True) +@patch("neurons.validator.main.ExportAgentRuns", spec=True) @patch("neurons.validator.main.ExportPredictions", spec=True) class TestValidatorMain: @pytest.mark.parametrize( @@ -24,6 +25,7 @@ class TestValidatorMain: def test_main( self, mock_export_predictions, + mock_export_agent_runs, mock_peer_scoring, mock_metagraph_scoring, mock_export_scores, @@ -114,7 +116,7 @@ def test_main( mock_measure_event_loop_lag.assert_awaited_once() # Verify tasks added count - assert mock_scheduler.add.call_count == 13 + assert mock_scheduler.add.call_count == 14 # Verify logging mock_logger.info.assert_called_with( diff --git a/neurons/validator/version.py b/neurons/validator/version.py index 3aeb858..a423ec4 100644 --- a/neurons/validator/version.py +++ b/neurons/validator/version.py @@ -1,4 +1,4 @@ -__version__ = "2.0.2" +__version__ = "2.0.3" version_split = __version__.split(".")