Skip to content

Commit 317365a

Browse files
committed
fix(p2p): improve validator heartbeat diagnostics and stale threshold
- Add WARN-level logs for heartbeat message rejections (was DEBUG) - Add heartbeat_rejections_total counter metric for monitoring - Make stale threshold configurable via VALIDATOR_STALE_THRESHOLD_MS env var - Change default stale threshold from 90s to 300s (5 minutes) - Add unit test for configurable threshold Root cause: Validators were marked inactive after 90s without visible logs showing why heartbeats were rejected. Now operators can diagnose P2P issues and configure appropriate thresholds for their network conditions. Fixes issue where all validators became inactive simultaneously due to network hiccups or message validation failures going unnoticed.
1 parent 6f22e9e commit 317365a

File tree

2 files changed

+73
-10
lines changed

2 files changed

+73
-10
lines changed

crates/p2p-consensus/src/network.rs

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,8 @@ pub struct P2PNetwork {
725725
seen_nonces: RwLock<HashMap<Hotkey, HashMap<u64, i64>>>,
726726
/// Message timestamps for sliding window rate limiting (hotkey -> recent message timestamps in ms)
727727
message_timestamps: RwLock<HashMap<Hotkey, VecDeque<i64>>>,
728+
/// Counter for tracking heartbeat message rejections
729+
heartbeat_rejections_total: AtomicU64,
728730
}
729731

730732
impl P2PNetwork {
@@ -763,6 +765,7 @@ impl P2PNetwork {
763765
),
764766
seen_nonces: RwLock::new(HashMap::new()),
765767
message_timestamps: RwLock::new(HashMap::new()),
768+
heartbeat_rejections_total: AtomicU64::new(0),
766769
})
767770
}
768771

@@ -797,6 +800,15 @@ impl P2PNetwork {
797800
self.connected_peer_count() >= min_required
798801
}
799802

803+
804+
/// Get the total count of heartbeat message rejections
805+
///
806+
/// This metric tracks how many heartbeat messages have been rejected
807+
/// due to validation failures, rate limiting, or other errors.
808+
pub fn heartbeat_rejections_total(&self) -> u64 {
809+
self.heartbeat_rejections_total.load(Ordering::Relaxed)
810+
}
811+
800812
/// Create gossipsub behaviour
801813
fn create_gossipsub(
802814
&self,
@@ -1481,17 +1493,35 @@ impl P2PNetwork {
14811493
}
14821494
}
14831495
Err(e) => {
1484-
// Try to extract hotkey from message for debugging
1485-
let hotkey = bincode::deserialize::<SignedP2PMessage>(&message.data)
1486-
.ok()
1496+
// Try to extract hotkey from message and check type
1497+
let deserialized = bincode::deserialize::<SignedP2PMessage>(&message.data).ok();
1498+
let hotkey = deserialized
1499+
.as_ref()
14871500
.map(|m| m.signer.to_ss58())
14881501
.unwrap_or_else(|| "unknown".to_string());
1489-
debug!(
1490-
source = %propagation_source,
1491-
hotkey = %hotkey,
1492-
error = %e,
1493-
"Failed to process gossipsub message"
1494-
);
1502+
1503+
// Check if this is a heartbeat message for specialized warning
1504+
let is_heartbeat = deserialized
1505+
.as_ref()
1506+
.map(|m| matches!(m.message, P2PMessage::Heartbeat(_)))
1507+
.unwrap_or(false);
1508+
1509+
if is_heartbeat {
1510+
warn!(
1511+
source = %propagation_source,
1512+
hotkey = %hotkey,
1513+
error = %e,
1514+
"Heartbeat message rejected - validator may become inactive"
1515+
);
1516+
self.heartbeat_rejections_total.fetch_add(1, Ordering::Relaxed);
1517+
} else {
1518+
debug!(
1519+
source = %propagation_source,
1520+
hotkey = %hotkey,
1521+
error = %e,
1522+
"Failed to process gossipsub message"
1523+
);
1524+
}
14951525
}
14961526
},
14971527
SwarmEvent::Behaviour(CombinedEvent::Gossipsub(gossipsub::Event::Subscribed {

crates/p2p-consensus/src/validator.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,22 @@ pub struct ValidatorSet {
107107
impl ValidatorSet {
108108
/// Create a new validator set
109109
pub fn new(local_keypair: Keypair, min_stake: u64) -> Self {
110+
// Read stale threshold from environment, default to 5 minutes
111+
let stale_threshold_ms = std::env::var("VALIDATOR_STALE_THRESHOLD_MS")
112+
.ok()
113+
.and_then(|s| s.parse::<i64>().ok())
114+
.unwrap_or(300_000); // Default: 5 minutes
115+
116+
info!(
117+
threshold_ms = stale_threshold_ms,
118+
"Validator stale threshold initialized (configurable via VALIDATOR_STALE_THRESHOLD_MS)"
119+
);
120+
110121
Self {
111122
validators: RwLock::new(HashMap::new()),
112123
min_stake,
113124
local_keypair,
114-
stale_threshold_ms: 90_000, // 90 seconds (3x heartbeat interval)
125+
stale_threshold_ms,
115126
verified_stakes: RwLock::new(HashMap::new()),
116127
}
117128
}
@@ -624,4 +635,26 @@ mod tests {
624635
assert_eq!(updated.last_sequence, 100);
625636
assert_eq!(updated.stake, 15_000);
626637
}
638+
639+
#[test]
640+
fn test_stale_threshold_from_env() {
641+
// Test env var override
642+
std::env::set_var("VALIDATOR_STALE_THRESHOLD_MS", "600000");
643+
let keypair = create_test_keypair();
644+
let set = ValidatorSet::new(keypair, 1000);
645+
assert_eq!(set.stale_threshold_ms, 600_000);
646+
std::env::remove_var("VALIDATOR_STALE_THRESHOLD_MS");
647+
648+
// Test default
649+
let keypair2 = create_test_keypair();
650+
let set2 = ValidatorSet::new(keypair2, 1000);
651+
assert_eq!(set2.stale_threshold_ms, 300_000);
652+
653+
// Test invalid env var falls back to default
654+
std::env::set_var("VALIDATOR_STALE_THRESHOLD_MS", "invalid");
655+
let keypair3 = create_test_keypair();
656+
let set3 = ValidatorSet::new(keypair3, 1000);
657+
assert_eq!(set3.stale_threshold_ms, 300_000);
658+
std::env::remove_var("VALIDATOR_STALE_THRESHOLD_MS");
659+
}
627660
}

0 commit comments

Comments
 (0)