Skip to content

Commit 332f75d

Browse files
committed
fix: auto-recover from corrupted distributed-db and state
- Add is_corruption_error() helper to detect truncated/corrupted files - Auto-delete and recreate distributed-db on corruption - Auto-delete and recreate chain state on corruption - Log warnings when recovery is triggered - Prevents crash loop on corrupted data
1 parent 7080d56 commit 332f75d

File tree

1 file changed

+110
-38
lines changed

1 file changed

+110
-38
lines changed

bins/validator-node/src/main.rs

Lines changed: 110 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -289,13 +289,53 @@ async fn main() -> Result<()> {
289289
};
290290
info!("Using data directory: {:?}", data_dir);
291291

292-
// Open storage
293-
let storage = Storage::open(&data_dir)?;
292+
// Helper to detect corruption errors (truncated files, incomplete writes)
293+
fn is_corruption_error<E: std::fmt::Display>(err: &E) -> bool {
294+
let err_str = err.to_string().to_lowercase();
295+
err_str.contains("unexpected end of file")
296+
|| err_str.contains("io error")
297+
|| err_str.contains("serialization error")
298+
|| err_str.contains("corrupt")
299+
|| err_str.contains("invalid data")
300+
}
301+
302+
// Open storage with corruption recovery
303+
let storage = match Storage::open(&data_dir) {
304+
Ok(s) => s,
305+
Err(e) if is_corruption_error(&e) => {
306+
warn!("Storage corruption detected: {}. Attempting recovery...", e);
307+
// Delete corrupted state file
308+
let state_path = data_dir.join("state");
309+
if state_path.exists() {
310+
warn!("Removing corrupted state directory: {:?}", state_path);
311+
std::fs::remove_dir_all(&state_path)?;
312+
}
313+
// Retry opening
314+
Storage::open(&data_dir)?
315+
}
316+
Err(e) => return Err(e.into()),
317+
};
294318

