44//! Uses libp2p for gossipsub consensus and Kademlia DHT for storage.
55//! Submits weights to Bittensor at epoch boundaries.
66
7+ mod wasm_executor;
8+
79use anyhow:: Result ;
810use bittensor_rs:: chain:: { signer_from_seed, BittensorSigner , ExtrinsicWait } ;
911use clap:: Parser ;
@@ -17,7 +19,7 @@ use platform_core::{
1719 CheckpointData , CheckpointManager , CompletedEvaluationState , PendingEvaluationState ,
1820 WeightVoteState ,
1921 } ,
20- Hotkey , Keypair , SUDO_KEY_SS58 ,
22+ ChallengeId , Hotkey , Keypair , SUDO_KEY_SS58 ,
2123} ;
2224use platform_distributed_storage:: {
2325 DistributedStoreExt , LocalStorage , LocalStorageBuilder , StorageKey ,
@@ -30,6 +32,7 @@ use std::path::{Path, PathBuf};
3032use std:: sync:: Arc ;
3133use std:: time:: Duration ;
3234use tracing:: { debug, error, info, warn} ;
35+ use wasm_executor:: { WasmChallengeExecutor , WasmExecutorConfig } ;
3336
3437/// Storage key for persisted chain state
3538const STATE_STORAGE_KEY : & str = "chain_state" ;
@@ -155,6 +158,22 @@ struct Args {
155158 /// Disable Bittensor (for testing)
156159 #[ arg( long) ]
157160 no_bittensor : bool ,
161+
162+ /// Directory where WASM challenge modules are stored
163+ #[ arg( long, env = "WASM_MODULE_DIR" , default_value = "./wasm_modules" ) ]
164+ wasm_module_dir : PathBuf ,
165+
166+ /// Maximum memory for WASM execution in bytes (default: 512MB)
167+ #[ arg( long, env = "WASM_MAX_MEMORY" , default_value = "536870912" ) ]
168+ wasm_max_memory : u64 ,
169+
170+ /// Enable fuel metering for WASM execution
171+ #[ arg( long, env = "WASM_ENABLE_FUEL" ) ]
172+ wasm_enable_fuel : bool ,
173+
174+ /// Fuel limit per WASM execution (requires --wasm-enable-fuel)
175+ #[ arg( long, env = "WASM_FUEL_LIMIT" ) ]
176+ wasm_fuel_limit : Option < u64 > ,
158177}
159178
160179// ==================== Main ====================
@@ -338,6 +357,38 @@ async fn main() -> Result<()> {
338357 bittensor_client_for_metagraph = None ;
339358 }
340359
360+ // Initialize WASM challenge executor
361+ let wasm_module_dir = if args. wasm_module_dir . is_relative ( ) {
362+ data_dir. join ( & args. wasm_module_dir )
363+ } else {
364+ args. wasm_module_dir . clone ( )
365+ } ;
366+ std:: fs:: create_dir_all ( & wasm_module_dir) ?;
367+
368+ let wasm_executor = match WasmChallengeExecutor :: new ( WasmExecutorConfig {
369+ module_dir : wasm_module_dir. clone ( ) ,
370+ max_memory_bytes : args. wasm_max_memory ,
371+ enable_fuel : args. wasm_enable_fuel ,
372+ fuel_limit : args. wasm_fuel_limit ,
373+ } ) {
374+ Ok ( executor) => {
375+ info ! (
376+ module_dir = %wasm_module_dir. display( ) ,
377+ max_memory = args. wasm_max_memory,
378+ fuel_enabled = args. wasm_enable_fuel,
379+ "WASM challenge executor ready"
380+ ) ;
381+ Some ( Arc :: new ( executor) )
382+ }
383+ Err ( e) => {
384+ error ! (
385+ "Failed to initialize WASM executor: {}. WASM evaluations disabled." ,
386+ e
387+ ) ;
388+ None
389+ }
390+ } ;
391+
341392 // Initialize shutdown handler for graceful checkpoint persistence
342393 let mut shutdown_handler =
343394 match ShutdownHandler :: new ( & data_dir, state_manager. clone ( ) , args. netuid ) {
@@ -363,6 +414,7 @@ async fn main() -> Result<()> {
363414 let mut stale_check_interval = tokio:: time:: interval ( Duration :: from_secs ( 60 ) ) ;
364415 let mut state_persist_interval = tokio:: time:: interval ( Duration :: from_secs ( 60 ) ) ;
365416 let mut checkpoint_interval = tokio:: time:: interval ( Duration :: from_secs ( 300 ) ) ; // 5 minutes
417+ let mut wasm_eval_interval = tokio:: time:: interval ( Duration :: from_secs ( 5 ) ) ;
366418
367419 loop {
368420 tokio:: select! {
@@ -438,6 +490,17 @@ async fn main() -> Result<()> {
438490 debug!( "Active validators: {}" , validator_set. active_count( ) ) ;
439491 }
440492
493+ // WASM evaluation check
494+ _ = wasm_eval_interval. tick( ) => {
495+ if let Some ( ref executor) = wasm_executor {
496+ process_wasm_evaluations(
497+ executor,
498+ & state_manager,
499+ & keypair,
500+ ) . await ;
501+ }
502+ }
503+
441504 // Periodic checkpoint
442505 _ = checkpoint_interval. tick( ) => {
443506 if let Some ( handler) = shutdown_handler. as_mut( ) {
@@ -793,3 +856,146 @@ async fn handle_block_event(
793856 }
794857 }
795858}
859+
860+ async fn process_wasm_evaluations (
861+ executor : & Arc < WasmChallengeExecutor > ,
862+ state_manager : & Arc < StateManager > ,
863+ keypair : & Keypair ,
864+ ) {
865+ let pending: Vec < ( String , ChallengeId , String ) > = state_manager. read ( |state| {
866+ state
867+ . pending_evaluations
868+ . iter ( )
869+ . filter ( |( _, record) | {
870+ !record. finalized && !record. evaluations . contains_key ( & keypair. hotkey ( ) )
871+ } )
872+ . map ( |( id, record) | ( id. clone ( ) , record. challenge_id , record. agent_hash . clone ( ) ) )
873+ . collect ( )
874+ } ) ;
875+
876+ if pending. is_empty ( ) {
877+ return ;
878+ }
879+
880+ for ( submission_id, challenge_id, _agent_hash) in pending {
881+ let module_filename = format ! ( "{}.wasm" , challenge_id) ;
882+
883+ if !executor. module_exists ( & module_filename) {
884+ debug ! (
885+ submission_id = %submission_id,
886+ challenge_id = %challenge_id,
887+ "No WASM module found for challenge, skipping WASM evaluation"
888+ ) ;
889+ continue ;
890+ }
891+
892+ let network_policy = wasm_runtime_interface:: NetworkPolicy :: default ( ) ;
893+
894+ let input_data = submission_id. as_bytes ( ) . to_vec ( ) ;
895+
896+ let executor = Arc :: clone ( executor) ;
897+ let module_filename_clone = module_filename. clone ( ) ;
898+
899+ let result = tokio:: task:: spawn_blocking ( move || {
900+ executor. execute_evaluation ( & module_filename_clone, & network_policy, & input_data)
901+ } )
902+ . await ;
903+
904+ let score = match result {
905+ Ok ( Ok ( ( score, metrics) ) ) => {
906+ info ! (
907+ submission_id = %submission_id,
908+ challenge_id = %challenge_id,
909+ score,
910+ execution_time_ms = metrics. execution_time_ms,
911+ memory_bytes = metrics. memory_used_bytes,
912+ network_requests = metrics. network_requests_made,
913+ fuel_consumed = ?metrics. fuel_consumed,
914+ "WASM evaluation succeeded"
915+ ) ;
916+ ( score as f64 ) / i64:: MAX as f64
917+ }
918+ Ok ( Err ( e) ) => {
919+ warn ! (
920+ submission_id = %submission_id,
921+ challenge_id = %challenge_id,
922+ error = %e,
923+ "WASM evaluation failed, reporting score 0"
924+ ) ;
925+ 0.0
926+ }
927+ Err ( e) => {
928+ error ! (
929+ submission_id = %submission_id,
930+ challenge_id = %challenge_id,
931+ error = %e,
932+ "WASM evaluation task panicked, reporting score 0"
933+ ) ;
934+ 0.0
935+ }
936+ } ;
937+
938+ let score_clamped = score. clamp ( 0.0 , 1.0 ) ;
939+ let validator_hotkey = keypair. hotkey ( ) ;
940+
941+ #[ derive( serde:: Serialize ) ]
942+ struct EvaluationSigningData < ' a > {
943+ submission_id : & ' a str ,
944+ score : f64 ,
945+ }
946+ let signing_data = EvaluationSigningData {
947+ submission_id : & submission_id,
948+ score : score_clamped,
949+ } ;
950+ let signing_bytes = match bincode:: serialize ( & signing_data) {
951+ Ok ( bytes) => bytes,
952+ Err ( e) => {
953+ error ! (
954+ submission_id = %submission_id,
955+ error = %e,
956+ "Failed to serialize evaluation signing data"
957+ ) ;
958+ continue ;
959+ }
960+ } ;
961+ let signature = match keypair. sign_bytes ( & signing_bytes) {
962+ Ok ( sig) => sig,
963+ Err ( e) => {
964+ error ! (
965+ submission_id = %submission_id,
966+ error = %e,
967+ "Failed to sign evaluation"
968+ ) ;
969+ continue ;
970+ }
971+ } ;
972+
973+ let eval = platform_p2p_consensus:: ValidatorEvaluation {
974+ score : score_clamped,
975+ stake : 0 ,
976+ timestamp : chrono:: Utc :: now ( ) . timestamp_millis ( ) ,
977+ signature : signature. clone ( ) ,
978+ } ;
979+
980+ state_manager. apply ( |state| {
981+ if let Err ( e) = state. add_validator_evaluation (
982+ & submission_id,
983+ validator_hotkey. clone ( ) ,
984+ eval,
985+ & signature,
986+ ) {
987+ warn ! (
988+ submission_id = %submission_id,
989+ error = %e,
990+ "Failed to add WASM evaluation to state"
991+ ) ;
992+ } else {
993+ debug ! (
994+ submission_id = %submission_id,
995+ score = score_clamped,
996+ "WASM evaluation recorded in state"
997+ ) ;
998+ }
999+ } ) ;
1000+ }
1001+ }
0 commit comments