Skip to content

Commit f170f90

Browse files
committed
feat: add challenge sync mechanism
- Add sync() method to Challenge trait for periodic state sync - Add WasmSyncResult type for consensus - Add ChallengeSyncProposal/ChallengeSyncVote P2P messages - Add execute_sync() in wasm_executor - Add sync ticker every 60 blocks in validator main loop - Validators sync all active challenges and broadcast proposals
1 parent ef00542 commit f170f90

File tree

7 files changed

+249
-10
lines changed

7 files changed

+249
-10
lines changed

bins/validator-node/src/main.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,9 @@ async fn main() -> Result<()> {
912912
let mut stale_job_interval = tokio::time::interval(Duration::from_secs(120));
913913
let mut weight_check_interval = tokio::time::interval(Duration::from_secs(30));
914914
let mut last_weight_submission_epoch: u64 = 0; // Local tracking of weight submissions
915+
let mut challenge_sync_interval = tokio::time::interval(Duration::from_secs(60)); // Check every minute
916+
let mut last_sync_block: u64 = 0; // Last block where sync was triggered
917+
let sync_block_interval: u64 = 60; // Sync every 60 blocks
915918

916919
// Clone p2p_cmd_tx for use in the loop
917920
let p2p_broadcast_tx = p2p_cmd_tx.clone();
@@ -1497,6 +1500,78 @@ async fn main() -> Result<()> {
14971500
}
14981501
}
14991502

