Skip to content

Commit a85c51b

Browse files
committed
feat(p2p): add peer authentication via PeerAnnounce and Kademlia discovery
- Enable Kademlia server mode for DHT participation - Add periodic peer discovery via get_closest_peers queries - Implement PeerAnnounce authentication handshake: - Peers must send signed PeerAnnounce within 30s of connection - Signature verified using sr25519 (hotkey signs peer_id + timestamp) - Validator stake verified via ValidatorSet - Unauthenticated peers are disconnected after timeout - Add pending_auth tracking for connection authentication state
1 parent 72de513 commit a85c51b

File tree

1 file changed

+190
-2
lines changed

1 file changed

+190
-2
lines changed

crates/p2p-consensus/src/network.rs

Lines changed: 190 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ const RATE_LIMIT_WINDOW_MS: i64 = 1000;
190190
/// Nonce expiry time in milliseconds (5 minutes)
191191
const NONCE_EXPIRY_MS: i64 = 5 * 60 * 1000;
192192

193+
/// Authentication timeout in seconds - peers must send PeerAnnounce within this time
194+
const AUTH_TIMEOUT_SECS: u64 = 30;
195+
193196
/// P2P network node
194197
pub struct P2PNetwork {
195198
/// Local keypair
@@ -215,6 +218,9 @@ pub struct P2PNetwork {
215218
seen_nonces: RwLock<HashMap<Hotkey, HashMap<u64, i64>>>,
216219
/// Message timestamps for sliding window rate limiting (hotkey -> recent message timestamps in ms)
217220
message_timestamps: RwLock<HashMap<Hotkey, VecDeque<i64>>>,
221+
/// Peers pending authentication (peer_id -> connection timestamp)
222+
/// Peers must send a valid PeerAnnounce within AUTH_TIMEOUT_SECS or be disconnected
223+
pending_auth: RwLock<HashMap<PeerId, std::time::Instant>>,
218224
}
219225

220226
impl P2PNetwork {
@@ -247,6 +253,7 @@ impl P2PNetwork {
247253
nonce: RwLock::new(0),
248254
seen_nonces: RwLock::new(HashMap::new()),
249255
message_timestamps: RwLock::new(HashMap::new()),
256+
pending_auth: RwLock::new(HashMap::new()),
250257
})
251258
}
252259

@@ -780,6 +787,14 @@ impl P2PNetwork {
780787
"P2P network running"
781788
);
782789

790+
// Peer discovery interval - query DHT for closest peers periodically
791+
let mut discovery_interval = tokio::time::interval(Duration::from_secs(30));
792+
discovery_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
793+
794+
// Auth cleanup interval - disconnect peers that haven't authenticated
795+
let mut auth_cleanup_interval = tokio::time::interval(Duration::from_secs(5));
796+
auth_cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
797+
783798
// Main event loop
784799
loop {
785800
tokio::select! {
@@ -816,6 +831,33 @@ impl P2PNetwork {
816831
}
817832
}
818833
}
834+
_ = discovery_interval.tick() => {
835+
// Query for peers closest to our own ID to discover new peers
836+
let random_peer = PeerId::random();
837+
swarm.behaviour_mut().kademlia.get_closest_peers(random_peer);
838+
debug!("Started Kademlia peer discovery query");
839+
}
840+
_ = auth_cleanup_interval.tick() => {
841+
// Disconnect peers that haven't authenticated within timeout
842+
let now = std::time::Instant::now();
843+
let timeout = Duration::from_secs(AUTH_TIMEOUT_SECS);
844+
let mut to_disconnect = Vec::new();
845+
846+
{
847+
let pending = self.pending_auth.read();
848+
for (peer_id, connected_at) in pending.iter() {
849+
if now.duration_since(*connected_at) > timeout {
850+
to_disconnect.push(*peer_id);
851+
}
852+
}
853+
}
854+
855+
for peer_id in to_disconnect {
856+
warn!(peer = %peer_id, "Disconnecting unauthenticated peer (timeout)");
857+
self.pending_auth.write().remove(&peer_id);
858+
let _ = swarm.disconnect_peer_id(peer_id);
859+
}
860+
}
819861
}
820862
}
821863

@@ -846,9 +888,11 @@ impl P2PNetwork {
846888
)
847889
.map_err(|e| NetworkError::Gossipsub(e.to_string()))?;
848890

