diff --git a/.gitignore b/.gitignore index b1e4fe7d..d248da77 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ venv/ .idea/ state.json +tmp/ app.config.js diff --git a/desearch/__init__.py b/desearch/__init__.py index a8a8be84..209d0431 100644 --- a/desearch/__init__.py +++ b/desearch/__init__.py @@ -33,7 +33,6 @@ print("__version__", __version__) import os -from enum import Enum from openai import AsyncOpenAI @@ -43,44 +42,13 @@ client = AsyncOpenAI(timeout=90.0) + +MIN_TOTAL_STAKE = 10000 +MIN_ALPHA_STAKE = 20 + # Blacklist variables -ALLOW_NON_REGISTERED = False -PROMPT_BLACKLIST_STAKE = 20000 -TWITTER_SCRAPPER_BLACKLIST_STAKE = 20000 -ISALIVE_BLACKLIST_STAKE = min(PROMPT_BLACKLIST_STAKE, TWITTER_SCRAPPER_BLACKLIST_STAKE) MIN_REQUEST_PERIOD = 2 MAX_REQUESTS = 30 -# must have the test_key whitelisted to avoid a global blacklist -testnet_key = ["5EhEZN6soubtKJm8RN7ANx9FGZ2JezxBUFxr45cdsHtDp3Uk"] -test_key = ["5DcRHcCwD33YsHfj4PX5j2evWLniR1wSWeNmpf5RXaspQT6t"] - -valid_validators = [ - "5FFApaS75bv5pJHfAp2FVLBj9ZaXuFDjEypsaBNc1wCfe52v", - "5EhvL1FVkQPpMjZX4MAADcW42i3xPSF1KiCpuaxTYVr28sux", - "5CXRfP2ekFhe62r7q3vppRajJmGhTi7vwvb2yr79jveZ282w", - "5CaNj3BarTHotEK1n513aoTtFeXcjf6uvKzAyzNuv9cirUoW", - "5HK5tp6t2S59DywmHRWPBVJeJ86T61KjurYqeooqj8sREpeN", - "5DvTpiniW9s3APmHRYn8FroUWyfnLtrsid5Mtn5EwMXHN2ed", - "5G3f8VDTT1ydirT3QffnV2TMrNMR2MkQfGUubQNqZcGSj82T", - "5Dz8ShM6rtPw1GBAaqxjycT9LF1TC3iDpzpUH9gKr85Nizo6", - "5Hddm3iBFD2GLT5ik7LZnT3XJUnRnN8PoeCFgGQgawUVKNm8", - "5HNQURvmjjYhTSksi8Wfsw676b4owGwfLR2BFAQzG7H3HhYf", - "5HEo565WAy4Dbq3Sv271SAi7syBSofyfhhwRNjFNSM2gP9M2", - "5F4tQyWrhfGVcNhoqeiNsR6KjD4wMZ2kfhLj4oHYuyHbZAc3", - "5H66kJAzBCv2DC9poHATLQqyt3ag8FLSbHf6rMqTiRcS52rc", - "5HbLYXUBy1snPR8nfioQ7GoA9x76EELzEq9j7F32vWUQHm1x", - "5FKstHjZkh4v3qAMSBa1oJcHCLjxYZ8SNTSz1opTv4hR7gVB", - "5DXTJSPVvf1sow1MU4npJPewEAwhPRb6CWsk4RX9RFt2PRbj", - "5EsrMfo7UcPs6AqAotU47VmYGfLHntS9JzhEwbY2EJMcWQxQ", # server - "5Dd8gaRNdhm1YP7G1hcB1N842ecAUQmbLjCRLqH5ycaTGrWv", - "5DnXm2tBGAD57ySJv5SfpTfLcsQbSKKp6xZKFWABw3cYUgqg", - "5GVpVH7DjYmQY7ckznVnrHncU9knzYJvhY3TfbFY7sPboJB2", - "5Fq5v71D4LX8Db1xsmRSy6udQThcZ8sFDqxQFwnUZ1BuqY5A", - "5ChuGqW2cxc5AZJ29z6vyTkTncg75L9ovfp8QN8eB8niSD75", - "5F27Eqz2PhyMtGMEce898x31DokNqRVxkm5AhDDe6rDGNvoY", -] - -WHITELISTED_KEYS = testnet_key + test_key + valid_validators BLACKLISTED_KEYS = [ "5G1NjW9YhXLadMWajvTkfcJy6up3yH2q1YzMXDTi6ijanChe", diff --git a/neurons/miners/miner.py b/neurons/miners/miner.py index a405b45a..4d080ac4 100644 --- a/neurons/miners/miner.py +++ b/neurons/miners/miner.py @@ -159,7 +159,7 @@ async def _twitter_urls_search( async def _web_search(self, synapse: WebSearchSynapse) -> WebSearchSynapse: return await self.web_search(synapse) - def base_blacklist(self, synapse, blacklist_amt=20000) -> Tuple[bool, str]: + def base_blacklist(self, synapse) -> Tuple[bool, str]: try: hotkey = synapse.dendrite.hotkey synapse_type = type(synapse).__name__ @@ -167,37 +167,34 @@ def base_blacklist(self, synapse, blacklist_amt=20000) -> Tuple[bool, str]: if hotkey in desearch.BLACKLISTED_KEYS: return True, f"Blacklisted a {synapse_type} request from {hotkey}" - # if hotkey in desearch.WHITELISTED_KEYS: - # return False, f"accepting {synapse_type} request from {hotkey}" - - # if hotkey not in desearch.valid_validators: - # return ( - # True, - # f"Blacklisted a {synapse_type} request from a non-valid hotkey: {hotkey}", - # ) - uid = None - axon = None for _uid, _axon in enumerate(self.metagraph.axons): if _axon.hotkey == hotkey: uid = _uid - axon = _axon break - if uid is None and desearch.ALLOW_NON_REGISTERED == False: + if uid is None: return ( True, f"Blacklisted a non registered hotkey's {synapse_type} request from {hotkey}", ) if self.config.subtensor.network == "finney": - # check the stake - tao = self.metagraph.neurons[uid].stake.tao - # metagraph.neurons[uid].S - if tao < blacklist_amt: + alpha_stake = float(self.metagraph.alpha_stake[uid].item()) + total_stake = float(self.metagraph.total_stake[uid].item()) + + if ( + alpha_stake < desearch.MIN_ALPHA_STAKE + or total_stake < desearch.MIN_TOTAL_STAKE + ): return ( True, - f"Blacklisted a low stake {synapse_type} request: {tao} < {blacklist_amt} from {hotkey}", + ( + f"Blacklisted a low stake {synapse_type} request: " + f"alpha_stake={alpha_stake} < {desearch.MIN_ALPHA_STAKE} " + f"or total_stake={total_stake} < {desearch.MIN_TOTAL_STAKE} " + f"from {hotkey}" + ), ) time_window = desearch.MIN_REQUEST_PERIOD * 60 @@ -228,50 +225,40 @@ def base_blacklist(self, synapse, blacklist_amt=20000) -> Tuple[bool, str]: bt.logging.error(f"errror in blacklist {traceback.format_exc()}") def blacklist_is_alive(self, synapse: IsAlive) -> Tuple[bool, str]: - blacklist = self.base_blacklist(synapse, desearch.ISALIVE_BLACKLIST_STAKE) + blacklist = self.base_blacklist(synapse) bt.logging.debug(blacklist[1]) return blacklist def blacklist_smart_scraper( self, synapse: ScraperStreamingSynapse ) -> Tuple[bool, str]: - blacklist = self.base_blacklist( - synapse, desearch.TWITTER_SCRAPPER_BLACKLIST_STAKE - ) + blacklist = self.base_blacklist(synapse) bt.logging.info(blacklist[1]) return blacklist def blacklist_twitter_search( self, synapse: TwitterSearchSynapse ) -> Tuple[bool, str]: - blacklist = self.base_blacklist( - synapse, desearch.TWITTER_SCRAPPER_BLACKLIST_STAKE - ) + blacklist = self.base_blacklist(synapse) bt.logging.info(blacklist[1]) return blacklist def blacklist_twitter_id_search( self, synapse: TwitterIDSearchSynapse ) -> Tuple[bool, str]: - blacklist = self.base_blacklist( - synapse, desearch.TWITTER_SCRAPPER_BLACKLIST_STAKE - ) + blacklist = self.base_blacklist(synapse) bt.logging.info(blacklist[1]) return blacklist def blacklist_twitter_urls_search( self, synapse: TwitterURLsSearchSynapse ) -> Tuple[bool, str]: - blacklist = self.base_blacklist( - synapse, desearch.TWITTER_SCRAPPER_BLACKLIST_STAKE - ) + blacklist = self.base_blacklist(synapse) bt.logging.info(blacklist[1]) return blacklist def blacklist_web_search(self, synapse: WebSearchSynapse) -> Tuple[bool, str]: - blacklist = self.base_blacklist( - synapse, desearch.TWITTER_SCRAPPER_BLACKLIST_STAKE - ) + blacklist = self.base_blacklist(synapse) bt.logging.info(blacklist[1]) return blacklist diff --git a/neurons/validators/api/__init__.py b/neurons/validators/api/__init__.py new file mode 100644 index 00000000..7034fb19 --- /dev/null +++ b/neurons/validators/api/__init__.py @@ -0,0 +1,3 @@ +from neurons.validators.api.app import app + +__all__ = ["app"] diff --git a/neurons/validators/api.py b/neurons/validators/api/app.py similarity index 99% rename from neurons/validators/api.py rename to neurons/validators/api/app.py index 2cfb1f17..8ab2aaf8 100644 --- a/neurons/validators/api.py +++ b/neurons/validators/api/app.py @@ -26,9 +26,9 @@ TwitterScraperTweet, WebSearchResultList, ) +from neurons.validators.api.validator_api import ValidatorAPI +from neurons.validators.api.validator_service_client import ValidatorServiceClient from neurons.validators.env import EXPECTED_ACCESS_KEY, PORT -from neurons.validators.validator_api import ValidatorAPI -from neurons.validators.validator_service_client import ValidatorServiceClient async def get_validator_config(): @@ -526,9 +526,7 @@ async def get_tweets_by_urls( bt.logging.info(f"Fetching tweets for URLs: {urls}") - results = await api.x_scraper_validator.x_posts_by_urls( - urls, uid=request.uid - ) + results = await api.x_scraper_validator.x_posts_by_urls(urls, uid=request.uid) except Exception as e: bt.logging.error(f"Error fetching tweets by URLs: {e}") raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") diff --git a/neurons/validators/validator_api.py b/neurons/validators/api/validator_api.py similarity index 88% rename from neurons/validators/validator_api.py rename to neurons/validators/api/validator_api.py index 028eb35a..c6cf8ebb 100644 --- a/neurons/validators/validator_api.py +++ b/neurons/validators/api/validator_api.py @@ -4,11 +4,13 @@ import bittensor as bt from desearch.redis.redis_client import close_redis, initialize_redis -from neurons.validators.advanced_scraper_validator import AdvancedScraperValidator +from neurons.validators.api.validator_service_client import ValidatorServiceClient +from neurons.validators.scrapers.advanced_scraper_validator import ( + AdvancedScraperValidator, +) +from neurons.validators.scrapers.web_scraper_validator import WebScraperValidator +from neurons.validators.scrapers.x_scraper_validator import XScraperValidator from neurons.validators.utility_api_client import UtilityAPIClient -from neurons.validators.validator_service_client import ValidatorServiceClient -from neurons.validators.web_scraper_validator import WebScraperValidator -from neurons.validators.x_scraper_validator import XScraperValidator class ValidatorAPI: diff --git a/neurons/validators/validator_service_client.py b/neurons/validators/api/validator_service_client.py similarity index 100% rename from neurons/validators/validator_service_client.py rename to neurons/validators/api/validator_service_client.py diff --git a/neurons/validators/proxy/dependencies.py b/neurons/validators/proxy/dependencies.py deleted file mode 100644 index 065123a2..00000000 --- a/neurons/validators/proxy/dependencies.py +++ /dev/null @@ -1,14 +0,0 @@ -import fastapi -import fastapi.security - - -AUTHORIZATION = fastapi.security.APIKeyHeader(name="Authorization", auto_error=True) - - -async def get_token(authorization: str = fastapi.Depends(AUTHORIZATION)) -> None: - """ - Require a token to call this endpoint. - - It'll already be injected by the token_middleware, but _enforce_ it - """ - return ... diff --git a/neurons/validators/proxy/uid_manager.py b/neurons/validators/proxy/uid_manager.py index b796a4d1..deecfd56 100644 --- a/neurons/validators/proxy/uid_manager.py +++ b/neurons/validators/proxy/uid_manager.py @@ -3,7 +3,7 @@ from bittensor.core.metagraph import AsyncMetagraph -from neurons.validators.weights import EMISSION_CONTROL_HOTKEY +from neurons.validators.service.weights import EMISSION_CONTROL_HOTKEY class UIDManager: diff --git a/neurons/validators/query_scheduler.py b/neurons/validators/query_scheduler.py deleted file mode 100644 index 7617ce5d..00000000 --- a/neurons/validators/query_scheduler.py +++ /dev/null @@ -1,270 +0,0 @@ -import asyncio -import time -from datetime import datetime, timedelta, timezone -from typing import Dict, Optional - -import bittensor as bt -import torch - -from neurons.validators.scoring_store import ScoringStore -from neurons.validators.utility_api_client import UtilityAPIClient - - -class QueryScheduler: - """ - Background scheduler that drives scoring queries from the utility API. - - Lifecycle per UTC hour: - 1. Poll utility API for (uid, search_type, question) items. - 2. Dispatch each as a fire-and-forget scoring query to the target miner. - 3. Save the miner's response in ScoringStore. - 4. On hour boundary → score the previous hour's responses (if not first epoch). - - The API returns 404 when all questions for the current hour are served; - we sleep until the next UTC hour begins and then resume polling. - """ - - def __init__( - self, - neuron, - utility_api: UtilityAPIClient, - scoring_store: ScoringStore, - validators: Dict, # {"ai_search": ..., "x_search": ..., "web_search": ...} - ): - self.neuron = neuron - self.utility_api = utility_api - self.scoring_store = scoring_store - self.validators = validators - - self.current_time_range: Optional[datetime] = None - self.min_request_interval_seconds = 4.1 - self._next_poll_at = 0.0 - - # Skip scoring on the first epoch boundary (incomplete responses) - self.is_first_epoch = True - - def _build_query(self, question_query: str, params: dict) -> dict: - """Format a query dict for the appropriate validator's send_scoring_query().""" - return {"query": question_query, **params} - - def _extract_prompt(self, response) -> str: - if isinstance(response, dict): - for key in ("prompt", "query", "content", "id"): - value = response.get(key) - if value: - return str(value) - urls = response.get("urls") - if urls: - return ", ".join(str(url) for url in urls) - return "" - - for key in ("prompt", "query", "content", "id"): - value = getattr(response, key, None) - if value: - return str(value) - - urls = getattr(response, "urls", None) - if urls: - return ", ".join(str(url) for url in urls) - - return "" - - async def _send_and_save( - self, - search_type: str, - uid: int, - query: dict, - time_range_start: datetime, - scoring_seed: Optional[int] = None, - ) -> None: - """Send one scoring query to a specific miner and persist the response.""" - try: - validator = self.validators[search_type] - response = await validator.send_scoring_query(query, uid=uid) - if response is not None: - await self.scoring_store.save_response( - time_range_start, - uid, - search_type, - response, - scoring_seed=scoring_seed, - ) - bt.logging.debug( - f"[QueryScheduler] Saved response uid={uid} type={search_type}" - ) - except Exception as e: - bt.logging.error( - f"[QueryScheduler] Scoring query failed uid={uid} type={search_type}: {e}" - ) - - async def _score_search_type( - self, - search_type: str, - items: list, - time_range_start: datetime, - ) -> None: - """Score one search type for a completed epoch.""" - validator = self.validators.get(search_type) - if validator is None or not items: - return - - uids = torch.tensor([item["uid"] for item in items]) - responses = [item["response"] for item in items] - scoring_seeds = [item.get("scoring_seed") for item in items] - prompts = [self._extract_prompt(response) for response in responses] - event = {} - - bt.logging.info( - f"[QueryScheduler] Scoring {search_type}: {len(items)} responses" - ) - - await validator.compute_rewards_and_penalties( - event=event, - prompts=prompts, - responses=responses, - uids=uids, - start_time=time.time(), - scoring_epoch_start=time_range_start, - scoring_seeds=scoring_seeds, - ) - - async def score_epoch(self, time_range_start: datetime) -> None: - """Load all responses for a completed hour and run reward/penalty computation.""" - try: - bt.logging.info( - f"[QueryScheduler] Scoring epoch {time_range_start.isoformat()}" - ) - all_responses = await self.scoring_store.get_all_for_range(time_range_start) - - if not all_responses: - bt.logging.warning( - f"[QueryScheduler] No responses for epoch " - f"{time_range_start.isoformat()}, skipping scoring." - ) - return - - score_tasks = [ - self._score_search_type(search_type, items, time_range_start) - for search_type, items in all_responses.items() - if self.validators.get(search_type) is not None and items - ] - - if not score_tasks: - bt.logging.warning( - f"[QueryScheduler] No scoreable responses for epoch " - f"{time_range_start.isoformat()}, skipping scoring." - ) - return - - results = await asyncio.gather(*score_tasks, return_exceptions=True) - - for result in results: - if isinstance(result, Exception): - bt.logging.error( - f"[QueryScheduler] Error scoring epoch task: {result}" - ) - - except Exception as e: - bt.logging.error(f"[QueryScheduler] Error in score_epoch: {e}") - - async def _wait_for_poll_window(self) -> None: - delay = self._next_poll_at - time.monotonic() - if delay > 0: - await asyncio.sleep(delay) - - def _schedule_next_poll(self, request_started_at: float) -> None: - self._next_poll_at = request_started_at + self.min_request_interval_seconds - - def _seconds_until_next_hour(self) -> float: - now = datetime.now(timezone.utc) - next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) - return max((next_hour - now).total_seconds() + 1, 1) - - async def run(self) -> None: - """Entry point — run as a long-lived asyncio task.""" - bt.logging.info("[QueryScheduler] Starting") - - while True: - request_started_at: Optional[float] = None - try: - await self._wait_for_poll_window() - request_started_at = time.monotonic() - item = await self.utility_api.fetch_next_question() - self._schedule_next_poll(request_started_at) - - time_range_start = item["time_range_start"] - if isinstance(time_range_start, str): - time_range_start = datetime.fromisoformat( - time_range_start.replace("Z", "+00:00") - ) - - uid: int = item["uid"] - search_type: str = item["search_type"] - question_query: str = item["question"]["query"] - params: dict = item["question"].get("params", {}) - scoring_seed: Optional[int] = item.get("scoring_seed") - - # Detect hour boundary - if ( - self.current_time_range is not None - and time_range_start != self.current_time_range - ): - bt.logging.info( - "[QueryScheduler] Hour boundary detected " - f"previous={self.current_time_range.isoformat()} " - f"next={time_range_start.isoformat()} " - f"is_first_epoch={self.is_first_epoch}" - ) - if not self.is_first_epoch: - asyncio.create_task(self.score_epoch(self.current_time_range)) - else: - bt.logging.info( - "[QueryScheduler] First epoch boundary — " - "skipping scoring (incomplete data)." - ) - self.is_first_epoch = False - - self.current_time_range = time_range_start - - # Dispatch scoring query in background - query = self._build_query(question_query, params) - - asyncio.create_task( - self._send_and_save( - search_type, - uid, - query, - time_range_start, - scoring_seed=scoring_seed, - ) - ) - - except Exception as e: - status = getattr(e, "status", None) - - if status == 404: - # All questions for this hour have been served - sleep_seconds = self._seconds_until_next_hour() - self._next_poll_at = 0.0 - bt.logging.info( - "[QueryScheduler] All questions served for current hour. " - f"Waiting {sleep_seconds:.1f}s for next UTC hour..." - ) - await asyncio.sleep(sleep_seconds) - elif status == 429: - # Unexpected rate limit — re-align to the server window. - if request_started_at is not None: - self._schedule_next_poll(request_started_at) - else: - self._next_poll_at = time.monotonic() + ( - self.min_request_interval_seconds - ) - await self._wait_for_poll_window() - else: - bt.logging.error( - "[QueryScheduler] Unexpected error while polling " - f"dataset/next current_time_range=" - f"{self.current_time_range.isoformat() if self.current_time_range else None} " - f"status={status}: {e}" - ) - await asyncio.sleep(5) diff --git a/neurons/validators/reward/search_content_relevance.py b/neurons/validators/reward/search_content_relevance.py index 2cccb484..b206aba4 100644 --- a/neurons/validators/reward/search_content_relevance.py +++ b/neurons/validators/reward/search_content_relevance.py @@ -10,7 +10,7 @@ from desearch.utils import clean_text from neurons.validators.base_validator import AbstractNeuron from neurons.validators.reward.reward_llm import RewardLLM -from neurons.validators.scrapingdog_scraper import scrape_links_with_retries +from neurons.validators.scrapers.scrapingdog_scraper import scrape_links_with_retries from neurons.validators.utils.prompts import ( SearchSummaryRelevancePrompt, ) diff --git a/neurons/validators/reward/web_basic_search_content_relevance.py b/neurons/validators/reward/web_basic_search_content_relevance.py index 8ae4033d..690af5d0 100644 --- a/neurons/validators/reward/web_basic_search_content_relevance.py +++ b/neurons/validators/reward/web_basic_search_content_relevance.py @@ -12,7 +12,7 @@ from desearch.protocol import WebSearchSynapse, WebSearchValidatorResult from desearch.utils import is_valid_web_search_result from neurons.validators.base_validator import AbstractNeuron -from neurons.validators.scrapingdog_scraper import ( +from neurons.validators.scrapers.scrapingdog_scraper import ( scrape_links_with_retries, ) diff --git a/neurons/validators/scoring_store.py b/neurons/validators/scoring_store.py deleted file mode 100644 index 370254fe..00000000 --- a/neurons/validators/scoring_store.py +++ /dev/null @@ -1,92 +0,0 @@ -from datetime import datetime -from typing import Any, Dict, List - -import jsonpickle - -from desearch.redis.redis_client import redis_client - -EXPIRY = 2 * 3600 # 2 hours - -SEARCH_TYPES = ["ai_search", "x_search", "web_search"] - - -class ScoringStore: - """ - Redis-backed store for scoring query responses. - - Organizes responses by (time_range_start, search_type) using Redis hashes. - Key format: scoring:{unix_ts}:{search_type} - Field: {uid} → jsonpickle-encoded response - - Responses expire after 2 hours. - """ - - KEY_PREFIX = "scoring" - - def _key(self, time_range_start: datetime, search_type: str) -> str: - unix_ts = int(time_range_start.timestamp()) - return f"{self.KEY_PREFIX}:{unix_ts}:{search_type}" - - async def save_response( - self, - time_range_start: datetime, - uid: int, - search_type: str, - response: Any, - scoring_seed: int | None = None, - ) -> None: - """Save a single miner response (with optional scoring seed) for later scoring.""" - - key = self._key(time_range_start, search_type) - data = jsonpickle.encode({"response": response, "scoring_seed": scoring_seed}) - pipeline = redis_client.pipeline() - pipeline.hset(key, str(uid), data) - pipeline.expire(key, EXPIRY) - await pipeline.execute() - - async def get_all_for_range( - self, time_range_start: datetime - ) -> Dict[str, List[Dict]]: - """ - Load all saved responses for a completed epoch. - - Returns: - { - "ai_search": [{"uid": int, "response": ...}, ...], - "x_search": [...], - "web_search": [...], - } - """ - - pipeline = redis_client.pipeline() - - for st in SEARCH_TYPES: - pipeline.hgetall(self._key(time_range_start, st)) - - raw_results = await pipeline.execute() - - result: Dict[str, List[Dict]] = {} - - for st, raw in zip(SEARCH_TYPES, raw_results): - items = [] - - for uid_str, encoded in raw.items(): - data = jsonpickle.decode(encoded) - if isinstance(data, dict) and "response" in data: - response = data["response"] - scoring_seed = data.get("scoring_seed") - else: - response = data - scoring_seed = None - items.append( - { - "uid": int(uid_str), - "response": response, - "scoring_seed": scoring_seed, - } - ) - - if items: - result[st] = items - - return result diff --git a/neurons/validators/scrapers/__init__.py b/neurons/validators/scrapers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/neurons/validators/advanced_scraper_validator.py b/neurons/validators/scrapers/advanced_scraper_validator.py similarity index 53% rename from neurons/validators/advanced_scraper_validator.py rename to neurons/validators/scrapers/advanced_scraper_validator.py index 2ea761e7..7dbb3326 100644 --- a/neurons/validators/advanced_scraper_validator.py +++ b/neurons/validators/scrapers/advanced_scraper_validator.py @@ -4,7 +4,6 @@ import bittensor as bt import torch -import wandb from desearch.dataset.date_filters import ( DateFilter, DateFilterType, @@ -21,7 +20,6 @@ from neurons.validators.base_validator import AbstractNeuron from neurons.validators.miner_response_logger import ( build_log_entry, - build_reward_payload, submit_logs_best_effort, ) from neurons.validators.penalty.miner_score_penalty import MinerScorePenaltyModel @@ -36,12 +34,15 @@ from neurons.validators.reward.twitter_content_relevance import ( TwitterContentRelevanceModel, ) +from neurons.validators.scrapers.base_scraper_validator import BaseScraperValidator -class AdvancedScraperValidator: - def __init__(self, neuron: AbstractNeuron): - self.neuron = neuron +class AdvancedScraperValidator(BaseScraperValidator): + search_type = "ai_search" + wandb_modality = "twitter_scrapper" + wandb_reward_keys = ["twitter_reward", "search_reward", "summary_reward"] + def __init__(self, neuron: AbstractNeuron): self.tools = [ ["Twitter Search", "Reddit Search"], ["Twitter Search", "Web Search"], @@ -89,59 +90,58 @@ def __init__(self, neuron: AbstractNeuron): # Init device. bt.logging.debug("loading", "device") bt.logging.debug( - "self.neuron.config.neuron.device = ", str(self.neuron.config.neuron.device) + "self.neuron.config.neuron.device = ", str(neuron.config.neuron.device) ) self.twitter_content_weight = 0.40 self.web_search_weight = 0.30 self.summary_relevance_weight = 0.30 - self.reward_weights = torch.tensor( + self.reward_llm = RewardLLM(neuron.config.neuron.scoring_model) + + reward_weights = torch.tensor( [ self.twitter_content_weight, self.web_search_weight, self.summary_relevance_weight, ], dtype=torch.float32, - ).to(self.neuron.config.neuron.device) - - if self.reward_weights.sum() != 1: - message = ( - f"Reward function weights do not sum to 1 (Current sum: {self.reward_weights.sum()}.)" - f"Check your reward config file at `reward/config.py` or ensure that all your cli reward flags sum to 1." - ) - bt.logging.error(message) - raise Exception(message) - - self.reward_llm = RewardLLM(self.neuron.config.neuron.scoring_model) + ) - self.reward_functions = [ + reward_functions = [ TwitterContentRelevanceModel( - device=self.neuron.config.neuron.device, + device=neuron.config.neuron.device, scoring_type=RewardScoringType.summary_relevance_score_template, llm_reward=self.reward_llm, - neuron=self.neuron, + neuron=neuron, ), WebSearchContentRelevanceModel( - device=self.neuron.config.neuron.device, + device=neuron.config.neuron.device, scoring_type=RewardScoringType.search_relevance_score_template, llm_reward=self.reward_llm, - neuron=self.neuron, + neuron=neuron, ), SummaryRelevanceRewardModel( - device=self.neuron.config.neuron.device, + device=neuron.config.neuron.device, scoring_type=RewardScoringType.summary_relevance_score_template, llm_reward=self.reward_llm, - neuron=self.neuron, + neuron=neuron, ), ] - self.penalty_functions = [ - StreamingPenaltyModel(max_penalty=1, neuron=self.neuron), - TimeoutPenaltyModel(max_penalty=1, neuron=self.neuron), - MinerScorePenaltyModel(max_penalty=1, neuron=self.neuron), + penalty_functions = [ + StreamingPenaltyModel(max_penalty=1, neuron=neuron), + TimeoutPenaltyModel(max_penalty=1, neuron=neuron), + MinerScorePenaltyModel(max_penalty=1, neuron=neuron), ] + super().__init__( + neuron=neuron, + reward_weights=reward_weights, + reward_functions=reward_functions, + penalty_functions=penalty_functions, + ) + async def call_miner( self, prompt: str, @@ -208,216 +208,31 @@ async def call_miner( return async_response, uids, start_time, axon - async def compute_rewards_and_penalties( - self, - event, - prompts: List[str], - responses, - uids, - start_time, - result_type: Optional[ResultType] = None, - scoring_epoch_start=None, - scoring_seeds=None, - ): - try: - if not len(uids): - bt.logging.warning("No UIDs provided for logging event.") - return - - # Attach scoring seeds to response objects so reward models can use - # them for deterministic random sampling across all validators. - if scoring_seeds: - for response, seed in zip(responses, scoring_seeds): - response.scoring_seed = seed - - bt.logging.info("Computing rewards and penalties") - - rewards = torch.zeros(len(responses), dtype=torch.float32).to( - self.neuron.config.neuron.device - ) - - all_rewards = [] - all_original_rewards = [] - val_score_responses_list = [] - - if result_type is None: - result_type = ResultType.LINKS_WITH_FINAL_SUMMARY - - for weight_i, reward_fn_i in zip( - self.reward_weights, self.reward_functions - ): - start_time = time.time() - ( - reward_i_normalized, - reward_event, - val_score_responses, - original_rewards, - ) = await reward_fn_i.apply(responses, uids) - - all_rewards.append(reward_i_normalized) - all_original_rewards.append(original_rewards) - val_score_responses_list.append(val_score_responses) - - rewards += weight_i * reward_i_normalized.to( - self.neuron.config.neuron.device - ) - if not self.neuron.config.neuron.disable_log_rewards: - event = {**event, **reward_event} - execution_time = time.time() - start_time - bt.logging.trace(str(reward_fn_i.name), reward_i_normalized.tolist()) - bt.logging.info( - f"Applied reward function: {reward_fn_i.name} in {execution_time / 60:.2f} minutes" - ) - - val_scores = [] - for val_score_responses, reward_function in zip( - val_score_responses_list, self.reward_functions - ): - if reward_function.name in [ - RewardModelType.twitter_content_relevance.value, - RewardModelType.search_content_relevance.value, - ]: - val_scores.append(val_score_responses) - - for penalty_fn_i in self.penalty_functions: - ( - raw_penalty_i, - adjusted_penalty_i, - applied_penalty_i, - ) = await penalty_fn_i.apply_penalties(responses, uids, val_scores) - penalty_start_time = time.time() - rewards *= applied_penalty_i.to(self.neuron.config.neuron.device) - penalty_execution_time = time.time() - penalty_start_time - if not self.neuron.config.neuron.disable_log_rewards: - event[penalty_fn_i.name + "_raw"] = raw_penalty_i.tolist() - event[penalty_fn_i.name + "_adjusted"] = adjusted_penalty_i.tolist() - event[penalty_fn_i.name + "_applied"] = applied_penalty_i.tolist() - bt.logging.trace(str(penalty_fn_i.name), applied_penalty_i.tolist()) - bt.logging.info( - f"Applied penalty function: {penalty_fn_i.name} in {penalty_execution_time:.2f} seconds" - ) - - await self.neuron.update_moving_averaged_scores(uids, rewards) - self.log_event(prompts, event, start_time, uids, rewards) - - scores = torch.zeros(len(self.neuron.metagraph.hotkeys)) - uid_scores_dict = {} - wandb_data = { - "modality": "twitter_scrapper", - "prompts": {}, - "responses": {}, - "scores": {}, - "timestamps": {}, - "summary_reward": {}, - "twitter_reward": {}, - "search_reward": {}, - } - bt.logging.info( - f"======================== Reward ===========================" - ) - # Initialize an empty list to accumulate log messages - log_messages = [] - for uid_tensor, reward, response in zip(uids, rewards.tolist(), responses): - uid = uid_tensor.item() - completion_length = ( - len(response.completion) if response.completion is not None else 0 - ) - # Accumulate log messages instead of logging them immediately - log_messages.append( - f"UID: {uid}, R: {round(reward, 3)}, C: {completion_length}" - ) - bt.logging.trace(f"{response.completion}") - - # Log the accumulated messages in groups of three - for i in range(0, len(log_messages), 3): - bt.logging.info(" | ".join(log_messages[i : i + 3])) - - bt.logging.info( - f"======================== Reward ===========================" - ) - - twitter_rewards = all_rewards[0] - search_rewards = all_rewards[1] - summary_rewards = all_rewards[2] - zipped_rewards = zip( - uids, - rewards.tolist(), - responses, - summary_rewards, - twitter_rewards, - search_rewards, - ) - - for ( - uid_tensor, - reward, - response, - summary_reward, - twitter_reward, - search_reward, - ) in zipped_rewards: - uid = uid_tensor.item() # Convert tensor to int - uid_scores_dict[uid] = reward - scores[uid] = reward # Now 'uid' is an int, which is a valid key type - wandb_data["scores"][uid] = reward - wandb_data["responses"][uid] = response.completion - wandb_data["prompts"][uid] = response.prompt - wandb_data["summary_reward"][uid] = summary_reward - wandb_data["twitter_reward"][uid] = twitter_reward - wandb_data["search_reward"][uid] = search_reward - - if self.neuron.config.wandb_on: - wandb.log(wandb_data) - - scoring_logs = [] - response_count = len(responses) - - for index, (uid_tensor, response, reward) in enumerate( - zip(uids, responses, rewards.tolist()) - ): - uid = uid_tensor.item() - reward_payload = build_reward_payload( - search_type="ai_search", - response_count=response_count, - index=index, - uid=uid, - total_reward=reward, - all_rewards=all_rewards, - all_original_rewards=all_original_rewards, - validator_scores=val_score_responses_list, - event=event, - ) - scoring_logs.append( - build_log_entry( - owner=self.neuron, - search_type="ai_search", - query_kind="scoring", - response=response, - miner_uid=uid, - total_reward=reward, - reward_payload=reward_payload, - scoring_epoch_start=scoring_epoch_start, - ) - ) - - submit_logs_best_effort(self.neuron, scoring_logs) - - return rewards, uids, val_score_responses_list, event, all_original_rewards - except Exception as e: - bt.logging.error(f"Error in compute_rewards_and_penalties: {e}") - raise e - - def log_event(self, prompts: List[str], event, start_time, uids, rewards): - event.update( - { - "step_length": time.time() - start_time, - "prompts": prompts, - "uids": uids.tolist(), - "rewards": rewards.tolist(), - } + def get_penalty_additional_params(self, val_score_responses_list): + val_scores = [] + for val_score_responses, reward_function in zip( + val_score_responses_list, self.reward_functions + ): + if reward_function.name in [ + RewardModelType.twitter_content_relevance.value, + RewardModelType.search_content_relevance.value, + ]: + val_scores.append(val_score_responses) + return val_scores + + def build_uid_log_message(self, uid, reward, response): + completion_length = ( + len(response.completion) if response.completion is not None else 0 ) - - bt.logging.debug("Run Task event:", event) + bt.logging.trace(f"{response.completion}") + return f"UID: {uid}, R: {round(reward, 3)}, C: {completion_length}" + + def populate_wandb_uid_data(self, wandb_data, uid, reward, response, reward_values): + wandb_data["scores"][uid] = reward + wandb_data["responses"][uid] = response.completion + wandb_data["prompts"][uid] = response.prompt + for key, value in zip(self.wandb_reward_keys, reward_values): + wandb_data[key][uid] = value async def send_scoring_query( self, diff --git a/neurons/validators/web_scraper_validator.py b/neurons/validators/scrapers/base_scraper_validator.py similarity index 56% rename from neurons/validators/web_scraper_validator.py rename to neurons/validators/scrapers/base_scraper_validator.py index 4d024642..5088e417 100644 --- a/neurons/validators/web_scraper_validator.py +++ b/neurons/validators/scrapers/base_scraper_validator.py @@ -1,46 +1,34 @@ import time -from typing import Any, Dict, List, Optional +from typing import List import bittensor as bt import torch - import wandb -from desearch.protocol import ( - WebSearchSynapse, -) + from neurons.validators.base_validator import AbstractNeuron from neurons.validators.miner_response_logger import ( build_log_entry, build_reward_payload, submit_logs_best_effort, ) -from neurons.validators.penalty.timeout_penalty import TimeoutPenaltyModel -from neurons.validators.reward import RewardScoringType -from neurons.validators.reward.web_basic_search_content_relevance import ( - WebBasicSearchContentRelevanceModel, -) - -class WebScraperValidator: - def __init__(self, neuron: AbstractNeuron): - self.neuron = neuron - self.timeout = 180 - self.max_execution_time = 10 - # Init device. - bt.logging.debug("loading", "device") - bt.logging.debug( - "self.neuron.config.neuron.device = ", str(self.neuron.config.neuron.device) - ) +class BaseScraperValidator: + # Subclasses must set these + search_type: str = "" + wandb_modality: str = "" + wandb_reward_keys: List[str] = [] - self.web_content_weight = 1.0 + def __init__( + self, + neuron: AbstractNeuron, + reward_weights: torch.Tensor, + reward_functions: list, + penalty_functions: list, + ): + self.neuron = neuron - self.reward_weights = torch.tensor( - [ - self.web_content_weight, - ], - dtype=torch.float32, - ).to(self.neuron.config.neuron.device) + self.reward_weights = reward_weights.to(self.neuron.config.neuron.device) if self.reward_weights.sum() != 1: message = ( @@ -50,42 +38,41 @@ def __init__(self, neuron: AbstractNeuron): bt.logging.error(message) raise Exception(message) - self.reward_functions = [ - WebBasicSearchContentRelevanceModel( - device=self.neuron.config.neuron.device, - scoring_type=RewardScoringType.search_relevance_score_template, - neuron=self.neuron, - ), - ] - - self.penalty_functions = [ - TimeoutPenaltyModel(max_penalty=1, neuron=self.neuron), - ] - - async def call_miner( - self, - prompt: str, - params: Dict[str, Any], - uid: Optional[int] = None, - ): - uid, axon = await self.neuron.get_random_miner(uid=uid) - - synapse = WebSearchSynapse( - **params, - query=prompt, - max_execution_time=self.max_execution_time, - ) - - dendrite = next(self.neuron.dendrites) - - response = await dendrite.call( - target_axon=axon, - synapse=synapse.model_copy(), - timeout=self.max_execution_time + 5, - deserialize=False, - ) - - return response, uid, axon + self.reward_functions = reward_functions + self.penalty_functions = penalty_functions + + def get_penalty_additional_params(self, val_score_responses_list): + """Override in subclasses that need to pass additional params to penalties (e.g. val_scores).""" + return None + + def build_uid_log_message(self, uid, reward, response): + """Override in subclasses that need custom per-UID log formatting.""" + return f"UID: {uid}, R: {round(reward, 3)}" + + def build_wandb_data(self, uids, rewards, responses, all_rewards): + """Build W&B logging data. Override for custom reward key mapping.""" + wandb_data = { + "modality": self.wandb_modality, + "prompts": {}, + "responses": {}, + "scores": {}, + "timestamps": {}, + } + for key in self.wandb_reward_keys: + wandb_data[key] = {} + return wandb_data + + def populate_wandb_uid_data(self, wandb_data, uid, reward, response, reward_values): + """Populate per-UID wandb data. Override for custom prompt/reward extraction.""" + wandb_data["scores"][uid] = reward + if hasattr(response, "query"): + wandb_data["prompts"][uid] = response.query + elif hasattr(response, "id"): + wandb_data["prompts"][uid] = response.id + elif hasattr(response, "urls"): + wandb_data["prompts"][uid] = response.urls + for key, value in zip(self.wandb_reward_keys, reward_values): + wandb_data[key][uid] = value async def compute_rewards_and_penalties( self, @@ -94,6 +81,7 @@ async def compute_rewards_and_penalties( responses, uids, start_time, + result_type=None, scoring_epoch_start=None, scoring_seeds=None, ): @@ -118,8 +106,6 @@ async def compute_rewards_and_penalties( all_original_rewards = [] val_score_responses_list = [] - bt.logging.trace(f"Received responses: {responses}") - for weight_i, reward_fn_i in zip( self.reward_weights, self.reward_functions ): @@ -146,12 +132,18 @@ async def compute_rewards_and_penalties( f"Applied reward function: {reward_fn_i.name} in {execution_time / 60:.2f} minutes" ) + penalty_additional_params = self.get_penalty_additional_params( + val_score_responses_list + ) + for penalty_fn_i in self.penalty_functions: ( raw_penalty_i, adjusted_penalty_i, applied_penalty_i, - ) = await penalty_fn_i.apply_penalties(responses, uids) + ) = await penalty_fn_i.apply_penalties( + responses, uids, penalty_additional_params + ) penalty_start_time = time.time() rewards *= applied_penalty_i.to(self.neuron.config.neuron.device) penalty_execution_time = time.time() - penalty_start_time @@ -169,14 +161,8 @@ async def compute_rewards_and_penalties( scores = torch.zeros(len(self.neuron.metagraph.hotkeys)) uid_scores_dict = {} - wandb_data = { - "modality": "web_scrapper", - "prompts": {}, - "responses": {}, - "scores": {}, - "timestamps": {}, - "search_reward": {}, - } + wandb_data = self.build_wandb_data(uids, rewards, responses, all_rewards) + bt.logging.info( f"======================== Reward ===========================" ) @@ -184,9 +170,7 @@ async def compute_rewards_and_penalties( log_messages = [] for uid_tensor, reward, response in zip(uids, rewards.tolist(), responses): uid = uid_tensor.item() - - # Accumulate log messages instead of logging them immediately - log_messages.append(f"UID: {uid}, R: {round(reward, 3)}") + log_messages.append(self.build_uid_log_message(uid, reward, response)) # Log the accumulated messages in groups of three for i in range(0, len(log_messages), 3): @@ -195,23 +179,30 @@ async def compute_rewards_and_penalties( bt.logging.info( f"======================== Reward ===========================" ) - bt.logging.info(f"this is a all reward {all_rewards} ") - search_rewards = all_rewards[0] - zipped_rewards = zip(uids, rewards.tolist(), responses, search_rewards) + # Build per-uid reward values for wandb + reward_values_per_uid = ( + list( + zip( + *[ + r.tolist() if hasattr(r, "tolist") else r + for r in all_rewards + ] + ) + ) + if all_rewards + else [() for _ in uids] + ) - for uid_tensor, reward, response, search_reward in zipped_rewards: - uid = uid_tensor.item() # Convert tensor to int + for uid_tensor, reward, response, reward_values in zip( + uids, rewards.tolist(), responses, reward_values_per_uid + ): + uid = uid_tensor.item() uid_scores_dict[uid] = reward - scores[uid] = reward # Now 'uid' is an int, which is a valid key type - wandb_data["scores"][uid] = reward - if hasattr(response, "query"): - wandb_data["prompts"][uid] = response.query - elif hasattr(response, "id"): - wandb_data["prompts"][uid] = response.id - elif hasattr(response, "urls"): - wandb_data["prompts"][uid] = response.urls - wandb_data["search_reward"][uid] = search_reward + scores[uid] = reward + self.populate_wandb_uid_data( + wandb_data, uid, reward, response, reward_values + ) if self.neuron.config.wandb_on: wandb.log(wandb_data) @@ -224,7 +215,7 @@ async def compute_rewards_and_penalties( ): uid = uid_tensor.item() reward_payload = build_reward_payload( - search_type="web_search", + search_type=self.search_type, response_count=response_count, index=index, uid=uid, @@ -237,7 +228,7 @@ async def compute_rewards_and_penalties( scoring_logs.append( build_log_entry( owner=self.neuron, - search_type="web_search", + search_type=self.search_type, query_kind="scoring", response=response, miner_uid=uid, @@ -265,56 +256,3 @@ def log_event(self, prompts: List[str], event, start_time, uids, rewards): ) bt.logging.debug("Run Task event:", event) - - async def send_scoring_query( - self, - query: dict, - uid: int, - ) -> Optional[object]: - """ - Send a scoring query to a specific miner and return the full synapse. - Called by QueryScheduler; awaits the full response without streaming. - """ - prompt = query.get("query", "") - params = {k: v for k, v in query.items() if k != "query"} - - response, _, _ = await self.call_miner(prompt=prompt, params=params, uid=uid) - return response - - async def organic( - self, - query, - uid: Optional[int] = None, - ): - """Receives question from user and returns the response from the miners.""" - - try: - prompt = query.get("query", "") - params = {key: value for key, value in query.items() if key != "query"} - - response, selected_uid, axon = await self.call_miner( - prompt=prompt, params=params, uid=uid - ) - - if response: - submit_logs_best_effort( - self.neuron, - [ - build_log_entry( - owner=self.neuron, - search_type="web_search", - query_kind="organic", - response=response, - miner_uid=selected_uid, - miner_hotkey=getattr(axon, "hotkey", None), - miner_coldkey=getattr(axon, "coldkey", None), - ) - ], - ) - yield response - else: - bt.logging.warning("Invalid response for UID: Unknown") - - except Exception as e: - bt.logging.error(f"Error in organic: {e}") - raise e diff --git a/neurons/validators/scrapingdog_scraper.py b/neurons/validators/scrapers/scrapingdog_scraper.py similarity index 99% rename from neurons/validators/scrapingdog_scraper.py rename to neurons/validators/scrapers/scrapingdog_scraper.py index 13860f2b..05f88c51 100644 --- a/neurons/validators/scrapingdog_scraper.py +++ b/neurons/validators/scrapers/scrapingdog_scraper.py @@ -343,8 +343,7 @@ async def scrape_links_with_retries( ) except Exception as exc: bt.logging.warning( - "Apify fallback is unavailable for validator web scraping: " - f"{exc}" + f"Apify fallback is unavailable for validator web scraping: {exc}" ) return [], list(dict.fromkeys(urls)) diff --git a/neurons/validators/scrapers/web_scraper_validator.py b/neurons/validators/scrapers/web_scraper_validator.py new file mode 100644 index 00000000..5a5edc9c --- /dev/null +++ b/neurons/validators/scrapers/web_scraper_validator.py @@ -0,0 +1,141 @@ +from typing import Any, Dict, Optional + +import bittensor as bt +import torch + +from desearch.protocol import ( + WebSearchSynapse, +) +from neurons.validators.base_validator import AbstractNeuron +from neurons.validators.miner_response_logger import ( + build_log_entry, + submit_logs_best_effort, +) +from neurons.validators.penalty.timeout_penalty import TimeoutPenaltyModel +from neurons.validators.reward import RewardScoringType +from neurons.validators.reward.web_basic_search_content_relevance import ( + WebBasicSearchContentRelevanceModel, +) +from neurons.validators.scrapers.base_scraper_validator import BaseScraperValidator + + +class WebScraperValidator(BaseScraperValidator): + search_type = "web_search" + wandb_modality = "web_scrapper" + wandb_reward_keys = ["search_reward"] + + def __init__(self, neuron: AbstractNeuron): + self.timeout = 180 + self.max_execution_time = 10 + + # Init device. + bt.logging.debug("loading", "device") + bt.logging.debug( + "self.neuron.config.neuron.device = ", str(neuron.config.neuron.device) + ) + + self.web_content_weight = 1.0 + + reward_weights = torch.tensor( + [ + self.web_content_weight, + ], + dtype=torch.float32, + ) + + reward_functions = [ + WebBasicSearchContentRelevanceModel( + device=neuron.config.neuron.device, + scoring_type=RewardScoringType.search_relevance_score_template, + neuron=neuron, + ), + ] + + penalty_functions = [ + TimeoutPenaltyModel(max_penalty=1, neuron=neuron), + ] + + super().__init__( + neuron=neuron, + reward_weights=reward_weights, + reward_functions=reward_functions, + penalty_functions=penalty_functions, + ) + + async def call_miner( + self, + prompt: str, + params: Dict[str, Any], + uid: Optional[int] = None, + ): + uid, axon = await self.neuron.get_random_miner(uid=uid) + + synapse = WebSearchSynapse( + **params, + query=prompt, + max_execution_time=self.max_execution_time, + ) + + dendrite = next(self.neuron.dendrites) + + response = await dendrite.call( + target_axon=axon, + synapse=synapse.model_copy(), + timeout=self.max_execution_time + 5, + deserialize=False, + ) + + return response, uid, axon + + async def send_scoring_query( + self, + query: dict, + uid: int, + ) -> Optional[object]: + """ + Send a scoring query to a specific miner and return the full synapse. + Called by QueryScheduler; awaits the full response without streaming. + """ + prompt = query.get("query", "") + params = {k: v for k, v in query.items() if k != "query"} + + response, _, _ = await self.call_miner(prompt=prompt, params=params, uid=uid) + return response + + async def organic( + self, + query, + uid: Optional[int] = None, + ): + """Receives question from user and returns the response from the miners.""" + + try: + prompt = query.get("query", "") + params = {key: value for key, value in query.items() if key != "query"} + + response, selected_uid, axon = await self.call_miner( + prompt=prompt, params=params, uid=uid + ) + + if response: + submit_logs_best_effort( + self.neuron, + [ + build_log_entry( + owner=self.neuron, + search_type="web_search", + query_kind="organic", + response=response, + miner_uid=selected_uid, + miner_hotkey=getattr(axon, "hotkey", None), + miner_coldkey=getattr(axon, "coldkey", None), + ) + ], + ) + yield response + else: + bt.logging.warning("Invalid response for UID: Unknown") + + except Exception as e: + bt.logging.error(f"Error in organic: {e}") + raise e diff --git a/neurons/validators/scrapers/x_scraper_validator.py b/neurons/validators/scrapers/x_scraper_validator.py new file mode 100644 index 00000000..0d2f3303 --- /dev/null +++ b/neurons/validators/scrapers/x_scraper_validator.py @@ -0,0 +1,247 @@ +from typing import Any, Dict, List, Optional + +import bittensor as bt +import torch + +from desearch.protocol import ( + TwitterIDSearchSynapse, + TwitterSearchSynapse, + TwitterURLsSearchSynapse, +) +from neurons.validators.base_validator import AbstractNeuron +from neurons.validators.miner_response_logger import ( + build_log_entry, + submit_logs_best_effort, +) +from neurons.validators.penalty.timeout_penalty import TimeoutPenaltyModel +from neurons.validators.penalty.twitter_count_penalty import TwitterCountPenaltyModel +from neurons.validators.reward import RewardScoringType +from neurons.validators.reward.twitter_basic_search_content_relevance import ( + TwitterBasicSearchContentRelevanceModel, +) +from neurons.validators.scrapers.base_scraper_validator import BaseScraperValidator + + +class XScraperValidator(BaseScraperValidator): + search_type = "x_search" + wandb_modality = "twitter_scrapper" + wandb_reward_keys = ["twitter_reward"] + + def __init__(self, neuron: AbstractNeuron): + self.timeout = 180 + self.max_execution_time = 10 + + # Init device. + bt.logging.debug("loading", "device") + bt.logging.debug( + "self.neuron.config.neuron.device = ", str(neuron.config.neuron.device) + ) + + self.twitter_content_weight = 1.0 + + reward_weights = torch.tensor( + [ + self.twitter_content_weight, + ], + dtype=torch.float32, + ) + + reward_functions = [ + TwitterBasicSearchContentRelevanceModel( + device=neuron.config.neuron.device, + scoring_type=RewardScoringType.search_relevance_score_template, + neuron=neuron, + ), + ] + + penalty_functions = [ + TimeoutPenaltyModel(max_penalty=1, neuron=neuron), + TwitterCountPenaltyModel(max_penalty=1, neuron=neuron), + ] + + super().__init__( + neuron=neuron, + reward_weights=reward_weights, + reward_functions=reward_functions, + penalty_functions=penalty_functions, + ) + + def calc_max_execution_time(self, count): + if not count or count <= 20: + return self.max_execution_time + + return self.max_execution_time + int((count - 20) / 20) * 5 + + async def call_miner( + self, + prompt: str, + params: Dict[str, Any], + uid: Optional[int] = None, + ): + uid, axon = await self.neuron.get_random_miner(uid=uid) + + synapse = TwitterSearchSynapse( + **params, + query=prompt, + max_execution_time=self.calc_max_execution_time(params.get("count")), + ) + + dendrite = next(self.neuron.dendrites) + + response = await dendrite.call( + target_axon=axon, + synapse=synapse.model_copy(), + timeout=synapse.max_execution_time + 5, + deserialize=False, + ) + + return response, uid, axon + + async def send_scoring_query( + self, + query: dict, + uid: int, + ) -> Optional[object]: + """ + Send a scoring query to a specific miner and return the full synapse. + Called by QueryScheduler; awaits the full response without streaming. + """ + prompt = query.get("query", "") + params = {k: v for k, v in query.items() if k != "query"} + + response, _, _ = await self.call_miner(prompt=prompt, params=params, uid=uid) + return response + + async def x_search( + self, + query, + uid: Optional[int] = None, + ): + """Receives question from user and returns the response from the miners.""" + + try: + prompt = query.get("query", "") + params = {key: value for key, value in query.items() if key != "query"} + + response, selected_uid, axon = await self.call_miner( + prompt=prompt, params=params, uid=uid + ) + + if response: + self._save_organic_log( + response=response, + miner_uid=selected_uid, + axon=axon, + search_type="x_search", + ) + yield response + else: + bt.logging.warning("Invalid response for UID: Unknown") + + except Exception as e: + bt.logging.error(f"Error in organic: {e}") + raise e + + async def x_post_by_id( + self, + tweet_id: str, + uid: Optional[int] = None, + ): + """ + Perform a Twitter search using a specific tweet ID. + """ + + try: + uid, axon = await self.neuron.get_random_miner(uid=uid) + + synapse = TwitterIDSearchSynapse( + id=tweet_id, + max_execution_time=self.max_execution_time, + validator_tweets=[], + results=[], + ) + + timeout = self.max_execution_time + 5 + + dendrite = next(self.neuron.dendrites) + + synapse: TwitterIDSearchSynapse = await dendrite.call( + target_axon=axon, + synapse=synapse, + timeout=timeout, + deserialize=False, + ) + + self._save_organic_log( + response=synapse, + miner_uid=uid, + axon=axon, + search_type="x_post_by_id", + ) + + return synapse.results + except Exception as e: + bt.logging.error(f"Error in ID search: {e}") + raise e + + async def x_posts_by_urls( + self, + urls: List[str], + uid: Optional[int] = None, + ): + """ + Perform a Twitter search using multiple tweet URLs. + """ + + try: + bt.logging.debug("run_task", "twitter urls search") + + uid, axon = await self.neuron.get_random_miner(uid=uid) + + synapse = TwitterURLsSearchSynapse( + urls=urls, + max_execution_time=self.calc_max_execution_time(len(urls)), + validator_tweets=[], + results=[], + ) + + timeout = synapse.max_execution_time + 5 + + dendrite = next(self.neuron.dendrites) + + synapse: TwitterURLsSearchSynapse = await dendrite.call( + target_axon=axon, + synapse=synapse, + timeout=timeout, + deserialize=False, + ) + + self._save_organic_log( + response=synapse, + miner_uid=uid, + axon=axon, + search_type="x_posts_by_urls", + ) + + return synapse.results + except Exception as e: + bt.logging.error(f"Error in URLs search: {e}") + raise e + + def _save_organic_log( + self, response, miner_uid: int, axon, search_type: str + ) -> None: + submit_logs_best_effort( + self.neuron, + [ + build_log_entry( + owner=self.neuron, + search_type=search_type, + query_kind="organic", + response=response, + miner_uid=miner_uid, + miner_hotkey=getattr(axon, "hotkey", None), + miner_coldkey=getattr(axon, "coldkey", None), + ) + ], + ) diff --git a/neurons/validators/service/__init__.py b/neurons/validators/service/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/neurons/validators/service/query_scheduler.py b/neurons/validators/service/query_scheduler.py new file mode 100644 index 00000000..9ec0745d --- /dev/null +++ b/neurons/validators/service/query_scheduler.py @@ -0,0 +1,412 @@ +import asyncio +import time +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Dict, Optional + +import bittensor as bt +import torch + +from neurons.validators.service.scoring_dataset import ( + SCORING_CONCURRENCY, + ScoringAssignment, + build_question_pool, + build_scoring_assignments, + current_scoring_window, + filter_scoring_assignments, +) +from neurons.validators.service.seed_commitment import ( + DEFAULT_BUCKET_COMMITMENT_REVEAL_DELAY_SECONDS, + WindowBucketState, + WindowSeedState, + build_window_bucket_state, + build_window_seed_state, + load_window_bucket_state, + load_window_seed_state, +) + +if TYPE_CHECKING: + from neurons.validators.service.scoring_store import ScoringStore + +SCORING_STORAGE_PUBLISH_OFFSET = timedelta(minutes=30) + + +class QueryScheduler: + """ + Background scheduler that drives scoring queries from a local Hugging Face dataset. + + Lifecycle per UTC hour: + 1. Load or reuse the local dataset question pool. + 2. Publish this validator's seed commitment. + 3. After the reveal delay, fetch current validator commitments and derive a + shared combined seed. + 4. Split miner UIDs deterministically across committed validators. + 5. Between `HH:00` and `HH:30`, execute this validator's owned ai/x/web + scoring queries and save the responses in public object storage. + 6. At `HH:30`, publish this validator's bucket locator, wait for bucket + commitments, then read the expected stored responses and score them. + + The Utility API remains available for log submission and backward compatibility, + but scheduling no longer depends on `/dataset/next`. + """ + + def __init__( + self, + neuron, + scoring_store: "ScoringStore", + validators: Dict, # {"ai_search": ..., "x_search": ..., "web_search": ...} + ): + self.neuron = neuron + self.scoring_store = scoring_store + self.search_validators = validators + self.question_pool = build_question_pool() + self.scoring_concurrency = SCORING_CONCURRENCY + + self.current_time_range: Optional[datetime] = None + self.window_assignments: dict[datetime, tuple[ScoringAssignment, ...]] = {} + self.window_bucket_states: dict[datetime, WindowBucketState] = {} + + def _build_query(self, question_query: str, params: dict) -> dict: + return {"query": question_query, **params} + + def _extract_prompt(self, response) -> str: + if isinstance(response, dict): + for key in ("prompt", "query", "content", "id"): + value = response.get(key) + if value: + return str(value) + urls = response.get("urls") + if urls: + return ", ".join(str(url) for url in urls) + return "" + + for key in ("prompt", "query", "content", "id"): + value = getattr(response, key, None) + if value: + return str(value) + + urls = getattr(response, "urls", None) + if urls: + return ", ".join(str(url) for url in urls) + + return "" + + async def _send_and_save(self, assignment: ScoringAssignment) -> None: + try: + validator = self.search_validators[assignment.search_type] + query = self._build_query( + assignment.question.query, + assignment.question.params, + ) + response = await validator.send_scoring_query(query, uid=assignment.uid) + if response is not None: + await self.scoring_store.save_response(assignment, response) + bt.logging.debug( + "[QueryScheduler] Saved response " + f"uid={assignment.uid} type={assignment.search_type}" + ) + except Exception as e: + bt.logging.error( + "[QueryScheduler] Scoring query failed " + f"uid={assignment.uid} type={assignment.search_type}: {e}" + ) + + async def _run_assignment( + self, + assignment: ScoringAssignment, + semaphore: asyncio.Semaphore, + ) -> None: + async with semaphore: + await self._send_and_save(assignment) + + async def _score_search_type( + self, + search_type: str, + items: list, + time_range_start: datetime, + ) -> None: + validator = self.search_validators.get(search_type) + if validator is None or not items: + return + + uids = torch.tensor([item["uid"] for item in items]) + responses = [item["response"] for item in items] + scoring_seeds = [item.get("scoring_seed") for item in items] + prompts = [self._extract_prompt(response) for response in responses] + event = {} + + bt.logging.info( + f"[QueryScheduler] Scoring {search_type}: {len(items)} responses" + ) + + await validator.compute_rewards_and_penalties( + event=event, + prompts=prompts, + responses=responses, + uids=uids, + start_time=time.time(), + scoring_epoch_start=time_range_start, + scoring_seeds=scoring_seeds, + ) + + async def score_epoch(self, time_range_start: datetime) -> None: + try: + bt.logging.info( + f"[QueryScheduler] Scoring epoch {time_range_start.isoformat()}" + ) + assignments = await self._get_window_assignments(time_range_start) + all_responses = await self.scoring_store.get_all_for_assignments( + assignments, + bucket_locators=( + await self._get_window_bucket_state(time_range_start) + ).bucket_locators, + ) + + if not all_responses: + bt.logging.warning( + "[QueryScheduler] No responses for epoch " + f"{time_range_start.isoformat()}, skipping scoring." + ) + return + + score_tasks = [ + self._score_search_type(search_type, items, time_range_start) + for search_type, items in all_responses.items() + if self.search_validators.get(search_type) is not None and items + ] + + if not score_tasks: + bt.logging.warning( + "[QueryScheduler] No scoreable responses for epoch " + f"{time_range_start.isoformat()}, skipping scoring." + ) + return + + results = await asyncio.gather(*score_tasks, return_exceptions=True) + + for result in results: + if isinstance(result, Exception): + bt.logging.error( + f"[QueryScheduler] Error scoring epoch task: {result}" + ) + + except Exception as e: + bt.logging.error(f"[QueryScheduler] Error in score_epoch: {e}") + + def _collect_miner_uids(self) -> list[int]: + neurons = getattr(self.neuron.metagraph, "neurons", None) + if neurons is not None: + return sorted( + int(neuron.uid) + for neuron in neurons + if not getattr(neuron, "validator_permit", False) + ) + + metagraph_uids = self.neuron.metagraph.uids + if hasattr(metagraph_uids, "tolist"): + return [int(uid) for uid in metagraph_uids.tolist()] + + return [int(uid) for uid in metagraph_uids] + + def _store_window_assignments( + self, + time_range_start: datetime, + assignments: list[ScoringAssignment], + ) -> None: + self.window_assignments[time_range_start] = tuple(assignments) + + cutoff = time_range_start - timedelta(hours=2) + self.window_assignments = { + current_window: items + for current_window, items in self.window_assignments.items() + if current_window >= cutoff + } + self.window_bucket_states = { + current_window: items + for current_window, items in self.window_bucket_states.items() + if current_window >= cutoff + } + + def _build_assignments_for_window( + self, + *, + time_range_start: datetime, + window_state: WindowSeedState, + ) -> list[ScoringAssignment]: + if window_state.validator_count == 0: + return [] + + assignments = build_scoring_assignments( + time_range_start=time_range_start, + miner_uids=self._collect_miner_uids(), + validators=window_state.committed_validators, + question_pool=self.question_pool, + combined_seed=window_state.combined_seed, + ) + self._store_window_assignments(time_range_start, assignments) + return assignments + + async def _get_window_assignments( + self, + time_range_start: datetime, + ) -> list[ScoringAssignment]: + cached_assignments = self.window_assignments.get(time_range_start) + if cached_assignments is not None: + return list(cached_assignments) + + active_validators = await self.neuron.get_validators() + window_state = await load_window_seed_state( + subtensor=self.neuron.subtensor, + netuid=self.neuron.config.netuid, + validators=active_validators, + time_range_start=time_range_start, + ) + + assignments = self._build_assignments_for_window( + time_range_start=time_range_start, + window_state=window_state, + ) + + if assignments: + bt.logging.info( + "[QueryScheduler] Rebuilt missing window assignments " + f"time_range={time_range_start.isoformat()} " + f"validators={window_state.validator_count} " + f"assignments_total={len(assignments)}" + ) + + return assignments + + def _store_window_bucket_state( + self, + time_range_start: datetime, + bucket_state: WindowBucketState, + ) -> None: + self.window_bucket_states[time_range_start] = bucket_state + + async def _get_window_bucket_state( + self, + time_range_start: datetime, + ) -> WindowBucketState: + cached_bucket_state = self.window_bucket_states.get(time_range_start) + if cached_bucket_state is not None: + return cached_bucket_state + + active_validators = await self.neuron.get_validators() + bucket_state = await load_window_bucket_state( + subtensor=self.neuron.subtensor, + netuid=self.neuron.config.netuid, + validators=active_validators, + time_range_start=time_range_start, + ) + self._store_window_bucket_state(time_range_start, bucket_state) + return bucket_state + + async def _sleep_until_score_phase(self, time_range_start: datetime) -> None: + score_phase_start = time_range_start + SCORING_STORAGE_PUBLISH_OFFSET + delay = (score_phase_start - datetime.now(timezone.utc)).total_seconds() + if delay > 0: + await asyncio.sleep(delay) + + async def _run_window(self, time_range_start: datetime) -> None: + active_validators = await self.neuron.get_validators() + score_phase_start = time_range_start + SCORING_STORAGE_PUBLISH_OFFSET + + window_state = await build_window_seed_state( + subtensor=self.neuron.subtensor, + wallet=self.neuron.wallet, + netuid=self.neuron.config.netuid, + uid=self.neuron.uid, + validators=active_validators, + time_range_start=time_range_start, + ) + + if window_state.validator_count == 0: + bt.logging.warning( + "[QueryScheduler] No validator commitments available for " + f"{time_range_start.isoformat()}, skipping window." + ) + return + + assignments = self._build_assignments_for_window( + time_range_start=time_range_start, + window_state=window_state, + ) + local_assignments = filter_scoring_assignments( + assignments, + validator_uid=self.neuron.uid, + ) + + bt.logging.info( + "[QueryScheduler] Built local scoring plan " + f"time_range={time_range_start.isoformat()} " + f"validators={window_state.validator_count} " + f"assignments_total={len(assignments)} " + f"local_assignments={len(local_assignments)} " + f"combined_seed={window_state.combined_seed}" + ) + + if local_assignments and datetime.now(timezone.utc) < score_phase_start: + semaphore = asyncio.Semaphore(self.scoring_concurrency) + + tasks = [ + asyncio.create_task(self._run_assignment(assignment, semaphore)) + for assignment in local_assignments + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + + for result in results: + if isinstance(result, Exception): + bt.logging.error(f"[QueryScheduler] Window task failed: {result}") + elif local_assignments: + bt.logging.warning( + "[QueryScheduler] Query phase already ended for " + f"{time_range_start.isoformat()}, skipping late local scoring queries." + ) + + await self._sleep_until_score_phase(time_range_start) + + bucket_state = await build_window_bucket_state( + subtensor=self.neuron.subtensor, + wallet=self.neuron.wallet, + netuid=self.neuron.config.netuid, + uid=self.neuron.uid, + validators=active_validators, + time_range_start=time_range_start, + bucket_locator=self.scoring_store.bucket_locator, + publish_offset=SCORING_STORAGE_PUBLISH_OFFSET, + reveal_delay_seconds=DEFAULT_BUCKET_COMMITMENT_REVEAL_DELAY_SECONDS, + ) + self._store_window_bucket_state(time_range_start, bucket_state) + + if bucket_state.validator_count == 0: + bt.logging.warning( + "[QueryScheduler] No validator bucket commitments available for " + f"{time_range_start.isoformat()}, skipping scoring." + ) + return + + await self.score_epoch(time_range_start) + + async def _sleep_until_next_window(self, time_range_start: datetime) -> None: + next_window_start = time_range_start + timedelta(hours=1) + delay = (next_window_start - datetime.now(timezone.utc)).total_seconds() + if delay > 0: + await asyncio.sleep(delay) + + async def run(self) -> None: + bt.logging.info("[QueryScheduler] Starting") + await self.question_pool.initialize() + + while True: + try: + window_start = current_scoring_window() + self.current_time_range = window_start + await self._run_window(window_start) + await self._sleep_until_next_window(window_start) + except Exception as e: + bt.logging.error( + "[QueryScheduler] Unexpected scheduler error " + f"time_range={self.current_time_range.isoformat() if self.current_time_range else None}: {e}" + ) + await asyncio.sleep(5) diff --git a/neurons/validators/service/scoring_dataset.py b/neurons/validators/service/scoring_dataset.py new file mode 100644 index 00000000..e5f087b6 --- /dev/null +++ b/neurons/validators/service/scoring_dataset.py @@ -0,0 +1,402 @@ +import asyncio +import hashlib +import random +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any, Iterable, Sequence + +SCORING_DATASET_NAME = "sentence-transformers/natural-questions" +SCORING_DATASET_CONFIG = "pair" +SCORING_DATASET_SPLIT = "train" +SCORING_DATASET_COLUMN = "query" +SCORING_DATASET_FALLBACK_COLUMNS: tuple[str, ...] = ("question",) +SCORING_DATASET_MAX_ROWS: int | None = None +SCORING_CONCURRENCY = 10 + +SCORING_SEARCH_TYPES: tuple[str, ...] = ("ai_search", "x_search", "web_search") + +AI_SEARCH_TOOLS: list[list[str]] = [ + ["Twitter Search", "Reddit Search"], + ["Twitter Search", "Web Search"], + ["Twitter Search", "Web Search"], + ["Twitter Search", "Web Search"], + ["Twitter Search", "Web Search"], + ["Twitter Search", "Hacker News Search"], + ["Twitter Search", "Hacker News Search"], + ["Twitter Search", "Youtube Search"], + ["Twitter Search", "Youtube Search"], + ["Twitter Search", "Youtube Search"], + ["Twitter Search", "Web Search"], + ["Twitter Search", "Reddit Search"], + ["Twitter Search", "Reddit Search"], + ["Twitter Search", "Hacker News Search"], + ["Twitter Search", "ArXiv Search"], + ["Twitter Search", "ArXiv Search"], + ["Twitter Search", "Wikipedia Search"], + ["Twitter Search", "Wikipedia Search"], + ["Twitter Search", "Web Search"], + ["Twitter Search", "Web Search"], + ["Twitter Search", "Web Search"], + ["Web Search"], + ["Reddit Search"], + ["Hacker News Search"], + ["Youtube Search"], + ["ArXiv Search"], + ["Wikipedia Search"], + ["Twitter Search", "Youtube Search", "ArXiv Search", "Wikipedia Search"], + ["Twitter Search", "Web Search", "Reddit Search", "Hacker News Search"], + [ + "Twitter Search", + "Web Search", + "Reddit Search", + "Hacker News Search", + "Youtube Search", + "ArXiv Search", + "Wikipedia Search", + ], +] + +AI_SEARCH_DATE_FILTERS: list[str] = ( + ["PAST_24_HOURS"] * 4 + + ["PAST_2_DAYS"] * 5 + + ["PAST_WEEK"] * 5 + + ["PAST_2_WEEKS"] * 5 + + ["PAST_MONTH"] + + ["PAST_YEAR"] +) + +X_SEARCH_PARAM_FIELDS: tuple[str, ...] = ( + "sort", + "is_quote", + "is_video", + "is_image", + "min_retweets", + "min_replies", + "min_likes", + "date_range", +) + +THREE_YEARS_IN_DAYS = 3 * 365 + + +def current_scoring_window() -> datetime: + now = datetime.now(timezone.utc) + return now.replace(minute=0, second=0, microsecond=0) + + +def derive_deterministic_int(*parts: Any, bits: int = 63) -> int: + payload = "|".join(str(part) for part in parts).encode("utf-8") + digest = hashlib.sha256(payload).digest() + value = int.from_bytes(digest[:8], "big") + mask = (1 << bits) - 1 + return value & mask + + +def normalize_questions(rows: Iterable[Any]) -> list[str]: + unique_questions: list[str] = [] + seen: set[str] = set() + + for row in rows: + if row is None: + continue + + question = str(row).strip() + if not question or question in seen: + continue + + seen.add(question) + unique_questions.append(question) + + return unique_questions + + +def generate_ai_search_params(rng: random.Random) -> dict[str, Any]: + return { + "tools": list(rng.choice(AI_SEARCH_TOOLS)), + "date_filter_type": rng.choice(AI_SEARCH_DATE_FILTERS), + } + + +def generate_x_search_params( + rng: random.Random, + *, + base_time: datetime, +) -> dict[str, Any]: + selected_field = rng.choice(X_SEARCH_PARAM_FIELDS) + params: dict[str, Any] = {} + + if selected_field == "sort": + params["sort"] = "Latest" + elif selected_field == "date_range": + end_date = base_time - timedelta(days=rng.randint(0, THREE_YEARS_IN_DAYS)) + start_date = end_date - timedelta(days=rng.randint(7, 14)) + params["start_date"] = start_date.strftime("%Y-%m-%d_%H:%M:%S_UTC") + params["end_date"] = end_date.strftime("%Y-%m-%d_%H:%M:%S_UTC") + elif selected_field == "is_video": + params["is_video"] = rng.choice([True, False]) + elif selected_field == "is_image": + params["is_image"] = rng.choice([True, False]) + elif selected_field == "is_quote": + params["is_quote"] = rng.choice([True, False]) + elif selected_field == "min_likes": + params["min_likes"] = rng.randint(5, 100) + elif selected_field == "min_replies": + params["min_replies"] = rng.randint(5, 20) + elif selected_field == "min_retweets": + params["min_retweets"] = rng.randint(5, 20) + + return params + + +def generate_params_for( + search_type: str, + *, + window_seed: int, + time_range_start: datetime, + uid: int | None = None, +) -> dict[str, Any]: + if search_type == "ai_search": + rng = random.Random( + derive_deterministic_int( + window_seed, time_range_start.isoformat(), search_type + ) + ) + return generate_ai_search_params(rng) + + if search_type == "x_search": + rng = random.Random( + derive_deterministic_int( + window_seed, + time_range_start.isoformat(), + search_type, + uid, + ) + ) + return generate_x_search_params( + rng, + base_time=time_range_start.astimezone(timezone.utc), + ) + + return {} + + +@dataclass(frozen=True) +class ScoringQuestion: + query: str + params: dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class ScoringAssignment: + time_range_start: datetime + uid: int + search_type: str + validator_uid: int + validator_hotkey: str + question: ScoringQuestion + scoring_seed: int + + +class HuggingFaceQuestionPool: + def __init__( + self, + *, + dataset_name: str, + dataset_config: str | None, + split: str, + question_column: str, + fallback_question_columns: Sequence[str] = ("question",), + max_rows: int | None = None, + ): + self.dataset_name = dataset_name + self.dataset_config = dataset_config + self.split = split + self.question_column = question_column + self.fallback_question_columns = tuple(fallback_question_columns) + self.max_rows = max_rows + self._questions: list[str] = [] + + @property + def questions(self) -> list[str]: + return list(self._questions) + + async def initialize(self) -> None: + if self._questions: + return + + self._questions = await asyncio.to_thread(self._load_questions) + + def _load_questions(self) -> list[str]: + try: + from datasets import load_dataset + except ImportError as exc: + raise RuntimeError( + "The 'datasets' package is required for validator-local scoring " + "datasets. Install it before running the validator." + ) from exc + + load_args: list[str] = [self.dataset_name] + if self.dataset_config: + load_args.append(self.dataset_config) + + dataset = load_dataset(*load_args, split=self.split) + + candidate_columns = (self.question_column, *self.fallback_question_columns) + selected_column = next( + (column for column in candidate_columns if column in dataset.column_names), + None, + ) + if selected_column is None: + raise ValueError( + "Unable to find a question column in the dataset. " + f"Tried {candidate_columns}, available columns are {dataset.column_names}." + ) + + rows = dataset[selected_column] + if self.max_rows is not None: + rows = rows[: self.max_rows] + + questions = normalize_questions(rows) + if not questions: + raise ValueError( + f"Dataset {self.dataset_name!r} did not yield any usable questions." + ) + + return questions + + def get_questions_for(self, search_type: str) -> list[str]: + if search_type not in SCORING_SEARCH_TYPES: + raise ValueError(f"Unsupported search type: {search_type}") + if not self._questions: + raise RuntimeError("Question pool is not initialized yet.") + return self.questions + + +def build_question_pool() -> HuggingFaceQuestionPool: + return HuggingFaceQuestionPool( + dataset_name=SCORING_DATASET_NAME, + dataset_config=SCORING_DATASET_CONFIG, + split=SCORING_DATASET_SPLIT, + question_column=SCORING_DATASET_COLUMN, + fallback_question_columns=SCORING_DATASET_FALLBACK_COLUMNS, + max_rows=SCORING_DATASET_MAX_ROWS, + ) + + +def build_scoring_assignments( + *, + time_range_start: datetime, + miner_uids: Sequence[int], + validators: Sequence[Any], + question_pool: HuggingFaceQuestionPool, + combined_seed: int, +) -> list[ScoringAssignment]: + if not miner_uids or not validators: + return [] + + assignments: list[ScoringAssignment] = [] + ordered_miner_uids = sorted(int(uid) for uid in miner_uids) + + validator_ownership = build_validator_ownership( + time_range_start=time_range_start, + miner_uids=ordered_miner_uids, + validators=validators, + combined_seed=combined_seed, + ) + + for search_type in SCORING_SEARCH_TYPES: + questions = question_pool.get_questions_for(search_type) + question_order = list(range(len(questions))) + random.Random( + derive_deterministic_int( + combined_seed, + time_range_start.isoformat(), + search_type, + "question-order", + ) + ).shuffle(question_order) + + for index, uid in enumerate(ordered_miner_uids): + owner_validator = validator_ownership[uid] + question_index = question_order[index % len(question_order)] + query = questions[question_index] + params = generate_params_for( + search_type, + window_seed=combined_seed, + time_range_start=time_range_start, + uid=uid, + ) + scoring_seed = derive_deterministic_int( + combined_seed, + time_range_start.isoformat(), + search_type, + uid, + "scoring-seed", + ) + assignments.append( + ScoringAssignment( + time_range_start=time_range_start, + uid=uid, + search_type=search_type, + validator_uid=int(owner_validator.uid), + validator_hotkey=owner_validator.hotkey, + question=ScoringQuestion(query=query, params=params), + scoring_seed=scoring_seed, + ) + ) + + task_order_rng = random.Random( + derive_deterministic_int( + combined_seed, time_range_start.isoformat(), "task-order" + ) + ) + task_order_rng.shuffle(assignments) + return assignments + + +def build_validator_ownership( + *, + time_range_start: datetime, + miner_uids: Sequence[int], + validators: Sequence[Any], + combined_seed: int, +) -> dict[int, Any]: + if not miner_uids or not validators: + return {} + + ordered_validators = sorted( + validators, key=lambda item: (int(item.uid), item.hotkey) + ) + + validator_order = list(ordered_validators) + + random.Random( + derive_deterministic_int( + combined_seed, + time_range_start.isoformat(), + "validator-order", + ) + ).shuffle(validator_order) + + miner_order = sorted(int(uid) for uid in miner_uids) + random.Random( + derive_deterministic_int( + combined_seed, + time_range_start.isoformat(), + "miner-order", + ) + ).shuffle(miner_order) + + ownership: dict[int, Any] = {} + for index, uid in enumerate(miner_order): + ownership[uid] = validator_order[index % len(validator_order)] + + return ownership + + +def filter_scoring_assignments( + assignments: Sequence[ScoringAssignment], + *, + validator_uid: int, +) -> list[ScoringAssignment]: + return [item for item in assignments if item.validator_uid == int(validator_uid)] diff --git a/neurons/validators/service/scoring_store.py b/neurons/validators/service/scoring_store.py new file mode 100644 index 00000000..50bb2a8d --- /dev/null +++ b/neurons/validators/service/scoring_store.py @@ -0,0 +1,260 @@ +import asyncio +import hashlib +import json +from datetime import datetime, timezone +from typing import Any, Dict, List, Mapping, Sequence + +import bittensor as bt +import jsonpickle + +from neurons.validators.service.scoring_dataset import ( + SCORING_SEARCH_TYPES, + ScoringAssignment, +) +from neurons.validators.storage import ( + ObjectStorage, + StorageObject, + build_validator_bucket_name, +) + +STORAGE_VERSION = 1 +READ_CONCURRENCY = 20 + + +class ScoringStore: + def __init__( + self, + *, + object_storage: ObjectStorage, + netuid: int, + validator_uid: int, + validator_hotkey: str, + ): + self.object_storage = object_storage + self.netuid = int(netuid) + self.validator_uid = int(validator_uid) + self.validator_hotkey = validator_hotkey + self.bucket_name = build_validator_bucket_name( + netuid=self.netuid, + validator_uid=self.validator_uid, + hotkey=self.validator_hotkey, + ) + self.bucket_locator = self.object_storage.build_bucket_locator( + bucket=self.bucket_name + ) + self.bucket_ref = self.object_storage.resolve_bucket_locator( + self.bucket_locator + ) + + def _bucket_name_for(self, validator_uid: int, validator_hotkey: str) -> str: + return build_validator_bucket_name( + netuid=self.netuid, + validator_uid=validator_uid, + hotkey=validator_hotkey, + ) + + def _assignment_payload(self, assignment: ScoringAssignment) -> dict[str, Any]: + return { + "time_range_start": assignment.time_range_start.astimezone( + timezone.utc + ).isoformat(), + "uid": int(assignment.uid), + "search_type": assignment.search_type, + "validator_uid": int(assignment.validator_uid), + "validator_hotkey": assignment.validator_hotkey, + "query": assignment.question.query, + "params": assignment.question.params, + "scoring_seed": int(assignment.scoring_seed), + } + + def _location_for_assignment(self, assignment: ScoringAssignment) -> StorageObject: + return self._location_for_assignment_with_bucket( + assignment, + bucket=self._bucket_name_for( + assignment.validator_uid, + assignment.validator_hotkey, + ), + ) + + def _location_for_assignment_with_bucket( + self, + assignment: ScoringAssignment, + *, + bucket: str, + ) -> StorageObject: + query_hash = hashlib.sha256( + json.dumps( + { + "query": assignment.question.query, + "params": assignment.question.params, + }, + sort_keys=True, + ).encode("utf-8") + ).hexdigest()[:16] + window_ts = int( + assignment.time_range_start.astimezone(timezone.utc).timestamp() + ) + return StorageObject( + bucket=bucket, + key=( + f"windows/{window_ts}/{assignment.search_type}/" + f"uid-{assignment.uid}-{assignment.scoring_seed}-{query_hash}.json" + ), + ) + + async def save_response( + self, + assignment: ScoringAssignment, + response: Any, + ) -> None: + if assignment.validator_uid != self.validator_uid: + raise ValueError( + "Cannot save a response for another validator bucket: " + f"expected uid={self.validator_uid}, got uid={assignment.validator_uid}." + ) + if assignment.validator_hotkey != self.validator_hotkey: + raise ValueError( + "Cannot save a response for another validator hotkey: " + f"expected hotkey={self.validator_hotkey}, " + f"got hotkey={assignment.validator_hotkey}." + ) + + location = self._location_for_assignment_with_bucket( + assignment, + bucket=self.bucket_ref, + ) + payload = { + "version": STORAGE_VERSION, + "assignment": self._assignment_payload(assignment), + "response": jsonpickle.encode(response), + } + + await self.object_storage.put_object( + bucket=location.bucket, + key=location.key, + data=json.dumps(payload, sort_keys=True).encode("utf-8"), + content_type="application/json", + ) + + def _decode_response_payload( + self, + assignment: ScoringAssignment, + raw_payload: bytes | None, + ) -> dict[str, Any] | None: + if raw_payload is None: + return None + + try: + payload = json.loads(raw_payload.decode("utf-8")) + except (TypeError, ValueError) as exc: + bt.logging.warning( + "[ScoringStore] Failed to decode stored response " + f"uid={assignment.uid} type={assignment.search_type}: {exc}" + ) + return None + + if payload.get("version") != STORAGE_VERSION: + bt.logging.warning( + "[ScoringStore] Ignoring stored response with unexpected version " + f"uid={assignment.uid} type={assignment.search_type}" + ) + return None + + if payload.get("assignment") != self._assignment_payload(assignment): + bt.logging.warning( + "[ScoringStore] Ignoring stored response with mismatched assignment " + f"uid={assignment.uid} type={assignment.search_type}" + ) + return None + + try: + response = jsonpickle.decode(payload["response"]) + except Exception as exc: + bt.logging.warning( + "[ScoringStore] Failed to deserialize stored response " + f"uid={assignment.uid} type={assignment.search_type}: {exc}" + ) + return None + + return { + "uid": assignment.uid, + "response": response, + "scoring_seed": assignment.scoring_seed, + } + + async def _load_assignment( + self, + assignment: ScoringAssignment, + bucket_locators: Mapping[tuple[int, str], str], + semaphore: asyncio.Semaphore, + ) -> tuple[str, dict[str, Any] | None]: + locator = bucket_locators.get( + (assignment.validator_uid, assignment.validator_hotkey) + ) + if not locator: + bt.logging.warning( + "[ScoringStore] Missing bucket locator for validator " + f"uid={assignment.validator_uid} hotkey={assignment.validator_hotkey}" + ) + return assignment.search_type, None + + try: + bucket_ref = self.object_storage.resolve_bucket_locator(locator) + except Exception as exc: + bt.logging.warning( + "[ScoringStore] Invalid bucket locator for validator " + f"uid={assignment.validator_uid} hotkey={assignment.validator_hotkey}: {exc}" + ) + return assignment.search_type, None + + location = self._location_for_assignment_with_bucket( + assignment, + bucket=bucket_ref, + ) + async with semaphore: + raw_payload = await self.object_storage.get_object( + bucket=location.bucket, + key=location.key, + ) + + if raw_payload is None: + bt.logging.warning( + "[ScoringStore] Missing stored response " + f"uid={assignment.uid} type={assignment.search_type} " + f"validator={assignment.validator_uid}" + ) + + return assignment.search_type, self._decode_response_payload( + assignment, + raw_payload, + ) + + async def get_all_for_assignments( + self, + assignments: Sequence[ScoringAssignment], + *, + bucket_locators: Mapping[tuple[int, str], str], + ) -> Dict[str, List[Dict]]: + if not assignments: + return {} + + semaphore = asyncio.Semaphore(READ_CONCURRENCY) + tasks = [ + asyncio.create_task( + self._load_assignment(assignment, bucket_locators, semaphore) + ) + for assignment in assignments + ] + + results = await asyncio.gather(*tasks) + + grouped: Dict[str, List[Dict]] = {key: [] for key in SCORING_SEARCH_TYPES} + for search_type, item in results: + if item is not None: + grouped[search_type].append(item) + + return { + search_type: sorted(items, key=lambda item: item["uid"]) + for search_type, items in grouped.items() + if items + } diff --git a/neurons/validators/service/seed_commitment.py b/neurons/validators/service/seed_commitment.py new file mode 100644 index 00000000..43277637 --- /dev/null +++ b/neurons/validators/service/seed_commitment.py @@ -0,0 +1,491 @@ +import asyncio +import base64 +import logging +import random +import re +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Iterable + +from neurons.validators.service.scoring_dataset import derive_deterministic_int + +logger = logging.getLogger(__name__) + +MAX_SEED_COMMITMENT_BYTES = 128 +DEFAULT_SEED_COMMITMENT_REVEAL_DELAY_SECONDS = 30 +DEFAULT_BUCKET_COMMITMENT_REVEAL_DELAY_SECONDS = 30 +SEED_ENVELOPE_PATTERN = re.compile(r"]*)>") +BUCKET_ENVELOPE_PATTERN = re.compile(r"]*)>") + + +@dataclass(frozen=True) +class SeedCommitment: + time_range_start: datetime + seed: int + + +@dataclass(frozen=True) +class BucketCommitment: + time_range_start: datetime + bucket_locator: str + + +@dataclass(frozen=True) +class CommittedValidator: + uid: int + hotkey: str + seed: int + + +@dataclass(frozen=True) +class CommittedValidatorBucket: + uid: int + hotkey: str + bucket_locator: str + + +@dataclass(frozen=True) +class WindowSeedState: + committed_validators: tuple[CommittedValidator, ...] + combined_seed: int + + @property + def validator_count(self) -> int: + return len(self.committed_validators) + + +@dataclass(frozen=True) +class WindowBucketState: + committed_buckets: tuple[CommittedValidatorBucket, ...] + + @property + def validator_count(self) -> int: + return len(self.committed_buckets) + + @property + def bucket_locators(self) -> dict[tuple[int, str], str]: + return { + (bucket.uid, bucket.hotkey): bucket.bucket_locator + for bucket in self.committed_buckets + } + + +def extract_seed_payload(raw_commitment: str) -> tuple[str, str]: + match = SEED_ENVELOPE_PATTERN.search(raw_commitment) + if not match: + return raw_commitment, "" + + seed_payload = match.group(1) + other_data = SEED_ENVELOPE_PATTERN.sub("", raw_commitment).strip() + return other_data, seed_payload + + +def merge_seed_commitment(raw_commitment: str, payload: str) -> str: + if not raw_commitment: + return f"" + + other_data, _ = extract_seed_payload(raw_commitment) + envelope = f"" + return f"{other_data}{envelope}" if other_data else envelope + + +def format_seed_payload(time_range_start: datetime, seed: int) -> str: + timestamp = int(time_range_start.astimezone(timezone.utc).timestamp()) + return f"{timestamp}:{seed:x}" + + +def parse_seed_payload(payload: str) -> SeedCommitment | None: + if not payload: + return None + + try: + timestamp_raw, seed_raw = payload.split(":", maxsplit=1) + time_range_start = datetime.fromtimestamp(int(timestamp_raw), tz=timezone.utc) + seed = int(seed_raw, 16) + return SeedCommitment(time_range_start=time_range_start, seed=seed) + except (TypeError, ValueError): + logger.debug("Failed to parse seed commitment payload %r", payload) + return None + + +def parse_seed_commitment(raw_commitment: str) -> SeedCommitment | None: + _, payload = extract_seed_payload(raw_commitment) + return parse_seed_payload(payload) + + +def extract_bucket_payload(raw_commitment: str) -> tuple[str, str]: + match = BUCKET_ENVELOPE_PATTERN.search(raw_commitment) + if not match: + return raw_commitment, "" + + bucket_payload = match.group(1) + other_data = BUCKET_ENVELOPE_PATTERN.sub("", raw_commitment).strip() + return other_data, bucket_payload + + +def merge_bucket_commitment(raw_commitment: str, payload: str) -> str: + if not raw_commitment: + return f"" + + other_data, _ = extract_bucket_payload(raw_commitment) + envelope = f"" + return f"{other_data}{envelope}" if other_data else envelope + + +def _encode_bucket_locator(locator: str) -> str: + return base64.urlsafe_b64encode(locator.encode("utf-8")).decode("ascii").rstrip("=") + + +def _decode_bucket_locator(encoded_locator: str) -> str: + padding = "=" * (-len(encoded_locator) % 4) + return base64.urlsafe_b64decode( + f"{encoded_locator}{padding}".encode("ascii") + ).decode("utf-8") + + +def format_bucket_payload(time_range_start: datetime, bucket_locator: str) -> str: + timestamp = int(time_range_start.astimezone(timezone.utc).timestamp()) + return f"{timestamp}:{_encode_bucket_locator(bucket_locator)}" + + +def parse_bucket_payload(payload: str) -> BucketCommitment | None: + if not payload: + return None + + try: + timestamp_raw, locator_raw = payload.split(":", maxsplit=1) + time_range_start = datetime.fromtimestamp(int(timestamp_raw), tz=timezone.utc) + bucket_locator = _decode_bucket_locator(locator_raw) + return BucketCommitment( + time_range_start=time_range_start, + bucket_locator=bucket_locator, + ) + except (TypeError, ValueError): + logger.debug("Failed to parse bucket commitment payload %r", payload) + return None + + +def parse_bucket_commitment(raw_commitment: str) -> BucketCommitment | None: + _, payload = extract_bucket_payload(raw_commitment) + return parse_bucket_payload(payload) + + +def combine_validator_seeds( + validators: Iterable[CommittedValidator], + *, + time_range_start: datetime, +) -> int: + ordered_fragments = [ + f"{validator.uid}:{validator.hotkey}:{validator.seed}" + for validator in sorted(validators, key=lambda item: (item.uid, item.hotkey)) + ] + return derive_deterministic_int(time_range_start.isoformat(), *ordered_fragments) + + +async def publish_seed_commitment( + *, + subtensor, + wallet, + netuid: int, + uid: int, + time_range_start: datetime, + reveal_delay_seconds: int = DEFAULT_SEED_COMMITMENT_REVEAL_DELAY_SECONDS, +) -> SeedCommitment | None: + current_raw_commitment = await subtensor.get_commitment(netuid, uid) + + current_commitment = parse_seed_commitment(current_raw_commitment or "") + + if current_commitment and current_commitment.time_range_start == time_range_start: + logger.info( + "Reusing existing seed commitment for window %s", + time_range_start.isoformat(), + ) + return current_commitment + + reveal_deadline = time_range_start + timedelta(seconds=reveal_delay_seconds) + + if datetime.now(timezone.utc) >= reveal_deadline: + logger.warning( + "Skipping seed commitment for %s because the reveal deadline passed.", + time_range_start.isoformat(), + ) + return None + + local_seed = random.SystemRandom().getrandbits(63) + payload = format_seed_payload(time_range_start, local_seed) + merged_commitment = merge_seed_commitment(current_raw_commitment or "", payload) + + commitment_bytes = len(merged_commitment.encode("utf-8")) + + if commitment_bytes > MAX_SEED_COMMITMENT_BYTES: + raise ValueError( + f"Seed commitment exceeds {MAX_SEED_COMMITMENT_BYTES} bytes: " + f"{commitment_bytes}" + ) + + await subtensor.set_commitment( + wallet=wallet, + netuid=netuid, + data=merged_commitment, + ) + + logger.info( + "Published seed commitment for window %s", + time_range_start.isoformat(), + ) + + return SeedCommitment(time_range_start=time_range_start, seed=local_seed) + + +async def wait_for_commitment_reveal( + *, + time_range_start: datetime, + reveal_delay_seconds: int = DEFAULT_SEED_COMMITMENT_REVEAL_DELAY_SECONDS, +) -> None: + reveal_deadline = time_range_start + timedelta(seconds=reveal_delay_seconds) + remaining = (reveal_deadline - datetime.now(timezone.utc)).total_seconds() + if remaining > 0: + logger.info( + "Waiting %.2fs for validator seed commitments for %s", + remaining, + time_range_start.isoformat(), + ) + await asyncio.sleep(remaining) + + +async def publish_bucket_commitment( + *, + subtensor, + wallet, + netuid: int, + uid: int, + time_range_start: datetime, + bucket_locator: str, +) -> BucketCommitment | None: + current_raw_commitment = await subtensor.get_commitment(netuid, uid) + current_commitment = parse_bucket_commitment(current_raw_commitment or "") + + if ( + current_commitment + and current_commitment.time_range_start == time_range_start + and current_commitment.bucket_locator == bucket_locator + ): + logger.info( + "Reusing existing bucket commitment for window %s", + time_range_start.isoformat(), + ) + return current_commitment + + payload = format_bucket_payload(time_range_start, bucket_locator) + merged_commitment = merge_bucket_commitment(current_raw_commitment or "", payload) + + commitment_bytes = len(merged_commitment.encode("utf-8")) + if commitment_bytes > MAX_SEED_COMMITMENT_BYTES: + raise ValueError( + f"Bucket commitment exceeds {MAX_SEED_COMMITMENT_BYTES} bytes: " + f"{commitment_bytes}" + ) + + await subtensor.set_commitment( + wallet=wallet, + netuid=netuid, + data=merged_commitment, + ) + + logger.info( + "Published bucket commitment for window %s", + time_range_start.isoformat(), + ) + + return BucketCommitment( + time_range_start=time_range_start, + bucket_locator=bucket_locator, + ) + + +async def wait_for_bucket_commitment_reveal( + *, + time_range_start: datetime, + publish_offset: timedelta, + reveal_delay_seconds: int = DEFAULT_BUCKET_COMMITMENT_REVEAL_DELAY_SECONDS, +) -> None: + reveal_deadline = ( + time_range_start + publish_offset + timedelta(seconds=reveal_delay_seconds) + ) + remaining = (reveal_deadline - datetime.now(timezone.utc)).total_seconds() + if remaining > 0: + logger.info( + "Waiting %.2fs for validator bucket commitments for %s", + remaining, + time_range_start.isoformat(), + ) + await asyncio.sleep(remaining) + + +async def load_committed_validators( + *, + subtensor, + netuid: int, + validators, + time_range_start: datetime, +) -> list[CommittedValidator]: + raw_commitments = await subtensor.get_all_commitments(netuid) + + committed_validators: list[CommittedValidator] = [] + for validator in sorted(validators, key=lambda item: (item.uid, item.hotkey)): + commitment = parse_seed_commitment(raw_commitments.get(validator.hotkey, "")) + if commitment is None or commitment.time_range_start != time_range_start: + continue + + committed_validators.append( + CommittedValidator( + uid=int(validator.uid), + hotkey=validator.hotkey, + seed=commitment.seed, + ) + ) + + logger.info( + "Loaded %s committed validators for window %s", + len(committed_validators), + time_range_start.isoformat(), + ) + return committed_validators + + +async def load_committed_validator_buckets( + *, + subtensor, + netuid: int, + validators, + time_range_start: datetime, +) -> list[CommittedValidatorBucket]: + raw_commitments = await subtensor.get_all_commitments(netuid) + + committed_buckets: list[CommittedValidatorBucket] = [] + for validator in sorted(validators, key=lambda item: (item.uid, item.hotkey)): + commitment = parse_bucket_commitment(raw_commitments.get(validator.hotkey, "")) + if commitment is None or commitment.time_range_start != time_range_start: + continue + + committed_buckets.append( + CommittedValidatorBucket( + uid=int(validator.uid), + hotkey=validator.hotkey, + bucket_locator=commitment.bucket_locator, + ) + ) + + logger.info( + "Loaded %s committed validator buckets for window %s", + len(committed_buckets), + time_range_start.isoformat(), + ) + return committed_buckets + + +async def build_window_seed_state( + *, + subtensor, + wallet, + netuid: int, + uid: int, + validators, + time_range_start: datetime, + reveal_delay_seconds: int = DEFAULT_SEED_COMMITMENT_REVEAL_DELAY_SECONDS, +) -> WindowSeedState: + await publish_seed_commitment( + subtensor=subtensor, + wallet=wallet, + netuid=netuid, + uid=uid, + time_range_start=time_range_start, + reveal_delay_seconds=reveal_delay_seconds, + ) + + await wait_for_commitment_reveal( + time_range_start=time_range_start, + reveal_delay_seconds=reveal_delay_seconds, + ) + + return await load_window_seed_state( + subtensor=subtensor, + netuid=netuid, + validators=validators, + time_range_start=time_range_start, + ) + + +async def load_window_seed_state( + *, + subtensor, + netuid: int, + validators, + time_range_start: datetime, +) -> WindowSeedState: + committed_validators = await load_committed_validators( + subtensor=subtensor, + netuid=netuid, + validators=validators, + time_range_start=time_range_start, + ) + + return WindowSeedState( + committed_validators=tuple(committed_validators), + combined_seed=combine_validator_seeds( + committed_validators, + time_range_start=time_range_start, + ), + ) + + +async def build_window_bucket_state( + *, + subtensor, + wallet, + netuid: int, + uid: int, + validators, + time_range_start: datetime, + bucket_locator: str, + publish_offset: timedelta, + reveal_delay_seconds: int = DEFAULT_BUCKET_COMMITMENT_REVEAL_DELAY_SECONDS, +) -> WindowBucketState: + await publish_bucket_commitment( + subtensor=subtensor, + wallet=wallet, + netuid=netuid, + uid=uid, + time_range_start=time_range_start, + bucket_locator=bucket_locator, + ) + + await wait_for_bucket_commitment_reveal( + time_range_start=time_range_start, + publish_offset=publish_offset, + reveal_delay_seconds=reveal_delay_seconds, + ) + + return await load_window_bucket_state( + subtensor=subtensor, + netuid=netuid, + validators=validators, + time_range_start=time_range_start, + ) + + +async def load_window_bucket_state( + *, + subtensor, + netuid: int, + validators, + time_range_start: datetime, +) -> WindowBucketState: + committed_buckets = await load_committed_validator_buckets( + subtensor=subtensor, + netuid=netuid, + validators=validators, + time_range_start=time_range_start, + ) + + return WindowBucketState(committed_buckets=tuple(committed_buckets)) diff --git a/neurons/validators/validator.py b/neurons/validators/service/validator.py similarity index 88% rename from neurons/validators/validator.py rename to neurons/validators/service/validator.py index 33e9f44d..63ff52b1 100644 --- a/neurons/validators/validator.py +++ b/neurons/validators/service/validator.py @@ -9,6 +9,7 @@ import torch from bittensor.core.metagraph import AsyncMetagraph +from desearch import MIN_ALPHA_STAKE, MIN_TOTAL_STAKE from desearch.protocol import IsAlive from desearch.redis.redis_client import close_redis, initialize_redis from desearch.redis.utils import ( @@ -16,16 +17,19 @@ save_moving_averaged_scores, ) from desearch.utils import resync_metagraph -from neurons.validators.advanced_scraper_validator import AdvancedScraperValidator from neurons.validators.base_validator import AbstractNeuron from neurons.validators.config import add_args, check_config, config from neurons.validators.proxy.uid_manager import UIDManager -from neurons.validators.query_scheduler import QueryScheduler -from neurons.validators.scoring_store import ScoringStore +from neurons.validators.scrapers.advanced_scraper_validator import ( + AdvancedScraperValidator, +) +from neurons.validators.scrapers.web_scraper_validator import WebScraperValidator +from neurons.validators.scrapers.x_scraper_validator import XScraperValidator +from neurons.validators.service.query_scheduler import QueryScheduler +from neurons.validators.service.scoring_store import ScoringStore +from neurons.validators.service.weights import init_wandb, set_weights +from neurons.validators.storage import build_object_storage from neurons.validators.utility_api_client import UtilityAPIClient -from neurons.validators.web_scraper_validator import WebScraperValidator -from neurons.validators.weights import init_wandb, set_weights -from neurons.validators.x_scraper_validator import XScraperValidator class Neuron(AbstractNeuron): @@ -140,6 +144,34 @@ def _build_validator_identity(self) -> dict: return identity + async def get_validators(self): + validators = [] + + for neuron in self.metagraph.neurons: + if not neuron.validator_permit: + continue + + alpha_stake = float(getattr(neuron.stake, "tao", neuron.stake)) + total_stake = float(self.metagraph.total_stake[neuron.uid]) + + print( + f"Neuron {neuron.uid}: ALPHA STAKE: {alpha_stake}, TOTAL STAKE: {total_stake}" + ) + + if alpha_stake < MIN_ALPHA_STAKE or total_stake < MIN_TOTAL_STAKE: + continue + + validators.append(neuron) + + bt.logging.info( + "Active validators for local scoring: " + f"count={len(validators)} " + f"min_alpha_stake={MIN_ALPHA_STAKE} " + f"min_total_stake={MIN_TOTAL_STAKE}" + ) + + return validators + async def sync_available_uids(self): start_time = time.time() @@ -347,7 +379,12 @@ async def start(self): ) bt.logging.debug(str(self.moving_averaged_scores)) - scoring_store = ScoringStore() + scoring_store = ScoringStore( + object_storage=build_object_storage(), + netuid=self.config.netuid, + validator_uid=self.uid, + validator_hotkey=self.wallet.hotkey.ss58_address, + ) utility_api = UtilityAPIClient( base_url=self.config.neuron.utility_api_url, @@ -363,7 +400,6 @@ async def start(self): query_scheduler = QueryScheduler( neuron=self, - utility_api=utility_api, scoring_store=scoring_store, validators=validators, ) diff --git a/neurons/validators/validator_service.py b/neurons/validators/service/validator_service.py similarity index 96% rename from neurons/validators/validator_service.py rename to neurons/validators/service/validator_service.py index 7706add6..c157ec50 100644 --- a/neurons/validators/validator_service.py +++ b/neurons/validators/service/validator_service.py @@ -11,7 +11,7 @@ from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware -from neurons.validators.validator import Neuron +from neurons.validators.service.validator import Neuron neuron = Neuron() diff --git a/neurons/validators/weights.py b/neurons/validators/service/weights.py similarity index 100% rename from neurons/validators/weights.py rename to neurons/validators/service/weights.py index 7fc1aac4..4947a5cb 100644 --- a/neurons/validators/weights.py +++ b/neurons/validators/service/weights.py @@ -22,10 +22,10 @@ import bittensor as bt import torch +import wandb from bittensor.utils.weight_utils import process_weights import desearch -import wandb ENABLE_EMISSION_CONTROL = True EMISSION_CONTROL_HOTKEY = "5CUu1QhvrfyMDBELUPJLt4c7uJFbi7TKqDHkS1Zz41oD4dyP" diff --git a/neurons/validators/storage/__init__.py b/neurons/validators/storage/__init__.py new file mode 100644 index 00000000..fc9498f4 --- /dev/null +++ b/neurons/validators/storage/__init__.py @@ -0,0 +1,17 @@ +from neurons.validators.storage.base import ( + ObjectStorage, + StorageObject, + build_validator_bucket_name, +) +from neurons.validators.storage.factory import build_object_storage +from neurons.validators.storage.huggingface import HuggingFaceObjectStorage +from neurons.validators.storage.local import LocalObjectStorage + +__all__ = [ + "HuggingFaceObjectStorage", + "LocalObjectStorage", + "ObjectStorage", + "StorageObject", + "build_object_storage", + "build_validator_bucket_name", +] diff --git a/neurons/validators/storage/base.py b/neurons/validators/storage/base.py new file mode 100644 index 00000000..11f9a25d --- /dev/null +++ b/neurons/validators/storage/base.py @@ -0,0 +1,59 @@ +import hashlib +from abc import ABC, abstractmethod +from dataclasses import dataclass +import re + + +BUCKET_COMPONENT_PATTERN = re.compile(r"[^a-z0-9-]+") + + +@dataclass(frozen=True) +class StorageObject: + bucket: str + key: str + + +class ObjectStorage(ABC): + @property + @abstractmethod + def provider_name(self) -> str: + raise NotImplementedError + + @abstractmethod + async def put_object( + self, + *, + bucket: str, + key: str, + data: bytes, + content_type: str | None = None, + ) -> None: + raise NotImplementedError + + @abstractmethod + async def get_object(self, *, bucket: str, key: str) -> bytes | None: + raise NotImplementedError + + @abstractmethod + def build_bucket_locator(self, *, bucket: str) -> str: + raise NotImplementedError + + @abstractmethod + def resolve_bucket_locator(self, locator: str) -> str: + raise NotImplementedError + + +def sanitize_bucket_component(value: str, *, max_length: int = 24) -> str: + cleaned = BUCKET_COMPONENT_PATTERN.sub("-", value.lower()).strip("-") + if not cleaned: + return "unknown" + return cleaned[:max_length] + + +def build_validator_bucket_name(*, netuid: int, validator_uid: int, hotkey: str) -> str: + hotkey_prefix = sanitize_bucket_component(hotkey[:8]) + hotkey_hash = hashlib.sha256(hotkey.encode("utf-8")).hexdigest()[:12] + return ( + f"sn{int(netuid)}-validator-{int(validator_uid)}-" + f"{hotkey_prefix}-{hotkey_hash}" + ) diff --git a/neurons/validators/storage/factory.py b/neurons/validators/storage/factory.py new file mode 100644 index 00000000..3e6960dc --- /dev/null +++ b/neurons/validators/storage/factory.py @@ -0,0 +1,42 @@ +import os +from pathlib import Path + +from neurons.validators.storage.base import ObjectStorage +from neurons.validators.storage.huggingface import HuggingFaceObjectStorage +from neurons.validators.storage.local import LocalObjectStorage + + +DEFAULT_STORAGE_PROVIDER = "local" +DEFAULT_LOCAL_STORAGE_PATH = ( + Path(__file__).resolve().parents[3] / "tmp" / "validator-storage" +) + + +def build_object_storage() -> ObjectStorage: + provider = ( + os.environ.get("VALIDATOR_STORAGE_PROVIDER", DEFAULT_STORAGE_PROVIDER) + .strip() + .lower() + ) + + if provider == "local": + root_path = os.environ.get("VALIDATOR_STORAGE_LOCAL_PATH") + return LocalObjectStorage(root_path or DEFAULT_LOCAL_STORAGE_PATH) + + if provider in {"hf", "huggingface"}: + namespace = os.environ.get("VALIDATOR_STORAGE_HF_NAMESPACE") + if not namespace: + raise RuntimeError( + "VALIDATOR_STORAGE_HF_NAMESPACE must be set when " + "VALIDATOR_STORAGE_PROVIDER=huggingface." + ) + + token = ( + os.environ.get("VALIDATOR_STORAGE_HF_TOKEN") + or os.environ.get("HF_TOKEN") + or os.environ.get("HUGGINGFACE_HUB_TOKEN") + ) + + return HuggingFaceObjectStorage(namespace=namespace, token=token) + + raise ValueError(f"Unsupported validator storage provider: {provider}") diff --git a/neurons/validators/storage/huggingface.py b/neurons/validators/storage/huggingface.py new file mode 100644 index 00000000..79d05ca1 --- /dev/null +++ b/neurons/validators/storage/huggingface.py @@ -0,0 +1,103 @@ +import asyncio +from io import BytesIO +from pathlib import Path + +from neurons.validators.storage.base import ObjectStorage + + +MISSING_HF_EXCEPTIONS = { + "EntryNotFoundError", + "LocalEntryNotFoundError", + "RepositoryNotFoundError", + "RevisionNotFoundError", +} + + +class HuggingFaceObjectStorage(ObjectStorage): + def __init__( + self, + *, + namespace: str, + token: str | None = None, + repo_type: str = "dataset", + ): + from huggingface_hub import HfApi + + self.namespace = namespace + self.token = token + self.repo_type = repo_type + self.api = HfApi(token=token) + self._ensured_repos: set[str] = set() + + @property + def provider_name(self) -> str: + return "hf" + + def _ensure_repo(self, repo_id: str) -> None: + if repo_id in self._ensured_repos: + return + + self.api.create_repo( + repo_id=repo_id, + repo_type=self.repo_type, + private=False, + exist_ok=True, + token=self.token, + ) + self._ensured_repos.add(repo_id) + + async def put_object( + self, + *, + bucket: str, + key: str, + data: bytes, + content_type: str | None = None, + ) -> None: + del content_type + + repo_id = bucket + await asyncio.to_thread(self._ensure_repo, repo_id) + + await asyncio.to_thread( + self.api.upload_file, + path_or_fileobj=BytesIO(data), + path_in_repo=key, + repo_id=repo_id, + repo_type=self.repo_type, + commit_message=f"Upload {key}", + token=self.token, + ) + + async def get_object(self, *, bucket: str, key: str) -> bytes | None: + from huggingface_hub import hf_hub_download + + repo_id = bucket + + try: + file_path = await asyncio.to_thread( + hf_hub_download, + repo_id=repo_id, + filename=key, + repo_type=self.repo_type, + token=self.token, + force_download=True, + ) + except Exception as exc: + if exc.__class__.__name__ in MISSING_HF_EXCEPTIONS: + return None + raise + + return await asyncio.to_thread(Path(file_path).read_bytes) + + def build_bucket_locator(self, *, bucket: str) -> str: + return f"{self.provider_name}:{self.namespace}/{bucket}" + + def resolve_bucket_locator(self, locator: str) -> str: + prefix = f"{self.provider_name}:" + if not locator.startswith(prefix): + raise ValueError(f"Unsupported Hugging Face bucket locator: {locator}") + repo_id = locator[len(prefix) :] + if "/" not in repo_id: + raise ValueError(f"Invalid Hugging Face repo locator: {locator}") + return repo_id diff --git a/neurons/validators/storage/local.py b/neurons/validators/storage/local.py new file mode 100644 index 00000000..7245eaf4 --- /dev/null +++ b/neurons/validators/storage/local.py @@ -0,0 +1,50 @@ +import asyncio +from pathlib import Path + +from neurons.validators.storage.base import ObjectStorage + + +class LocalObjectStorage(ObjectStorage): + def __init__(self, root_path: str | Path): + self.root_path = Path(root_path) + + @property + def provider_name(self) -> str: + return "local" + + def _resolve_path(self, bucket: str, key: str) -> Path: + root_path = self.root_path.resolve() + file_path = (root_path / bucket / key).resolve() + if root_path != file_path and root_path not in file_path.parents: + raise ValueError(f"Storage path escapes local root: {file_path}") + return file_path + + async def put_object( + self, + *, + bucket: str, + key: str, + data: bytes, + content_type: str | None = None, + ) -> None: + del content_type + + file_path = self._resolve_path(bucket, key) + await asyncio.to_thread(file_path.parent.mkdir, parents=True, exist_ok=True) + await asyncio.to_thread(file_path.write_bytes, data) + + async def get_object(self, *, bucket: str, key: str) -> bytes | None: + file_path = self._resolve_path(bucket, key) + if not file_path.exists(): + return None + + return await asyncio.to_thread(file_path.read_bytes) + + def build_bucket_locator(self, *, bucket: str) -> str: + return f"{self.provider_name}:{bucket}" + + def resolve_bucket_locator(self, locator: str) -> str: + prefix = f"{self.provider_name}:" + if not locator.startswith(prefix): + raise ValueError(f"Unsupported local bucket locator: {locator}") + return locator[len(prefix) :] diff --git a/neurons/validators/utility_api_client.py b/neurons/validators/utility_api_client.py index 29c7f874..3dc27970 100644 --- a/neurons/validators/utility_api_client.py +++ b/neurons/validators/utility_api_client.py @@ -6,7 +6,7 @@ class UtilityAPIClient: """ - Client for the Utility API that provides scoring questions. + Client for the Utility API that provides endpoints for validators to save logs Authenticates each request by signing the current timestamp with the validator's hotkey. @@ -50,36 +50,6 @@ async def _raise_for_status_with_context( response.raise_for_status() - async def fetch_next_question(self) -> dict: - """ - Fetch one (question, search_type, uid) from the utility API. - - Returns: - { - "time_range_start": str, - "uid": int, - "search_type": str, # e.g. "ai_search", "x_search" - "question": {"query": str} - } - - Raises: - aiohttp.ClientResponseError: on 4xx/5xx responses (including 404 when - all questions for this scoring window are served, - or 429 for rate limiting) - """ - - async with self._session.get( - f"{self.base_url}/dataset/next", - headers=self._auth_headers(), - timeout=aiohttp.ClientTimeout(total=30), - ) as response: - await self._raise_for_status_with_context( - response, - context="fetch_next_question", - skip_logging_statuses={404, 429}, - ) - return await response.json() - async def save_logs(self, logs: list[dict]) -> dict: async with self._session.post( f"{self.base_url}/logs", diff --git a/neurons/validators/x_scraper_validator.py b/neurons/validators/x_scraper_validator.py deleted file mode 100644 index df49b2e6..00000000 --- a/neurons/validators/x_scraper_validator.py +++ /dev/null @@ -1,426 +0,0 @@ -import time -from typing import Any, Dict, List, Optional - -import bittensor as bt -import torch - -import wandb -from desearch.protocol import ( - TwitterIDSearchSynapse, - TwitterSearchSynapse, - TwitterURLsSearchSynapse, -) -from neurons.validators.base_validator import AbstractNeuron -from neurons.validators.miner_response_logger import ( - build_log_entry, - build_reward_payload, - submit_logs_best_effort, -) -from neurons.validators.penalty.timeout_penalty import TimeoutPenaltyModel -from neurons.validators.penalty.twitter_count_penalty import TwitterCountPenaltyModel -from neurons.validators.reward import RewardScoringType -from neurons.validators.reward.twitter_basic_search_content_relevance import ( - TwitterBasicSearchContentRelevanceModel, -) - - -class XScraperValidator: - def __init__(self, neuron: AbstractNeuron): - self.neuron = neuron - self.timeout = 180 - self.max_execution_time = 10 - - # Init device. - bt.logging.debug("loading", "device") - bt.logging.debug( - "self.neuron.config.neuron.device = ", str(self.neuron.config.neuron.device) - ) - - self.twitter_content_weight = 1.0 - - self.reward_weights = torch.tensor( - [ - self.twitter_content_weight, - ], - dtype=torch.float32, - ).to(self.neuron.config.neuron.device) - - if self.reward_weights.sum() != 1: - message = ( - f"Reward function weights do not sum to 1 (Current sum: {self.reward_weights.sum()}.)" - f"Check your reward config file at `reward/config.py` or ensure that all your cli reward flags sum to 1." - ) - bt.logging.error(message) - raise Exception(message) - - self.reward_functions = [ - TwitterBasicSearchContentRelevanceModel( - device=self.neuron.config.neuron.device, - scoring_type=RewardScoringType.search_relevance_score_template, - neuron=self.neuron, - ), - ] - - self.penalty_functions = [ - TimeoutPenaltyModel(max_penalty=1, neuron=self.neuron), - TwitterCountPenaltyModel(max_penalty=1, neuron=self.neuron), - ] - - def calc_max_execution_time(self, count): - if not count or count <= 20: - return self.max_execution_time - - return self.max_execution_time + int((count - 20) / 20) * 5 - - async def call_miner( - self, - prompt: str, - params: Dict[str, Any], - uid: Optional[int] = None, - ): - uid, axon = await self.neuron.get_random_miner(uid=uid) - - synapse = TwitterSearchSynapse( - **params, - query=prompt, - max_execution_time=self.calc_max_execution_time(params.get("count")), - ) - - dendrite = next(self.neuron.dendrites) - - response = await dendrite.call( - target_axon=axon, - synapse=synapse.model_copy(), - timeout=synapse.max_execution_time + 5, - deserialize=False, - ) - - return response, uid, axon - - async def compute_rewards_and_penalties( - self, - event, - prompts: List[str], - responses, - uids, - start_time, - scoring_epoch_start=None, - scoring_seeds=None, - ): - try: - if not len(uids): - bt.logging.warning("No UIDs provided for logging event.") - return - - # Attach scoring seeds to response objects so reward models can use - # them for deterministic random sampling across all validators. - if scoring_seeds: - for response, seed in zip(responses, scoring_seeds): - response.scoring_seed = seed - - bt.logging.info("Computing rewards and penalties") - - rewards = torch.zeros(len(responses), dtype=torch.float32).to( - self.neuron.config.neuron.device - ) - - all_rewards = [] - all_original_rewards = [] - val_score_responses_list = [] - - bt.logging.trace(f"Received responses: {responses}") - - for weight_i, reward_fn_i in zip( - self.reward_weights, self.reward_functions - ): - start_time = time.time() - ( - reward_i_normalized, - reward_event, - val_score_responses, - original_rewards, - ) = await reward_fn_i.apply(responses, uids) - - all_rewards.append(reward_i_normalized) - all_original_rewards.append(original_rewards) - val_score_responses_list.append(val_score_responses) - - rewards += weight_i * reward_i_normalized.to( - self.neuron.config.neuron.device - ) - if not self.neuron.config.neuron.disable_log_rewards: - event = {**event, **reward_event} - execution_time = time.time() - start_time - bt.logging.trace(str(reward_fn_i.name), reward_i_normalized.tolist()) - bt.logging.info( - f"Applied reward function: {reward_fn_i.name} in {execution_time / 60:.2f} minutes" - ) - - for penalty_fn_i in self.penalty_functions: - ( - raw_penalty_i, - adjusted_penalty_i, - applied_penalty_i, - ) = await penalty_fn_i.apply_penalties(responses, uids) - penalty_start_time = time.time() - rewards *= applied_penalty_i.to(self.neuron.config.neuron.device) - penalty_execution_time = time.time() - penalty_start_time - if not self.neuron.config.neuron.disable_log_rewards: - event[penalty_fn_i.name + "_raw"] = raw_penalty_i.tolist() - event[penalty_fn_i.name + "_adjusted"] = adjusted_penalty_i.tolist() - event[penalty_fn_i.name + "_applied"] = applied_penalty_i.tolist() - bt.logging.trace(str(penalty_fn_i.name), applied_penalty_i.tolist()) - bt.logging.info( - f"Applied penalty function: {penalty_fn_i.name} in {penalty_execution_time:.2f} seconds" - ) - - await self.neuron.update_moving_averaged_scores(uids, rewards) - self.log_event(prompts, event, start_time, uids, rewards) - - scores = torch.zeros(len(self.neuron.metagraph.hotkeys)) - uid_scores_dict = {} - wandb_data = { - "modality": "twitter_scrapper", - "prompts": {}, - "responses": {}, - "scores": {}, - "timestamps": {}, - "twitter_reward": {}, - } - bt.logging.info( - f"======================== Reward ===========================" - ) - # Initialize an empty list to accumulate log messages - log_messages = [] - for uid_tensor, reward, response in zip(uids, rewards.tolist(), responses): - uid = uid_tensor.item() - - # Accumulate log messages instead of logging them immediately - log_messages.append(f"UID: {uid}, R: {round(reward, 3)}") - - # Log the accumulated messages in groups of three - for i in range(0, len(log_messages), 3): - bt.logging.info(" | ".join(log_messages[i : i + 3])) - - bt.logging.info( - f"======================== Reward ===========================" - ) - bt.logging.info(f"this is a all reward {all_rewards} ") - - twitter_rewards = all_rewards[0] - zipped_rewards = zip(uids, rewards.tolist(), responses, twitter_rewards) - - for uid_tensor, reward, response, twitter_reward in zipped_rewards: - uid = uid_tensor.item() # Convert tensor to int - uid_scores_dict[uid] = reward - scores[uid] = reward # Now 'uid' is an int, which is a valid key type - wandb_data["scores"][uid] = reward - if hasattr(response, "query"): - wandb_data["prompts"][uid] = response.query - elif hasattr(response, "id"): - wandb_data["prompts"][uid] = response.id - elif hasattr(response, "urls"): - wandb_data["prompts"][uid] = response.urls - wandb_data["twitter_reward"][uid] = twitter_reward - - if self.neuron.config.wandb_on: - wandb.log(wandb_data) - - scoring_logs = [] - response_count = len(responses) - - for index, (uid_tensor, response, reward) in enumerate( - zip(uids, responses, rewards.tolist()) - ): - uid = uid_tensor.item() - reward_payload = build_reward_payload( - search_type="x_search", - response_count=response_count, - index=index, - uid=uid, - total_reward=reward, - all_rewards=all_rewards, - all_original_rewards=all_original_rewards, - validator_scores=val_score_responses_list, - event=event, - ) - scoring_logs.append( - build_log_entry( - owner=self.neuron, - search_type="x_search", - query_kind="scoring", - response=response, - miner_uid=uid, - total_reward=reward, - reward_payload=reward_payload, - scoring_epoch_start=scoring_epoch_start, - ) - ) - - submit_logs_best_effort(self.neuron, scoring_logs) - - return rewards, uids, val_score_responses_list, event, all_original_rewards - except Exception as e: - bt.logging.error(f"Error in compute_rewards_and_penalties: {e}") - raise e - - def log_event(self, prompts: List[str], event, start_time, uids, rewards): - event.update( - { - "step_length": time.time() - start_time, - "prompts": prompts, - "uids": uids.tolist(), - "rewards": rewards.tolist(), - } - ) - - bt.logging.debug("Run Task event:", event) - - async def send_scoring_query( - self, - query: dict, - uid: int, - ) -> Optional[object]: - """ - Send a scoring query to a specific miner and return the full synapse. - Called by QueryScheduler; awaits the full response without streaming. - """ - prompt = query.get("query", "") - params = {k: v for k, v in query.items() if k != "query"} - - response, _, _ = await self.call_miner(prompt=prompt, params=params, uid=uid) - return response - - async def x_search( - self, - query, - uid: Optional[int] = None, - ): - """Receives question from user and returns the response from the miners.""" - - try: - prompt = query.get("query", "") - params = {key: value for key, value in query.items() if key != "query"} - - response, selected_uid, axon = await self.call_miner( - prompt=prompt, params=params, uid=uid - ) - - if response: - self._save_organic_log( - response=response, - miner_uid=selected_uid, - axon=axon, - search_type="x_search", - ) - yield response - else: - bt.logging.warning("Invalid response for UID: Unknown") - - except Exception as e: - bt.logging.error(f"Error in organic: {e}") - raise e - - async def x_post_by_id( - self, - tweet_id: str, - uid: Optional[int] = None, - ): - """ - Perform a Twitter search using a specific tweet ID. - """ - - try: - uid, axon = await self.neuron.get_random_miner(uid=uid) - - synapse = TwitterIDSearchSynapse( - id=tweet_id, - max_execution_time=self.max_execution_time, - validator_tweets=[], - results=[], - ) - - timeout = self.max_execution_time + 5 - - dendrite = next(self.neuron.dendrites) - - synapse: TwitterIDSearchSynapse = await dendrite.call( - target_axon=axon, - synapse=synapse, - timeout=timeout, - deserialize=False, - ) - - self._save_organic_log( - response=synapse, - miner_uid=uid, - axon=axon, - search_type="x_post_by_id", - ) - - return synapse.results - except Exception as e: - bt.logging.error(f"Error in ID search: {e}") - raise e - - async def x_posts_by_urls( - self, - urls: List[str], - uid: Optional[int] = None, - ): - """ - Perform a Twitter search using multiple tweet URLs. - """ - - try: - bt.logging.debug("run_task", "twitter urls search") - - uid, axon = await self.neuron.get_random_miner(uid=uid) - - synapse = TwitterURLsSearchSynapse( - urls=urls, - max_execution_time=self.calc_max_execution_time(len(urls)), - validator_tweets=[], - results=[], - ) - - timeout = synapse.max_execution_time + 5 - - dendrite = next(self.neuron.dendrites) - - synapse: TwitterURLsSearchSynapse = await dendrite.call( - target_axon=axon, - synapse=synapse, - timeout=timeout, - deserialize=False, - ) - - self._save_organic_log( - response=synapse, - miner_uid=uid, - axon=axon, - search_type="x_posts_by_urls", - ) - - return synapse.results - except Exception as e: - bt.logging.error(f"Error in URLs search: {e}") - raise e - - def _save_organic_log( - self, response, miner_uid: int, axon, search_type: str - ) -> None: - submit_logs_best_effort( - self.neuron, - [ - build_log_entry( - owner=self.neuron, - search_type=search_type, - query_kind="organic", - response=response, - miner_uid=miner_uid, - miner_hotkey=getattr(axon, "hotkey", None), - miner_coldkey=getattr(axon, "coldkey", None), - ) - ], - ) diff --git a/requirements.txt b/requirements.txt index facb85c3..79d07971 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,4 +16,6 @@ tiktoken==0.12.0 loguru==0.7.3 substrate-interface==1.7.11 redis[hiredis]==5.2.1 -jsonpickle==4.0.2 \ No newline at end of file +jsonpickle==4.0.2 +datasets==4.5.0 +huggingface_hub==1.4.1 diff --git a/run.sh b/run.sh index 9c3efec3..bfbf1102 100755 --- a/run.sh +++ b/run.sh @@ -14,7 +14,7 @@ check_root() { check_root # Initialize variables -validator_script="neurons/validators/validator_service.py" +validator_script="neurons/validators/service/validator_service.py" autoRunLoc=$(readlink -f "$0") api_proc_name="desearch_api_process" validator_proc_name="desearch_validator_process" @@ -364,4 +364,4 @@ if [ "$?" -eq 1 ]; then done else echo "Missing package 'jq'. Please install it for your system first." -fi \ No newline at end of file +fi diff --git a/tests/validators/test_api.py b/tests/validators/test_api.py index 7d84f6c8..e131ccb5 100644 --- a/tests/validators/test_api.py +++ b/tests/validators/test_api.py @@ -40,13 +40,13 @@ async def mock_async_generator(items): for item in items: yield item - @patch("neurons.validators.api.neu") - def test_search(self, mock_neu): + @patch("neurons.validators.api.app.api") + def test_search(self, mock_api): mock_organic = AsyncMock() mock_organic.return_value = self.mock_async_generator(["chunk1", "chunk2"]) - mock_neu.advanced_scraper_validator.organic = mock_organic + mock_api.advanced_scraper_validator.organic = mock_organic payload = { "prompt": "What is blockchain?", diff --git a/tests/validators/test_miner_response_logging.py b/tests/validators/test_miner_response_logging.py index 5e323e4d..05a7eab4 100644 --- a/tests/validators/test_miner_response_logging.py +++ b/tests/validators/test_miner_response_logging.py @@ -4,10 +4,12 @@ import pytest import torch -from neurons.validators.advanced_scraper_validator import AdvancedScraperValidator +from neurons.validators.scrapers.advanced_scraper_validator import ( + AdvancedScraperValidator, +) from neurons.validators.miner_response_logger import build_log_entry, submit_logs -from neurons.validators.web_scraper_validator import WebScraperValidator -from neurons.validators.x_scraper_validator import XScraperValidator +from neurons.validators.scrapers.web_scraper_validator import WebScraperValidator +from neurons.validators.scrapers.x_scraper_validator import XScraperValidator def _fake_owner(): @@ -48,11 +50,11 @@ async def test_x_search_logs_selected_uid(): with ( patch( - "neurons.validators.x_scraper_validator.build_log_entry", + "neurons.validators.scrapers.x_scraper_validator.build_log_entry", return_value={"ok": True}, ) as build_log_entry, patch( - "neurons.validators.x_scraper_validator.submit_logs_best_effort" + "neurons.validators.scrapers.x_scraper_validator.submit_logs_best_effort" ) as submit_logs_best_effort, ): items = [item async for item in validator.x_search({"query": "bittensor"})] @@ -78,11 +80,11 @@ async def test_web_organic_logs_selected_uid(): with ( patch( - "neurons.validators.web_scraper_validator.build_log_entry", + "neurons.validators.scrapers.web_scraper_validator.build_log_entry", return_value={"ok": True}, ) as build_log_entry, patch( - "neurons.validators.web_scraper_validator.submit_logs_best_effort" + "neurons.validators.scrapers.web_scraper_validator.submit_logs_best_effort" ) as submit_logs_best_effort, ): items = [item async for item in validator.organic({"query": "tao"})] @@ -117,11 +119,11 @@ async def fake_stream(): with ( patch( - "neurons.validators.advanced_scraper_validator.build_log_entry", + "neurons.validators.scrapers.advanced_scraper_validator.build_log_entry", return_value={"ok": True}, ) as build_log_entry, patch( - "neurons.validators.advanced_scraper_validator.submit_logs_best_effort" + "neurons.validators.scrapers.advanced_scraper_validator.submit_logs_best_effort" ) as submit_logs_best_effort, ): chunks = [ @@ -200,11 +202,11 @@ async def test_x_post_by_id_logs_organic(): with ( patch( - "neurons.validators.x_scraper_validator.build_log_entry", + "neurons.validators.scrapers.x_scraper_validator.build_log_entry", return_value={"ok": True}, ) as build_log_entry, patch( - "neurons.validators.x_scraper_validator.submit_logs_best_effort" + "neurons.validators.scrapers.x_scraper_validator.submit_logs_best_effort" ) as submit_logs_best_effort, ): results = await validator.x_post_by_id("123", uid=42) @@ -235,11 +237,11 @@ async def test_x_posts_by_urls_logs_organic(): with ( patch( - "neurons.validators.x_scraper_validator.build_log_entry", + "neurons.validators.scrapers.x_scraper_validator.build_log_entry", return_value={"ok": True}, ) as build_log_entry, patch( - "neurons.validators.x_scraper_validator.submit_logs_best_effort" + "neurons.validators.scrapers.x_scraper_validator.submit_logs_best_effort" ) as submit_logs_best_effort, ): results = await validator.x_posts_by_urls( diff --git a/tests/validators/test_query_scheduler.py b/tests/validators/test_query_scheduler.py index ee265310..5972a423 100644 --- a/tests/validators/test_query_scheduler.py +++ b/tests/validators/test_query_scheduler.py @@ -4,13 +4,18 @@ import pytest -from neurons.validators.query_scheduler import QueryScheduler +from neurons.validators.service.query_scheduler import QueryScheduler +from neurons.validators.service.scoring_dataset import ( + ScoringAssignment, + ScoringQuestion, +) +from neurons.validators.service.seed_commitment import WindowBucketState @pytest.mark.asyncio async def test_score_epoch_extracts_prompts_from_responses_and_passes_epoch_start(): scoring_store = SimpleNamespace( - get_all_for_range=AsyncMock( + get_all_for_assignments=AsyncMock( return_value={ "web_search": [ { @@ -28,14 +33,47 @@ async def test_score_epoch_extracts_prompts_from_responses_and_passes_epoch_star validator = SimpleNamespace(compute_rewards_and_penalties=AsyncMock()) scheduler = QueryScheduler( neuron=SimpleNamespace(), - utility_api=SimpleNamespace(), scoring_store=scoring_store, validators={"web_search": validator}, ) epoch_start = datetime(2026, 3, 14, 10, 0, tzinfo=timezone.utc) + scheduler.window_assignments[epoch_start] = ( + ScoringAssignment( + time_range_start=epoch_start, + uid=11, + search_type="web_search", + validator_uid=1, + validator_hotkey="validator-a", + question=ScoringQuestion(query="what is bittensor"), + scoring_seed=101, + ), + ScoringAssignment( + time_range_start=epoch_start, + uid=12, + search_type="web_search", + validator_uid=1, + validator_hotkey="validator-a", + question=ScoringQuestion(query="what is tao"), + scoring_seed=202, + ), + ) + scheduler.window_bucket_states[epoch_start] = WindowBucketState( + committed_buckets=( + SimpleNamespace( + uid=1, + hotkey="validator-a", + bucket_locator="local:validator-a", + ), + ) + ) await scheduler.score_epoch(epoch_start) + scoring_store.get_all_for_assignments.assert_awaited_once() + assert ( + scoring_store.get_all_for_assignments.await_args.kwargs["bucket_locators"] + == {(1, "validator-a"): "local:validator-a"} + ) validator.compute_rewards_and_penalties.assert_awaited_once() kwargs = validator.compute_rewards_and_penalties.await_args.kwargs assert kwargs["scoring_epoch_start"] == epoch_start diff --git a/tests/validators/test_scoring_dataset.py b/tests/validators/test_scoring_dataset.py new file mode 100644 index 00000000..235fb74e --- /dev/null +++ b/tests/validators/test_scoring_dataset.py @@ -0,0 +1,135 @@ +from collections import Counter +from datetime import datetime, timezone + +from neurons.validators.service.scoring_dataset import ( + HuggingFaceQuestionPool, + build_scoring_assignments, + build_validator_ownership, + filter_scoring_assignments, +) +from neurons.validators.service.seed_commitment import CommittedValidator + + +def build_validators(): + return [ + CommittedValidator(uid=1, hotkey="validator-a", seed=111), + CommittedValidator(uid=2, hotkey="validator-b", seed=222), + ] + + +def build_question_pool(): + pool = HuggingFaceQuestionPool( + dataset_name="dummy", + dataset_config=None, + split="train", + question_column="query", + ) + pool._questions = [ + "what is bittensor", + "what is tao", + "what is subnet 22", + "how does desearch work", + ] + return pool + + +def test_build_scoring_assignments_is_deterministic(): + time_range_start = datetime(2026, 4, 6, 10, 0, tzinfo=timezone.utc) + pool = build_question_pool() + + assignments_a = build_scoring_assignments( + time_range_start=time_range_start, + miner_uids=[3, 7, 11], + validators=build_validators(), + question_pool=pool, + combined_seed=123456, + ) + assignments_b = build_scoring_assignments( + time_range_start=time_range_start, + miner_uids=[3, 7, 11], + validators=build_validators(), + question_pool=pool, + combined_seed=123456, + ) + + assert assignments_a == assignments_b + assert len(assignments_a) == 9 + assert sorted(item.uid for item in assignments_a).count(3) == 3 + assert sorted(item.uid for item in assignments_a).count(7) == 3 + assert sorted(item.uid for item in assignments_a).count(11) == 3 + assert sorted(item.search_type for item in assignments_a) == [ + "ai_search", + "ai_search", + "ai_search", + "web_search", + "web_search", + "web_search", + "x_search", + "x_search", + "x_search", + ] + + owners_by_miner = { + uid: { + (item.validator_uid, item.validator_hotkey) + for item in assignments_a + if item.uid == uid + } + for uid in [3, 7, 11] + } + assert all(len(owners) == 1 for owners in owners_by_miner.values()) + + local_assignments = filter_scoring_assignments(assignments_a, validator_uid=1) + assert all(item.validator_uid == 1 for item in local_assignments) + + +def test_build_validator_ownership_is_stable_for_same_scoring_window(): + time_range_start = datetime(2026, 4, 6, 10, 0, tzinfo=timezone.utc) + miner_uids = [3, 7, 11, 13, 17, 19] + validators = build_validators() + + ownership_a = build_validator_ownership( + time_range_start=time_range_start, + miner_uids=miner_uids, + validators=validators, + combined_seed=123456, + ) + ownership_b = build_validator_ownership( + time_range_start=time_range_start, + miner_uids=list(reversed(miner_uids)), + validators=list(reversed(validators)), + combined_seed=123456, + ) + + assert ownership_a == ownership_b + assert set(ownership_a) == set(miner_uids) + + owned_counts = Counter(owner.uid for owner in ownership_a.values()) + assert set(owned_counts) == {1, 2} + assert max(owned_counts.values()) - min(owned_counts.values()) <= 1 + + +def test_two_validators_split_miner_uids_without_overlap(): + time_range_start = datetime(2026, 4, 6, 10, 0, tzinfo=timezone.utc) + miner_uids = [3, 7, 11, 13, 17, 19] + assignments = build_scoring_assignments( + time_range_start=time_range_start, + miner_uids=miner_uids, + validators=build_validators(), + question_pool=build_question_pool(), + combined_seed=123456, + ) + + validator_1_assignments = filter_scoring_assignments(assignments, validator_uid=1) + validator_2_assignments = filter_scoring_assignments(assignments, validator_uid=2) + + validator_1_miner_uids = {item.uid for item in validator_1_assignments} + validator_2_miner_uids = {item.uid for item in validator_2_assignments} + + assert validator_1_miner_uids + assert validator_2_miner_uids + assert validator_1_miner_uids.isdisjoint(validator_2_miner_uids) + assert validator_1_miner_uids | validator_2_miner_uids == set(miner_uids) + + assert len(validator_1_assignments) == len(validator_1_miner_uids) * 3 + assert len(validator_2_assignments) == len(validator_2_miner_uids) * 3 diff --git a/tests/validators/test_scoring_store.py b/tests/validators/test_scoring_store.py new file mode 100644 index 00000000..00b630e9 --- /dev/null +++ b/tests/validators/test_scoring_store.py @@ -0,0 +1,92 @@ +import json +from datetime import datetime, timezone + +import pytest + +from neurons.validators.service.scoring_dataset import ( + ScoringAssignment, + ScoringQuestion, +) +from neurons.validators.service.scoring_store import ScoringStore +from neurons.validators.storage.local import LocalObjectStorage + + +def build_assignment(epoch_start: datetime) -> ScoringAssignment: + return ScoringAssignment( + time_range_start=epoch_start, + uid=11, + search_type="web_search", + validator_uid=5, + validator_hotkey="validator-a", + question=ScoringQuestion( + query="what is bittensor", + params={"country": "us"}, + ), + scoring_seed=123, + ) + + +@pytest.mark.asyncio +async def test_scoring_store_round_trip_local_storage(tmp_path): + store = ScoringStore( + object_storage=LocalObjectStorage(tmp_path), + netuid=22, + validator_uid=5, + validator_hotkey="validator-a", + ) + epoch_start = datetime(2026, 4, 6, 10, 0, tzinfo=timezone.utc) + assignment = build_assignment(epoch_start) + response = {"query": "what is bittensor", "results": ["tao"]} + + await store.save_response(assignment, response) + + all_responses = await store.get_all_for_assignments( + [assignment], + bucket_locators={(5, "validator-a"): store.bucket_locator}, + ) + + assert all_responses == { + "web_search": [ + { + "uid": 11, + "response": response, + "scoring_seed": 123, + } + ] + } + + +@pytest.mark.asyncio +async def test_scoring_store_ignores_mismatched_assignment_payload(tmp_path): + storage = LocalObjectStorage(tmp_path) + store = ScoringStore( + object_storage=storage, + netuid=22, + validator_uid=5, + validator_hotkey="validator-a", + ) + epoch_start = datetime(2026, 4, 6, 10, 0, tzinfo=timezone.utc) + assignment = build_assignment(epoch_start) + location = store._location_for_assignment(assignment) + payload = { + "version": 1, + "assignment": { + **store._assignment_payload(assignment), + "query": "different question", + }, + "response": json.dumps({"query": "different question"}), + } + + await storage.put_object( + bucket=location.bucket, + key=location.key, + data=json.dumps(payload).encode("utf-8"), + ) + + assert ( + await store.get_all_for_assignments( + [assignment], + bucket_locators={(5, "validator-a"): store.bucket_locator}, + ) + == {} + ) diff --git a/tests/validators/test_seed_commitment.py b/tests/validators/test_seed_commitment.py new file mode 100644 index 00000000..2469e6a1 --- /dev/null +++ b/tests/validators/test_seed_commitment.py @@ -0,0 +1,65 @@ +from datetime import datetime, timezone + +from neurons.validators.service.seed_commitment import ( + CommittedValidator, + combine_validator_seeds, + format_bucket_payload, + format_seed_payload, + merge_bucket_commitment, + merge_seed_commitment, + parse_bucket_commitment, + parse_seed_commitment, +) + + +def test_seed_commitment_round_trip_preserves_other_data(): + time_range_start = datetime(2026, 4, 6, 10, 0, tzinfo=timezone.utc) + payload = format_seed_payload(time_range_start, 0xABC123) + merged = merge_seed_commitment("https://example.com/data", payload) + + parsed = parse_seed_commitment(merged) + + assert merged.startswith("https://example.com/data") + assert parsed is not None + assert parsed.time_range_start == time_range_start + assert parsed.seed == 0xABC123 + + +def test_combine_validator_seeds_is_stable_for_same_validators(): + time_range_start = datetime(2026, 4, 6, 11, 0, tzinfo=timezone.utc) + validators = [ + CommittedValidator( + uid=5, + hotkey="validator-b", + seed=222, + ), + CommittedValidator( + uid=1, + hotkey="validator-a", + seed=111, + ), + ] + + combined_a = combine_validator_seeds( + validators, + time_range_start=time_range_start, + ) + combined_b = combine_validator_seeds( + list(reversed(validators)), + time_range_start=time_range_start, + ) + + assert combined_a == combined_b + + +def test_bucket_commitment_round_trip_preserves_other_data(): + time_range_start = datetime(2026, 4, 6, 10, 30, tzinfo=timezone.utc) + payload = format_bucket_payload(time_range_start, "hf:namespace/repo-name") + merged = merge_bucket_commitment("https://example.com/data", payload) + + parsed = parse_bucket_commitment(merged) + + assert merged.startswith("https://example.com/data") + assert parsed is not None + assert parsed.time_range_start == time_range_start + assert parsed.bucket_locator == "hf:namespace/repo-name" diff --git a/tests/validators/test_weights.py b/tests/validators/test_weights.py index 19f4c1d4..b80b8e2b 100644 --- a/tests/validators/test_weights.py +++ b/tests/validators/test_weights.py @@ -1,6 +1,6 @@ from unittest.mock import Mock, patch import unittest -from neurons.validators.weights import burn_weights +from neurons.validators.service.weights import burn_weights from desearch.bittensor.metagraph import generateMockNeurons import torch @@ -11,7 +11,7 @@ def setUp(self): self.neuron.metagraph.neurons = generateMockNeurons(4) self.neuron.metagraph.uids = torch.tensor([0, 1, 2, 3]) - @patch("neurons.validators.weights.EMISSION_CONTROL_HOTKEY", "hotkey1") + @patch("neurons.validators.service.weights.EMISSION_CONTROL_HOTKEY", "hotkey1") def test_burn_weights(self): weights = burn_weights(self.neuron, torch.tensor([0, 1, 1, 1])) self.assertEqual(weights[0], torch.tensor([0]))