Skip to content

Commit d37cc4a

Browse files
committed
fix: challenge sync bugs - propagate name via P2P, fix module_path on receivers
- Add name field to RpcP2PCommand::BroadcastChallengeUpdate and ChallengeUpdateMessage - Propagate challenge name from sudo CLI through entire P2P pipeline - Fix empty module_path on P2P-receiving validators (use UUID like originator) - Fix startup reload for challenges with empty persisted module_path - Skip invalidate_cache after wasm_upload (module just compiled fresh) - Use JoinHandle to wait for WASM compilation threads instead of sleep(2s)
1 parent 290685b commit d37cc4a

File tree

3 files changed

+41
-16
lines changed

3 files changed

+41
-16
lines changed

bins/validator-node/src/main.rs

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -715,9 +715,9 @@ async fn main() -> Result<()> {
715715
"Reloading {} WASM modules from persisted state",
716716
challenges.len()
717717
);
718+
let mut handles = Vec::new();
718719
for (challenge_id, module_path) in &challenges {
719720
let challenge_id_str = challenge_id.to_string();
720-
// Try to load WASM bytes from distributed storage
721721
let wasm_key =
722722
platform_distributed_storage::StorageKey::new("wasm", &challenge_id_str);
723723
match storage
@@ -730,12 +730,14 @@ async fn main() -> Result<()> {
730730
Ok(Some(stored)) => {
731731
let exec = executor.clone();
732732
let cid = *challenge_id;
733-
let cid_str = challenge_id_str.clone();
734-
let mp = module_path.clone();
733+
let mp = if module_path.is_empty() {
734+
challenge_id_str.clone()
735+
} else {
736+
module_path.clone()
737+
};
735738
let cs = chain_state.clone();
736739
let wasm_bytes = stored.data;
737-
std::thread::spawn(move || {
738-
// Cache the compiled module
740+
let handle = std::thread::spawn(move || {
739741
match exec.execute_get_routes_from_bytes(
740742
&mp,
741743
&wasm_bytes,
@@ -776,6 +778,7 @@ async fn main() -> Result<()> {
776778
}
777779
}
778780
});
781+
handles.push((challenge_id_str, handle));
779782
}
780783
Ok(None) => {
781784
warn!(
@@ -792,8 +795,13 @@ async fn main() -> Result<()> {
792795
}
793796
}
794797
}
795-
// Give threads time to compile
796-
tokio::time::sleep(Duration::from_secs(2)).await;
798+
// Wait for all WASM compilation threads to finish
799+
for (cid, handle) in handles {
800+
if let Err(e) = handle.join() {
801+
warn!(challenge_id = %cid, "WASM compilation thread panicked: {:?}", e);
802+
}
803+
}
804+
info!("All WASM modules reloaded");
797805
}
798806
}
799807

@@ -884,6 +892,7 @@ async fn main() -> Result<()> {
884892
updater: keypair.hotkey(),
885893
update_type: "sudo_action".to_string(),
886894
data: action_bytes,
895+
name: None,
887896
timestamp,
888897
signature: signature.signature.to_vec(),
889898
};
@@ -895,11 +904,12 @@ async fn main() -> Result<()> {
895904
info!("Sudo action broadcast to P2P network");
896905
}
897906
}
898-
platform_rpc::RpcP2PCommand::BroadcastChallengeUpdate { challenge_id, update_type, data } => {
907+
platform_rpc::RpcP2PCommand::BroadcastChallengeUpdate { challenge_id, update_type, data, name } => {
899908
info!(
900909
challenge_id = %challenge_id,
901910
update_type = %update_type,
902911
data_bytes = data.len(),
912+
name = ?name,
903913
"Broadcasting ChallengeUpdate from RPC"
904914
);
905915

@@ -913,6 +923,7 @@ async fn main() -> Result<()> {
913923
updater: keypair.hotkey(),
914924
update_type: update_type.clone(),
915925
data: data.clone(),
926+
name: name.clone(),
916927
timestamp,
917928
signature: signature.signature.to_vec(),
918929
};
@@ -960,14 +971,14 @@ async fn main() -> Result<()> {
960971

961972
// Sync to ChainState for RPC
962973
{
974+
let challenge_name = name.as_deref().unwrap_or(&challenge_id_str).to_string();
963975
let mut cs = chain_state.write();
964976
let wasm_config = platform_core::WasmChallengeConfig {
965977
challenge_id,
966-
name: challenge_id_str.clone(),
978+
name: challenge_name,
967979
description: String::new(),
968980
owner: keypair.hotkey(),
969981
module: platform_core::WasmModuleMetadata {
970-
// Use challenge_id as module_path for cache lookup
971982
module_path: challenge_id_str.clone(),
972983
code_hash: hex::encode(metadata.value_hash),
973984
version: metadata.version.to_string(),
@@ -1717,13 +1728,20 @@ async fn handle_network_event(
17171728
size_bytes = update.data.len(),
17181729
"Stored WASM module in distributed storage"
17191730
);
1731+
// Use name from P2P message, fallback to UUID
1732+
let challenge_name = update
1733+
.name
1734+
.as_deref()
1735+
.unwrap_or(&challenge_id_str)
1736+
.to_string();
1737+
17201738
// Register challenge in state if not exists
17211739
state_manager.apply(|state| {
17221740
if state.get_challenge(&update.challenge_id).is_none() {
17231741
let challenge_config =
17241742
platform_p2p_consensus::ChallengeConfig {
17251743
id: update.challenge_id,
1726-
name: challenge_id_str.clone(),
1744+
name: challenge_name.clone(),
17271745
weight: 100, // Default weight
17281746
is_active: true,
17291747
creator: update.updater.clone(),
@@ -1740,11 +1758,11 @@ async fn handle_network_event(
17401758
let mut cs = chain_state.write();
17411759
let wasm_config = platform_core::WasmChallengeConfig {
17421760
challenge_id: update.challenge_id,
1743-
name: challenge_id_str.clone(),
1761+
name: challenge_name,
17441762
description: String::new(),
17451763
owner: update.updater.clone(),
17461764
module: platform_core::WasmModuleMetadata {
1747-
module_path: String::new(),
1765+
module_path: challenge_id_str.clone(),
17481766
code_hash: hex::encode(metadata.value_hash),
17491767
version: metadata.version.to_string(),
17501768
..Default::default()
@@ -1876,9 +1894,11 @@ async fn handle_network_event(
18761894
}
18771895
}
18781896

1879-
// Invalidate WASM cache
1880-
if let Some(ref executor) = wasm_executor_ref {
1881-
executor.invalidate_cache(&challenge_id_str);
1897+
// Invalidate WASM cache (skip for wasm_upload - module was just compiled fresh)
1898+
if update.update_type != "wasm_upload" {
1899+
if let Some(ref executor) = wasm_executor_ref {
1900+
executor.invalidate_cache(&challenge_id_str);
1901+
}
18821902
}
18831903
} else {
18841904
warn!(

crates/p2p-consensus/src/messages.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,9 @@ pub struct ChallengeUpdateMessage {
587587
pub update_type: String,
588588
/// Serialized update data
589589
pub data: Vec<u8>,
590+
/// Challenge name (for wasm_upload)
591+
#[serde(default)]
592+
pub name: Option<String>,
590593
/// Update timestamp
591594
pub timestamp: i64,
592595
/// Updater's signature

crates/rpc-server/src/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub enum RpcP2PCommand {
5959
challenge_id: ChallengeId,
6060
update_type: String,
6161
data: Vec<u8>,
62+
name: Option<String>,
6263
},
6364
/// Broadcast a SudoAction (config/emission/weight changes)
6465
BroadcastSudoAction { action: platform_core::SudoAction },
@@ -1092,6 +1093,7 @@ async fn sudo_challenge_handler(
10921093
challenge_id,
10931094
update_type: request.action.clone(),
10941095
data,
1096+
name: request.name.clone(),
10951097
};
10961098

10971099
if let Err(e) = tx.send(cmd).await {

0 commit comments

Comments
 (0)