1503+
// Challenge sync ticker - every 60 blocks, sync all challenges
1504+
_ = challenge_sync_interval.tick() => {
1505+
if !is_bootnode {
1506+
let current_block = state_manager.apply(|state| state.bittensor_block);
1507+
1508+
// Check if we should sync (every 60 blocks)
1509+
if current_block / sync_block_interval > last_sync_block / sync_block_interval {
1510+
last_sync_block = current_block;
1511+
1512+
// Get all active challenges
1513+
let challenges: Vec<_> = {
1514+
let cs = chain_state.read();
1515+
cs.wasm_challenge_configs
1516+
.iter()
1517+
.filter(|(_, cfg)| cfg.is_active)
1518+
.map(|(id, _)| *id)
1519+
.collect()
1520+
};
1521+
1522+
for challenge_id in challenges {
1523+
let challenge_id_str = challenge_id.to_string();
1524+
let module_path = format!("{}.wasm", challenge_id_str);
1525+
1526+
if let Some(ref executor) = wasm_executor {
1527+
match executor.execute_sync(&module_path) {
1528+
Ok(sync_result) => {
1529+
info!(
1530+
challenge_id = %challenge_id,
1531+
block = current_block,
1532+
total_users = sync_result.total_users,
1533+
"Challenge sync completed, broadcasting proposal"
1534+
);
1535+
1536+
// Broadcast sync proposal
1537+
let timestamp = chrono::Utc::now().timestamp_millis();
1538+
let proposal_data = bincode::serialize(&(
1539+
&challenge_id,
1540+
&sync_result.leaderboard_hash,
1541+
current_block,
1542+
timestamp
1543+
)).unwrap_or_default();
1544+
let signature = keypair.sign_bytes(&proposal_data).unwrap_or_default();
1545+
1546+
let proposal_msg = P2PMessage::ChallengeSyncProposal(
1547+
platform_p2p_consensus::ChallengeSyncProposalMessage {
1548+
challenge_id,
1549+
sync_result_hash: sync_result.leaderboard_hash,
1550+
proposer: keypair.hotkey(),
1551+
block_number: current_block,
1552+
timestamp,
1553+
signature,
1554+
}
1555+
);
1556+
1557+
if let Err(e) = p2p_broadcast_tx.send(platform_p2p_consensus::P2PCommand::Broadcast(proposal_msg)).await {
1558+
warn!(error = %e, "Failed to broadcast sync proposal");
1559+
}
1560+
}
1561+
Err(e) => {
1562+
debug!(
1563+
challenge_id = %challenge_id,
1564+
error = %e,
1565+
"Failed to execute sync (WASM may not support sync)"
1566+
);
1567+
}
1568+
}
1569+
}
1570+
}
1571+
}
1572+
}
1573+
}
1574+
15001575
// Ctrl+C
15011576
_ = tokio::signal::ctrl_c() => {
15021577
info!("Received shutdown signal, persisting state...");
@@ -3002,6 +3077,21 @@ async fn handle_network_event(
30023077
}
30033078
}
30043079
}
3080+
P2PMessage::ChallengeSyncProposal(proposal) => {
3081+
info!(
3082+
challenge_id = %proposal.challenge_id,
3083+
block = proposal.block_number,
3084+
proposer = %proposal.proposer.to_ss58(),
3085+
"Received challenge sync proposal (consensus voting not yet implemented)"
3086+
);
3087+
}
3088+
P2PMessage::ChallengeSyncVote(vote) => {
3089+
debug!(
3090+
challenge_id = %vote.challenge_id,
3091+
voter = %vote.voter.to_ss58(),
3092+
"Received challenge sync vote (consensus voting not yet implemented)"
3093+
);
3094+
}
30053095
},
30063096
NetworkEvent::PeerConnected(peer_id) => {
30073097
info!("Peer connected: {}", peer_id);

bins/validator-node/src/wasm_executor.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,72 @@ impl WasmChallengeExecutor {
985985
Ok(weights)
986986
}
987987

988+
/// Execute sync on a WASM challenge module.
989+
/// Returns WasmSyncResult with leaderboard hash and stats for consensus.
990+
pub fn execute_sync(
991+
&self,
992+
module_path: &str,
993+
) -> Result<platform_challenge_sdk_wasm::WasmSyncResult> {
994+
let start = Instant::now();
995+
996+
let module = self
997+
.load_module(module_path)
998+
.context("Failed to load WASM module")?;
999+
1000+
let instance_config = InstanceConfig {
1001+
challenge_id: module_path.to_string(),
1002+
validator_id: "validator".to_string(),
1003+
storage_host_config: StorageHostConfig {
1004+
allow_direct_writes: true,
1005+
require_consensus: false,
1006+
..self.config.storage_host_config.clone()
1007+
},
1008+
storage_backend: Arc::clone(&self.config.storage_backend),
1009+
consensus_policy: ConsensusPolicy::read_only(),
1010+
..Default::default()
1011+
};
1012+
1013+
let mut instance = self
1014+
.runtime
1015+
.instantiate(&module, instance_config, None)
1016+
.map_err(|e| anyhow::anyhow!("WASM instantiation failed: {}", e))?;
1017+
1018+
let result = instance
1019+
.call_return_i64("sync")
1020+
.map_err(|e| anyhow::anyhow!("WASM sync call failed: {}", e))?;
1021+
1022+
let out_len = (result >> 32) as i32;
1023+
let out_ptr = (result & 0xFFFF_FFFF) as i32;
1024+
1025+
if out_ptr <= 0 || out_len <= 0 {
1026+
return Ok(platform_challenge_sdk_wasm::WasmSyncResult {
1027+
leaderboard_hash: [0u8; 32],
1028+
total_users: 0,
1029+
total_valid_issues: 0,
1030+
total_invalid_issues: 0,
1031+
total_pending_issues: 0,
1032+
sync_timestamp: 0,
1033+
});
1034+
}
1035+
1036+
let result_data = instance
1037+
.read_memory(out_ptr as usize, out_len as usize)
1038+
.map_err(|e| anyhow::anyhow!("failed to read WASM memory for sync output: {}", e))?;
1039+
1040+
let sync_result: platform_challenge_sdk_wasm::WasmSyncResult =
1041+
bincode::deserialize(&result_data)
1042+
.context("Failed to deserialize sync output as WasmSyncResult")?;
1043+
1044+
info!(
1045+
module = module_path,
1046+
total_users = sync_result.total_users,
1047+
execution_time_ms = start.elapsed().as_millis() as u64,
1048+
"WASM sync completed"
1049+
);
1050+
1051+
Ok(sync_result)
1052+
}
1053+
9881054
#[allow(dead_code)]
9891055
pub fn execute_validate_storage_write(
9901056
&self,

crates/challenge-sdk-wasm/src/lib.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ pub use types::{
1313
};
1414
pub use types::{ContainerRunRequest, ContainerRunResponse};
1515
pub use types::{EvaluationInput, EvaluationOutput};
16-
pub use types::{WasmRouteDefinition, WasmRouteRequest, WasmRouteResponse, WeightEntry};
16+
pub use types::{
17+
WasmRouteDefinition, WasmRouteRequest, WasmRouteResponse, WasmSyncResult, WeightEntry,
18+
};
1719

1820
pub trait Challenge {
1921
fn name(&self) -> &'static str;
@@ -62,6 +64,13 @@ pub trait Challenge {
6264
fn validate_storage_write(&self, _key: &[u8], _value: &[u8]) -> bool {
6365
true
6466
}
67+
68+
/// Perform a periodic sync operation and return serialized sync result.
69+
/// Called by validators at regular intervals for consensus-based state sync.
70+
/// The default implementation returns an empty vector.
71+
fn sync(&self) -> alloc::vec::Vec<u8> {
72+
alloc::vec::Vec::new()
73+
}
6574
}
6675

6776
/// Pack a pointer and length into a single i64 value.
@@ -311,5 +320,21 @@ macro_rules! register_challenge {
311320
0
312321
}
313322
}
323+
324+
#[no_mangle]
325+
pub extern "C" fn sync() -> i64 {
326+
let output = <$ty as $crate::Challenge>::sync(&_CHALLENGE);
327+
if output.is_empty() {
328+
return $crate::pack_ptr_len(0, 0);
329+
}
330+
let ptr = $crate::alloc_impl::sdk_alloc(output.len());
331+
if ptr.is_null() {
332+
return $crate::pack_ptr_len(0, 0);
333+
}
334+
unsafe {
335+
core::ptr::copy_nonoverlapping(output.as_ptr(), ptr, output.len());
336+
}
337+
$crate::pack_ptr_len(ptr as i32, output.len() as i32)
338+
}
314339
};
315340
}

