Skip to content

Commit a964ad0

Browse files
committed
fix: auto-reconnect Subtensor RPC on transport errors + cache last-good weights on dedup collision
- Subtensor RPC: detect dead WebSocket connections ("connection closed", "restart required") during weight submission, automatically reconnect and retry. Also reconnect on BlockSyncEvent::Reconnected. - WASM get_weights dedup guard: return cached last-good weights instead of empty Vec when a concurrent call is blocked. Prevents false 100% burn that occurs when multiple callers (RPC, CommitWindowOpen, epoch transition) race on the same challenge's get_weights.
1 parent efeb73e commit a964ad0

File tree

2 files changed

+126
-10
lines changed

2 files changed

+126
-10
lines changed

bins/validator-node/src/main.rs

Lines changed: 106 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -517,8 +517,10 @@ async fn main() -> Result<()> {
517517
Arc::new(RwLock::new(std::collections::HashMap::new()));
518518

519519
// Connect to Bittensor
520-
let subtensor: Option<Arc<Subtensor>>;
520+
let mut subtensor: Option<Arc<Subtensor>>;
521521
let subtensor_signer: Option<Arc<BittensorSigner>>;
522+
let subtensor_endpoint_for_reconnect = args.subtensor_endpoint.clone();
523+
let subtensor_state_path_for_reconnect: Option<std::path::PathBuf>;
522524
let mut subtensor_client: Option<SubtensorClient>;
523525
let bittensor_client_for_metagraph: Option<Arc<BittensorClient>>;
524526
let mut block_rx: Option<tokio::sync::mpsc::Receiver<BlockSyncEvent>> = None;
@@ -531,6 +533,7 @@ async fn main() -> Result<()> {
531533
info!("Running in bootnode mode (read-only Bittensor access)");
532534
subtensor = None;
533535
subtensor_signer = None;
536+
subtensor_state_path_for_reconnect = None;
534537

535538
// Create SubtensorClient for metagraph only
536539
let mut client = SubtensorClient::new(platform_bittensor::BittensorConfig {
@@ -583,6 +586,7 @@ async fn main() -> Result<()> {
583586
} else {
584587
// Full validator mode: requires signing key
585588
let state_path = data_dir.join("subtensor_state.json");
589+
subtensor_state_path_for_reconnect = Some(state_path.clone());
586590
match Subtensor::with_persistence(&args.subtensor_endpoint, state_path).await {
587591
Ok(st) => {
588592
let secret = args
@@ -682,6 +686,7 @@ async fn main() -> Result<()> {
682686
info!("Bittensor disabled");
683687
subtensor = None;
684688
subtensor_signer = None;
689+
subtensor_state_path_for_reconnect = None;
685690
subtensor_client = None;
686691
bittensor_client_for_metagraph = None;
687692
}
@@ -1298,7 +1303,7 @@ async fn main() -> Result<()> {
12981303

12991304
handle_block_event(
13001305
event,
1301-
&subtensor,
1306+
&mut subtensor,
13021307
&subtensor_signer,
13031308
&subtensor_client,
13041309
&state_manager,
@@ -1309,6 +1314,8 @@ async fn main() -> Result<()> {
13091314
&chain_state,
13101315
&storage,
13111316
&mut last_weight_submission_epoch,
1317+
&subtensor_endpoint_for_reconnect,
1318+
&subtensor_state_path_for_reconnect,
13121319
).await;
13131320

13141321
// On first block after startup, compute weights immediately so
@@ -1339,7 +1346,7 @@ async fn main() -> Result<()> {
13391346
);
13401347
handle_block_event(
13411348
BlockSyncEvent::CommitWindowOpen { epoch, block: block_number },
1342-
&subtensor,
1349+
&mut subtensor,
13431350
&subtensor_signer,
13441351
&subtensor_client,
13451352
&state_manager,
@@ -1350,6 +1357,8 @@ async fn main() -> Result<()> {
13501357
&chain_state,
13511358
&storage,
13521359
&mut last_weight_submission_epoch,
1360+
&subtensor_endpoint_for_reconnect,
1361+
&subtensor_state_path_for_reconnect,
13531362
).await;
13541363
}
13551364
}
@@ -4706,9 +4715,42 @@ fn compute_weights_for_rpc(
47064715
}
47074716
}
47084717

4718+
async fn try_reconnect_subtensor(
4719+
subtensor: &mut Option<Arc<Subtensor>>,
4720+
endpoint: &str,
4721+
state_path: &Option<std::path::PathBuf>,
4722+
) -> bool {
4723+
info!("Attempting Subtensor RPC reconnection to {}", endpoint);
4724+
let result = if let Some(sp) = state_path {
4725+
Subtensor::with_persistence(endpoint, sp.clone()).await
4726+
} else {
4727+
Subtensor::new(endpoint).await
4728+
};
4729+
match result {
4730+
Ok(st) => {
4731+
*subtensor = Some(Arc::new(st));
4732+
info!("Subtensor RPC reconnected successfully");
4733+
true
4734+
}
4735+
Err(e) => {
4736+
error!("Subtensor RPC reconnection failed: {}", e);
4737+
false
4738+
}
4739+
}
4740+
}
4741+
4742+
fn is_transport_error(err_str: &str) -> bool {
4743+
err_str.contains("connection closed")
4744+
|| err_str.contains("restart required")
4745+
|| err_str.contains("background task")
4746+
|| err_str.contains("transport")
4747+
|| err_str.contains("Connection refused")
4748+
|| err_str.contains("Broken pipe")
4749+
}
4750+
47094751
async fn handle_block_event(
47104752
event: BlockSyncEvent,
4711-
subtensor: &Option<Arc<Subtensor>>,
4753+
subtensor: &mut Option<Arc<Subtensor>>,
47124754
signer: &Option<Arc<BittensorSigner>>,
47134755
client: &Option<SubtensorClient>,
47144756
state_manager: &Arc<StateManager>,
@@ -4719,6 +4761,8 @@ async fn handle_block_event(
47194761
chain_state: &Arc<RwLock<platform_core::ChainState>>,
47204762
storage: &Arc<TrackedStorage>,
47214763
last_weight_submission_epoch: &mut u64,
4764+
subtensor_endpoint: &str,
4765+
subtensor_state_path: &Option<std::path::PathBuf>,
47224766
) {
47234767
match event {
47244768
BlockSyncEvent::NewBlock { block_number, .. } => {
@@ -5149,7 +5193,10 @@ async fn handle_block_event(
51495193
cs.last_computed_weights = weights_to_submit.clone();
51505194
}
51515195

5152-
for (mechanism_id, uids, weights) in weights_to_submit {
5196+
let mut needs_reconnect = false;
5197+
let mut failed_submissions: Vec<(u8, Vec<u16>, Vec<u16>)> = Vec::new();
5198+
5199+
for (mechanism_id, uids, weights) in &weights_to_submit {
51535200
info!(
51545201
"Submitting weights for mechanism {} ({} UIDs)",
51555202
mechanism_id,
@@ -5159,9 +5206,9 @@ async fn handle_block_event(
51595206
.set_mechanism_weights(
51605207
sig,
51615208
netuid,
5162-
mechanism_id,
5163-
&uids,
5164-
&weights,
5209+
*mechanism_id,
5210+
uids,
5211+
weights,
51655212
version_key,
51665213
ExtrinsicWait::Finalized,
51675214
)
@@ -5185,6 +5232,13 @@ async fn handle_block_event(
51855232
mechanism_id, err_str
51865233
);
51875234
*last_weight_submission_epoch = epoch;
5235+
} else if is_transport_error(&err_str) {
5236+
error!(
5237+
"Mechanism {} weight submission failed (transport error): {}",
5238+
mechanism_id, e
5239+
);
5240+
needs_reconnect = true;
5241+
failed_submissions.push((*mechanism_id, uids.clone(), weights.clone()));
51885242
} else {
51895243
error!(
51905244
"Mechanism {} weight submission failed: {}",
@@ -5194,6 +5248,48 @@ async fn handle_block_event(
51945248
}
51955249
}
51965250
}
5251+
5252+
// Drop immutable borrows of subtensor (st, sig) before reconnecting
5253+
drop(weights_to_submit);
5254+
5255+
if needs_reconnect && !failed_submissions.is_empty() {
5256+
if try_reconnect_subtensor(subtensor, subtensor_endpoint, subtensor_state_path).await {
5257+
if let (Some(new_st), Some(sig)) = (subtensor.as_ref(), signer.as_ref()) {
5258+
for (mechanism_id, uids, weights) in &failed_submissions {
5259+
info!("Retrying weight submission after reconnect for mechanism {}", mechanism_id);
5260+
match new_st
5261+
.set_mechanism_weights(
5262+
sig,
5263+
netuid,
5264+
*mechanism_id,
5265+
uids,
5266+
weights,
5267+
version_key,
5268+
ExtrinsicWait::Finalized,
5269+
)
5270+
.await
5271+
{
5272+
Ok(resp) if resp.success => {
5273+
info!(
5274+
"Mechanism {} weights submitted after reconnect: {:?}",
5275+
mechanism_id, resp.tx_hash
5276+
);
5277+
*last_weight_submission_epoch = epoch;
5278+
}
5279+
Ok(resp) => {
5280+
warn!("Mechanism {} issue after reconnect: {}", mechanism_id, resp.message);
5281+
}
5282+
Err(e2) => {
5283+
error!(
5284+
"Mechanism {} weight submission failed even after reconnect: {}",
5285+
mechanism_id, e2
5286+
);
5287+
}
5288+
}
5289+
}
5290+
}
5291+
}
5292+
}
51975293
} else {
51985294
warn!("No Subtensor/signer - cannot submit weights");
51995295
}
@@ -5239,7 +5335,8 @@ async fn handle_block_event(
52395335
warn!("Bittensor disconnected: {}", reason);
52405336
}
52415337
BlockSyncEvent::Reconnected => {
5242-
info!("Bittensor reconnected");
5338+
info!("Bittensor block sync reconnected - also reconnecting Subtensor weight submission client");
5339+
try_reconnect_subtensor(subtensor, subtensor_endpoint, subtensor_state_path).await;
52435340
}
52445341
}
52455342
}

bins/validator-node/src/wasm_executor.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ pub struct WasmChallengeExecutor {
145145
module_versions: RwLock<HashMap<String, u64>>,
146146
persistent_instances: RwLock<HashMap<String, Arc<Mutex<PersistentInstance>>>>,
147147
dedup_state: RwLock<HashMap<String, Arc<DedupState>>>,
148+
/// Cache of last successful get_weights results per challenge.
149+
/// Used to return stale-but-valid data when the dedup guard blocks a concurrent call,
150+
/// instead of returning an empty Vec that the caller treats as "0 entries → 100% burn".
151+
last_good_weights: RwLock<HashMap<String, Vec<platform_challenge_sdk::WeightAssignment>>>,
148152
}
149153

150154
impl WasmChallengeExecutor {
@@ -173,6 +177,7 @@ impl WasmChallengeExecutor {
173177
module_versions: RwLock::new(HashMap::new()),
174178
persistent_instances: RwLock::new(HashMap::new()),
175179
dedup_state: RwLock::new(HashMap::new()),
180+
last_good_weights: RwLock::new(HashMap::new()),
176181
})
177182
}
178183

@@ -1086,7 +1091,16 @@ impl WasmChallengeExecutor {
10861091
let _guard = match dedup.try_acquire(DedupFlags::GET_WEIGHTS) {
10871092
Some(g) => g,
10881093
None => {
1089-
debug!(module = module_path, "get_weights skipped: already running");
1094+
let cache = self.last_good_weights.read();
1095+
if let Some(cached) = cache.get(module_path) {
1096+
info!(
1097+
module = module_path,
1098+
weight_count = cached.len(),
1099+
"get_weights dedup collision: returning cached last-good weights"
1100+
);
1101+
return Ok(cached.clone());
1102+
}
1103+
debug!(module = module_path, "get_weights skipped: already running, no cache available");
10901104
return Ok(Vec::new());
10911105
}
10921106
};
@@ -1153,6 +1167,11 @@ impl WasmChallengeExecutor {
11531167
"WASM get_weights completed"
11541168
);
11551169

1170+
if !weights.is_empty() {
1171+
let mut cache = self.last_good_weights.write();
1172+
cache.insert(module_path.to_string(), weights.clone());
1173+
}
1174+
11561175
Ok(weights)
11571176
}
11581177

0 commit comments

Comments
 (0)