diff --git a/crates/subnet-manager/src/commands.rs b/crates/subnet-manager/src/commands.rs index 9e3a47fd2..40fc77b0a 100644 --- a/crates/subnet-manager/src/commands.rs +++ b/crates/subnet-manager/src/commands.rs @@ -602,48 +602,1205 @@ fn sha256_hex(data: &[u8]) -> String { #[cfg(test)] mod tests { use super::*; - use crate::{HealthConfig, RecoveryConfig}; - use platform_core::Keypair; + use crate::{HealthConfig, RecoveryConfig, SubnetConfig}; + use platform_core::{Keypair, Stake, ValidatorInfo}; use tempfile::tempdir; - #[tokio::test] - async fn test_command_executor() { - let dir = tempdir().unwrap(); - let keypair = Keypair::generate(); - let sudo_key = keypair.hotkey(); - + fn build_executor_with_sudo(dir: &tempfile::TempDir, sudo_key: Hotkey) -> CommandExecutor { + let data_dir = dir.path().to_path_buf(); let state = Arc::new(RwLock::new(ChainState::new( sudo_key.clone(), platform_core::NetworkConfig::default(), ))); - let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + let updates = Arc::new(RwLock::new(UpdateManager::new(data_dir.clone()))); let snapshots = Arc::new(RwLock::new( - SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + SnapshotManager::new(data_dir.clone(), 3).unwrap(), )); let health = Arc::new(RwLock::new(HealthMonitor::new(HealthConfig::default()))); let recovery = Arc::new(RwLock::new(RecoveryManager::new( RecoveryConfig::default(), - dir.path().to_path_buf(), + data_dir.clone(), snapshots.clone(), updates.clone(), ))); - let bans = Arc::new(RwLock::new(BanList::new())); - let executor = CommandExecutor::new( + CommandExecutor::new( sudo_key, - dir.path().to_path_buf(), + data_dir, updates, snapshots, recovery, health, state, bans, - ); + ) + } + + fn create_executor_with_keypair() -> (CommandExecutor, tempfile::TempDir, Keypair) { + let dir = tempdir().unwrap(); + let keypair = Keypair::generate(); + let executor = build_executor_with_sudo(&dir, keypair.hotkey()); + (executor, dir, keypair) + } + + fn create_test_executor() -> (CommandExecutor, tempfile::TempDir) { + let (executor, dir, _) = create_executor_with_keypair(); + (executor, dir) + } + + #[tokio::test] + async fn test_command_executor_creation() { + let (_executor, _dir) = create_test_executor(); + // Test executor creation works + } + + #[test] + fn test_command_result_ok() { + let result = CommandResult::ok("Test success"); + assert!(result.success); + assert_eq!(result.message, "Test success"); + assert!(result.data.is_none()); + } + + #[test] + fn test_command_result_ok_with_data() { + let data = serde_json::json!({"key": "value"}); + let result = CommandResult::ok_with_data("Success with data", data.clone()); + assert!(result.success); + assert_eq!(result.message, "Success with data"); + assert_eq!(result.data.unwrap(), data); + } + + #[test] + fn test_command_result_error() { + let result = CommandResult::error("Test error"); + assert!(!result.success); + assert_eq!(result.message, "Test error"); + assert!(result.data.is_none()); + } + + #[test] + fn test_subnet_command_serialization() { + let commands = vec![ + SubnetCommand::GetStatus, + SubnetCommand::GetHealth, + SubnetCommand::ListChallenges, + SubnetCommand::ListValidators, + SubnetCommand::ListSnapshots, + SubnetCommand::ListBanned, + SubnetCommand::PauseSubnet { reason: "test".into() }, + SubnetCommand::ResumeSubnet, + ]; + + for cmd in commands { + let json = serde_json::to_string(&cmd).unwrap(); + let decoded: SubnetCommand = serde_json::from_str(&json).unwrap(); + // Verify it deserializes + let _ = serde_json::to_string(&decoded).unwrap(); + } + } + + #[test] + fn test_verify_signature_accepts_sudo_key() { + let (executor, _dir, keypair) = create_executor_with_keypair(); + let cmd = SubnetCommand::ListChallenges; + let signed = keypair.sign_data(&cmd).unwrap(); + + assert!(executor.verify_signature(&signed)); + } + + #[test] + fn test_verify_signature_rejects_wrong_signer() { + let (executor, _dir, _keypair) = create_executor_with_keypair(); + let other = Keypair::generate(); + let cmd = SubnetCommand::ListChallenges; + let signed = other.sign_data(&cmd).unwrap(); + + assert!(!executor.verify_signature(&signed)); + } + + #[test] + fn test_verify_signature_invalid_signature_bytes() { + let (executor, _dir, keypair) = create_executor_with_keypair(); + let cmd = SubnetCommand::ListChallenges; + let mut signed = keypair.sign_data(&cmd).unwrap(); + signed.signature = vec![1, 2, 3]; + + assert!(!executor.verify_signature(&signed)); + } + + #[tokio::test] + async fn test_execute_rejects_invalid_signature() { + let (executor, _dir, _keypair) = create_executor_with_keypair(); + let other = Keypair::generate(); + let cmd = SubnetCommand::ListChallenges; + let signed = other.sign_data(&cmd).unwrap(); + + let result = executor.execute(&signed).await; + assert!(!result.success); + assert!(result.message.contains("Invalid signature")); + } + + #[tokio::test] + async fn test_execute_deserialize_failure() { + let (executor, _dir, keypair) = create_executor_with_keypair(); + let signed = keypair.sign_data(&123u64).unwrap(); + + let result = executor.execute(&signed).await; + assert!(!result.success); + assert!(result + .message + .contains("Failed to deserialize command")); + } + + #[tokio::test] + async fn test_execute_succeeds_with_valid_signed_command() { + let (executor, _dir, keypair) = create_executor_with_keypair(); + let cmd = SubnetCommand::ListChallenges; + let signed = keypair.sign_data(&cmd).unwrap(); + + let result = executor.execute(&signed).await; + assert!(result.success); + } - // Test GetStatus command (unsigned for internal use) + #[tokio::test] + async fn test_get_status_command() { + let (executor, _dir) = create_test_executor(); let result = executor.execute_command(&SubnetCommand::GetStatus).await; assert!(result.success); assert!(result.data.is_some()); } + + #[tokio::test] + async fn test_get_health_command() { + let (executor, _dir) = create_test_executor(); + let result = executor.execute_command(&SubnetCommand::GetHealth).await; + assert!(result.success); + assert!(result.data.is_some()); + } + + #[tokio::test] + async fn test_list_challenges_command() { + let (executor, _dir) = create_test_executor(); + let result = executor.execute_command(&SubnetCommand::ListChallenges).await; + assert!(result.success); + assert!(result.data.is_some()); + } + + #[tokio::test] + async fn test_list_validators_command() { + let (executor, _dir) = create_test_executor(); + let result = executor.execute_command(&SubnetCommand::ListValidators).await; + assert!(result.success); + assert!(result.data.is_some()); + } + + #[tokio::test] + async fn test_list_snapshots_command() { + let (executor, _dir) = create_test_executor(); + let result = executor.execute_command(&SubnetCommand::ListSnapshots).await; + assert!(result.success); + assert!(result.data.is_some()); + } + + #[tokio::test] + async fn test_pause_resume_subnet() { + let (executor, _dir) = create_test_executor(); + + // Pause subnet + let result = executor.execute_command(&SubnetCommand::PauseSubnet { + reason: "Test pause".into(), + }).await; + assert!(result.success); + + // Resume subnet + let result = executor.execute_command(&SubnetCommand::ResumeSubnet).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_create_snapshot_command() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::CreateSnapshot { + name: "Test Snapshot".into(), + reason: "Testing".into(), + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_create_snapshot_error_path() { + let (executor, _dir) = create_test_executor(); + + // Remove the snapshots directory to force SnapshotManager::create_snapshot to fail + let snapshots_dir = executor.data_dir.join("snapshots"); + std::fs::remove_dir_all(&snapshots_dir).unwrap(); + + let result = executor.execute_command(&SubnetCommand::CreateSnapshot { + name: "Broken Snapshot".into(), + reason: "Force failure".into(), + }).await; + + assert!(!result.success); + assert!(result.message.contains("Failed to create snapshot")); + } + #[tokio::test] + async fn test_rollback_to_snapshot_apply_failure() { + use crate::snapshot::Snapshot; + + let (executor, _dir) = create_test_executor(); + + // Create a snapshot via the command interface + executor + .execute_command(&SubnetCommand::CreateSnapshot { + name: "Corruptible".into(), + reason: "Testing failure".into(), + }) + .await; + + // Fetch the snapshot ID + let list_result = executor.execute_command(&SubnetCommand::ListSnapshots).await; + let snapshot_id = list_result + .data + .and_then(|data| data.as_array().cloned()) + .and_then(|mut arr| arr.pop()) + .and_then(|snapshot| snapshot.get("id").and_then(|v| v.as_str()).map(|s| s.to_string())) + .and_then(|s| uuid::Uuid::parse_str(&s).ok()) + .expect("expected snapshot id"); + + // Corrupt the snapshot contents so apply_snapshot fails (while restore succeeds) + let snapshot_path = executor + .data_dir + .join("snapshots") + .join(format!("{}.snapshot", snapshot_id)); + let bytes = std::fs::read(&snapshot_path).unwrap(); + let mut snapshot: Snapshot = bincode::deserialize(&bytes).unwrap(); + snapshot.chain_state = vec![1, 2, 3]; + snapshot.meta.state_hash = sha256_hex(&snapshot.chain_state); + let corrupt = bincode::serialize(&snapshot).unwrap(); + std::fs::write(&snapshot_path, corrupt).unwrap(); + + let result = executor + .execute_command(&SubnetCommand::RollbackToSnapshot { snapshot_id }) + .await; + + assert!(!result.success); + assert!(result.message.contains("Failed to apply snapshot")); + } + + #[tokio::test] + async fn test_rollback_to_snapshot_error_path() { + let (executor, _dir) = create_test_executor(); + let fake_id = uuid::Uuid::new_v4(); + + let result = executor + .execute_command(&SubnetCommand::RollbackToSnapshot { snapshot_id: fake_id }) + .await; + + assert!(!result.success); + assert!(result.message.contains("Failed to restore snapshot")); + } + + #[tokio::test] + async fn test_rollback_to_snapshot_success_path() { + let (executor, _dir) = create_test_executor(); + + let (snapshot_id, original_height, original_epoch) = { + let state = executor.state.read(); + let mut snapshots = executor.snapshots.write(); + let id = snapshots + .create_snapshot( + "rollback-success", + state.block_height, + state.epoch, + &state, + "test", + false, + ) + .unwrap(); + (id, state.block_height, state.epoch) + }; + + { + let mut state = executor.state.write(); + state.block_height = original_height + 500; + state.epoch = original_epoch + 5; + } + + let result = executor + .execute_command(&SubnetCommand::RollbackToSnapshot { snapshot_id }) + .await; + + assert!(result.success); + assert!(result.message.contains("Rolled back")); + + let state = executor.state.read(); + assert_eq!(state.block_height, original_height); + assert_eq!(state.epoch, original_epoch); + } + + #[tokio::test] + async fn test_update_config_command() { + let (executor, _dir) = create_test_executor(); + + let config = SubnetConfig { + version: "1.0.0".into(), + ..Default::default() + }; + + let result = executor.execute_command(&SubnetCommand::UpdateConfig { + config, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_set_epoch_length_command() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::SetEpochLength { + blocks: 1000, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_set_epoch_length_updates_config() { + let (executor, dir) = create_test_executor(); + let config_path = dir.path().join("subnet_config.json"); + + let mut config = SubnetConfig::default(); + config.epoch_length = 500; + config.save(&config_path).unwrap(); + + let new_length = 4321u64; + let result = executor.execute_command(&SubnetCommand::SetEpochLength { + blocks: new_length, + }).await; + assert!(result.success); + assert!(result.message.contains("Epoch length set")); + + let updated = SubnetConfig::load(&config_path).unwrap(); + assert_eq!(updated.epoch_length, new_length); + } + + #[tokio::test] + async fn test_set_min_stake_command() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::SetMinStake { + amount: 10000, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_set_min_stake_updates_config() { + let (executor, dir) = create_test_executor(); + let config_path = dir.path().join("subnet_config.json"); + + let mut config = SubnetConfig::default(); + config.min_stake = 5_000; + config.save(&config_path).unwrap(); + + let new_amount = 42_000u64; + let result = executor.execute_command(&SubnetCommand::SetMinStake { + amount: new_amount, + }).await; + assert!(result.success); + assert!(result.message.contains("Min stake set")); + + let updated = SubnetConfig::load(&config_path).unwrap(); + assert_eq!(updated.min_stake, new_amount); + } + + #[tokio::test] + async fn test_deploy_challenge_command() { + let (executor, _dir) = create_test_executor(); + + let config = ChallengeConfig { + id: "test-challenge".into(), + name: "Test Challenge".into(), + wasm_hash: "hash".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }; + + let wasm_bytes = vec![0u8; 100]; + + let result = executor.execute_command(&SubnetCommand::DeployChallenge { + config, + wasm_bytes, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_pause_resume_challenge() { + let (executor, _dir) = create_test_executor(); + + // Deploy a challenge first + let config = ChallengeConfig { + id: "pause-test".into(), + name: "Pause Test".into(), + wasm_hash: "hash".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }; + + executor.execute_command(&SubnetCommand::DeployChallenge { + config, + wasm_bytes: vec![0u8; 100], + }).await; + + // Pause challenge + let result = executor.execute_command(&SubnetCommand::PauseChallenge { + challenge_id: "pause-test".into(), + }).await; + assert!(result.success); + + // Resume challenge + let result = executor.execute_command(&SubnetCommand::ResumeChallenge { + challenge_id: "pause-test".into(), + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_update_challenge_command() { + let (executor, _dir) = create_test_executor(); + + // Deploy a challenge first + let config = ChallengeConfig { + id: "update-test".into(), + name: "Update Test".into(), + wasm_hash: "hash".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }; + + executor.execute_command(&SubnetCommand::DeployChallenge { + config: config.clone(), + wasm_bytes: vec![0u8; 100], + }).await; + + // Update challenge + let result = executor.execute_command(&SubnetCommand::UpdateChallenge { + challenge_id: "update-test".into(), + config: Some(config), + wasm_bytes: None, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_remove_challenge_command() { + let (executor, _dir) = create_test_executor(); + + // Deploy a challenge first + let config = ChallengeConfig { + id: "remove-test".into(), + name: "Remove Test".into(), + wasm_hash: "hash".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }; + + executor.execute_command(&SubnetCommand::DeployChallenge { + config, + wasm_bytes: vec![0u8; 100], + }).await; + + // Remove challenge + let result = executor.execute_command(&SubnetCommand::RemoveChallenge { + challenge_id: "remove-test".into(), + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_ban_unban_validator() { + let (executor, _dir) = create_test_executor(); + + let hotkey = Hotkey([1u8; 32]); + + // Ban validator + let result = executor.execute_command(&SubnetCommand::BanValidator { + hotkey: hotkey.clone(), + reason: "Test ban".into(), + }).await; + assert!(result.success); + + // Unban validator + let result = executor.execute_command(&SubnetCommand::UnbanValidator { + hotkey, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_ban_unban_hotkey() { + let (executor, _dir) = create_test_executor(); + + let hotkey = Hotkey([2u8; 32]); + + // Ban hotkey + let result = executor.execute_command(&SubnetCommand::BanHotkey { + hotkey: hotkey.clone(), + reason: "Test hotkey ban".into(), + }).await; + assert!(result.success); + + // Unban hotkey + let result = executor.execute_command(&SubnetCommand::UnbanHotkey { + hotkey, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_ban_unban_coldkey() { + let (executor, _dir) = create_test_executor(); + + let coldkey = "5GTestColdkey"; + + // Ban coldkey + let result = executor.execute_command(&SubnetCommand::BanColdkey { + coldkey: coldkey.into(), + reason: "Test coldkey ban".into(), + }).await; + assert!(result.success); + + // Unban coldkey + let result = executor.execute_command(&SubnetCommand::UnbanColdkey { + coldkey: coldkey.into(), + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_list_banned_command() { + let (executor, _dir) = create_test_executor(); + + // Ban some entities + let hotkey = Hotkey([3u8; 32]); + executor.execute_command(&SubnetCommand::BanValidator { + hotkey, + reason: "Test".into(), + }).await; + + // List banned + let result = executor.execute_command(&SubnetCommand::ListBanned).await; + assert!(result.success); + assert!(result.data.is_some()); + } + + #[tokio::test] + async fn test_kick_validator_command() { + let (executor, _dir) = create_test_executor(); + + let hotkey = Hotkey([4u8; 32]); + + let result = executor.execute_command(&SubnetCommand::KickValidator { + hotkey, + reason: "Test kick".into(), + }).await; + // Might fail if validator doesn't exist, but command should execute + assert!(result.success || result.message.contains("not found") || result.message.contains("Not found")); + } + + #[tokio::test] + async fn test_kick_validator_when_exists() { + let (executor, _dir) = create_test_executor(); + let hotkey = Hotkey([5u8; 32]); + + { + let mut state = executor.state.write(); + state.validators.insert( + hotkey.clone(), + ValidatorInfo::new(hotkey.clone(), Stake::new(1_000_000_000)), + ); + } + + let result = executor.execute_command(&SubnetCommand::KickValidator { + hotkey: hotkey.clone(), + reason: "cleanup".into(), + }).await; + + assert!(result.success); + let state = executor.state.read(); + assert!(!state.validators.contains_key(&hotkey)); + } + + #[tokio::test] + async fn test_sync_validators_command() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::SyncValidators).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_trigger_recovery_command() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::TriggerRecovery { + action: RecoveryAction::ClearJobQueue, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_trigger_recovery_error_path() { + let (executor, _dir) = create_test_executor(); + let missing_snapshot = uuid::Uuid::new_v4(); + + let result = executor + .execute_command(&SubnetCommand::TriggerRecovery { + action: RecoveryAction::RollbackToSnapshot(missing_snapshot), + }) + .await; + + assert!(!result.success); + assert!(result.message.contains("Recovery failed")); + } + + #[tokio::test] + async fn test_hard_reset_command() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::HardReset { + reason: "Test reset".into(), + preserve_validators: true, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_rollback_to_snapshot_command() { + let (executor, _dir) = create_test_executor(); + + // Create a snapshot first + executor.execute_command(&SubnetCommand::CreateSnapshot { + name: "Test".into(), + reason: "Test".into(), + }).await; + + // Get snapshot ID from list + let list_result = executor.execute_command(&SubnetCommand::ListSnapshots).await; + if let Some(data) = list_result.data { + if let Some(snapshots) = data.as_array() { + if let Some(snapshot) = snapshots.first() { + if let Some(id_str) = snapshot.get("id").and_then(|v| v.as_str()) { + if let Ok(id) = uuid::Uuid::parse_str(id_str) { + let result = executor.execute_command(&SubnetCommand::RollbackToSnapshot { + snapshot_id: id, + }).await; + assert!(result.success); + } + } + } + } + } + } + + #[test] + fn test_sha256_hex() { + let data = b"test data"; + let hash = sha256_hex(data); + assert_eq!(hash.len(), 64); // SHA256 = 32 bytes = 64 hex chars + + // Same input should produce same hash + let hash2 = sha256_hex(data); + assert_eq!(hash, hash2); + + // Different input should produce different hash + let hash3 = sha256_hex(b"different"); + assert_ne!(hash, hash3); + } + + #[test] + fn test_command_variants_coverage() { + // Test serialization of all command variants + let commands = vec![ + SubnetCommand::DeployChallenge { + config: ChallengeConfig { + id: "test".into(), + name: "Test".into(), + wasm_hash: "hash".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }, + wasm_bytes: vec![], + }, + SubnetCommand::UpdateChallenge { + challenge_id: "test".into(), + config: None, + wasm_bytes: None, + }, + SubnetCommand::RemoveChallenge { + challenge_id: "test".into(), + }, + SubnetCommand::PauseChallenge { + challenge_id: "test".into(), + }, + SubnetCommand::ResumeChallenge { + challenge_id: "test".into(), + }, + SubnetCommand::SyncValidators, + SubnetCommand::KickValidator { + hotkey: Hotkey([0u8; 32]), + reason: "test".into(), + }, + SubnetCommand::BanValidator { + hotkey: Hotkey([0u8; 32]), + reason: "test".into(), + }, + SubnetCommand::UnbanValidator { + hotkey: Hotkey([0u8; 32]), + }, + SubnetCommand::BanHotkey { + hotkey: Hotkey([0u8; 32]), + reason: "test".into(), + }, + SubnetCommand::BanColdkey { + coldkey: "test".into(), + reason: "test".into(), + }, + SubnetCommand::UnbanHotkey { + hotkey: Hotkey([0u8; 32]), + }, + SubnetCommand::UnbanColdkey { + coldkey: "test".into(), + }, + SubnetCommand::ListBanned, + SubnetCommand::UpdateConfig { + config: SubnetConfig::default(), + }, + SubnetCommand::SetEpochLength { blocks: 1000 }, + SubnetCommand::SetMinStake { amount: 10000 }, + SubnetCommand::CreateSnapshot { + name: "test".into(), + reason: "test".into(), + }, + SubnetCommand::RollbackToSnapshot { + snapshot_id: uuid::Uuid::new_v4(), + }, + SubnetCommand::HardReset { + reason: "test".into(), + preserve_validators: true, + }, + SubnetCommand::PauseSubnet { + reason: "test".into(), + }, + SubnetCommand::ResumeSubnet, + SubnetCommand::TriggerRecovery { + action: RecoveryAction::ClearJobQueue, + }, + SubnetCommand::GetStatus, + SubnetCommand::GetHealth, + SubnetCommand::ListChallenges, + SubnetCommand::ListValidators, + SubnetCommand::ListSnapshots, + ]; + + for cmd in commands { + let json = serde_json::to_string(&cmd).unwrap(); + let _decoded: SubnetCommand = serde_json::from_str(&json).unwrap(); + } + } + + #[test] + fn test_command_result_serialization() { + let result_ok = CommandResult::ok("success"); + let json = serde_json::to_string(&result_ok).unwrap(); + let decoded: CommandResult = serde_json::from_str(&json).unwrap(); + assert!(decoded.success); + assert_eq!(decoded.message, "success"); + + let result_err = CommandResult::error("failure"); + let json = serde_json::to_string(&result_err).unwrap(); + let decoded: CommandResult = serde_json::from_str(&json).unwrap(); + assert!(!decoded.success); + assert_eq!(decoded.message, "failure"); + } + + #[tokio::test] + async fn test_deploy_multiple_challenges() { + let (executor, _dir) = create_test_executor(); + + for i in 0..3 { + let config = ChallengeConfig { + id: format!("challenge{}", i), + name: format!("Challenge {}", i), + wasm_hash: format!("hash{}", i), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }; + + let result = executor.execute_command(&SubnetCommand::DeployChallenge { + config, + wasm_bytes: vec![0u8; 100], + }).await; + assert!(result.success); + } + + // List challenges + let result = executor.execute_command(&SubnetCommand::ListChallenges).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_update_challenge_wasm_only() { + let (executor, _dir) = create_test_executor(); + + let config = ChallengeConfig { + id: "wasm_update_test".into(), + name: "WASM Update Test".into(), + wasm_hash: "hash1".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }; + + executor.execute_command(&SubnetCommand::DeployChallenge { + config: config.clone(), + wasm_bytes: vec![0u8; 100], + }).await; + + // Update only WASM + let result = executor.execute_command(&SubnetCommand::UpdateChallenge { + challenge_id: "wasm_update_test".into(), + config: None, + wasm_bytes: Some(vec![1u8; 200]), + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_update_challenge_config_only() { + let (executor, _dir) = create_test_executor(); + + let config = ChallengeConfig { + id: "config_update_test".into(), + name: "Config Update Test".into(), + wasm_hash: "hash1".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }; + + executor.execute_command(&SubnetCommand::DeployChallenge { + config: config.clone(), + wasm_bytes: vec![0u8; 100], + }).await; + + // Update only config + let updated_config = ChallengeConfig { + emission_weight: 2.0, + ..config + }; + + let result = executor.execute_command(&SubnetCommand::UpdateChallenge { + challenge_id: "config_update_test".into(), + config: Some(updated_config), + wasm_bytes: None, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_remove_nonexistent_challenge() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::RemoveChallenge { + challenge_id: "nonexistent".into(), + }).await; + assert!(result.success); + assert_eq!(result.message, "Challenge removed: nonexistent"); + } + + #[tokio::test] + async fn test_pause_nonexistent_challenge() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::PauseChallenge { + challenge_id: "nonexistent".into(), + }).await; + assert!(result.success); + assert_eq!(result.message, "Challenge paused: nonexistent"); + } + + #[tokio::test] + async fn test_multiple_ban_operations() { + let (executor, _dir) = create_test_executor(); + + let hotkeys = vec![Hotkey([10u8; 32]), Hotkey([20u8; 32]), Hotkey([30u8; 32])]; + + // Ban multiple validators + for hotkey in &hotkeys { + let result = executor.execute_command(&SubnetCommand::BanValidator { + hotkey: hotkey.clone(), + reason: "Test ban".into(), + }).await; + assert!(result.success); + } + + // List banned + let result = executor.execute_command(&SubnetCommand::ListBanned).await; + assert!(result.success); + assert!(result.data.is_some()); + + // Unban one + let result = executor.execute_command(&SubnetCommand::UnbanValidator { + hotkey: hotkeys[0].clone(), + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_set_epoch_length_zero() { + let (executor, dir) = create_test_executor(); + + let config_path = dir.path().join("subnet_config.json"); + let mut config = SubnetConfig::default(); + config.epoch_length = 42; + config.save(&config_path).unwrap(); + + let result = executor.execute_command(&SubnetCommand::SetEpochLength { + blocks: 0, + }).await; + assert!(result.success); + + let updated = SubnetConfig::load(&config_path).unwrap(); + assert_eq!(updated.epoch_length, 0); + } + + #[tokio::test] + async fn test_set_min_stake_zero() { + let (executor, dir) = create_test_executor(); + + let config_path = dir.path().join("subnet_config.json"); + let mut config = SubnetConfig::default(); + config.min_stake = 123; + config.save(&config_path).unwrap(); + + let result = executor.execute_command(&SubnetCommand::SetMinStake { + amount: 0, + }).await; + assert!(result.success); + + let updated = SubnetConfig::load(&config_path).unwrap(); + assert_eq!(updated.min_stake, 0); + } + + #[tokio::test] + async fn test_multiple_snapshots_creation() { + let (executor, _dir) = create_test_executor(); + + for i in 0..3 { + let result = executor.execute_command(&SubnetCommand::CreateSnapshot { + name: format!("Snapshot {}", i), + reason: format!("Test {}", i), + }).await; + assert!(result.success); + } + + let result = executor.execute_command(&SubnetCommand::ListSnapshots).await; + assert!(result.success); + assert!(result.data.is_some()); + } + + #[tokio::test] + async fn test_hard_reset_with_preserve_validators() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::HardReset { + reason: "Test with preserve".into(), + preserve_validators: true, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_hard_reset_without_preserve_validators() { + let (executor, _dir) = create_test_executor(); + + let result = executor.execute_command(&SubnetCommand::HardReset { + reason: "Test without preserve".into(), + preserve_validators: false, + }).await; + assert!(result.success); + } + + #[test] + fn test_sha256_hex_consistency() { + let data1 = b"consistent data"; + let hash1 = sha256_hex(data1); + let hash2 = sha256_hex(data1); + assert_eq!(hash1, hash2); + + let data2 = b"different data"; + let hash3 = sha256_hex(data2); + assert_ne!(hash1, hash3); + } + + #[tokio::test] + async fn test_trigger_recovery_all_actions() { + let (executor, _dir) = create_test_executor(); + + let actions = vec![ + RecoveryAction::RestartEvaluations, + RecoveryAction::ClearJobQueue, + RecoveryAction::ReconnectPeers, + RecoveryAction::Pause, + RecoveryAction::Resume, + ]; + + for action in actions { + let result = executor.execute_command(&SubnetCommand::TriggerRecovery { + action: action.clone(), + }).await; + assert!(result.success); + } + } + + #[tokio::test] + async fn test_update_challenge_both_none() { + let (executor, _dir) = create_test_executor(); + + // Path for line 240: wasm_bytes is none and config is none + let result = executor.execute_command(&SubnetCommand::UpdateChallenge { + challenge_id: "test".into(), + config: None, + wasm_bytes: None, + }).await; + assert!(!result.success); + } + + #[tokio::test] + async fn test_remove_nonexistent_challenge_error() { + let (executor, _dir) = create_test_executor(); + + // Path for line 299 + let result = executor.execute_command(&SubnetCommand::RemoveChallenge { + challenge_id: "definitely_does_not_exist".into(), + }).await; + assert!(result.success); + assert_eq!(result.message, "Challenge removed: definitely_does_not_exist"); + } + + #[tokio::test] + async fn test_pause_resume_challenge_errors() { + let (executor, _dir) = create_test_executor(); + + // Paths for lines 332, 381 + let pause_result = executor.execute_command(&SubnetCommand::PauseChallenge { + challenge_id: "nonexistent".into(), + }).await; + + let resume_result = executor.execute_command(&SubnetCommand::ResumeChallenge { + challenge_id: "nonexistent".into(), + }).await; + + assert!(pause_result.success); + assert!(pause_result.message.contains("paused")); + assert!(resume_result.success); + assert!(resume_result.message.contains("resumed")); + } + + #[tokio::test] + async fn test_unban_nonexistent_entities() { + let (executor, _dir) = create_test_executor(); + + // Paths for lines 416-417, 425-426 + let validator_result = executor.execute_command(&SubnetCommand::UnbanValidator { + hotkey: Hotkey([99u8; 32]), + }).await; + + let hotkey_result = executor.execute_command(&SubnetCommand::UnbanHotkey { + hotkey: Hotkey([88u8; 32]), + }).await; + + let coldkey_result = executor.execute_command(&SubnetCommand::UnbanColdkey { + coldkey: "nonexistent_coldkey".into(), + }).await; + + assert!(!validator_result.success); + assert!(validator_result.message.contains("not in ban list")); + assert!(!hotkey_result.success); + assert!(hotkey_result.message.contains("not in ban list")); + assert!(!coldkey_result.success); + assert!(coldkey_result.message.contains("not in ban list")); + } + + #[tokio::test] + async fn test_set_epoch_length_update() { + let (executor, _dir) = create_test_executor(); + + // Path for line 445 + let result = executor.execute_command(&SubnetCommand::SetEpochLength { + blocks: 5000, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_set_min_stake_update() { + let (executor, _dir) = create_test_executor(); + + // Paths for lines 458, 460 + let result = executor.execute_command(&SubnetCommand::SetMinStake { + amount: 50000, + }).await; + assert!(result.success); + } + + #[tokio::test] + async fn test_rollback_to_invalid_snapshot() { + let (executor, _dir) = create_test_executor(); + + // Path for line 502 + let result = executor.execute_command(&SubnetCommand::RollbackToSnapshot { + snapshot_id: uuid::Uuid::new_v4(), + }).await; + // Should handle gracefully + } + + #[tokio::test] + async fn test_trigger_recovery_hard_reset() { + let (executor, _dir) = create_test_executor(); + + // Paths for lines 555-557 + let result = executor.execute_command(&SubnetCommand::TriggerRecovery { + action: RecoveryAction::HardReset { + reason: "Test hard reset recovery".into(), + }, + }).await; + assert!(result.success); + } } diff --git a/crates/subnet-manager/src/config.rs b/crates/subnet-manager/src/config.rs index 5b3d15fe3..e78bc1ecd 100644 --- a/crates/subnet-manager/src/config.rs +++ b/crates/subnet-manager/src/config.rs @@ -369,6 +369,41 @@ pub struct BanSummary { #[cfg(test)] mod tests { use super::*; + use tempfile::tempdir; + + #[test] + fn test_subnet_config_load_success() { + let dir = tempdir().unwrap(); + let path = dir.path().join("subnet_config.json"); + + let mut config = SubnetConfig::default(); + config.name = "Load Test".into(); + config.max_validators = 42; + config.save(&path).unwrap(); + + let loaded = SubnetConfig::load(&path).unwrap(); + let expected = serde_json::to_value(&config).unwrap(); + let actual = serde_json::to_value(&loaded).unwrap(); + assert_eq!(actual, expected); + } + + #[test] + fn test_subnet_config_validate_errors() { + let mut config = SubnetConfig::default(); + config.epoch_length = 0; + let err = config.validate().unwrap_err(); + assert!(matches!(err, ConfigError::InvalidValue(msg) if msg.contains("epoch_length"))); + + let mut config = SubnetConfig::default(); + config.max_validators = 0; + let err = config.validate().unwrap_err(); + assert!(matches!(err, ConfigError::InvalidValue(msg) if msg.contains("max_validators"))); + + let mut config = SubnetConfig::default(); + config.weight_interval = 0; + let err = config.validate().unwrap_err(); + assert!(matches!(err, ConfigError::InvalidValue(msg) if msg.contains("weight_interval"))); + } #[test] fn test_ban_list() { @@ -407,4 +442,32 @@ mod tests { // 1000 TAO = 1000 * 10^9 RAO assert_eq!(MIN_VALIDATOR_STAKE, 1_000_000_000_000); } + + #[test] + fn test_ban_list_load_from_file() { + let dir = tempdir().unwrap(); + let path = dir.path().join("bans.json"); + let hotkey = Hotkey([9u8; 32]); + + { + let mut bans = BanList::new(); + bans.ban_validator(&hotkey, "test", "sudo"); + bans.save(&path).unwrap(); + } + + let loaded = BanList::load(&path).unwrap(); + assert!(loaded.is_validator_banned(&hotkey)); + } + + #[test] + fn test_ban_list_load_missing_file_returns_default() { + let dir = tempdir().unwrap(); + let path = dir.path().join("does_not_exist.json"); + + let loaded = BanList::load(&path).unwrap(); + + assert!(loaded.banned_validators.is_empty()); + assert!(loaded.banned_hotkeys.is_empty()); + assert!(loaded.banned_coldkeys.is_empty()); + } } diff --git a/crates/subnet-manager/src/health.rs b/crates/subnet-manager/src/health.rs index b538c6263..d7975443c 100644 --- a/crates/subnet-manager/src/health.rs +++ b/crates/subnet-manager/src/health.rs @@ -568,10 +568,65 @@ impl HealthMonitor { } } +#[cfg(test)] +impl HealthMonitor { + pub(crate) fn test_history_mut(&mut self) -> &mut VecDeque { + &mut self.history + } + + pub(crate) fn test_failure_counts_mut( + &mut self, + ) -> &mut std::collections::HashMap { + &mut self.failure_counts + } +} + #[cfg(test)] mod tests { use super::*; + #[test] + fn test_health_status_equality() { + assert_eq!(HealthStatus::Healthy, HealthStatus::Healthy); + assert_ne!(HealthStatus::Healthy, HealthStatus::Degraded); + assert_ne!(HealthStatus::Degraded, HealthStatus::Unhealthy); + assert_ne!(HealthStatus::Unhealthy, HealthStatus::Critical); + } + + #[test] + fn test_alert_severity() { + let severities = vec![ + AlertSeverity::Info, + AlertSeverity::Warning, + AlertSeverity::Error, + AlertSeverity::Critical, + ]; + + for severity in severities { + let alert = HealthAlert { + id: uuid::Uuid::new_v4(), + severity, + message: "Test alert".into(), + component: "test".into(), + created_at: Utc::now(), + acknowledged: false, + }; + assert_eq!(alert.severity, severity); + } + } + + #[test] + fn test_health_metrics_default() { + let metrics = HealthMetrics::default(); + assert_eq!(metrics.memory_percent, 0.0); + assert_eq!(metrics.cpu_percent, 0.0); + assert_eq!(metrics.disk_percent, 0.0); + assert_eq!(metrics.pending_jobs, 0); + assert_eq!(metrics.running_jobs, 0); + assert_eq!(metrics.evaluations_per_hour, 0); + assert_eq!(metrics.failures_per_hour, 0); + } + #[test] fn test_health_check() { let config = HealthConfig::default(); @@ -647,4 +702,358 @@ mod tests { let check = monitor.check(metrics); assert_eq!(check.status, HealthStatus::Critical); } + + #[test] + fn test_high_cpu_warning() { + let config = HealthConfig { + cpu_warn_percent: 80, + ..Default::default() + }; + let mut monitor = HealthMonitor::new(config); + + let metrics = HealthMetrics { + memory_percent: 50.0, + cpu_percent: 95.0, // High CPU + disk_percent: 40.0, + pending_jobs: 10, + running_jobs: 2, + evaluations_per_hour: 100, + failures_per_hour: 1, + avg_eval_time_ms: 500, + connected_peers: 5, + block_height: 1000, + epoch: 10, + uptime_secs: 3600, + }; + + let check = monitor.check(metrics); + assert_ne!(check.status, HealthStatus::Healthy); + } + + #[test] + fn test_high_disk_warning() { + let config = HealthConfig { + disk_warn_percent: 75, + ..Default::default() + }; + let mut monitor = HealthMonitor::new(config); + + let metrics = HealthMetrics { + memory_percent: 50.0, + cpu_percent: 50.0, + disk_percent: 90.0, // High disk + pending_jobs: 10, + running_jobs: 2, + evaluations_per_hour: 100, + failures_per_hour: 1, + avg_eval_time_ms: 500, + connected_peers: 5, + block_height: 1000, + epoch: 10, + uptime_secs: 3600, + }; + + let check = monitor.check(metrics); + assert_ne!(check.status, HealthStatus::Healthy); + } + + #[test] + fn test_high_failure_rate() { + let config = HealthConfig { + cpu_warn_percent: 30, + memory_warn_percent: 30, + ..Default::default() + }; + let mut monitor = HealthMonitor::new(config); + + let metrics = HealthMetrics { + memory_percent: 50.0, + cpu_percent: 50.0, + disk_percent: 50.0, + pending_jobs: 10, + running_jobs: 2, + evaluations_per_hour: 100, + failures_per_hour: 50, // 50% failure rate + avg_eval_time_ms: 500, + connected_peers: 5, + block_height: 1000, + epoch: 10, + uptime_secs: 3600, + }; + + let check = monitor.check(metrics); + assert_ne!(check.status, HealthStatus::Healthy); + } + + #[test] + fn test_job_queue_overload() { + let config = HealthConfig { + max_pending_jobs: 100, + ..Default::default() + }; + let mut monitor = HealthMonitor::new(config); + + let metrics = HealthMetrics { + memory_percent: 50.0, + cpu_percent: 50.0, + disk_percent: 50.0, + pending_jobs: 500, // Way over limit + running_jobs: 2, + evaluations_per_hour: 100, + failures_per_hour: 1, + avg_eval_time_ms: 500, + connected_peers: 5, + block_height: 1000, + epoch: 10, + uptime_secs: 3600, + }; + + let check = monitor.check(metrics); + assert_ne!(check.status, HealthStatus::Healthy); + } + + #[test] + fn test_job_queue_high_warning() { + let config = HealthConfig { + max_pending_jobs: 50, + ..Default::default() + }; + let mut monitor = HealthMonitor::new(config); + + let metrics = HealthMetrics { + pending_jobs: 75, // > max_pending but <= 2 * max_pending + running_jobs: 5, + ..Default::default() + }; + + let status = monitor.check_job_queue(&metrics); + assert_eq!(status.status, HealthStatus::Degraded); + assert!(status.details.contains("75 pending")); + assert!(matches!(status.last_success, Some(_))); + } + + #[test] + fn test_check_evaluations_zero_total() { + let config = HealthConfig::default(); + let mut monitor = HealthMonitor::new(config); + + let metrics = HealthMetrics { + evaluations_per_hour: 0, + failures_per_hour: 0, + avg_eval_time_ms: 1000, + ..Default::default() + }; + + let result = monitor.check_evaluations(&metrics); + assert_eq!(result.status, HealthStatus::Healthy); + assert!(result.details.contains("0/hr")); + assert!(matches!(result.last_success, Some(_))); + assert!(monitor.active_alerts().is_empty()); + } + + #[test] + fn test_check_evaluations_slow_avg_time() { + let config = HealthConfig { + max_eval_time: 1, // seconds (=> 1000 ms threshold) + ..Default::default() + }; + let mut monitor = HealthMonitor::new(config); + + let metrics = HealthMetrics { + evaluations_per_hour: 10, + failures_per_hour: 0, + avg_eval_time_ms: 5_000, // exceeds max_time + ..Default::default() + }; + + let result = monitor.check_evaluations(&metrics); + assert_eq!(result.status, HealthStatus::Degraded); + assert!(result.details.contains("5000ms")); + assert!(matches!(result.last_success, Some(_))); + assert!(!monitor.active_alerts().is_empty()); + } + + #[test] + fn test_clear_failure_acknowledges_alerts() { + let config = HealthConfig { + failure_threshold: 1, + ..Default::default() + }; + let mut monitor = HealthMonitor::new(config); + + monitor.add_alert("job_queue", AlertSeverity::Warning, "Queue high".into()); + assert!(monitor.needs_recovery()); + assert_eq!(monitor.active_alerts().len(), 1); + + monitor.clear_failure("job_queue"); + + assert!(!monitor.needs_recovery()); + assert!(monitor.active_alerts().is_empty()); + } + + #[test] + fn test_worse_status_priority_ordering() { + assert_eq!(HealthMonitor::worse_status(HealthStatus::Healthy, HealthStatus::Degraded), HealthStatus::Degraded); + assert_eq!(HealthMonitor::worse_status(HealthStatus::Degraded, HealthStatus::Unhealthy), HealthStatus::Unhealthy); + assert_eq!(HealthMonitor::worse_status(HealthStatus::Unhealthy, HealthStatus::Critical), HealthStatus::Critical); + assert_eq!(HealthMonitor::worse_status(HealthStatus::Critical, HealthStatus::Healthy), HealthStatus::Critical); + assert_eq!(HealthMonitor::worse_status(HealthStatus::Healthy, HealthStatus::Healthy), HealthStatus::Healthy); + } + + #[test] + fn test_worst_component_tracking() { + let config = HealthConfig { + failure_threshold: 2, + ..Default::default() + }; + let mut monitor = HealthMonitor::new(config); + + monitor.add_alert("cpu", AlertSeverity::Warning, "High CPU".into()); + monitor.add_alert("memory", AlertSeverity::Warning, "High memory".into()); + let first_alert_id = monitor.active_alerts().iter().find(|a| a.component == "memory").unwrap().id; + monitor.acknowledge_alert(first_alert_id); + monitor.add_alert("memory", AlertSeverity::Warning, "Still high".into()); + + let worst = monitor.worst_component().unwrap(); + assert_eq!(worst.0, "memory"); + assert_eq!(worst.1, 2); + } + + #[test] + fn test_alert_acknowledgement() { + let config = HealthConfig::default(); + let mut monitor = HealthMonitor::new(config); + + // Trigger an alert + let metrics = HealthMetrics { + memory_percent: 95.0, // High memory + cpu_percent: 50.0, + disk_percent: 50.0, + pending_jobs: 10, + running_jobs: 2, + evaluations_per_hour: 100, + failures_per_hour: 1, + avg_eval_time_ms: 500, + connected_peers: 5, + block_height: 1000, + epoch: 10, + uptime_secs: 3600, + }; + + monitor.check(metrics); + let alerts = monitor.active_alerts(); + assert!(!alerts.is_empty()); + + if let Some(alert) = alerts.first() { + let alert_id = alert.id; + monitor.acknowledge_alert(alert_id); + let alerts_after = monitor.active_alerts(); + let ack_alerts: Vec<_> = alerts_after + .iter() + .filter(|a| a.id == alert_id) + .collect(); + if !ack_alerts.is_empty() { + assert!(ack_alerts[0].acknowledged); + } + } + } + + #[test] + fn test_health_history() { + let config = HealthConfig::default(); + let mut monitor = HealthMonitor::new(config); + + // Run multiple checks + for i in 0..5 { + let metrics = HealthMetrics { + memory_percent: 50.0 + i as f32, + cpu_percent: 30.0, + disk_percent: 40.0, + pending_jobs: 10, + running_jobs: 2, + evaluations_per_hour: 100, + failures_per_hour: 1, + avg_eval_time_ms: 500, + connected_peers: 5, + block_height: 1000 + i, + epoch: 10, + uptime_secs: 3600, + }; + monitor.check(metrics); + } + + // Verify health monitoring is working + let status = monitor.current_status(); + assert!(matches!(status, HealthStatus::Healthy | HealthStatus::Degraded)); + } + + #[test] + fn test_component_health() { + let component = ComponentHealth { + name: "test_component".into(), + status: HealthStatus::Healthy, + details: "All good".into(), + last_success: Some(Utc::now()), + }; + + assert_eq!(component.name, "test_component"); + assert_eq!(component.status, HealthStatus::Healthy); + assert!(component.last_success.is_some()); + } + + #[test] + fn test_needs_recovery() { + let config = HealthConfig { + failure_threshold: 2, + cpu_warn_percent: 80, + memory_warn_percent: 80, + ..Default::default() + }; + let mut monitor = HealthMonitor::new(config); + + // First failure + let bad_metrics = HealthMetrics { + memory_percent: 99.0, + cpu_percent: 99.0, + disk_percent: 99.0, + pending_jobs: 10000, + running_jobs: 2, + evaluations_per_hour: 10, + failures_per_hour: 50, + avg_eval_time_ms: 5000, + connected_peers: 1, + block_height: 1000, + epoch: 10, + uptime_secs: 3600, + }; + + monitor.check(bad_metrics.clone()); + monitor.check(bad_metrics.clone()); + monitor.check(bad_metrics); + + // Should trigger degraded/critical status + let status = monitor.current_status(); + assert!(matches!(status, HealthStatus::Critical | HealthStatus::Degraded)); + } + + #[test] + fn test_history_is_trimmed_to_100_entries() { + let mut monitor = HealthMonitor::new(HealthConfig::default()); + let metrics = HealthMetrics { + memory_percent: 10.0, + cpu_percent: 10.0, + disk_percent: 10.0, + ..Default::default() + }; + + let first_timestamp = monitor.check(metrics.clone()).timestamp; + for _ in 0..100 { + monitor.check(metrics.clone()); + } + + assert_eq!(monitor.history.len(), 100); + let oldest_timestamp = monitor.history.front().unwrap().timestamp; + assert!(oldest_timestamp > first_timestamp); + } + } diff --git a/crates/subnet-manager/src/recovery.rs b/crates/subnet-manager/src/recovery.rs index 6bcb0e88c..d2afc33eb 100644 --- a/crates/subnet-manager/src/recovery.rs +++ b/crates/subnet-manager/src/recovery.rs @@ -3,8 +3,8 @@ //! Handles automatic recovery from failures. use crate::{ - HealthMonitor, HealthStatus, RecoveryConfig, SnapshotManager, UpdateManager, UpdatePayload, - UpdateTarget, + HealthConfig, HealthMetrics, HealthMonitor, HealthStatus, RecoveryConfig, SnapshotManager, + UpdateManager, UpdatePayload, UpdateTarget, }; use parking_lot::RwLock; use platform_core::ChainState; @@ -313,9 +313,82 @@ impl RecoveryManager { #[cfg(test)] mod tests { use super::*; - use crate::HealthConfig; + use crate::{HealthCheck, HealthConfig}; use tempfile::tempdir; + fn create_manager_with_config( + config: RecoveryConfig, + ) -> ( + RecoveryManager, + Arc>, + Arc>, + tempfile::TempDir, + ) { + let dir = tempdir().unwrap(); + let data_dir = dir.path().to_path_buf(); + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(data_dir.clone(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(data_dir.clone()))); + let manager = RecoveryManager::new(config, data_dir, snapshots.clone(), updates.clone()); + (manager, snapshots, updates, dir) + } + + fn create_aggressive_health_monitor() -> HealthMonitor { + let health_config = HealthConfig { + failure_threshold: 1, + max_pending_jobs: 10, + cpu_warn_percent: 50, + memory_warn_percent: 50, + max_eval_time: 1, + ..Default::default() + }; + HealthMonitor::new(health_config) + } + + fn base_metrics() -> HealthMetrics { + let mut metrics = HealthMetrics::default(); + metrics.connected_peers = 5; + metrics + } + + #[test] + fn test_recovery_action_serialization() { + let actions = vec![ + RecoveryAction::RestartEvaluations, + RecoveryAction::ClearJobQueue, + RecoveryAction::ReconnectPeers, + RecoveryAction::RollbackToSnapshot(uuid::Uuid::new_v4()), + RecoveryAction::HardReset { + reason: "test".into(), + }, + RecoveryAction::Pause, + RecoveryAction::Resume, + ]; + + for action in actions { + let json = serde_json::to_string(&action).unwrap(); + let decoded: RecoveryAction = serde_json::from_str(&json).unwrap(); + let _ = serde_json::to_string(&decoded).unwrap(); + } + } + + #[test] + fn test_recovery_attempt_fields() { + let attempt = RecoveryAttempt { + id: uuid::Uuid::new_v4(), + action: RecoveryAction::RestartEvaluations, + reason: "Test recovery".into(), + timestamp: chrono::Utc::now(), + success: true, + details: "Recovery successful".into(), + }; + + assert!(attempt.success); + assert_eq!(attempt.reason, "Test recovery"); + assert_eq!(attempt.details, "Recovery successful"); + } + #[tokio::test] async fn test_recovery_manager() { let dir = tempdir().unwrap(); @@ -326,20 +399,784 @@ mod tests { )); let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + let manager = RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + assert_eq!(manager.history().len(), 0); + assert!(!manager.is_paused()); + } + + #[tokio::test] + async fn test_pause_resume() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + // Initially not paused + assert!(!manager.is_paused()); + + // Pause + manager.manual_recovery(RecoveryAction::Pause).await; + assert!(manager.is_paused()); + + // Resume + manager.manual_recovery(RecoveryAction::Resume).await; + assert!(!manager.is_paused()); + } + + #[tokio::test] + async fn test_check_and_recover_with_healthy_status() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig { + auto_recover: true, + ..Default::default() + }; + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + let health_config = HealthConfig::default(); + let health = HealthMonitor::new(health_config); + + // With healthy status, no recovery should occur + let attempt = manager.check_and_recover(&health).await; + assert!(attempt.is_none()); + } + + #[tokio::test] + async fn test_check_and_recover_cooldown() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig { + auto_recover: true, + cooldown_secs: 3600, // Long cooldown + ..Default::default() + }; + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + let health_config = HealthConfig { + failure_threshold: 1, + cpu_warn_percent: 50, + memory_warn_percent: 50, + ..Default::default() + }; + let mut health = HealthMonitor::new(health_config); + + // Trigger unhealthy status + let bad_metrics = HealthMetrics { + memory_percent: 99.0, + cpu_percent: 99.0, + disk_percent: 99.0, + pending_jobs: 10000, + running_jobs: 2, + evaluations_per_hour: 10, + failures_per_hour: 50, + avg_eval_time_ms: 5000, + connected_peers: 1, + block_height: 1000, + epoch: 10, + uptime_secs: 3600, + }; + health.check(bad_metrics.clone()); + health.check(bad_metrics.clone()); + + // First recovery should work + let attempt1 = manager.check_and_recover(&health).await; + + // Second recovery immediately after should be blocked by cooldown + let attempt2 = manager.check_and_recover(&health).await; + assert!(attempt2.is_none()); + } + + #[tokio::test] + async fn test_check_and_recover_with_degraded_health() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig { + auto_recover: true, + cooldown_secs: 0, // No cooldown + ..Default::default() + }; + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + let health_config = HealthConfig { + failure_threshold: 1, + cpu_warn_percent: 50, + memory_warn_percent: 50, + ..Default::default() + }; + let mut health = HealthMonitor::new(health_config); + + // Trigger degraded status + let bad_metrics = HealthMetrics { + memory_percent: 80.0, + cpu_percent: 80.0, + disk_percent: 50.0, + pending_jobs: 100, + running_jobs: 2, + evaluations_per_hour: 50, + failures_per_hour: 10, + avg_eval_time_ms: 1000, + connected_peers: 3, + block_height: 1000, + epoch: 10, + uptime_secs: 3600, + }; + health.check(bad_metrics); + + // Recovery should occur + let attempt = manager.check_and_recover(&health).await; + // May or may not trigger depending on health implementation + // Just verify it doesn't panic + } + + #[tokio::test] + async fn test_check_and_recover_skips_when_paused() { + let config = RecoveryConfig::default(); + let (mut manager, _, _, _dir) = create_manager_with_config(config); + + manager.manual_recovery(RecoveryAction::Pause).await; + assert!(manager.is_paused()); + + let health = HealthMonitor::new(HealthConfig::default()); + let attempt = manager.check_and_recover(&health).await; + assert!(attempt.is_none()); + } + + #[tokio::test] + async fn test_check_and_recover_rolls_back_on_attempt_limit() { + let config = RecoveryConfig { + auto_recover: true, + max_attempts: 0, + cooldown_secs: 0, + rollback_on_failure: true, + pause_on_critical: false, + }; + let (mut manager, snapshots, _, _dir) = create_manager_with_config(config); + + { + let keypair = platform_core::Keypair::generate(); + let sudo_key = keypair.hotkey(); + let chain_state = ChainState::new(sudo_key, platform_core::NetworkConfig::default()); + let mut snap_mgr = snapshots.write(); + snap_mgr + .create_snapshot("limit", 100, 1, &chain_state, "limit", false) + .unwrap(); + } + + let mut health = create_aggressive_health_monitor(); + let mut metrics = base_metrics(); + metrics.pending_jobs = 100; + metrics.running_jobs = 1; + health.check(metrics); + assert_eq!(health.current_status(), HealthStatus::Unhealthy); + + let attempt = manager + .check_and_recover(&health) + .await + .expect("expected rollback"); + assert!(matches!(attempt.action, RecoveryAction::RollbackToSnapshot(_))); + } + + #[tokio::test] + async fn test_check_and_recover_pauses_on_attempt_limit() { + let config = RecoveryConfig { + auto_recover: true, + max_attempts: 0, + cooldown_secs: 0, + rollback_on_failure: false, + pause_on_critical: true, + }; + let (mut manager, _, _, _dir) = create_manager_with_config(config); + + let mut health = create_aggressive_health_monitor(); + let mut metrics = base_metrics(); + metrics.memory_percent = 90.0; + health.check(metrics); + + let attempt = manager + .check_and_recover(&health) + .await + .expect("expected pause"); + assert!(matches!(attempt.action, RecoveryAction::Pause)); + assert!(manager.is_paused()); + } + + #[tokio::test] + async fn test_check_and_recover_unhealthy_branch_actions() { + fn acknowledge_component(health: &mut HealthMonitor, component: &str) { + if let Some(alert) = health + .active_alerts() + .into_iter() + .find(|alert| alert.component == component) + { + health.acknowledge_alert(alert.id); + } + } + + async fn run_case(prepare: F) -> RecoveryAction + where + F: FnOnce(&mut HealthMonitor), + { + let config = RecoveryConfig { + auto_recover: true, + max_attempts: 5, + cooldown_secs: 0, + rollback_on_failure: false, + pause_on_critical: false, + }; + let (mut manager, _, _, _dir) = create_manager_with_config(config); + let mut health = create_aggressive_health_monitor(); + prepare(&mut health); + assert_eq!(health.current_status(), HealthStatus::Unhealthy); + manager + .check_and_recover(&health) + .await + .expect("expected action") + .action + } + + let job_queue_action = run_case(|health| { + let mut metrics = base_metrics(); + metrics.pending_jobs = 100; + metrics.running_jobs = 1; + health.check(metrics); + }) + .await; + assert!(matches!(job_queue_action, RecoveryAction::ClearJobQueue)); + + let network_action = run_case(|health| { + let mut first = base_metrics(); + first.connected_peers = 1; + health.check(first); + + acknowledge_component(health, "network"); + + let mut second = base_metrics(); + second.connected_peers = 1; + health.check(second); + + let mut third = base_metrics(); + third.connected_peers = 1; + third.pending_jobs = 100; + third.running_jobs = 1; + health.check(third); + }) + .await; + assert!(matches!(network_action, RecoveryAction::ReconnectPeers)); + + let evaluations_action = run_case(|health| { + let mut first = base_metrics(); + first.avg_eval_time_ms = 5_000; + health.check(first); + + acknowledge_component(health, "evaluations"); + + let mut second = base_metrics(); + second.avg_eval_time_ms = 5_000; + health.check(second); + + let mut third = base_metrics(); + third.avg_eval_time_ms = 5_000; + third.pending_jobs = 100; + third.running_jobs = 1; + health.check(third); + }) + .await; + assert!(matches!(evaluations_action, RecoveryAction::RestartEvaluations)); + + let fallback_action = run_case(|health| { + let mut first = base_metrics(); + first.memory_percent = 90.0; + health.check(first); + + acknowledge_component(health, "memory"); + + let mut second = base_metrics(); + second.memory_percent = 90.0; + health.check(second); + + let mut third = base_metrics(); + third.memory_percent = 90.0; + third.pending_jobs = 100; + third.running_jobs = 1; + health.check(third); + }) + .await; + assert!(matches!(fallback_action, RecoveryAction::RestartEvaluations)); + } + + #[tokio::test] + async fn test_check_and_recover_degraded_branch() { + let config = RecoveryConfig { + auto_recover: true, + cooldown_secs: 0, + ..Default::default() + }; + let (mut manager, _, _, _dir) = create_manager_with_config(config); + let mut health = create_aggressive_health_monitor(); + + let mut metrics = base_metrics(); + metrics.pending_jobs = 11; // just over the degraded threshold + metrics.running_jobs = 1; + health.check(metrics); + assert_eq!(health.current_status(), HealthStatus::Degraded); + + let attempt = manager + .check_and_recover(&health) + .await + .expect("expected degraded recovery"); + assert!(matches!(attempt.action, RecoveryAction::RestartEvaluations)); + } + + #[tokio::test] + async fn test_check_and_recover_degraded_branch_from_history() { + let config = RecoveryConfig { + auto_recover: true, + cooldown_secs: 0, + ..Default::default() + }; + let (mut manager, _, _, _dir) = create_manager_with_config(config); + let mut health = create_aggressive_health_monitor(); + + health + .test_failure_counts_mut() + .insert("memory".into(), 1); + health.test_history_mut().push_back(HealthCheck { + timestamp: chrono::Utc::now(), + status: HealthStatus::Degraded, + components: Vec::new(), + alerts: Vec::new(), + metrics: HealthMetrics::default(), + }); + + let attempt = manager + .check_and_recover(&health) + .await + .expect("expected degraded recovery from history"); + assert!(matches!(attempt.action, RecoveryAction::RestartEvaluations)); + } + + #[tokio::test] + async fn test_check_and_recover_healthy_branch_returns_none() { + let config = RecoveryConfig { + auto_recover: true, + cooldown_secs: 0, + ..Default::default() + }; + let (mut manager, _, _, _dir) = create_manager_with_config(config); + let mut health = create_aggressive_health_monitor(); + + health + .test_failure_counts_mut() + .insert("job_queue".into(), 5); + health.test_history_mut().push_back(HealthCheck { + timestamp: chrono::Utc::now(), + status: HealthStatus::Healthy, + components: Vec::new(), + alerts: Vec::new(), + metrics: HealthMetrics::default(), + }); + + let attempt = manager.check_and_recover(&health).await; + assert!(attempt.is_none()); + } + + #[tokio::test] + async fn test_rollback_to_snapshot_recovery() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + // Create a snapshot first + { + let keypair = platform_core::Keypair::generate(); + let sudo_key = keypair.hotkey(); + let chain_state = ChainState::new(sudo_key, platform_core::NetworkConfig::default()); + + let mut snap_mgr = snapshots.write(); + snap_mgr + .create_snapshot( + "test", + 1000, + 10, + &chain_state, + "test reason", + false, + ) + .unwrap(); + } + + let snapshot_id = { + let snap_mgr = snapshots.read(); + snap_mgr.list_snapshots()[0].id + }; + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + let attempt = manager + .manual_recovery(RecoveryAction::RollbackToSnapshot(snapshot_id)) + .await; + + // Rollback might succeed or fail, just verify it runs + assert!(attempt.details.contains("Rolled back") || attempt.details.contains("failed")); + } + + #[tokio::test] + async fn test_rollback_to_last_snapshot() { + let config = RecoveryConfig::default(); + let (mut manager, snapshots, _, _dir) = create_manager_with_config(config); + + // No snapshots yet, expect no action + assert!(manager.rollback_to_last_snapshot().await.is_none()); + + // Create two snapshots so the latest can be selected + { + let keypair = platform_core::Keypair::generate(); + let sudo_key = keypair.hotkey(); + let chain_state = ChainState::new(sudo_key, platform_core::NetworkConfig::default()); + let mut snap_mgr = snapshots.write(); + snap_mgr + .create_snapshot("first", 10, 1, &chain_state, "first", false) + .unwrap(); + snap_mgr + .create_snapshot("second", 20, 2, &chain_state, "second", false) + .unwrap(); + } + + let latest_id = { + let snap_mgr = snapshots.read(); + snap_mgr.latest_snapshot().unwrap().id + }; + + let attempt = manager + .rollback_to_last_snapshot() + .await + .expect("expected rollback attempt"); + + match attempt.action { + RecoveryAction::RollbackToSnapshot(id) => assert_eq!(id, latest_id), + other => panic!("unexpected action: {:?}", other), + } + + assert_eq!(manager.history().len(), 1); + } + + #[tokio::test] + async fn test_current_attempts_tracking() { + let config = RecoveryConfig { + auto_recover: true, + cooldown_secs: 0, + ..Default::default() + }; + let (mut manager, _, _, _dir) = create_manager_with_config(config); + let mut health = create_aggressive_health_monitor(); + + assert_eq!(manager.current_attempts(), 0); + + // First unhealthy check increments attempts + let mut unhealthy = base_metrics(); + unhealthy.pending_jobs = 100; + unhealthy.running_jobs = 1; + health.check(unhealthy); + assert!(health.needs_recovery()); + + manager + .check_and_recover(&health) + .await + .expect("expected recovery attempt"); + assert_eq!(manager.current_attempts(), 1); + + // Healthy metrics reset attempts counter + let healthy_metrics = base_metrics(); + health.check(healthy_metrics); + assert!(!health.needs_recovery()); + assert!(manager.check_and_recover(&health).await.is_none()); + assert_eq!(manager.current_attempts(), 0); + + // Another unhealthy event increments again + let mut second_unhealthy = base_metrics(); + second_unhealthy.pending_jobs = 150; + second_unhealthy.running_jobs = 1; + health.check(second_unhealthy); + + manager + .check_and_recover(&health) + .await + .expect("expected second recovery attempt"); + assert_eq!(manager.current_attempts(), 1); + } + + #[tokio::test] + async fn test_max_recovery_attempts() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig { + auto_recover: true, + max_attempts: 2, + cooldown_secs: 0, + rollback_on_failure: false, + pause_on_critical: false, + }; + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + let health_config = HealthConfig { + failure_threshold: 1, + cpu_warn_percent: 50, + memory_warn_percent: 50, + ..Default::default() + }; + let mut health = HealthMonitor::new(health_config); + + // Trigger unhealthy status + let bad_metrics = HealthMetrics { + memory_percent: 99.0, + cpu_percent: 99.0, + disk_percent: 99.0, + pending_jobs: 10000, + running_jobs: 2, + evaluations_per_hour: 10, + failures_per_hour: 50, + avg_eval_time_ms: 5000, + connected_peers: 1, + block_height: 1000, + epoch: 10, + uptime_secs: 3600, + }; + health.check(bad_metrics.clone()); + health.check(bad_metrics.clone()); + + // First recovery attempt + let attempt1 = manager.check_and_recover(&health).await; + assert!(attempt1.is_some(), "First attempt should execute a recovery action"); + assert_eq!(manager.current_attempts(), 1); + + // Second recovery attempt + let attempt2 = manager.check_and_recover(&health).await; + assert!(attempt2.is_some(), "Second attempt should still run while under the limit"); + assert_eq!(manager.current_attempts(), 2); + + // Third attempt should be limited (config disables fallback actions) + let attempt3 = manager.check_and_recover(&health).await; + assert!(attempt3.is_none(), "Further attempts should be skipped once the max is reached"); + assert_eq!(manager.current_attempts(), 2, "Attempt counter should not increase past the limit"); + } + + #[tokio::test] + async fn test_restart_evaluations_recovery() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + let attempt = manager + .manual_recovery(RecoveryAction::RestartEvaluations) + .await; + assert!(attempt.success); + assert!(attempt.details.contains("restart")); + } + + #[tokio::test] + async fn test_clear_job_queue_recovery() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + let mut manager = RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); - // Manual recovery let attempt = manager + .manual_recovery(RecoveryAction::ClearJobQueue) + .await; + assert!(attempt.success); + assert!(attempt.details.contains("cleared")); + } + + #[tokio::test] + async fn test_reconnect_peers_recovery() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + let attempt = manager + .manual_recovery(RecoveryAction::ReconnectPeers) + .await; + assert!(attempt.success); + } + + #[tokio::test] + async fn test_recovery_history_tracking() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + // Perform multiple recoveries + manager .manual_recovery(RecoveryAction::RestartEvaluations) .await; + manager.manual_recovery(RecoveryAction::ClearJobQueue).await; + manager.manual_recovery(RecoveryAction::ReconnectPeers).await; + + let history = manager.history(); + assert_eq!(history.len(), 3); + } + + #[tokio::test] + async fn test_recovery_config_custom() { + let config = RecoveryConfig { + auto_recover: false, + max_attempts: 5, + cooldown_secs: 120, + rollback_on_failure: false, + pause_on_critical: false, + }; + + assert!(!config.auto_recover); + assert_eq!(config.max_attempts, 5); + assert_eq!(config.cooldown_secs, 120); + assert!(!config.rollback_on_failure); + assert!(!config.pause_on_critical); + } + + #[tokio::test] + async fn test_hard_reset_recovery() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + let attempt = manager + .manual_recovery(RecoveryAction::HardReset { + reason: "Test hard reset".into(), + }) + .await; + assert!(attempt.success); + assert!(attempt.details.contains("reset")); + } + + #[tokio::test] + async fn test_recovery_with_different_health_statuses() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig { + auto_recover: true, + cooldown_secs: 0, + ..Default::default() + }; + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + // Test with healthy status + let health_config = HealthConfig::default(); + let health = HealthMonitor::new(health_config); + let attempt = manager.check_and_recover(&health).await; + assert!(attempt.is_none()); + } + + #[tokio::test] + async fn test_manual_recovery_updates_history() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + assert_eq!(manager.history().len(), 0); + manager + .manual_recovery(RecoveryAction::RestartEvaluations) + .await; assert_eq!(manager.history().len(), 1); + + manager.manual_recovery(RecoveryAction::ClearJobQueue).await; + assert_eq!(manager.history().len(), 2); + } + + #[test] + fn test_recovery_config_defaults() { + let config = RecoveryConfig::default(); + assert!(config.auto_recover); + assert!(config.max_attempts > 0); + assert!(config.cooldown_secs > 0); } #[tokio::test] - async fn test_pause_resume() { + async fn test_pause_when_already_paused() { let dir = tempdir().unwrap(); let config = RecoveryConfig::default(); @@ -351,12 +1188,68 @@ mod tests { let mut manager = RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); - // Pause manager.manual_recovery(RecoveryAction::Pause).await; assert!(manager.is_paused()); - // Resume - manager.resume_subnet().await; + // Pause again + manager.manual_recovery(RecoveryAction::Pause).await; + assert!(manager.is_paused()); + } + + #[tokio::test] + async fn test_resume_when_not_paused() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + assert!(!manager.is_paused()); + + manager.manual_recovery(RecoveryAction::Resume).await; + assert!(!manager.is_paused()); + } + + #[tokio::test] + async fn test_rollback_to_invalid_snapshot() { + let dir = tempdir().unwrap(); + let config = RecoveryConfig::default(); + + let snapshots = Arc::new(RwLock::new( + SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(), + )); + let updates = Arc::new(RwLock::new(UpdateManager::new(dir.path().to_path_buf()))); + + let mut manager = + RecoveryManager::new(config, dir.path().to_path_buf(), snapshots, updates); + + let attempt = manager + .manual_recovery(RecoveryAction::RollbackToSnapshot(uuid::Uuid::new_v4())) + .await; + + assert!(!attempt.success); + assert!(attempt.details.contains("failed")); + } + + #[test] + fn test_recovery_attempt_serialization() { + let attempt = RecoveryAttempt { + id: uuid::Uuid::new_v4(), + action: RecoveryAction::ClearJobQueue, + reason: "test".into(), + timestamp: chrono::Utc::now(), + success: true, + details: "details".into(), + }; + + let json = serde_json::to_string(&attempt).unwrap(); + let decoded: RecoveryAttempt = serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.reason, "test"); + assert!(decoded.success); } } diff --git a/crates/subnet-manager/src/snapshot.rs b/crates/subnet-manager/src/snapshot.rs index f21eb5927..afd163866 100644 --- a/crates/subnet-manager/src/snapshot.rs +++ b/crates/subnet-manager/src/snapshot.rs @@ -315,14 +315,21 @@ impl SnapshotManager { /// Serialize a directory to bytes (simplified) fn serialize_directory(path: &std::path::Path, output: &mut Vec) -> anyhow::Result<()> { // For now, just store the path - in production, would tar the directory + // NOTE: deserialize_directory writes the stored bytes back as raw data. + // This asymmetry is intentional for now, so document it until full tar support exists. let path_str = path.to_string_lossy(); output.extend_from_slice(path_str.as_bytes()); Ok(()) } /// Deserialize bytes to a directory (simplified) - fn deserialize_directory(_path: &PathBuf, _data: &[u8]) -> anyhow::Result<()> { - // For now, no-op - in production, would untar the directory + fn deserialize_directory(path: &PathBuf, data: &[u8]) -> anyhow::Result<()> { + // Placeholder implementation: recreate directory and store raw bytes + fs::create_dir_all(path)?; + if !data.is_empty() { + let file_path = path.join("data.bin"); + fs::write(file_path, data)?; + } Ok(()) } } @@ -333,6 +340,178 @@ mod tests { use platform_core::{Keypair, NetworkConfig}; use tempfile::tempdir; + #[test] + fn test_load_snapshot_index_success() { + let dir = tempdir().unwrap(); + let snapshots_dir = dir.path().join("snapshots"); + std::fs::create_dir_all(&snapshots_dir).unwrap(); + + let meta = SnapshotMeta { + id: uuid::Uuid::new_v4(), + name: "indexed".into(), + block_height: 123, + epoch: 4, + state_hash: "abc123".into(), + created_at: Utc::now(), + size_bytes: 42, + auto: false, + reason: "preloaded".into(), + }; + + let index_path = snapshots_dir.join("index.json"); + let content = serde_json::to_string_pretty(&vec![meta.clone()]).unwrap(); + std::fs::write(index_path, content).unwrap(); + + let manager = SnapshotManager::new(dir.path().to_path_buf(), 5).unwrap(); + let snapshots = manager.list_snapshots(); + assert_eq!(snapshots.len(), 1); + let loaded = &snapshots[0]; + assert_eq!(loaded.id, meta.id); + assert_eq!(loaded.name, "indexed"); + assert_eq!(loaded.block_height, 123); + assert_eq!(loaded.reason, "preloaded"); + } + + #[test] + fn test_create_snapshot_reads_config_file() { + let dir = tempdir().unwrap(); + let config_path = dir.path().join("subnet_config.json"); + std::fs::write(&config_path, b"{\"dummy\":true}").unwrap(); + + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + let id = manager + .create_snapshot("config_snapshot", 50, 2, &state, "test", false) + .unwrap(); + + let snapshot = manager.restore_snapshot(id).unwrap(); + assert_eq!(snapshot.config, b"{\"dummy\":true}".to_vec()); + } + + #[test] + fn test_restore_snapshot_detects_hash_mismatch() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + let id = manager + .create_snapshot("corruptible", 42, 1, &state, "test", false) + .unwrap(); + + // Corrupt the stored snapshot by altering the recorded hash + let snapshot_path = dir + .path() + .join("snapshots") + .join(format!("{}.snapshot", id)); + let bytes = std::fs::read(&snapshot_path).unwrap(); + let mut snapshot: Snapshot = bincode::deserialize(&bytes).unwrap(); + snapshot.meta.state_hash = "bad-hash".into(); + let corrupt = bincode::serialize(&snapshot).unwrap(); + std::fs::write(&snapshot_path, corrupt).unwrap(); + + let result = manager.restore_snapshot(id); + assert!(result.is_err()); + assert!(format!("{}", result.unwrap_err()).contains("hash mismatch")); + } + + #[test] + fn test_apply_snapshot_restores_config_and_challenges() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(); + + // Prepare snapshot contents + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + let mut snapshot = Snapshot { + meta: SnapshotMeta { + id: uuid::Uuid::new_v4(), + name: "apply_test".into(), + block_height: 1, + epoch: 1, + state_hash: "hash".into(), + created_at: Utc::now(), + size_bytes: 0, + auto: false, + reason: "test".into(), + }, + chain_state: bincode::serialize(&state).unwrap(), + challenge_data: { + let mut map = std::collections::HashMap::new(); + map.insert("challengeA".into(), b"placeholder".to_vec()); + map + }, + config: br#"{"foo":"bar"}"#.to_vec(), + }; + + // Save the snapshot file and metadata index manually + let snapshot_path = manager + .snapshots_dir + .join(format!("{}.snapshot", snapshot.meta.id)); + snapshot.meta.state_hash = SnapshotManager::compute_hash(&snapshot.chain_state); + let bytes = bincode::serialize(&snapshot).unwrap(); + std::fs::write(&snapshot_path, &bytes).unwrap(); + snapshot.meta.size_bytes = bytes.len() as u64; + manager.snapshots.push(snapshot.meta.clone()); + manager.save_snapshot_index().unwrap(); + + let restored = manager.restore_snapshot(snapshot.meta.id).unwrap(); + let new_state = manager.apply_snapshot(&restored).unwrap(); + assert_eq!(new_state.block_height, state.block_height); + + // Config file should exist with snapshot contents + let config_path = manager.data_dir.join("subnet_config.json"); + let config_contents = std::fs::read(&config_path).unwrap(); + assert_eq!(config_contents, br#"{"foo":"bar"}"#); + + // Challenge directory should be recreated with db contents + let challenge_dir = manager + .data_dir + .join("challenges") + .join("challengeA"); + let db_path = challenge_dir.join("db"); + let data_file = db_path.join("data.bin"); + assert!(db_path.exists()); + assert_eq!(std::fs::read(data_file).unwrap(), b"placeholder".to_vec()); + } + + #[test] + fn test_deserialize_directory_creates_structure() { + let dir = tempdir().unwrap(); + let target = dir.path().join("nested").join("db"); + + SnapshotManager::deserialize_directory(&target, b"hello").unwrap(); + + let data_path = target.join("data.bin"); + assert!(target.exists()); + assert!(data_path.exists()); + assert_eq!(std::fs::read(data_path).unwrap(), b"hello"); + } + + #[test] + fn test_snapshot_meta_fields() { + let meta = SnapshotMeta { + id: uuid::Uuid::new_v4(), + name: "test_snapshot".into(), + block_height: 1000, + epoch: 10, + state_hash: "abc123".into(), + created_at: Utc::now(), + size_bytes: 1024, + auto: true, + reason: "Auto snapshot".into(), + }; + + assert_eq!(meta.name, "test_snapshot"); + assert_eq!(meta.block_height, 1000); + assert_eq!(meta.epoch, 10); + assert!(meta.auto); + } + #[test] fn test_snapshot_manager() { let dir = tempdir().unwrap(); @@ -371,4 +550,274 @@ mod tests { // Should only keep 2 assert_eq!(manager.list_snapshots().len(), 2); } + + #[test] + fn test_manual_snapshot() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 5).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + let id = manager + .create_snapshot("manual_backup", 500, 5, &state, "Manual backup before update", false) + .unwrap(); + + let snapshots = manager.list_snapshots(); + assert_eq!(snapshots.len(), 1); + + let meta = &snapshots[0]; + assert!(!meta.auto); + assert_eq!(meta.reason, "Manual backup before update"); + } + + #[test] + fn test_auto_snapshot() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 5).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + let id = manager + .create_snapshot("auto_snapshot_epoch_10", 1000, 10, &state, "Automatic snapshot", true) + .unwrap(); + + let snapshots = manager.list_snapshots(); + let meta = &snapshots[0]; + assert!(meta.auto); + } + + #[test] + fn test_snapshot_with_challenge_data() { + let dir = tempdir().unwrap(); + + // Create a challenges directory with some data + let challenges_dir = dir.path().join("challenges"); + std::fs::create_dir_all(&challenges_dir).unwrap(); + + let challenge_dir = challenges_dir.join("test_challenge"); + std::fs::create_dir_all(&challenge_dir).unwrap(); + let db_dir = challenge_dir.join("db"); + std::fs::create_dir_all(&db_dir).unwrap(); + std::fs::write(db_dir.join("test.dat"), b"test data").unwrap(); + + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + let id = manager + .create_snapshot("with_challenges", 100, 1, &state, "test", false) + .unwrap(); + + let snapshot = manager.restore_snapshot(id).unwrap(); + assert!(!snapshot.challenge_data.is_empty()); + } + + #[test] + fn test_snapshot_list_ordering() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 10).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + // Create multiple snapshots + for i in 0..5 { + manager + .create_snapshot(&format!("snap_{}", i), i * 100, i, &state, "test", true) + .unwrap(); + } + + let snapshots = manager.list_snapshots(); + assert_eq!(snapshots.len(), 5); + + // Verify they're tracked + for i in 0..5 { + assert_eq!(snapshots[i].block_height, (i * 100) as u64); + } + } + + #[test] + fn test_snapshot_size_tracking() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + let id = manager + .create_snapshot("size_test", 100, 1, &state, "test", false) + .unwrap(); + + let snapshots = manager.list_snapshots(); + let meta = &snapshots[0]; + + // Size should be set after saving + assert!(meta.size_bytes > 0); + } + + #[test] + fn test_snapshot_hash_computation() { + let data = b"test data for hashing"; + let hash = SnapshotManager::compute_hash(data); + + // SHA256 hash should be 64 hex characters + assert_eq!(hash.len(), 64); + + // Same data should produce same hash + let hash2 = SnapshotManager::compute_hash(data); + assert_eq!(hash, hash2); + } + + #[test] + fn test_delete_snapshot() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 5).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + let id = manager + .create_snapshot("to_delete", 100, 1, &state, "test", false) + .unwrap(); + + assert_eq!(manager.list_snapshots().len(), 1); + + manager.delete_snapshot(id).unwrap(); + assert_eq!(manager.list_snapshots().len(), 0); + } + + #[test] + fn test_latest_snapshot() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 5).unwrap(); + + assert!(manager.latest_snapshot().is_none()); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + manager + .create_snapshot("snap1", 100, 1, &state, "test", false) + .unwrap(); + manager + .create_snapshot("snap2", 200, 2, &state, "test", false) + .unwrap(); + + let latest = manager.latest_snapshot().unwrap(); + assert_eq!(latest.name, "snap2"); + assert_eq!(latest.block_height, 200); + } + + #[test] + fn test_get_snapshot() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 5).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + let id = manager + .create_snapshot("test_snap", 100, 1, &state, "test", false) + .unwrap(); + + let snapshot = manager.get_snapshot(id).unwrap(); + assert_eq!(snapshot.name, "test_snap"); + assert_eq!(snapshot.block_height, 100); + } + + #[test] + fn test_get_nonexistent_snapshot() { + let dir = tempdir().unwrap(); + let manager = SnapshotManager::new(dir.path().to_path_buf(), 5).unwrap(); + + let result = manager.get_snapshot(uuid::Uuid::new_v4()); + assert!(result.is_none()); + } + + #[test] + fn test_snapshot_retention_limit() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 3).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + // Create 5 snapshots with retention limit of 3 + for i in 0..5 { + manager + .create_snapshot( + &format!("snap{}", i), + (i + 1) * 100, + i + 1, + &state, + "test", + false, + ) + .unwrap(); + } + + // Should only keep the 3 most recent + let snapshots = manager.list_snapshots(); + assert_eq!(snapshots.len(), 3); + + // Just verify we have exactly 3 snapshots (pruning worked) + // The exact order/names may vary based on pruning implementation + } + + #[test] + fn test_snapshot_metadata_fields() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 5).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + let id = manager + .create_snapshot("metadata_test", 12345, 67, &state, "test reason", true) + .unwrap(); + + let snapshot = manager.get_snapshot(id).unwrap(); + assert_eq!(snapshot.name, "metadata_test"); + assert_eq!(snapshot.block_height, 12345); + assert_eq!(snapshot.epoch, 67); + assert_eq!(snapshot.reason, "test reason"); + assert!(snapshot.auto); + assert!(snapshot.size_bytes > 0); + } + + #[test] + fn test_delete_nonexistent_snapshot() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 5).unwrap(); + + // Deleting nonexistent snapshot should succeed (no-op) + let result = manager.delete_snapshot(uuid::Uuid::new_v4()); + assert!(result.is_ok()); + } + + #[test] + fn test_snapshot_ordering_by_time() { + let dir = tempdir().unwrap(); + let mut manager = SnapshotManager::new(dir.path().to_path_buf(), 10).unwrap(); + + let kp = Keypair::generate(); + let state = ChainState::new(kp.hotkey(), NetworkConfig::default()); + + // Create snapshots in order + for i in 0..3 { + manager + .create_snapshot(&format!("snap{}", i), (i + 1) * 100, i + 1, &state, "test", false) + .unwrap(); + } + + let snapshots = manager.list_snapshots(); + assert_eq!(snapshots.len(), 3); + + // Verify deterministic ordering without relying on timing jitter + let names: Vec<&str> = snapshots.iter().map(|s| s.name.as_str()).collect(); + assert_eq!(names, vec!["snap0", "snap1", "snap2"]); + } } diff --git a/crates/subnet-manager/src/update.rs b/crates/subnet-manager/src/update.rs index 9404ef972..7f0b9d99a 100644 --- a/crates/subnet-manager/src/update.rs +++ b/crates/subnet-manager/src/update.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use tracing::{error, info, warn}; /// Update types -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub enum UpdateType { /// Hot update - applied without restart Hot, @@ -412,6 +412,76 @@ mod tests { use super::*; use tempfile::tempdir; + #[test] + fn test_update_type_variants() { + let types = vec![ + UpdateType::Hot, + UpdateType::Warm, + UpdateType::Cold, + UpdateType::HardReset, + ]; + + for update_type in types { + let json = serde_json::to_string(&update_type).unwrap(); + let decoded: UpdateType = serde_json::from_str(&json).unwrap(); + // Verify it deserializes + match decoded { + UpdateType::Hot | UpdateType::Warm | UpdateType::Cold | UpdateType::HardReset => {} + } + } + } + + #[test] + fn test_update_status_variants() { + let statuses = vec![ + UpdateStatus::Pending, + UpdateStatus::Downloading, + UpdateStatus::Validating, + UpdateStatus::Applying, + UpdateStatus::Applied, + UpdateStatus::Failed("error".into()), + UpdateStatus::RolledBack, + ]; + + for status in statuses { + let json = serde_json::to_string(&status).unwrap(); + let decoded: UpdateStatus = serde_json::from_str(&json).unwrap(); + // Verify it deserializes + match decoded { + UpdateStatus::Pending + | UpdateStatus::Downloading + | UpdateStatus::Validating + | UpdateStatus::Applying + | UpdateStatus::Applied + | UpdateStatus::Failed(_) + | UpdateStatus::RolledBack => {} + } + } + } + + #[test] + fn test_update_target_variants() { + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + let targets = vec![ + UpdateTarget::Challenge(challenge_id), + UpdateTarget::Config, + UpdateTarget::AllChallenges, + UpdateTarget::Validators, + ]; + + for target in targets { + let json = serde_json::to_string(&target).unwrap(); + let decoded: UpdateTarget = serde_json::from_str(&json).unwrap(); + // Verify it deserializes + match decoded { + UpdateTarget::Challenge(_) + | UpdateTarget::Config + | UpdateTarget::AllChallenges + | UpdateTarget::Validators => {} + } + } + } + #[tokio::test] async fn test_update_manager() { let dir = tempdir().unwrap(); @@ -444,5 +514,835 @@ mod tests { let data = b"hello world"; let hash = UpdateManager::compute_hash(data); assert_eq!(hash.len(), 64); // SHA256 = 32 bytes = 64 hex chars + + // Same input should produce same hash + let hash2 = UpdateManager::compute_hash(data); + assert_eq!(hash, hash2); + + // Different input should produce different hash + let hash3 = UpdateManager::compute_hash(b"different"); + assert_ne!(hash, hash3); + } + + #[tokio::test] + async fn test_wasm_challenge_update() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let wasm_bytes = vec![0u8; 100]; + let wasm_hash = UpdateManager::compute_hash(&wasm_bytes); + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + + let config = ChallengeConfig { + id: challenge_id.0.to_string(), + name: "Test Challenge".into(), + wasm_hash: wasm_hash.clone(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }; + + let id = manager.queue_update( + UpdateTarget::Challenge(challenge_id), + UpdatePayload::WasmChallenge { + wasm_bytes, + wasm_hash, + config, + }, + "1.0.0".into(), + ); + + assert_eq!(manager.pending_count(), 1); + } + + #[tokio::test] + async fn test_validators_update() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let add = vec![platform_core::Hotkey([1u8; 32]), platform_core::Hotkey([2u8; 32])]; + let remove = vec![platform_core::Hotkey([3u8; 32])]; + + let id = manager.queue_update( + UpdateTarget::Validators, + UpdatePayload::Validators { + add: add.clone(), + remove: remove.clone(), + }, + "1.0.0".into(), + ); + + assert_eq!(manager.pending_count(), 1); + } + + #[tokio::test] + async fn test_hard_reset_update() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let id = manager.queue_update( + UpdateTarget::Config, + UpdatePayload::HardReset { + reason: "Test reset".into(), + preserve_validators: true, + new_config: None, + }, + "1.0.0".into(), + ); + + assert_eq!(manager.pending_count(), 1); + + let updates = manager.pending.read(); + assert_eq!(updates[0].update_type, UpdateType::HardReset); + } + + #[tokio::test] + async fn test_multiple_updates_processing() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + // Queue multiple updates + for i in 0..3 { + let config = SubnetConfig { + version: format!("0.{}.0", i + 1), + ..Default::default() + }; + manager.queue_update( + UpdateTarget::Config, + UpdatePayload::Config(config), + format!("0.{}.0", i + 1), + ); + } + + assert_eq!(manager.pending_count(), 3); + + // Process all updates + let applied = manager.process_updates().await.unwrap(); + assert_eq!(applied.len(), 3); + assert_eq!(manager.pending_count(), 0); + } + + #[tokio::test] + async fn test_update_already_in_progress() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + *manager.updating.write() = true; + + let result = manager.process_updates().await; + assert!(result.is_err()); + + match result { + Err(UpdateError::AlreadyUpdating) => {} + _ => panic!("Expected AlreadyUpdating error"), + } + } + + #[test] + fn test_update_creation_timestamps() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let config = SubnetConfig::default(); + let id = manager.queue_update( + UpdateTarget::Config, + UpdatePayload::Config(config), + "1.0.0".into(), + ); + + let pending = manager.pending.read(); + let update = pending.iter().find(|u| u.id == id).unwrap(); + + assert!(update.applied_at.is_none()); + assert!(update.rollback_data.is_none()); + } + + #[test] + fn test_current_version() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + assert_eq!(manager.current_version(), "0.1.0"); + } + + #[test] + fn test_is_updating_flag() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + assert!(!manager.is_updating()); + + *manager.updating.write() = true; + assert!(manager.is_updating()); + } + + #[test] + fn test_update_payload_variants() { + let wasm_payload = UpdatePayload::WasmChallenge { + wasm_bytes: vec![0u8; 10], + wasm_hash: "hash".into(), + config: ChallengeConfig { + id: "test".into(), + name: "Test".into(), + wasm_hash: "hash".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }, + }; + + let config_payload = UpdatePayload::Config(SubnetConfig::default()); + let validators_payload = UpdatePayload::Validators { + add: vec![], + remove: vec![], + }; + let reset_payload = UpdatePayload::HardReset { + reason: "test".into(), + preserve_validators: false, + new_config: None, + }; + + // Verify they all serialize/deserialize + for payload in vec![wasm_payload, config_payload, validators_payload, reset_payload] { + let json = serde_json::to_string(&payload).unwrap(); + let _decoded: UpdatePayload = serde_json::from_str(&json).unwrap(); + } + } + + #[test] + fn test_update_status_serialization() { + let statuses = vec![ + UpdateStatus::Pending, + UpdateStatus::Downloading, + UpdateStatus::Validating, + UpdateStatus::Applying, + UpdateStatus::Applied, + UpdateStatus::Failed("test error".into()), + UpdateStatus::RolledBack, + ]; + + for status in statuses { + let json = serde_json::to_string(&status).unwrap(); + let decoded: UpdateStatus = serde_json::from_str(&json).unwrap(); + assert_eq!(status, decoded); + } + } + + #[test] + fn test_update_struct_fields() { + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + let update = Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::Hot, + version: "1.0.0".into(), + target: UpdateTarget::Challenge(challenge_id), + payload: UpdatePayload::Config(SubnetConfig::default()), + status: UpdateStatus::Pending, + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: None, + }; + + assert_eq!(update.update_type, UpdateType::Hot); + assert_eq!(update.version, "1.0.0"); + assert!(matches!(update.status, UpdateStatus::Pending)); + assert!(update.applied_at.is_none()); + assert!(update.rollback_data.is_none()); + } + + #[tokio::test] + async fn test_process_updates_with_empty_queue() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let applied = manager.process_updates().await.unwrap(); + assert_eq!(applied.len(), 0); + } + + #[tokio::test] + async fn test_config_update_type_detection() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let config = SubnetConfig { + version: "1.0.0".into(), + ..Default::default() + }; + + manager.queue_update( + UpdateTarget::Config, + UpdatePayload::Config(config), + "1.0.0".into(), + ); + + let pending = manager.pending.read(); + assert_eq!(pending[0].update_type, UpdateType::Warm); + } + + #[tokio::test] + async fn test_wasm_update_type_detection() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + let config = ChallengeConfig { + id: challenge_id.0.to_string(), + name: "Test".into(), + wasm_hash: "hash".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 10, + }; + + manager.queue_update( + UpdateTarget::Challenge(challenge_id), + UpdatePayload::WasmChallenge { + wasm_bytes: vec![], + wasm_hash: "hash".into(), + config, + }, + "1.0.0".into(), + ); + + let pending = manager.pending.read(); + assert_eq!(pending[0].update_type, UpdateType::Hot); + } + + #[tokio::test] + async fn test_validators_update_type_detection() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + manager.queue_update( + UpdateTarget::Validators, + UpdatePayload::Validators { + add: vec![], + remove: vec![], + }, + "1.0.0".into(), + ); + + let pending = manager.pending.read(); + assert_eq!(pending[0].update_type, UpdateType::Hot); + } + + #[tokio::test] + async fn test_hard_reset_update_type_detection() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + manager.queue_update( + UpdateTarget::Config, + UpdatePayload::HardReset { + reason: "test".into(), + preserve_validators: true, + new_config: None, + }, + "1.0.0".into(), + ); + + let pending = manager.pending.read(); + assert_eq!(pending[0].update_type, UpdateType::HardReset); + } + + #[test] + fn test_pending_count() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + assert_eq!(manager.pending_count(), 0); + + manager.queue_update( + UpdateTarget::Config, + UpdatePayload::Config(SubnetConfig::default()), + "1.0.0".into(), + ); + + assert_eq!(manager.pending_count(), 1); + + manager.queue_update( + UpdateTarget::Config, + UpdatePayload::Config(SubnetConfig::default()), + "1.1.0".into(), + ); + + assert_eq!(manager.pending_count(), 2); + } + + #[tokio::test] + async fn test_update_history() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let config = SubnetConfig { + version: "1.0.0".into(), + ..Default::default() + }; + + manager.queue_update( + UpdateTarget::Config, + UpdatePayload::Config(config), + "1.0.0".into(), + ); + + manager.process_updates().await.unwrap(); + + let history = manager.history.read(); + assert_eq!(history.len(), 1); + assert!(matches!(history[0].status, UpdateStatus::Applied)); + } + + #[tokio::test] + async fn test_process_updates_rolls_back_on_failure() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + let bad_hash = "not_the_real_hash".to_string(); + let wasm_bytes = vec![1u8, 2, 3]; + + let config = ChallengeConfig { + id: challenge_id.0.to_string(), + name: "Rollback Challenge".into(), + wasm_hash: bad_hash.clone(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 300, + max_concurrent: 5, + }; + + manager.queue_update( + UpdateTarget::Challenge(challenge_id), + UpdatePayload::WasmChallenge { + wasm_bytes, + wasm_hash: bad_hash, + config, + }, + "1.0.0".into(), + ); + + // Ensure rollback data is present so failure triggers rollback path + let rollback_bytes = b"rollback-wasm".to_vec(); + { + let mut pending = manager.pending.write(); + pending[0].rollback_data = Some(rollback_bytes.clone()); + } + + // Prepare challenge directory for rollback write + let challenge_dir = dir + .path() + .join("challenges") + .join(challenge_id.0.to_string()); + std::fs::create_dir_all(&challenge_dir).unwrap(); + + let applied = manager.process_updates().await.unwrap(); + assert!(applied.is_empty()); + + let history = manager.history.read(); + assert_eq!(history.len(), 1); + assert!(matches!(history[0].status, UpdateStatus::RolledBack)); + + let rollback_path = challenge_dir.join("code.wasm"); + assert_eq!(std::fs::read(rollback_path).unwrap(), rollback_bytes); + } + + #[tokio::test] + async fn test_process_updates_handles_rollback_failure() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + let wasm_bytes = vec![0u8, 1, 2]; + let bad_hash = "incorrect".to_string(); + + let config = ChallengeConfig { + id: challenge_id.0.to_string(), + name: "RollbackFail".into(), + wasm_hash: bad_hash.clone(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 60, + max_concurrent: 5, + }; + + manager.queue_update( + UpdateTarget::Challenge(challenge_id), + UpdatePayload::WasmChallenge { + wasm_bytes, + wasm_hash: bad_hash, + config, + }, + "1.0.0".into(), + ); + + { + let mut pending = manager.pending.write(); + pending[0].rollback_data = Some(b"rollback-data".to_vec()); + } + + let applied = manager.process_updates().await.unwrap(); + assert!(applied.is_empty()); + + let history = manager.history.read(); + assert_eq!(history.len(), 1); + assert!(matches!(history[0].status, UpdateStatus::Failed(_))); + } + + #[tokio::test] + async fn test_apply_update_wasm_challenge_success() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + let wasm_bytes = vec![9u8, 8, 7, 6]; + let wasm_hash = UpdateManager::compute_hash(&wasm_bytes); + + let config = ChallengeConfig { + id: challenge_id.0.to_string(), + name: "ApplySuccess".into(), + wasm_hash: wasm_hash.clone(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 60, + max_concurrent: 5, + }; + + let mut update = Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::Hot, + version: "1.0.0".into(), + target: UpdateTarget::Challenge(challenge_id), + payload: UpdatePayload::WasmChallenge { + wasm_bytes: wasm_bytes.clone(), + wasm_hash, + config, + }, + status: UpdateStatus::Pending, + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: None, + }; + + manager.apply_update(&mut update).await.unwrap(); + + // Rollback data should have been captured and WASM written to disk + assert!(update.rollback_data.is_some()); + let wasm_path = dir + .path() + .join("challenges") + .join(challenge_id.0.to_string()) + .join("code.wasm"); + assert_eq!(std::fs::read(&wasm_path).unwrap(), wasm_bytes); + } + + #[tokio::test] + async fn test_apply_update_wasm_challenge_hash_mismatch() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + let wasm_bytes = vec![1u8, 2, 3]; + + let config = ChallengeConfig { + id: challenge_id.0.to_string(), + name: "ApplyFail".into(), + wasm_hash: "expected_hash".into(), + wasm_source: "test".into(), + emission_weight: 1.0, + active: true, + timeout_secs: 60, + max_concurrent: 5, + }; + + let mut update = Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::Hot, + version: "1.0.0".into(), + target: UpdateTarget::Challenge(challenge_id), + payload: UpdatePayload::WasmChallenge { + wasm_bytes: wasm_bytes.clone(), + wasm_hash: "expected_hash".into(), + config, + }, + status: UpdateStatus::Pending, + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: None, + }; + + let err = manager.apply_update(&mut update).await.unwrap_err(); + match err { + UpdateError::HashMismatch { .. } => {} + other => panic!("unexpected error: {other:?}"), + } + + // No WASM should be written and rollback data remains None + let wasm_path = dir + .path() + .join("challenges") + .join(challenge_id.0.to_string()) + .join("code.wasm"); + assert!(!wasm_path.exists()); + assert!(update.rollback_data.is_none()); + } + + #[tokio::test] + async fn test_apply_update_validators_payload() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let add = vec![platform_core::Hotkey([1u8; 32])]; + let remove = vec![platform_core::Hotkey([2u8; 32])]; + + let mut update = Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::Hot, + version: "1.0.0".into(), + target: UpdateTarget::Validators, + payload: UpdatePayload::Validators { add: add.clone(), remove: remove.clone() }, + status: UpdateStatus::Pending, + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: None, + }; + + manager.apply_update(&mut update).await.unwrap(); + assert!(update.rollback_data.is_none()); + + let challenges_dir = dir.path().join("challenges"); + assert!(!challenges_dir.exists(), "validator updates should not touch disk state"); + } + + #[tokio::test] + async fn test_apply_update_hard_reset_clears_state_and_applies_config() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let validators_dir = dir.path().join("validators"); + std::fs::create_dir_all(&validators_dir).unwrap(); + std::fs::write(validators_dir.join("node"), b"validator").unwrap(); + + let challenges_dir = dir.path().join("challenges"); + std::fs::create_dir_all(challenges_dir.join("legacy")).unwrap(); + + let new_config = SubnetConfig { + version: "9.9.9".into(), + ..Default::default() + }; + + let mut update = Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::HardReset, + version: "9.9.9".into(), + target: UpdateTarget::Config, + payload: UpdatePayload::HardReset { + reason: "maintenance".into(), + preserve_validators: false, + new_config: Some(new_config.clone()), + }, + status: UpdateStatus::Pending, + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: None, + }; + + manager.apply_update(&mut update).await.unwrap(); + + assert!(!validators_dir.exists()); + assert!(challenges_dir.exists()); + assert!(!challenges_dir.join("legacy").exists()); + assert!(dir.path().join("snapshots").join("pre_reset").exists()); + + let config_bytes = std::fs::read(dir.path().join("subnet_config.json")).unwrap(); + assert!(String::from_utf8(config_bytes).unwrap().contains("9.9.9")); + } + + #[tokio::test] + async fn test_apply_update_hard_reset_preserves_validators() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let validators_dir = dir.path().join("validators"); + std::fs::create_dir_all(&validators_dir).unwrap(); + std::fs::write(validators_dir.join("node"), b"validator").unwrap(); + + let mut update = Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::HardReset, + version: "1.0.0".into(), + target: UpdateTarget::Config, + payload: UpdatePayload::HardReset { + reason: "maintenance".into(), + preserve_validators: true, + new_config: None, + }, + status: UpdateStatus::Pending, + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: None, + }; + + manager.apply_update(&mut update).await.unwrap(); + assert!(validators_dir.exists()); + } + + #[tokio::test] + async fn test_rollback_update_challenge_target() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + let rollback_bytes = b"restore-wasm".to_vec(); + + let challenge_dir = dir + .path() + .join("challenges") + .join(challenge_id.0.to_string()); + std::fs::create_dir_all(&challenge_dir).unwrap(); + + let update = Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::Hot, + version: "1.0.0".into(), + target: UpdateTarget::Challenge(challenge_id), + payload: UpdatePayload::Config(SubnetConfig::default()), + status: UpdateStatus::Failed("hash".into()), + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: Some(rollback_bytes.clone()), + }; + + manager.rollback_update(&update).await.unwrap(); + + let wasm_path = challenge_dir.join("code.wasm"); + assert_eq!(std::fs::read(wasm_path).unwrap(), rollback_bytes); + } + + #[tokio::test] + async fn test_rollback_update_config_target() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let rollback_bytes = br#"{"version":"2.0.0"}"#.to_vec(); + + let update = Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::Warm, + version: "2.0.0".into(), + target: UpdateTarget::Config, + payload: UpdatePayload::Config(SubnetConfig::default()), + status: UpdateStatus::Failed("io".into()), + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: Some(rollback_bytes.clone()), + }; + + manager.rollback_update(&update).await.unwrap(); + let config_path = dir.path().join("subnet_config.json"); + assert_eq!(std::fs::read(config_path).unwrap(), rollback_bytes); } + + #[tokio::test] + async fn test_rollback_update_for_unsupported_target() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + let update = Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::Hot, + version: "1.0.0".into(), + target: UpdateTarget::Validators, + payload: UpdatePayload::Validators { + add: vec![], + remove: vec![], + }, + status: UpdateStatus::Failed("test".into()), + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: Some(vec![1, 2, 3]), + }; + + manager.rollback_update(&update).await.unwrap(); + let unsupported_dir = manager.data_dir.join("validators"); + assert!(!unsupported_dir.exists()); + } + + #[test] + fn test_history_method_returns_clone() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + manager.history.write().push(Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::Hot, + version: "1.0".into(), + target: UpdateTarget::Config, + payload: UpdatePayload::Config(SubnetConfig::default()), + status: UpdateStatus::Applied, + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: None, + }); + + let external_history = manager.history(); + assert_eq!(external_history.len(), 1); + + // Mutating returned vector should not affect manager's internal history + drop(external_history); + assert_eq!(manager.history.read().len(), 1); + } + + #[test] + fn test_prune_history_keeps_most_recent() { + let dir = tempdir().unwrap(); + let manager = UpdateManager::new(dir.path().to_path_buf()); + + for i in 0..5 { + manager.history.write().push(Update { + id: uuid::Uuid::new_v4(), + update_type: UpdateType::Hot, + version: format!("1.0.{}", i), + target: UpdateTarget::Config, + payload: UpdatePayload::Config(SubnetConfig::default()), + status: UpdateStatus::Applied, + created_at: chrono::Utc::now(), + applied_at: None, + rollback_data: None, + }); + } + + manager.prune_history(2); + let history = manager.history.read(); + assert_eq!(history.len(), 2); + assert!(history + .iter() + .all(|update| update.version == "1.0.3" || update.version == "1.0.4")); + } + + #[test] + fn test_update_target_challenge() { + let challenge_id = ChallengeId(uuid::Uuid::new_v4()); + let target = UpdateTarget::Challenge(challenge_id); + let json = serde_json::to_string(&target).unwrap(); + let decoded: UpdateTarget = serde_json::from_str(&json).unwrap(); + assert!(matches!(decoded, UpdateTarget::Challenge(_))); + } + + #[test] + fn test_update_target_all_challenges() { + let target = UpdateTarget::AllChallenges; + let json = serde_json::to_string(&target).unwrap(); + let decoded: UpdateTarget = serde_json::from_str(&json).unwrap(); + assert!(matches!(decoded, UpdateTarget::AllChallenges)); + } + }