Skip to content

Commit 7263a0e

Browse files
authored
feat(validator-node): wire WASM evaluation into submission processing and gossipsub broadcast (#34)
Integrate WasmChallengeExecutor into the validator node main event loop so that WASM challenge evaluations are actually processed and their results propagated through the P2P network. Key changes in bins/validator-node/src/main.rs: - Add an eval_broadcast_tx/rx channel in the main event loop to forward evaluation results back into the P2P network via gossipsub, ensuring WASM scores participate in consensus/weight aggregation like any other evaluation result. - Handle P2PMessage::Submission in handle_network_event: incoming submissions are deduplicated and stored as EvaluationRecord in the state manager pending_evaluations map for later processing. - Handle P2PMessage::Evaluation in handle_network_event: peer validator evaluations are recorded in state with stake-weighted metadata for consensus aggregation. - Extend process_wasm_evaluations to construct EvaluationMetrics from WASM execution results (normalized score, execution time, memory usage, error info) and broadcast a P2PMessage::Evaluation after each evaluation completes. - Update imports to include EvaluationMessage, EvaluationMetrics, and EvaluationRecord from platform_p2p_consensus. - Thread eval_broadcast_tx through to process_wasm_evaluations so results can be published without blocking the main loop.
1 parent 6ba85e5 commit 7263a0e

File tree

1 file changed

+141
-8
lines changed

1 file changed

+141
-8
lines changed

bins/validator-node/src/main.rs

Lines changed: 141 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use platform_distributed_storage::{
2525
DistributedStoreExt, LocalStorage, LocalStorageBuilder, StorageKey,
2626
};
2727
use platform_p2p_consensus::{
28-
ChainState, ConsensusEngine, NetworkEvent, P2PConfig, P2PMessage, P2PNetwork, StateManager,
29-
ValidatorRecord, ValidatorSet,
28+
ChainState, ConsensusEngine, EvaluationMessage, EvaluationMetrics, EvaluationRecord,
29+
NetworkEvent, P2PConfig, P2PMessage, P2PNetwork, StateManager, ValidatorRecord, ValidatorSet,
3030
};
3131
use std::path::{Path, PathBuf};
3232
use std::sync::Arc;
@@ -416,6 +416,8 @@ async fn main() -> Result<()> {
416416
let mut checkpoint_interval = tokio::time::interval(Duration::from_secs(300)); // 5 minutes
417417
let mut wasm_eval_interval = tokio::time::interval(Duration::from_secs(5));
418418

419+
let (eval_broadcast_tx, mut eval_broadcast_rx) = tokio::sync::mpsc::channel::<P2PMessage>(256);
420+
419421
loop {
420422
tokio::select! {
421423
// P2P network events
@@ -428,6 +430,16 @@ async fn main() -> Result<()> {
428430
).await;
429431
}
430432

433+
// Outbound evaluation broadcasts
434+
Some(msg) = eval_broadcast_rx.recv() => {
435+
if let Err(e) = event_tx.send(NetworkEvent::Message {
436+
source: network.local_peer_id(),
437+
message: msg,
438+
}).await {
439+
warn!("Failed to forward evaluation broadcast: {}", e);
440+
}
441+
}
442+
431443
// Bittensor block events
432444
Some(event) = async {
433445
match block_rx.as_mut() {
@@ -497,6 +509,7 @@ async fn main() -> Result<()> {
497509
executor,
498510
&state_manager,
499511
&keypair,
512+
&eval_broadcast_tx,
500513
).await;
501514
}
502515
}
@@ -615,7 +628,7 @@ async fn handle_network_event(
615628
event: NetworkEvent,
616629
consensus: &Arc<RwLock<ConsensusEngine>>,
617630
validator_set: &Arc<ValidatorSet>,
618-
_state_manager: &Arc<StateManager>,
631+
state_manager: &Arc<StateManager>,
619632
) {
620633
match event {
621634
NetworkEvent::Message { source, message } => match message {
@@ -691,6 +704,82 @@ async fn handle_network_event(
691704
debug!("Heartbeat update skipped: {}", e);
692705
}
693706
}
707+
P2PMessage::Submission(sub) => {
708+
info!(
709+
submission_id = %sub.submission_id,
710+
challenge_id = %sub.challenge_id,
711+
miner = %sub.miner.to_hex(),
712+
"Received submission from P2P network"
713+
);
714+
let already_exists = state_manager
715+
.read(|state| state.pending_evaluations.contains_key(&sub.submission_id));
716+
if already_exists {
717+
debug!(
718+
submission_id = %sub.submission_id,
719+
"Submission already exists in pending evaluations, skipping"
720+
);
721+
} else {
722+
let record = EvaluationRecord {
723+
submission_id: sub.submission_id.clone(),
724+
challenge_id: sub.challenge_id,
725+
miner: sub.miner,
726+
agent_hash: sub.agent_hash,
727+
evaluations: std::collections::HashMap::new(),
728+
aggregated_score: None,
729+
finalized: false,
730+
created_at: sub.timestamp,
731+
finalized_at: None,
732+
};
733+
state_manager.apply(|state| {
734+
state.add_evaluation(record);
735+
});
736+
info!(
737+
submission_id = %sub.submission_id,
738+
"Submission added to pending evaluations"
739+
);
740+
}
741+
}
742+
P2PMessage::Evaluation(eval) => {
743+
info!(
744+
submission_id = %eval.submission_id,
745+
validator = %eval.validator.to_hex(),
746+
score = eval.score,
747+
"Received evaluation from peer validator"
748+
);
749+
let validator_hotkey = eval.validator.clone();
750+
let stake = validator_set
751+
.get_validator(&validator_hotkey)
752+
.map(|v| v.stake)
753+
.unwrap_or(0);
754+
let validator_eval = platform_p2p_consensus::ValidatorEvaluation {
755+
score: eval.score,
756+
stake,
757+
timestamp: eval.timestamp,
758+
signature: eval.signature.clone(),
759+
};
760+
state_manager.apply(|state| {
761+
if let Err(e) = state.add_validator_evaluation(
762+
&eval.submission_id,
763+
validator_hotkey.clone(),
764+
validator_eval,
765+
&eval.signature,
766+
) {
767+
warn!(
768+
submission_id = %eval.submission_id,
769+
validator = %validator_hotkey.to_hex(),
770+
error = %e,
771+
"Failed to add peer evaluation to state"
772+
);
773+
} else {
774+
debug!(
775+
submission_id = %eval.submission_id,
776+
validator = %validator_hotkey.to_hex(),
777+
score = eval.score,
778+
"Peer evaluation recorded in state"
779+
);
780+
}
781+
});
782+
}
694783
_ => {
695784
debug!("Unhandled P2P message type");
696785
}
@@ -861,6 +950,7 @@ async fn process_wasm_evaluations(
861950
executor: &Arc<WasmChallengeExecutor>,
862951
state_manager: &Arc<StateManager>,
863952
keypair: &Keypair,
953+
eval_broadcast_tx: &tokio::sync::mpsc::Sender<P2PMessage>,
864954
) {
865955
let pending: Vec<(String, ChallengeId, String)> = state_manager.read(|state| {
866956
state
@@ -901,7 +991,7 @@ async fn process_wasm_evaluations(
901991
})
902992
.await;
903993

904-
let score = match result {
994+
let (score, eval_metrics) = match result {
905995
Ok(Ok((score, metrics))) => {
906996
info!(
907997
submission_id = %submission_id,
@@ -913,7 +1003,16 @@ async fn process_wasm_evaluations(
9131003
fuel_consumed = ?metrics.fuel_consumed,
9141004
"WASM evaluation succeeded"
9151005
);
916-
(score as f64) / i64::MAX as f64
1006+
let normalized = (score as f64) / i64::MAX as f64;
1007+
let em = EvaluationMetrics {
1008+
primary_score: normalized,
1009+
secondary_metrics: vec![],
1010+
execution_time_ms: metrics.execution_time_ms as u64,
1011+
memory_usage_bytes: Some(metrics.memory_used_bytes),
1012+
timed_out: false,
1013+
error: None,
1014+
};
1015+
(normalized, em)
9171016
}
9181017
Ok(Err(e)) => {
9191018
warn!(
@@ -922,7 +1021,15 @@ async fn process_wasm_evaluations(
9221021
error = %e,
9231022
"WASM evaluation failed, reporting score 0"
9241023
);
925-
0.0
1024+
let em = EvaluationMetrics {
1025+
primary_score: 0.0,
1026+
secondary_metrics: vec![],
1027+
execution_time_ms: 0,
1028+
memory_usage_bytes: None,
1029+
timed_out: false,
1030+
error: Some(e.to_string()),
1031+
};
1032+
(0.0, em)
9261033
}
9271034
Err(e) => {
9281035
error!(
@@ -931,12 +1038,21 @@ async fn process_wasm_evaluations(
9311038
error = %e,
9321039
"WASM evaluation task panicked, reporting score 0"
9331040
);
934-
0.0
1041+
let em = EvaluationMetrics {
1042+
primary_score: 0.0,
1043+
secondary_metrics: vec![],
1044+
execution_time_ms: 0,
1045+
memory_usage_bytes: None,
1046+
timed_out: false,
1047+
error: Some(e.to_string()),
1048+
};
1049+
(0.0, em)
9351050
}
9361051
};
9371052

9381053
let score_clamped = score.clamp(0.0, 1.0);
9391054
let validator_hotkey = keypair.hotkey();
1055+
let timestamp = chrono::Utc::now().timestamp_millis();
9401056

9411057
#[derive(serde::Serialize)]
9421058
struct EvaluationSigningData<'a> {
@@ -973,7 +1089,7 @@ async fn process_wasm_evaluations(
9731089
let eval = platform_p2p_consensus::ValidatorEvaluation {
9741090
score: score_clamped,
9751091
stake: 0,
976-
timestamp: chrono::Utc::now().timestamp_millis(),
1092+
timestamp,
9771093
signature: signature.clone(),
9781094
};
9791095

@@ -997,5 +1113,22 @@ async fn process_wasm_evaluations(
9971113
);
9981114
}
9991115
});
1116+
1117+
let eval_msg = P2PMessage::Evaluation(EvaluationMessage {
1118+
submission_id: submission_id.clone(),
1119+
challenge_id,
1120+
validator: validator_hotkey,
1121+
score: score_clamped,
1122+
metrics: eval_metrics,
1123+
signature,
1124+
timestamp,
1125+
});
1126+
if let Err(e) = eval_broadcast_tx.send(eval_msg).await {
1127+
warn!(
1128+
submission_id = %submission_id,
1129+
error = %e,
1130+
"Failed to queue evaluation broadcast"
1131+
);
1132+
}
10001133
}
10011134
}

0 commit comments

Comments
 (0)