Skip to content

Commit aec5fe7

Browse files
committed
fix: prevent weight divergence between bootstrap and non-bootstrap validators
Five fixes for P2P state consistency: 1. Remove optimistic local write from propose_write 2. Content-based state hashing (not just counts) 3. Fix merge_from to update existing entries 4. Force metagraph refresh before weight submission 5. Post-bootstrap reconciliation via improved hash + merge
1 parent e47198f commit aec5fe7

File tree

4 files changed

+103
-35
lines changed

4 files changed

+103
-35
lines changed

bins/validator-node/src/challenge_storage.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,19 +105,11 @@ impl StorageBackend for ChallengeStorageBackend {
105105
signature,
106106
});
107107

108-
// Write locally first so WASM can read-your-own-writes during sync
109-
let storage_key = build_challenge_storage_key(challenge_id, key);
110-
if let Err(e) = tokio::task::block_in_place(|| {
111-
tokio::runtime::Handle::current().block_on(self.storage.put(
112-
storage_key,
113-
value.to_vec(),
114-
DPutOptions::default(),
115-
))
116-
}) {
117-
tracing::warn!(error = %e, "Failed to write locally before P2P broadcast");
118-
}
108+
// DO NOT write locally before consensus - this causes state divergence.
109+
// The proposer's get_weights would read uncommitted data that other validators
110+
// don't have yet. All nodes (including proposer) write only after P2P consensus.
119111

