Skip to content

Commit 6a3d17e

Browse files
committed
feat: pre-compute weights for RPC at 70s and submit on-chain at 90s after boot
- At 70s: compute WASM weights and expose via subnet_getWeights RPC so other validators without P2P can fetch weights immediately - At 90s: submit weights on-chain via CRv4 commit - Both wait for blockchain sync, active challenges, and WASM readiness - Prevents waiting up to ~72min for next epoch boundary after restart
1 parent 7774a3f commit 6a3d17e

File tree

2 files changed

+143
-2
lines changed

2 files changed

+143
-2
lines changed

bins/validator-node/src/main.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -970,6 +970,12 @@ async fn main() -> Result<()> {
970970
let mut stale_job_interval = tokio::time::interval(Duration::from_secs(120));
971971
let mut weight_check_interval = tokio::time::interval(Duration::from_secs(30));
972972
let mut last_weight_submission_epoch: u64 = 0;
973+
let startup_rpc_precompute_delay = tokio::time::sleep(Duration::from_secs(70));
974+
tokio::pin!(startup_rpc_precompute_delay);
975+
let mut startup_rpc_precomputed = false;
976+
let startup_weight_delay = tokio::time::sleep(Duration::from_secs(90));
977+
tokio::pin!(startup_weight_delay);
978+
let mut startup_weights_submitted = false;
973979
let mut challenge_sync_interval = tokio::time::interval(Duration::from_secs(30)); // Check every 30s
974980
let mut last_sync_block: u64 = 0;
975981
let sync_block_interval: u64 = 1; // Call WASM sync every block, WASM decides frequency internally
@@ -1536,6 +1542,105 @@ async fn main() -> Result<()> {
15361542
// No-op: weights are submitted in CommitWindowOpen via CRv4
15371543
}
15381544

1545+
// Pre-compute weights for RPC 70s after boot so other validators
1546+
// without P2P can fetch via subnet_getWeights before on-chain submission.
1547+
_ = &mut startup_rpc_precompute_delay, if !startup_rpc_precomputed => {
1548+
startup_rpc_precomputed = true;
1549+
let current_block = state_manager.apply(|state| state.bittensor_block);
1550+
if current_block == 0 {
1551+
warn!("RPC pre-compute skipped: blockchain not yet synced");
1552+
} else if let Some(ref executor) = wasm_executor {
1553+
let challenges: Vec<(String, u8, f64)> = {
1554+
let cs = chain_state.read();
1555+
cs.wasm_challenge_configs.iter()
1556+
.filter(|(_, cfg)| cfg.is_active)
1557+
.map(|(id, cfg)| (id.to_string(), cfg.config.mechanism_id, cfg.config.emission_weight))
1558+
.collect()
1559+
};
1560+
if challenges.is_empty() {
1561+
warn!("RPC pre-compute skipped: no active challenges loaded");
1562+
} else {
1563+
let tempo = 360u64;
1564+
let netuid_plus_one = (netuid as u64).saturating_add(1);
1565+
let epoch = current_block.saturating_add(netuid_plus_one) / (tempo + 1);
1566+
let mut precomputed: Vec<(u8, Vec<u16>, Vec<u16>)> = Vec::new();
1567+
for (cid, mid, ew) in &challenges {
1568+
let ew = ew.clamp(0.0, 1.0);
1569+
if ew < 0.001 { continue; }
1570+
if let Ok(assignments) = executor.execute_get_weights_with_block(cid, current_block, epoch) {
1571+
if assignments.is_empty() { continue; }
1572+
let total: f64 = assignments.iter().map(|a| a.weight).sum();
1573+
if total <= 0.0 { continue; }
1574+
let mut uids = Vec::new();
1575+
let mut vals = Vec::new();
1576+
let mut assigned = 0.0f64;
1577+
for a in &assignments {
1578+
if let Some(uid) = subtensor_client.as_ref().and_then(|c| c.get_uid_for_hotkey(&a.hotkey)) {
1579+
let scaled = (a.weight / total) * ew;
1580+
let v = (scaled * 65535.0).round() as u16;
1581+
if v > 0 { uids.push(uid); vals.push(v); assigned += scaled; }
1582+
}
1583+
}
1584+
let burn = 1.0 - assigned;
1585+
if burn > 0.001 {
1586+
let bv = (burn * 65535.0).round() as u16;
1587+
if let Some(pos) = uids.iter().position(|&u| u == 0) {
1588+
vals[pos] = vals[pos].saturating_add(bv);
1589+
} else { uids.push(0); vals.push(bv); }
1590+
}
1591+
if !uids.is_empty() {
1592+
let mx = *vals.iter().max().unwrap() as f64;
1593+
if mx > 0.0 && mx < 65535.0 {
1594+
vals = vals.iter().map(|v| ((*v as f64 / mx) * 65535.0).round() as u16).collect();
1595+
}
1596+
precomputed.push((*mid, uids, vals));
1597+
}
1598+
}
1599+
}
1600+
if !precomputed.is_empty() {
1601+
info!("RPC pre-compute: {} mechanism entries available via subnet_getWeights (70s after boot)", precomputed.len());
1602+
let mut cs = chain_state.write();
1603+
cs.last_computed_weights = precomputed;
1604+
} else {
1605+
warn!("RPC pre-compute: no weights computed (WASM returned empty)");
1606+
}
1607+
}
1608+
}
1609+
}
1610+
1611+
// Submit weights on-chain 90s after boot (after RPC pre-compute at 70s).
1612+
_ = &mut startup_weight_delay, if !startup_weights_submitted => {
1613+
startup_weights_submitted = true;
1614+
let current_block = state_manager.apply(|state| state.bittensor_block);
1615+
if current_block == 0 {
1616+
warn!("Startup weight submission skipped: blockchain not yet synced");
1617+
} else if subtensor.is_none() || subtensor_signer.is_none() {
1618+
warn!("Startup weight submission skipped: subtensor not connected");
1619+
} else if wasm_executor.is_none() {
1620+
warn!("Startup weight submission skipped: WASM executor not ready");
1621+
} else {
1622+
let has_challenges = {
1623+
let cs = chain_state.read();
1624+
cs.wasm_challenge_configs.iter().any(|(_, cfg)| cfg.is_active)
1625+
};
1626+
if !has_challenges {
1627+
warn!("Startup weight submission skipped: no active challenges loaded");
1628+
} else {
1629+
let tempo = 360u64;
1630+
let netuid_plus_one = (netuid as u64).saturating_add(1);
1631+
let epoch = current_block.saturating_add(netuid_plus_one) / (tempo + 1);
1632+
info!("Startup weight submission: epoch {} block {} (90s after boot)", epoch, current_block);
1633+
handle_block_event(
1634+
BlockSyncEvent::CommitWindowOpen { epoch, block: current_block },
1635+
&subtensor, &subtensor_signer, &subtensor_client,
1636+
&state_manager, netuid, version_key, &wasm_executor,
1637+
&keypair, &chain_state, &storage,
1638+
&mut last_weight_submission_epoch,
1639+
).await;
1640+
}
1641+
}
1642+
}
1643+
15391644
// Periodic checkpoint
15401645
_ = checkpoint_interval.tick() => {
15411646
if let Some(handler) = shutdown_handler.as_mut() {

crates/bittensor-integration/src/block_sync.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ impl BlockSync {
175175
// Process events in background
176176
tokio::spawn(async move {
177177
let mut was_disconnected = false;
178+
let mut first_block_seen = false;
178179

179180
loop {
180181
if !*running.read().await {
@@ -190,6 +191,7 @@ impl BlockSync {
190191
&current_epoch,
191192
&current_phase,
192193
&mut was_disconnected,
194+
&mut first_block_seen,
193195
)
194196
.await;
195197

@@ -218,6 +220,7 @@ impl BlockSync {
218220
current_epoch: &Arc<RwLock<u64>>,
219221
current_phase: &Arc<RwLock<EpochPhase>>,
220222
was_disconnected: &mut bool,
223+
first_block_seen: &mut bool,
221224
) -> bool {
222225
match event {
223226
BlockEvent::NewBlock {
@@ -228,6 +231,24 @@ impl BlockSync {
228231
*current_epoch.write().await = epoch_info.epoch_number;
229232
*current_phase.write().await = epoch_info.phase;
230233

234+
// On first block after startup, trigger immediate weight submission
235+
// so the validator doesn't have to wait for the next epoch boundary.
236+
if !*first_block_seen {
237+
*first_block_seen = true;
238+
if epoch_info.commit_reveal_enabled {
239+
info!(
240+
"First block after startup: triggering immediate weight submission (epoch={}, block={})",
241+
epoch_info.epoch_number, block_number
242+
);
243+
let _ = event_tx
244+
.send(BlockSyncEvent::CommitWindowOpen {
245+
epoch: epoch_info.epoch_number,
246+
block: block_number,
247+
})
248+
.await;
249+
}
250+
}
251+
231252
if let Err(e) = event_tx
232253
.send(BlockSyncEvent::NewBlock {
233254
block_number,
@@ -416,6 +437,7 @@ mod tests {
416437
let current_epoch = Arc::new(RwLock::new(0));
417438
let current_phase = Arc::new(RwLock::new(EpochPhase::Evaluation));
418439
let mut was_disconnected = true;
440+
let mut first_block_seen = false;
419441

420442
let epoch_info = sample_epoch_info(123, 9, EpochPhase::CommitWindow);
421443

@@ -429,6 +451,7 @@ mod tests {
429451
&current_epoch,
430452
&current_phase,
431453
&mut was_disconnected,
454+
&mut first_block_seen,
432455
)
433456
.await;
434457

@@ -440,10 +463,13 @@ mod tests {
440463
EpochPhase::CommitWindow
441464
));
442465

466+
// First event: CommitWindowOpen (triggered on first block after startup)
443467
let first = rx.recv().await.unwrap();
444-
assert!(matches!(first, BlockSyncEvent::NewBlock { .. }));
468+
assert!(matches!(first, BlockSyncEvent::CommitWindowOpen { .. }));
445469
let second = rx.recv().await.unwrap();
446-
assert!(matches!(second, BlockSyncEvent::Reconnected));
470+
assert!(matches!(second, BlockSyncEvent::NewBlock { .. }));
471+
let third = rx.recv().await.unwrap();
472+
assert!(matches!(third, BlockSyncEvent::Reconnected));
447473
assert!(!was_disconnected);
448474
}
449475

@@ -454,6 +480,7 @@ mod tests {
454480
let current_epoch = Arc::new(RwLock::new(0));
455481
let current_phase = Arc::new(RwLock::new(EpochPhase::Evaluation));
456482
let mut was_disconnected = false;
483+
let mut first_block_seen = false;
457484

458485
let should_break = BlockSync::handle_block_event(
459486
BlockEvent::PhaseChange {
@@ -467,6 +494,7 @@ mod tests {
467494
&current_epoch,
468495
&current_phase,
469496
&mut was_disconnected,
497+
&mut first_block_seen,
470498
)
471499
.await;
472500

@@ -493,6 +521,7 @@ mod tests {
493521
let current_epoch = Arc::new(RwLock::new(0));
494522
let current_phase = Arc::new(RwLock::new(EpochPhase::Evaluation));
495523
let mut was_disconnected = false;
524+
let mut first_block_seen = false;
496525

497526
let should_break = BlockSync::handle_block_event(
498527
BlockEvent::Stopped,
@@ -501,6 +530,7 @@ mod tests {
501530
&current_epoch,
502531
&current_phase,
503532
&mut was_disconnected,
533+
&mut first_block_seen,
504534
)
505535
.await;
506536

@@ -515,6 +545,7 @@ mod tests {
515545
let current_epoch = Arc::new(RwLock::new(0));
516546
let current_phase = Arc::new(RwLock::new(EpochPhase::Evaluation));
517547
let mut was_disconnected = false;
548+
let mut first_block_seen = false;
518549

519550
BlockSync::handle_block_event(
520551
BlockEvent::EpochTransition(EpochTransition::NewEpoch {
@@ -527,6 +558,7 @@ mod tests {
527558
&current_epoch,
528559
&current_phase,
529560
&mut was_disconnected,
561+
&mut first_block_seen,
530562
)
531563
.await;
532564

@@ -548,6 +580,7 @@ mod tests {
548580
let current_epoch = Arc::new(RwLock::new(0));
549581
let current_phase = Arc::new(RwLock::new(EpochPhase::Evaluation));
550582
let mut was_disconnected = false;
583+
let mut first_block_seen = false;
551584

552585
BlockSync::handle_block_event(
553586
BlockEvent::PhaseChange {
@@ -561,6 +594,7 @@ mod tests {
561594
&current_epoch,
562595
&current_phase,
563596
&mut was_disconnected,
597+
&mut first_block_seen,
564598
)
565599
.await;
566600

@@ -583,6 +617,7 @@ mod tests {
583617
let current_epoch = Arc::new(RwLock::new(0));
584618
let current_phase = Arc::new(RwLock::new(EpochPhase::Evaluation));
585619
let mut was_disconnected = false;
620+
let mut first_block_seen = false;
586621

587622
let should_break = BlockSync::handle_block_event(
588623
BlockEvent::ConnectionError("network wobble".into()),
@@ -591,6 +626,7 @@ mod tests {
591626
&current_epoch,
592627
&current_phase,
593628
&mut was_disconnected,
629+
&mut first_block_seen,
594630
)
595631
.await;
596632

0 commit comments

Comments
 (0)