Skip to content

Commit 41b805f

Browse files
committed
fix(p2p): broadcast heartbeats and filter private IPs from Kademlia
- Broadcast HeartbeatMessage via P2P to keep validators active - Filter private/local IPs (172.x, 192.168.x, 10.x, 127.x) from Kademlia DHT - Prevents validators from being marked stale after 90s - Fixes 'Active validators: 0' issue
1 parent e068355 commit 41b805f

File tree

2 files changed

+88
-32
lines changed

2 files changed

+88
-32
lines changed

bins/validator-node/src/main.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ use platform_distributed_storage::{
2626
DistributedStoreExt, LocalStorage, LocalStorageBuilder, StorageKey,
2727
};
2828
use platform_p2p_consensus::{
29-
ChainState, ConsensusEngine, EvaluationMessage, EvaluationMetrics, EvaluationRecord, JobRecord,
30-
JobStatus, NetworkEvent, P2PConfig, P2PMessage, P2PNetwork, StateManager, TaskProgressRecord,
31-
ValidatorRecord, ValidatorSet,
29+
ChainState, ConsensusEngine, EvaluationMessage, EvaluationMetrics, EvaluationRecord,
30+
HeartbeatMessage, JobRecord, JobStatus, NetworkEvent, P2PConfig, P2PMessage, P2PNetwork,
31+
StateManager, TaskProgressRecord, ValidatorRecord, ValidatorSet,
3232
};
3333
use std::path::{Path, PathBuf};
3434
use std::sync::Arc;
@@ -579,10 +579,32 @@ async fn main() -> Result<()> {
579579
).await;
580580
}
581581

582-
// Heartbeat
582+
// Heartbeat - broadcast to other validators
583583
_ = heartbeat_interval.tick() => {
584584
let state_hash = state_manager.state_hash();
585585
let sequence = state_manager.sequence();
586+
let our_hotkey = keypair.hotkey();
587+
588+
// Get our stake from validator set
589+
let our_stake = validator_set.stake_for(&our_hotkey);
590+
591+
let heartbeat = P2PMessage::Heartbeat(HeartbeatMessage {
592+
validator: our_hotkey,
593+
state_hash,
594+
sequence,
595+
stake: our_stake,
596+
timestamp: chrono::Utc::now().timestamp_millis(),
597+
signature: vec![], // Will be signed by P2P layer
598+
});
599+
600+
if let Err(e) = p2p_broadcast_tx.send(platform_p2p_consensus::P2PCommand::Broadcast(heartbeat)).await {
601+
warn!("Failed to broadcast heartbeat: {}", e);
602+
}
603+
604+
// Also update validator activity count
605+
validator_set.mark_stale_validators();
606+
debug!("Active validators: {}", validator_set.active_count());
607+
586608
debug!("Heartbeat: sequence={}, state_hash={}", sequence, hex::encode(&state_hash[..8]));
587609
}
588610

crates/p2p-consensus/src/network.rs

Lines changed: 62 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,12 +1016,16 @@ impl P2PNetwork {
10161016
agent = %info.agent_version,
10171017
"Received identify info"
10181018
);
1019-
// Add peer addresses to Kademlia
1019+
// Add peer addresses to Kademlia (filter out private/local addresses)
10201020
for addr in &info.listen_addrs {
1021-
swarm
1022-
.behaviour_mut()
1023-
.kademlia
1024-
.add_address(&peer_id, addr.clone());
1021+
if !is_private_multiaddr(addr) {
1022+
swarm
1023+
.behaviour_mut()
1024+
.kademlia
1025+
.add_address(&peer_id, addr.clone());
1026+
} else {
1027+
debug!(peer = %peer_id, addr = %addr, "Filtered private address from Kademlia");
1028+
}
10251029
}
10261030
if let Err(e) = self
10271031
.event_tx
@@ -1213,30 +1217,57 @@ async fn fetch_public_ip(url: &str) -> Option<String> {
12131217
/// Check if an IP address is public (not private/local/reserved)
12141218
fn is_public_ip(ip: &str) -> bool {
12151219
if let Ok(addr) = ip.parse::<std::net::Ipv4Addr>() {
1216-
let octets = addr.octets();
1217-
// Private ranges: 10.x.x.x, 172.16-31.x.x, 192.168.x.x
1218-
// Loopback: 127.x.x.x
1219-
// Link-local: 169.254.x.x
1220-
if octets[0] == 10 {
1221-
return false;
1222-
}
1223-
if octets[0] == 172 && (16..=31).contains(&octets[1]) {
1224-
return false;
1225-
}
1226-
if octets[0] == 192 && octets[1] == 168 {
1227-
return false;
1228-
}
1229-
if octets[0] == 127 {
1230-
return false;
1231-
}
1232-
if octets[0] == 169 && octets[1] == 254 {
1233-
return false;
1234-
}
1235-
if octets[0] == 0 {
1236-
return false;
1237-
}
1220+
return !is_private_ipv4(&addr);
1221+
}
1222+
false
1223+
}
1224+
1225+
/// Check if an IPv4 address is private/local/reserved
1226+
fn is_private_ipv4(addr: &std::net::Ipv4Addr) -> bool {
1227+
let octets = addr.octets();
1228+
// Private ranges: 10.x.x.x, 172.16-31.x.x, 192.168.x.x
1229+
// Loopback: 127.x.x.x
1230+
// Link-local: 169.254.x.x
1231+
// Reserved: 0.x.x.x
1232+
if octets[0] == 10 {
1233+
return true;
1234+
}
1235+
if octets[0] == 172 && (16..=31).contains(&octets[1]) {
1236+
return true;
1237+
}
1238+
if octets[0] == 192 && octets[1] == 168 {
1239+
return true;
1240+
}
1241+
if octets[0] == 127 {
1242+
return true;
1243+
}
1244+
if octets[0] == 169 && octets[1] == 254 {
12381245
return true;
12391246
}
1247+
if octets[0] == 0 {
1248+
return true;
1249+
}
1250+
false
1251+
}
1252+
1253+
/// Check if a multiaddr contains a private/local IP address
1254+
fn is_private_multiaddr(addr: &Multiaddr) -> bool {
1255+
for proto in addr.iter() {
1256+
match proto {
1257+
libp2p::multiaddr::Protocol::Ip4(ip) => {
1258+
if is_private_ipv4(&ip) {
1259+
return true;
1260+
}
1261+
}
1262+
libp2p::multiaddr::Protocol::Ip6(ip) => {
1263+
// Filter loopback and link-local IPv6
1264+
if ip.is_loopback() || ip.segments()[0] == 0xfe80 {
1265+
return true;
1266+
}
1267+
}
1268+
_ => {}
1269+
}
1270+
}
12401271
false
12411272
}
12421273

@@ -1326,8 +1357,11 @@ impl NetworkRunner {
13261357
"Received identify info"
13271358
);
13281359

1360+
// Filter out private/local addresses before adding to Kademlia
13291361
for addr in &info.listen_addrs {
1330-
behaviour.kademlia.add_address(&peer_id, addr.clone());
1362+
if !is_private_multiaddr(addr) {
1363+
behaviour.kademlia.add_address(&peer_id, addr.clone());
1364+
}
13311365
}
13321366

13331367
if let Err(e) = self

0 commit comments

Comments
 (0)