crates/challenge-sdk-wasm/src/types.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,3 +199,16 @@ pub struct WeightEntry {
199199
pub uid: u16,
200200
pub weight: u16,
201201
}
202+
203+
/// Result of a sync operation for consensus.
204+
///
205+
/// Returned by [`Challenge::sync`] for validators to reach consensus on state.
206+
#[derive(Clone, Debug, Serialize, Deserialize)]
207+
pub struct WasmSyncResult {
208+
pub leaderboard_hash: [u8; 32],
209+
pub total_users: u32,
210+
pub total_valid_issues: u32,
211+
pub total_invalid_issues: u32,
212+
pub total_pending_issues: u32,
213+
pub sync_timestamp: i64,
214+
}

crates/p2p-consensus/src/lib.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,16 @@ pub mod validator;
4545
pub use config::{P2PConfig, DEFAULT_BOOTSTRAP_NODES, DEFAULT_P2P_PORT};
4646
pub use consensus::{ConsensusDecision, ConsensusEngine, ConsensusError, ConsensusPhase};
4747
pub use messages::{
48-
ChallengeUpdateMessage, CommitMessage, ConsensusProposal, CoreStateRequestMessage,
49-
CoreStateResponseMessage, DataRequestMessage, DataResponseMessage, EvaluationMessage,
50-
EvaluationMetrics, HeartbeatMessage, JobAssignmentMessage, JobClaimMessage,
51-
LeaderboardRequestMessage, LeaderboardResponseMessage, MerkleNode, MerkleProof, NewViewMessage,
52-
P2PMessage, PeerAnnounceMessage, PrePrepare, PrepareMessage, PreparedProof, ProposalContent,
53-
RoundId, SequenceNumber, SignedP2PMessage, StateChangeType, StateMutationProposalMessage,
54-
StateMutationType, StateMutationVoteMessage, StateRequest, StateResponse,
55-
StorageProposalMessage, StorageRootSyncMessage, StorageVoteMessage, SubmissionMessage,
56-
TaskProgressMessage, TaskResultMessage, ViewChangeMessage, ViewNumber, WeightVoteMessage,
48+
ChallengeSyncProposalMessage, ChallengeSyncVoteMessage, ChallengeUpdateMessage, CommitMessage,
49+
ConsensusProposal, CoreStateRequestMessage, CoreStateResponseMessage, DataRequestMessage,
50+
DataResponseMessage, EvaluationMessage, EvaluationMetrics, HeartbeatMessage,
51+
JobAssignmentMessage, JobClaimMessage, LeaderboardRequestMessage, LeaderboardResponseMessage,
52+
MerkleNode, MerkleProof, NewViewMessage, P2PMessage, PeerAnnounceMessage, PrePrepare,
53+
PrepareMessage, PreparedProof, ProposalContent, RoundId, SequenceNumber, SignedP2PMessage,
54+
StateChangeType, StateMutationProposalMessage, StateMutationType, StateMutationVoteMessage,
55+
StateRequest, StateResponse, StorageProposalMessage, StorageRootSyncMessage,
56+
StorageVoteMessage, SubmissionMessage, TaskProgressMessage, TaskResultMessage,
57+
ViewChangeMessage, ViewNumber, WeightVoteMessage,
5758
};
5859
pub use network::{
5960
CombinedBehaviour, CombinedEvent, NetworkBehaviour, NetworkError, NetworkEvent, NetworkRunner,

crates/p2p-consensus/src/messages.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ pub enum P2PMessage {
7474
CoreStateRequest(CoreStateRequestMessage),
7575
/// Core state sync response
7676
CoreStateResponse(CoreStateResponseMessage),
77+
/// Challenge sync proposal for periodic consensus
78+
ChallengeSyncProposal(ChallengeSyncProposalMessage),
79+
/// Vote on a challenge sync proposal
80+
ChallengeSyncVote(ChallengeSyncVoteMessage),
7781
}
7882

7983
impl P2PMessage {
@@ -134,6 +138,8 @@ impl P2PMessage {
134138
P2PMessage::StateMutationVote(_) => "StateMutationVote",
135139
P2PMessage::CoreStateRequest(_) => "CoreStateRequest",
136140
P2PMessage::CoreStateResponse(_) => "CoreStateResponse",
141+
P2PMessage::ChallengeSyncProposal(_) => "ChallengeSyncProposal",
142+
P2PMessage::ChallengeSyncVote(_) => "ChallengeSyncVote",
137143
}
138144
}
139145
}
@@ -983,3 +989,39 @@ pub struct StorageRootSyncMessage {
983989
/// Signature
984990
pub signature: Vec<u8>,
985991
}
992+
993+
/// Challenge sync proposal for periodic consensus-based state sync
994+
#[derive(Clone, Debug, Serialize, Deserialize)]
995+
pub struct ChallengeSyncProposalMessage {
996+
/// Challenge being synced
997+
pub challenge_id: ChallengeId,
998+
/// Hash of the sync result for consensus comparison
999+
pub sync_result_hash: [u8; 32],
1000+
/// Proposer validator
1001+
pub proposer: Hotkey,
1002+
/// Block number at which sync was triggered
1003+
pub block_number: u64,
1004+
/// Timestamp
1005+
pub timestamp: i64,
1006+
/// Signature
1007+
pub signature: Vec<u8>,
1008+
}
1009+
1010+
/// Vote on a challenge sync proposal
1011+
#[derive(Clone, Debug, Serialize, Deserialize)]
1012+
pub struct ChallengeSyncVoteMessage {
1013+
/// Challenge being synced
1014+
pub challenge_id: ChallengeId,
1015+
/// Hash being voted on
1016+
pub sync_result_hash: [u8; 32],
1017+
/// Block number
1018+
pub block_number: u64,
1019+
/// Voter validator
1020+
pub voter: Hotkey,
1021+
/// Whether the voter agrees with the hash
1022+
pub approve: bool,
1023+
/// Timestamp
1024+
pub timestamp: i64,
1025+
/// Signature
1026+
pub signature: Vec<u8>,
1027+
}

crates/p2p-consensus/src/network.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,6 +1158,8 @@ fn expected_signer(message: &P2PMessage) -> Option<&Hotkey> {
11581158
P2PMessage::StateMutationVote(msg) => Some(&msg.voter),
11591159
P2PMessage::CoreStateRequest(msg) => Some(&msg.requester),
11601160
P2PMessage::CoreStateResponse(msg) => Some(&msg.responder),
1161+
P2PMessage::ChallengeSyncProposal(msg) => Some(&msg.proposer),
1162+
P2PMessage::ChallengeSyncVote(msg) => Some(&msg.voter),
11611163
}
11621164
}
11631165

0 commit comments

Comments
 (0)