@@ -8,8 +8,8 @@ use clap::Parser;
88use distributed_db:: { ConsensusStatus , DBSyncEvent , DBSyncManager , DBSyncMessage , DistributedDB } ;
99use parking_lot:: RwLock ;
1010use platform_bittensor:: {
11- signer_from_seed, BittensorClient , BlockSync , BlockSyncConfig , BlockSyncEvent , ExtrinsicWait ,
12- Subtensor ,
11+ signer_from_seed, sync_metagraph , BittensorClient , BlockSync , BlockSyncConfig , BlockSyncEvent ,
12+ ExtrinsicWait , Subtensor ,
1313} ;
1414use platform_challenge_runtime:: { ChallengeRuntime , RuntimeConfig , RuntimeEvent } ;
1515use platform_consensus:: PBFTEngine ;
@@ -727,6 +727,80 @@ async fn main() -> Result<()> {
727727 warn ! ( "Failed to create block sync client: {}" , e) ;
728728 }
729729 }
730+
731+ // Initial metagraph sync to populate validators from Bittensor
732+ // IMPORTANT: Must complete before accepting peer connections to validate their stake
733+ info ! ( "Waiting for metagraph sync to load validators (netuid={})..." , args. netuid) ;
734+
735+ let max_retries = 3 ;
736+ let mut sync_success = false ;
737+
738+ for attempt in 1 ..=max_retries {
739+ info ! ( "Metagraph sync attempt {}/{}..." , attempt, max_retries) ;
740+
741+ match BittensorClient :: new ( & args. subtensor_endpoint ) . await {
742+ Ok ( metagraph_client) => {
743+ match sync_metagraph ( & metagraph_client, args. netuid ) . await {
744+ Ok ( metagraph) => {
745+ let mut added = 0 ;
746+ let mut state = chain_state. write ( ) ;
747+
748+ for neuron in metagraph. neurons . values ( ) {
749+ // Convert AccountId32 hotkey to our Hotkey type
750+ let hotkey_bytes: & [ u8 ; 32 ] = neuron. hotkey . as_ref ( ) ;
751+ let hotkey = Hotkey ( * hotkey_bytes) ;
752+
753+ // Get stake (convert from u128 to u64, saturating)
754+ let stake_rao = neuron. stake . min ( u64:: MAX as u128 ) as u64 ;
755+
756+ // Skip if below minimum stake
757+ if stake_rao < MIN_STAKE_RAO {
758+ continue ;
759+ }
760+
761+ // Add validator if not already present
762+ if state. get_validator ( & hotkey) . is_none ( ) {
763+ let info = ValidatorInfo :: new ( hotkey. clone ( ) , Stake :: new ( stake_rao) ) ;
764+ if state. add_validator ( info) . is_ok ( ) {
765+ added += 1 ;
766+ }
767+ } else if let Some ( v) = state. validators . get_mut ( & hotkey) {
768+ // Update stake for existing validator
769+ v. stake = Stake :: new ( stake_rao) ;
770+ }
771+
772+ // Cache stake in protection for quick validation
773+ protection. validate_stake ( & hotkey. to_hex ( ) , stake_rao) ;
774+ }
775+
776+ info ! ( "Metagraph sync complete: {} neurons, {} validators with sufficient stake (min {} TAO)" ,
777+ metagraph. n, added, MIN_STAKE_TAO ) ;
778+ info ! ( "Validator identity verification ready - will accept messages from {} known validators" ,
779+ state. validators. len( ) ) ;
780+ sync_success = true ;
781+ break ;
782+ }
783+ Err ( e) => {
784+ warn ! ( "Metagraph sync attempt {} failed: {}" , attempt, e) ;
785+ }
786+ }
787+ }
788+ Err ( e) => {
789+ warn ! ( "Failed to connect for metagraph sync (attempt {}): {}" , attempt, e) ;
790+ }
791+ }
792+
793+ if attempt < max_retries {
794+ info ! ( "Retrying metagraph sync in 5 seconds..." ) ;
795+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 5 ) ) . await ;
796+ }
797+ }
798+
799+ if !sync_success {
800+ warn ! ( "CRITICAL: Metagraph sync failed after {} attempts!" , max_retries) ;
801+ warn ! ( "Validator will only recognize itself and sudo. Other validators may be rejected." ) ;
802+ warn ! ( "Periodic sync will retry every 10 minutes." ) ;
803+ }
730804 }
731805 Err ( e) => {
732806 warn ! ( "Failed to connect to Subtensor: {} (continuing without)" , e) ;
@@ -1173,6 +1247,53 @@ async fn main() -> Result<()> {
11731247 msg_value. get ( "message" ) ,
11741248 msg_value. get ( "target" ) . and_then ( |t| t. as_str ( ) ) ,
11751249 ) {
1250+ // Check if this is a DecryptApiKeyRequest - handle locally
1251+ if let Ok ( platform_challenge_sdk:: ChallengeP2PMessage :: DecryptApiKeyRequest ( req) ) =
1252+ serde_json:: from_value :: < platform_challenge_sdk:: ChallengeP2PMessage > ( message. clone ( ) ) {
1253+ // Decrypt the API key locally and send response back to container
1254+ let response = match platform_challenge_sdk:: decrypt_api_key (
1255+ & req. encrypted_key ,
1256+ & keypair_for_outbox. seed ( ) ,
1257+ ) {
1258+ Ok ( api_key) => {
1259+ info ! ( "Decrypted API key for agent {} (request {})" , & req. agent_hash[ ..16 . min( req. agent_hash. len( ) ) ] , & req. request_id[ ..8 ] ) ;
1260+ platform_challenge_sdk:: DecryptApiKeyResponse {
1261+ challenge_id : req. challenge_id . clone ( ) ,
1262+ agent_hash : req. agent_hash . clone ( ) ,
1263+ request_id : req. request_id . clone ( ) ,
1264+ success : true ,
1265+ api_key : Some ( api_key) ,
1266+ error : None ,
1267+ }
1268+ }
1269+ Err ( e) => {
1270+ warn ! ( "Failed to decrypt API key for agent {}: {}" , & req. agent_hash[ ..16 . min( req. agent_hash. len( ) ) ] , e) ;
1271+ platform_challenge_sdk:: DecryptApiKeyResponse {
1272+ challenge_id : req. challenge_id . clone ( ) ,
1273+ agent_hash : req. agent_hash . clone ( ) ,
1274+ request_id : req. request_id . clone ( ) ,
1275+ success : false ,
1276+ api_key : None ,
1277+ error : Some ( e. to_string ( ) ) ,
1278+ }
1279+ }
1280+ } ;
1281+
1282+ // Send response back to container via P2P message endpoint
1283+ let p2p_response = platform_challenge_sdk:: ChallengeP2PMessage :: DecryptApiKeyResponse ( response) ;
1284+ let p2p_endpoint = format ! ( "http://challenge-{}:8080/p2p/message" , container_name) ;
1285+ let req_body = serde_json:: json!( {
1286+ "from_hotkey" : keypair_for_outbox. hotkey( ) . to_hex( ) ,
1287+ "message" : p2p_response
1288+ } ) ;
1289+
1290+ if let Err ( e) = client. post ( & p2p_endpoint) . json ( & req_body) . send ( ) . await {
1291+ debug ! ( "Failed to send decrypt response to container: {}" , e) ;
1292+ }
1293+ continue ; // Don't broadcast this message
1294+ }
1295+ }
1296+
11761297 // Create ChallengeNetworkMessage to broadcast
11771298 let challenge_msg = ChallengeNetworkMessage {
11781299 challenge_id : config. name . clone ( ) ,
@@ -1239,7 +1360,7 @@ async fn main() -> Result<()> {
12391360 let challenge_routes_for_p2p = rpc_handler. as_ref ( ) . map ( |h| h. challenge_routes . clone ( ) ) ;
12401361 // Get distributed_db for P2P message handling
12411362 let db_for_p2p = Some ( distributed_db. clone ( ) ) ;
1242- let _storage = Arc :: new ( storage) ; // Keep reference but don't persist state
1363+ let storage = Arc :: new ( storage) ; // Keep reference for state persistence
12431364 let runtime_for_blocks = challenge_runtime. clone ( ) ;
12441365 let subtensor_clone = subtensor. clone ( ) ;
12451366 let subtensor_signer_clone = subtensor_signer. clone ( ) ;
@@ -1248,6 +1369,9 @@ async fn main() -> Result<()> {
12481369 let mut block_counter = 0u64 ;
12491370 let use_bittensor_blocks = block_sync_rx. is_some ( ) ;
12501371 let netuid = args. netuid ;
1372+ let subtensor_endpoint = args. subtensor_endpoint . clone ( ) ;
1373+ let mut last_metagraph_sync = std:: time:: Instant :: now ( ) ;
1374+ let metagraph_sync_interval = std:: time:: Duration :: from_secs ( 600 ) ; // Sync metagraph every 10 minutes
12511375
12521376 // Fetch mechanism count from Bittensor and submit initial weights
12531377 // This prevents vtrust penalty from not having set weights yet
@@ -1765,15 +1889,78 @@ async fn main() -> Result<()> {
17651889 error!( "Timeout check error: {}" , e) ;
17661890 }
17671891
1768- // Note: State is not persisted locally - it comes from Bittensor chain
1769- // Challenges and weights are managed via SudoAction and Bittensor
1892+ // Persist state periodically (challenge_configs, etc.)
1893+ {
1894+ let state = chain_state. read( ) ;
1895+ if let Err ( e) = storage. save_state( & state) {
1896+ warn!( "Failed to persist state: {}" , e) ;
1897+ }
1898+ }
17701899
17711900 // Cleanup expired protection entries
17721901 protection. cleanup( ) ;
17731902
17741903 // Cleanup stale hotkey connections (no heartbeat for 5 minutes)
17751904 protection. cleanup_stale_hotkeys( std:: time:: Duration :: from_secs( 300 ) ) ;
17761905
1906+ // Periodic metagraph sync to update validators from Bittensor
1907+ if use_bittensor_blocks && last_metagraph_sync. elapsed( ) >= metagraph_sync_interval {
1908+ last_metagraph_sync = std:: time:: Instant :: now( ) ;
1909+ let endpoint = subtensor_endpoint. clone( ) ;
1910+ let chain_state_for_sync = chain_state. clone( ) ;
1911+ let protection_for_sync = protection. clone( ) ;
1912+
1913+ tokio:: spawn( async move {
1914+ match BittensorClient :: new( & endpoint) . await {
1915+ Ok ( client) => {
1916+ match sync_metagraph( & client, netuid) . await {
1917+ Ok ( metagraph) => {
1918+ let mut added = 0 ;
1919+ let mut updated = 0 ;
1920+ let mut state = chain_state_for_sync. write( ) ;
1921+
1922+ for neuron in metagraph. neurons. values( ) {
1923+ let hotkey_bytes: & [ u8 ; 32 ] = neuron. hotkey. as_ref( ) ;
1924+ let hotkey = Hotkey ( * hotkey_bytes) ;
1925+ let stake_rao = neuron. stake. min( u64 :: MAX as u128 ) as u64 ;
1926+
1927+ if stake_rao < MIN_STAKE_RAO {
1928+ continue ;
1929+ }
1930+
1931+ if state. get_validator( & hotkey) . is_none( ) {
1932+ let info = ValidatorInfo :: new( hotkey. clone( ) , Stake :: new( stake_rao) ) ;
1933+ if state. add_validator( info) . is_ok( ) {
1934+ added += 1 ;
1935+ }
1936+ } else if let Some ( v) = state. validators. get_mut( & hotkey) {
1937+ if v. stake. 0 != stake_rao {
1938+ v. stake = Stake :: new( stake_rao) ;
1939+ updated += 1 ;
1940+ }
1941+ }
1942+
1943+ // Update stake cache
1944+ protection_for_sync. validate_stake( & hotkey. to_hex( ) , stake_rao) ;
1945+ }
1946+
1947+ if added > 0 || updated > 0 {
1948+ info!( "Metagraph periodic sync: {} added, {} updated (total: {} validators)" ,
1949+ added, updated, state. validators. len( ) ) ;
1950+ }
1951+ }
1952+ Err ( e) => {
1953+ warn!( "Periodic metagraph sync failed: {}" , e) ;
1954+ }
1955+ }
1956+ }
1957+ Err ( e) => {
1958+ debug!( "Failed to connect for metagraph sync: {}" , e) ;
1959+ }
1960+ }
1961+ } ) ;
1962+ }
1963+
17771964 // Log protection stats periodically
17781965 let prot_stats = protection. stats( ) ;
17791966 let connected_validators = protection. connected_validator_count( ) ;
@@ -2047,11 +2234,12 @@ async fn handle_message(
20472234 }
20482235 }
20492236 NetworkMessage :: Proposal ( proposal) => {
2050- // Check if this is a Sudo AddChallenge proposal and auto-register routes
2237+ // Check if this is a Sudo AddChallenge proposal and handle it
20512238 if let platform_core:: ProposalAction :: Sudo ( platform_core:: SudoAction :: AddChallenge {
20522239 ref config,
20532240 } ) = proposal. action
20542241 {
2242+ // Auto-register routes
20552243 if let Some ( routes_map) = challenge_routes {
20562244 use platform_challenge_sdk:: ChallengeRoute ;
20572245 let default_routes = vec ! [
@@ -2070,6 +2258,16 @@ async fn handle_message(
20702258 config. name
20712259 ) ;
20722260 }
2261+
2262+ // Start container via orchestrator (same as SudoAction handler)
2263+ if let Some ( orchestrator) = challenge_orchestrator {
2264+ info ! ( "Starting challenge container '{}' from P2P Proposal" , config. name) ;
2265+ if let Err ( e) = orchestrator. add_challenge ( config. clone ( ) ) . await {
2266+ error ! ( "Failed to start challenge container from P2P: {}" , e) ;
2267+ } else {
2268+ info ! ( "Challenge container '{}' started from P2P" , config. name) ;
2269+ }
2270+ }
20732271 }
20742272
20752273 if let Err ( e) = consensus. handle_proposal ( proposal, & signer) . await {
0 commit comments