849-
// Create kademlia
891+
// Create kademlia with server mode for peer discovery
850892
let store = MemoryStore::new(self.local_peer_id);
851-
let kademlia = kad::Behaviour::new(self.local_peer_id, store);
893+
let mut kademlia = kad::Behaviour::new(self.local_peer_id, store);
894+
// Set to server mode so we respond to queries and participate in DHT
895+
kademlia.set_mode(Some(kad::Mode::Server));
852896

853897
// Create identify
854898
let identify_config =
@@ -880,6 +924,92 @@ impl P2PNetwork {
880924
Ok(swarm)
881925
}
882926

927+
/// Send our PeerAnnounce message to authenticate with peers
928+
fn send_peer_announce(&self, swarm: &mut Swarm<CombinedBehaviour>) -> Result<(), NetworkError> {
929+
use crate::messages::PeerAnnounceMessage;
930+
931+
let timestamp = chrono::Utc::now().timestamp_millis();
932+
let peer_id_str = self.local_peer_id.to_string();
933+
934+
// Create signing data: peer_id + timestamp
935+
let signing_data = format!("{}:{}", peer_id_str, timestamp);
936+
let signature = self.keypair.sign_bytes(signing_data.as_bytes())
937+
.map_err(|e| NetworkError::Gossipsub(format!("Failed to sign PeerAnnounce: {}", e)))?;
938+
939+
let announce = PeerAnnounceMessage {
940+
validator: self.keypair.hotkey(),
941+
addresses: self.config.listen_addrs.clone(),
942+
peer_id: peer_id_str,
943+
protocol_version: "1.0.0".to_string(),
944+
timestamp,
945+
signature,
946+
};
947+
948+
let msg = P2PMessage::PeerAnnounce(announce);
949+
self.do_broadcast(swarm, msg)?;
950+
951+
debug!("Sent PeerAnnounce for authentication");
952+
Ok(())
953+
}
954+
955+
/// Handle incoming PeerAnnounce and authenticate the peer
956+
fn handle_peer_announce(
957+
&self,
958+
source: PeerId,
959+
announce: &crate::messages::PeerAnnounceMessage,
960+
swarm: &mut Swarm<CombinedBehaviour>,
961+
) -> bool {
962+
// Verify timestamp is recent (within 5 minutes)
963+
let now = chrono::Utc::now().timestamp_millis();
964+
if (now - announce.timestamp).abs() > 5 * 60 * 1000 {
965+
warn!(peer = %source, "PeerAnnounce timestamp too old, rejecting");
966+
let _ = swarm.disconnect_peer_id(source);
967+
return false;
968+
}
969+
970+
// Verify the peer_id in the message matches the source
971+
if announce.peer_id != source.to_string() {
972+
warn!(
973+
peer = %source,
974+
claimed_peer_id = %announce.peer_id,
975+
"PeerAnnounce peer_id mismatch, rejecting"
976+
);
977+
let _ = swarm.disconnect_peer_id(source);
978+
return false;
979+
}
980+
981+
// Verify signature
982+
let signing_data = format!("{}:{}", announce.peer_id, announce.timestamp);
983+
if !verify_hotkey_signature(&announce.validator, signing_data.as_bytes(), &announce.signature) {
984+
warn!(peer = %source, "PeerAnnounce signature invalid, rejecting");
985+
let _ = swarm.disconnect_peer_id(source);
986+
return false;
987+
}
988+
989+
// Verify the hotkey is a registered validator with sufficient stake
990+
if !self.validator_set.is_validator(&announce.validator) {
991+
warn!(
992+
peer = %source,
993+
hotkey = %announce.validator.to_hex(),
994+
"PeerAnnounce from non-validator, rejecting"
995+
);
996+
let _ = swarm.disconnect_peer_id(source);
997+
return false;
998+
}
999+
1000+
// Authentication successful - add to peer mapping and remove from pending
1001+
self.peer_mapping.insert(source, announce.validator.clone());
1002+
self.pending_auth.write().remove(&source);
1003+
1004+
info!(
1005+
peer = %source,
1006+
validator = %announce.validator.to_hex(),
1007+
"Peer authenticated successfully"
1008+
);
1009+
1010+
true
1011+
}
1012+
8831013
/// Handle swarm events
8841014
async fn handle_swarm_event(
8851015
&self,
@@ -894,6 +1024,17 @@ impl P2PNetwork {
8941024
})) => {
8951025
match self.handle_gossipsub_message(propagation_source, &message.data) {
8961026
Ok(msg) => {
1027+
// Handle PeerAnnounce specially for authentication
1028+
if let P2PMessage::PeerAnnounce(ref announce) = msg {
1029+
if self.handle_peer_announce(propagation_source, announce, swarm) {
1030+
debug!(
1031+
source = %propagation_source,
1032+
validator = %announce.validator.to_hex(),
1033+
"Peer authenticated via PeerAnnounce"
1034+
);
1035+
}
1036+
}
1037+
8971038
debug!(
8981039
source = %propagation_source,
8991040
msg_type = %msg.type_name(),
@@ -937,6 +1078,30 @@ impl P2PNetwork {
9371078
})) => {
9381079
info!("Kademlia bootstrap completed");
9391080
}
1081+
SwarmEvent::Behaviour(CombinedEvent::Kademlia(kad::Event::OutboundQueryProgressed {
1082+
result: kad::QueryResult::GetClosestPeers(Ok(ok)),
1083+
..
1084+
})) => {
1085+
// Connect to discovered peers
1086+
let connected_peers: std::collections::HashSet<_> = swarm.connected_peers().cloned().collect();
1087+
for peer_info in ok.peers {
1088+
let peer_id = peer_info.peer_id;
1089+
if peer_id != self.local_peer_id && !connected_peers.contains(&peer_id) {
1090+
if let Some(addr) = peer_info.addrs.first() {
1091+
info!(peer = %peer_id, addr = %addr, "Discovered new peer via Kademlia, dialing");
1092+
if let Err(e) = swarm.dial(addr.clone()) {
1093+
debug!(peer = %peer_id, error = ?e, "Failed to dial discovered peer");
1094+
}
1095+
}
1096+
}
1097+
}
1098+
}
1099+
SwarmEvent::Behaviour(CombinedEvent::Kademlia(kad::Event::OutboundQueryProgressed {
1100+
result: kad::QueryResult::GetClosestPeers(Err(e)),
1101+
..
1102+
})) => {
1103+
debug!(error = ?e, "Kademlia get_closest_peers query failed");
1104+
}
9401105
SwarmEvent::Behaviour(CombinedEvent::Identify(identify::Event::Received {
9411106
peer_id,
9421107
info,
@@ -971,6 +1136,15 @@ impl P2PNetwork {
9711136
peer_id, endpoint, ..
9721137
} => {
9731138
info!(peer = %peer_id, endpoint = ?endpoint, "Connection established");
1139+
1140+
// Add to pending auth - peer must send PeerAnnounce within timeout
1141+
self.pending_auth.write().insert(peer_id, std::time::Instant::now());
1142+
1143+
// Send our PeerAnnounce to authenticate ourselves
1144+
if let Err(e) = self.send_peer_announce(swarm) {
1145+
warn!(error = %e, "Failed to send PeerAnnounce");
1146+
}
1147+
9741148
if let Err(e) = self
9751149
.event_tx
9761150
.send(NetworkEvent::PeerConnected(peer_id))
@@ -982,6 +1156,7 @@ impl P2PNetwork {
9821156
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
9831157
info!(peer = %peer_id, cause = ?cause, "Connection closed");
9841158
self.peer_mapping.remove_peer(&peer_id);
1159+
self.pending_auth.write().remove(&peer_id);
9851160
if let Err(e) = self
9861161
.event_tx
9871162
.send(NetworkEvent::PeerDisconnected(peer_id))
@@ -1072,6 +1247,19 @@ fn validate_weight_vote_hash(message: &WeightVoteMessage) -> Result<(), NetworkE
10721247
Ok(())
10731248
}
10741249

1250+
/// Verify a signature from a hotkey
1251+
fn verify_hotkey_signature(hotkey: &Hotkey, message: &[u8], signature: &[u8]) -> bool {
1252+
use platform_core::SignedMessage;
1253+
1254+
let signed = SignedMessage {
1255+
message: message.to_vec(),
1256+
signature: signature.to_vec(),
1257+
signer: hotkey.clone(),
1258+
};
1259+
1260+
signed.verify().unwrap_or(false)
1261+
}
1262+
10751263
/// Extract peer ID from multiaddr if present
10761264
fn extract_peer_id(addr: &Multiaddr) -> Option<PeerId> {
10771265
addr.iter().find_map(|p| {

0 commit comments

Comments
 (0)