295-
// Open distributed database for decentralized storage
319+
// Open distributed database with corruption recovery
296320
let db_path = data_dir.join("distributed-db");
297321
info!("Opening distributed database at {:?}", db_path);
298-
let distributed_db = Arc::new(DistributedDB::open(&db_path, keypair.hotkey())?);
322+
let distributed_db = match DistributedDB::open(&db_path, keypair.hotkey()) {
323+
Ok(db) => Arc::new(db),
324+
Err(e) if is_corruption_error(&e) => {
325+
warn!(
326+
"Distributed DB corruption detected: {}. Attempting recovery...",
327+
e
328+
);
329+
// Delete corrupted database
330+
if db_path.exists() {
331+
warn!("Removing corrupted distributed-db: {:?}", db_path);
332+
std::fs::remove_dir_all(&db_path)?;
333+
}
334+
// Retry opening (will create fresh)
335+
Arc::new(DistributedDB::open(&db_path, keypair.hotkey())?)
336+
}
337+
Err(e) => return Err(e.into()),
338+
};
299339
info!(
300340
"Distributed DB initialized - state root: {}",
301341
hex::encode(&distributed_db.state_root()[..8])
@@ -314,44 +354,76 @@ async fn main() -> Result<()> {
314354
let min_stake_tao = args.min_stake;
315355
let min_stake_rao = (args.min_stake * 1_000_000_000.0) as u64;
316356

317-
// Load or create chain state
318-
let chain_state = if let Some(mut state) = storage.load_state()? {
319-
info!("Loaded existing state at block {}", state.block_height);
320-
// IMPORTANT: Sync the loaded state's min_stake with CLI argument
321-
// This ensures add_validator uses the same threshold as metagraph sync
322-
let old_min_stake = state.config.min_stake.0;
323-
state.config.min_stake = Stake::new(min_stake_rao);
324-
if old_min_stake != min_stake_rao {
325-
info!(
326-
"Updated state config min_stake: {} -> {} RAO ({} TAO)",
327-
old_min_stake, min_stake_rao, min_stake_tao
328-
);
357+
// Load or create chain state (with corruption recovery)
358+
let chain_state = match storage.load_state() {
359+
Ok(Some(mut state)) => {
360+
info!("Loaded existing state at block {}", state.block_height);
361+
// IMPORTANT: Sync the loaded state's min_stake with CLI argument
362+
// This ensures add_validator uses the same threshold as metagraph sync
363+
let old_min_stake = state.config.min_stake.0;
364+
state.config.min_stake = Stake::new(min_stake_rao);
365+
if old_min_stake != min_stake_rao {
366+
info!(
367+
"Updated state config min_stake: {} -> {} RAO ({} TAO)",
368+
old_min_stake, min_stake_rao, min_stake_tao
369+
);
370+
}
371+
Arc::new(RwLock::new(state))
329372
}
330-
Arc::new(RwLock::new(state))
331-
} else {
332-
info!("Creating new chain state");
373+
Ok(None) => {
374+
info!("Creating new chain state");
333375

334-
// Determine sudo key - use production key by default
335-
let sudo_key = if let Some(sudo_hex) = &args.sudo_key {
336-
info!("Using custom sudo key");
337-
Hotkey::from_hex(sudo_hex).ok_or_else(|| anyhow::anyhow!("Invalid sudo key"))?
338-
} else {
339-
// Production sudo key: 5GziQCcRpN8NCJktX343brnfuVe3w6gUYieeStXPD1Dag2At
340-
info!("Using production sudo key: {}", SUDO_KEY_SS58);
341-
production_sudo_key()
342-
};
376+
// Determine sudo key - use production key by default
377+
let sudo_key = if let Some(sudo_hex) = &args.sudo_key {
378+
info!("Using custom sudo key");
379+
Hotkey::from_hex(sudo_hex).ok_or_else(|| anyhow::anyhow!("Invalid sudo key"))?
380+
} else {
381+
// Production sudo key: 5GziQCcRpN8NCJktX343brnfuVe3w6gUYieeStXPD1Dag2At
382+
info!("Using production sudo key: {}", SUDO_KEY_SS58);
383+
production_sudo_key()
384+
};
343385

344-
// Select network configuration based on Bittensor connection
345-
// Use min_stake from CLI argument for consistency
346-
let mut config = if args.no_bittensor {
347-
NetworkConfig::default()
348-
} else {
349-
NetworkConfig::production()
350-
};
351-
config.min_stake = Stake::new(min_stake_rao);
386+
// Select network configuration based on Bittensor connection
387+
// Use min_stake from CLI argument for consistency
388+
let mut config = if args.no_bittensor {
389+
NetworkConfig::default()
390+
} else {
391+
NetworkConfig::production()
392+
};
393+
config.min_stake = Stake::new(min_stake_rao);
352394

353-
let state = ChainState::new(sudo_key, config);
354-
Arc::new(RwLock::new(state))
395+
let state = ChainState::new(sudo_key, config);
396+
Arc::new(RwLock::new(state))
397+
}
398+
Err(e) if is_corruption_error(&e) => {
399+
warn!("Chain state corruption detected: {}. Creating fresh state...", e);
400+
// Delete corrupted state
401+
let state_path = data_dir.join("state");
402+
if state_path.exists() {
403+
warn!("Removing corrupted state: {:?}", state_path);
404+
std::fs::remove_dir_all(&state_path)?;
405+
}
406+
407+
// Create fresh state
408+
let sudo_key = if let Some(sudo_hex) = &args.sudo_key {
409+
info!("Using custom sudo key");
410+
Hotkey::from_hex(sudo_hex).ok_or_else(|| anyhow::anyhow!("Invalid sudo key"))?
411+
} else {
412+
info!("Using production sudo key: {}", SUDO_KEY_SS58);
413+
production_sudo_key()
414+
};
415+
416+
let mut config = if args.no_bittensor {
417+
NetworkConfig::default()
418+
} else {
419+
NetworkConfig::production()
420+
};
421+
config.min_stake = Stake::new(min_stake_rao);
422+
423+
let state = ChainState::new(sudo_key, config);
424+
Arc::new(RwLock::new(state))
425+
}
426+
Err(e) => return Err(e.into()),
355427
};
356428

357429
// Set OWNER_HOTKEY env var for challenge containers

0 commit comments

Comments
 (0)