120-
// Broadcast via P2P so other validators also apply the write
112+
// Broadcast via P2P so all validators apply the write after consensus
121113
tracing::debug!(
122114
proposal_id = %hex::encode(&proposal_id[..8]),
123115
challenge_id = %challenge_id,

bins/validator-node/src/main.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,23 @@ async fn main() -> Result<()> {
10251025
None => std::future::pending().await,
10261026
}
10271027
} => {
1028+
// Force metagraph refresh before weight submission to avoid stale hotkey->UID mappings
1029+
if matches!(event, BlockSyncEvent::CommitWindowOpen { .. }) {
1030+
if let Some(bittensor_client) = bittensor_client_for_metagraph.as_ref() {
1031+
match sync_metagraph(bittensor_client, netuid).await {
1032+
Ok(mg) => {
1033+
info!("Pre-commit metagraph refresh: {} neurons", mg.n);
1034+
update_validator_set_from_metagraph(&mg, &validator_set, &chain_state, &valid_voters, &state_root_consensus, &state_manager);
1035+
if let Some(sc) = subtensor_client.as_mut() {
1036+
sc.set_metagraph(mg);
1037+
}
1038+
}
1039+
Err(e) => {
1040+
warn!("Pre-commit metagraph refresh failed: {}. Using cached metagraph.", e);
1041+
}
1042+
}
1043+
}
1044+
}
10281045
handle_block_event(
10291046
event,
10301047
&subtensor,

crates/core/src/state.rs

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -298,28 +298,62 @@ impl ChainState {
298298
}
299299

300300
/// Update the state hash
301+
///
302+
/// Hashes actual content (not just counts) so divergent state is detectable.
301303
pub fn update_hash(&mut self) {
304+
#[derive(Serialize)]
305+
struct ChallengeHashEntry {
306+
id: String,
307+
emission_weight: u64, // f64 as bits for deterministic hashing
308+
mechanism_id: u8,
309+
is_active: bool,
310+
}
311+
302312
#[derive(Serialize)]
303313
struct HashInput<'a> {
304314
block_height: BlockHeight,
305315
sudo_key: &'a Hotkey,
306-
validator_count: usize,
307-
challenge_count: usize,
308-
wasm_challenge_count: usize,
309-
pending_jobs: usize,
310316
mutation_sequence: u64,
311-
route_count: usize,
317+
validators: Vec<String>,
318+
challenge_ids: Vec<String>,
319+
wasm_challenges: Vec<ChallengeHashEntry>,
320+
mechanism_configs: Vec<(u8, String)>,
312321
}
313322

323+
let mut validators: Vec<String> = self.validators.keys().map(|h| h.to_ss58()).collect();
324+
validators.sort();
325+
326+
let mut challenge_ids: Vec<String> =
327+
self.challenges.keys().map(|c| c.to_string()).collect();
328+
challenge_ids.sort();
329+
330+
let mut wasm_challenges: Vec<ChallengeHashEntry> = self
331+
.wasm_challenge_configs
332+
.iter()
333+
.map(|(id, cfg)| ChallengeHashEntry {
334+
id: id.to_string(),
335+
emission_weight: cfg.config.emission_weight.to_bits(),
336+
mechanism_id: cfg.config.mechanism_id,
337+
is_active: cfg.is_active,
338+
})
339+
.collect();
340+
wasm_challenges.sort_by(|a, b| a.id.cmp(&b.id));
341+
342+
let mut mechanism_configs: Vec<(u8, String)> = self
343+
.mechanism_configs
344+
.iter()
345+
.map(|(k, v)| (*k, format!("{:?}", v)))
346+
.collect();
347+
mechanism_configs.sort_by_key(|(k, _)| *k);
348+
314349
let input = HashInput {
315350
block_height: self.block_height,
316351
sudo_key: &self.sudo_key,
317-
validator_count: self.validators.len(),
318-
challenge_count: self.challenges.len(),
319-
wasm_challenge_count: self.wasm_challenge_configs.len(),
320-
pending_jobs: self.pending_jobs.len(),
321352
mutation_sequence: self.mutation_sequence,
322-
route_count: self.challenge_routes.len(),
353+
validators,
354+
challenge_ids,
355+
wasm_challenges,
356+
mechanism_configs,
323357
};
324358

325359
self.state_hash = hash_data(&input).unwrap_or([0u8; 32]);
@@ -640,18 +674,30 @@ impl ChainState {
640674
return false;
641675
}
642676

677+
// Merge WASM challenge configs: update existing entries too (not just insert)
678+
// so that sudo actions (SetEmission, SetMechanism) propagate correctly.
643679
for (id, config) in &peer_state.wasm_challenge_configs {
644-
self.wasm_challenge_configs
645-
.entry(*id)
646-
.or_insert_with(|| config.clone());
680+
self.wasm_challenge_configs.insert(*id, config.clone());
647681
}
648682

649683
for (id, routes) in &peer_state.challenge_routes {
650-
self.challenge_routes
651-
.entry(*id)
652-
.or_insert_with(|| routes.clone());
684+
self.challenge_routes.insert(*id, routes.clone());
685+
}
686+
687+
// Merge mechanism configs
688+
for (mid, mconfig) in &peer_state.mechanism_configs {
689+
self.mechanism_configs.insert(*mid, mconfig.clone());
690+
}
691+
692+
// Merge challenge weight allocations
693+
for (id, alloc) in &peer_state.challenge_weights {
694+
self.challenge_weights.insert(*id, alloc.clone());
653695
}
654696

697+
// Sync paused state from higher-sequence peer
698+
self.paused = peer_state.paused;
699+
self.pause_reason = peer_state.pause_reason.clone();
700+
655701
self.mutation_sequence = peer_state.mutation_sequence;
656702
self.update_hash();
657703
true

crates/p2p-consensus/src/state.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -465,24 +465,37 @@ impl ChainState {
465465
pub fn update_hash(&mut self) {
466466
self.last_updated = chrono::Utc::now().timestamp_millis();
467467

468-
// Create a deterministic hash input
468+
// Hash actual content (not just counts) so divergent state is detectable
469469
#[derive(Serialize)]
470470
struct HashInput {
471471
sequence: SequenceNumber,
472472
epoch: u64,
473-
validator_count: usize,
474-
challenge_count: usize,
475-
pending_count: usize,
476473
netuid: u16,
474+
validators: Vec<(String, u64)>,
475+
challenges: Vec<String>,
476+
pending_ids: Vec<String>,
477477
}
478478

479+
let mut validators: Vec<(String, u64)> = self
480+
.validators
481+
.iter()
482+
.map(|(h, s)| (h.to_ss58(), *s))
483+
.collect();
484+
validators.sort_by(|a, b| a.0.cmp(&b.0));
485+
486+
let mut challenges: Vec<String> = self.challenges.keys().map(|c| c.to_string()).collect();
487+
challenges.sort();
488+
489+
let mut pending_ids: Vec<String> = self.pending_evaluations.keys().cloned().collect();
490+
pending_ids.sort();
491+
479492
let input = HashInput {
480493
sequence: self.sequence,
481494
epoch: self.epoch,
482-
validator_count: self.validators.len(),
483-
challenge_count: self.challenges.len(),
484-
pending_count: self.pending_evaluations.len(),
485495
netuid: self.netuid,
496+
validators,
497+
challenges,
498+
pending_ids,
486499
};
487500

488501
self.state_hash = hash_data(&input).unwrap_or([0u8; 32]);

0 commit comments

Comments
 (0)