Skip to content

Commit 6eec9d7

Browse files
feat(p2p): implement full libp2p swarm with gossipsub and kademlia
- Add CombinedBehaviour with NetworkBehaviour derive macro - Implement P2PNetwork::run() with complete event loop - Spawn network task in validator-node main - Set default P2P port to 8090 and hardcode bootstrap peers - Add crash loop prevention in entrypoint.sh (30s cooldown) - Fix entrypoint argument parsing with bash arrays Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
1 parent f7b3bec commit 6eec9d7

File tree

10 files changed

+595
-214
lines changed

10 files changed

+595
-214
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bins/validator-node/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,5 @@ sp-core = { workspace = true }
4545
uuid = { workspace = true }
4646
chrono = { workspace = true }
4747
bincode = { workspace = true }
48-
sha2 = { workspace = true }
48+
sha2 = { workspace = true }
49+
rand = { workspace = true }

bins/validator-node/src/main.rs

Lines changed: 159 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ struct Args {
181181
#[arg(long)]
182182
no_bittensor: bool,
183183

184+
/// Run as bootnode (read-only Bittensor access, no signing required)
185+
#[arg(long)]
186+
bootnode: bool,
187+
184188
/// Directory where WASM challenge modules are stored
185189
#[arg(long, env = "WASM_MODULE_DIR", default_value = "./wasm_modules")]
186190
wasm_module_dir: PathBuf,
@@ -212,6 +216,7 @@ impl std::fmt::Debug for Args {
212216
.field("netuid", &self.netuid)
213217
.field("version_key", &self.version_key)
214218
.field("no_bittensor", &self.no_bittensor)
219+
.field("bootnode", &self.bootnode)
215220
.field("wasm_module_dir", &self.wasm_module_dir)
216221
.field("wasm_max_memory", &self.wasm_max_memory)
217222
.field("wasm_enable_fuel", &self.wasm_enable_fuel)
@@ -296,6 +301,18 @@ async fn main() -> Result<()> {
296301
network.local_peer_id()
297302
);
298303

304+
// Create command channel for P2P network
305+
let (p2p_cmd_tx, p2p_cmd_rx) =
306+
tokio::sync::mpsc::channel::<platform_p2p_consensus::P2PCommand>(256);
307+
308+
// Spawn P2P network task
309+
let network_clone = network.clone();
310+
tokio::spawn(async move {
311+
if let Err(e) = network_clone.run(p2p_cmd_rx).await {
312+
error!("P2P network error: {}", e);
313+
}
314+
});
315+
299316
// Initialize consensus engine
300317
let consensus = Arc::new(RwLock::new(ConsensusEngine::new(
301318
keypair.clone(),
@@ -313,82 +330,122 @@ async fn main() -> Result<()> {
313330
if !args.no_bittensor {
314331
info!("Connecting to Bittensor: {}", args.subtensor_endpoint);
315332

316-
let state_path = data_dir.join("subtensor_state.json");
317-
match Subtensor::with_persistence(&args.subtensor_endpoint, state_path).await {
318-
Ok(st) => {
319-
let secret = args
320-
.secret_key
321-
.as_ref()
322-
.ok_or_else(|| anyhow::anyhow!("VALIDATOR_SECRET_KEY required"))?;
323-
324-
let signer = signer_from_seed(secret).map_err(|e| {
325-
anyhow::anyhow!(
326-
"Failed to create Bittensor signer from secret key: {}. \
327-
A valid signer is required for weight submission. \
328-
Use --no-bittensor flag if running without Bittensor.",
329-
e
330-
)
331-
})?;
332-
info!("Bittensor signer initialized: {}", signer.account_id());
333-
subtensor_signer = Some(Arc::new(signer));
334-
335-
subtensor = Some(Arc::new(st));
336-
337-
// Create SubtensorClient for metagraph
338-
let mut client = SubtensorClient::new(platform_bittensor::BittensorConfig {
339-
endpoint: args.subtensor_endpoint.clone(),
340-
netuid: args.netuid,
341-
..Default::default()
342-
});
343-
344-
let bittensor_client =
345-
Arc::new(BittensorClient::new(&args.subtensor_endpoint).await?);
346-
match sync_metagraph(&bittensor_client, args.netuid).await {
347-
Ok(mg) => {
348-
info!("Metagraph synced: {} neurons", mg.n);
349-
350-
// Update validator set from metagraph
351-
update_validator_set_from_metagraph(&mg, &validator_set);
352-
info!(
353-
"Validator set: {} active validators",
354-
validator_set.active_count()
355-
);
333+
if args.bootnode {
334+
// Bootnode mode: read-only access to metagraph, no signing required
335+
info!("Running in bootnode mode (read-only Bittensor access)");
336+
subtensor = None;
337+
subtensor_signer = None;
338+
339+
// Create SubtensorClient for metagraph only
340+
let mut client = SubtensorClient::new(platform_bittensor::BittensorConfig {
341+
endpoint: args.subtensor_endpoint.clone(),
342+
netuid: args.netuid,
343+
..Default::default()
344+
});
356345

357-
client.set_metagraph(mg);
346+
match BittensorClient::new(&args.subtensor_endpoint).await {
347+
Ok(bittensor_client) => {
348+
let bittensor_client = Arc::new(bittensor_client);
349+
match sync_metagraph(&bittensor_client, args.netuid).await {
350+
Ok(mg) => {
351+
info!("Metagraph synced: {} neurons", mg.n);
352+
update_validator_set_from_metagraph(&mg, &validator_set);
353+
info!(
354+
"Validator set: {} active validators",
355+
validator_set.active_count()
356+
);
357+
client.set_metagraph(mg);
358+
}
359+
Err(e) => warn!("Metagraph sync failed: {}", e),
358360
}
359-
Err(e) => warn!("Metagraph sync failed: {}", e),
361+
subtensor_client = Some(client);
362+
bittensor_client_for_metagraph = Some(bittensor_client);
360363
}
364+
Err(e) => {
365+
error!("Bittensor client connection failed: {}", e);
366+
subtensor_client = None;
367+
bittensor_client_for_metagraph = None;
368+
}
369+
}
370+
} else {
371+
// Full validator mode: requires signing key
372+
let state_path = data_dir.join("subtensor_state.json");
373+
match Subtensor::with_persistence(&args.subtensor_endpoint, state_path).await {
374+
Ok(st) => {
375+
let secret = args
376+
.secret_key
377+
.as_ref()
378+
.ok_or_else(|| anyhow::anyhow!("VALIDATOR_SECRET_KEY required"))?;
379+
380+
let signer = signer_from_seed(secret).map_err(|e| {
381+
anyhow::anyhow!(
382+
"Failed to create Bittensor signer from secret key: {}. \
383+
A valid signer is required for weight submission. \
384+
Use --no-bittensor flag if running without Bittensor.",
385+
e
386+
)
387+
})?;
388+
info!("Bittensor signer initialized: {}", signer.account_id());
389+
subtensor_signer = Some(Arc::new(signer));
390+
391+
subtensor = Some(Arc::new(st));
392+
393+
// Create SubtensorClient for metagraph
394+
let mut client = SubtensorClient::new(platform_bittensor::BittensorConfig {
395+
endpoint: args.subtensor_endpoint.clone(),
396+
netuid: args.netuid,
397+
..Default::default()
398+
});
361399

362-
subtensor_client = Some(client);
363-
364-
// Store bittensor client for metagraph refreshes
365-
bittensor_client_for_metagraph = Some(bittensor_client.clone());
400+
let bittensor_client =
401+
Arc::new(BittensorClient::new(&args.subtensor_endpoint).await?);
402+
match sync_metagraph(&bittensor_client, args.netuid).await {
403+
Ok(mg) => {
404+
info!("Metagraph synced: {} neurons", mg.n);
366405

367-
// Block sync
368-
let mut sync = BlockSync::new(BlockSyncConfig {
369-
netuid: args.netuid,
370-
..Default::default()
371-
});
372-
let rx = sync.take_event_receiver();
406+
// Update validator set from metagraph
407+
update_validator_set_from_metagraph(&mg, &validator_set);
408+
info!(
409+
"Validator set: {} active validators",
410+
validator_set.active_count()
411+
);
373412

374-
if let Err(e) = sync.connect(bittensor_client).await {
375-
warn!("Block sync failed: {}", e);
376-
} else {
377-
tokio::spawn(async move {
378-
if let Err(e) = sync.start().await {
379-
error!("Block sync error: {}", e);
413+
client.set_metagraph(mg);
380414
}
415+
Err(e) => warn!("Metagraph sync failed: {}", e),
416+
}
417+
418+
subtensor_client = Some(client);
419+
420+
// Store bittensor client for metagraph refreshes
421+
bittensor_client_for_metagraph = Some(bittensor_client.clone());
422+
423+
// Block sync
424+
let mut sync = BlockSync::new(BlockSyncConfig {
425+
netuid: args.netuid,
426+
..Default::default()
381427
});
382-
block_rx = rx;
383-
info!("Block sync started");
428+
let rx = sync.take_event_receiver();
429+
430+
if let Err(e) = sync.connect(bittensor_client).await {
431+
warn!("Block sync failed: {}", e);
432+
} else {
433+
tokio::spawn(async move {
434+
if let Err(e) = sync.start().await {
435+
error!("Block sync error: {}", e);
436+
}
437+
});
438+
block_rx = rx;
439+
info!("Block sync started");
440+
}
441+
}
442+
Err(e) => {
443+
error!("Subtensor connection failed: {}", e);
444+
subtensor = None;
445+
subtensor_signer = None;
446+
subtensor_client = None;
447+
bittensor_client_for_metagraph = None;
384448
}
385-
}
386-
Err(e) => {
387-
error!("Subtensor connection failed: {}", e);
388-
subtensor = None;
389-
subtensor_signer = None;
390-
subtensor_client = None;
391-
bittensor_client_for_metagraph = None;
392449
}
393450
}
394451
} else {
@@ -465,7 +522,8 @@ async fn main() -> Result<()> {
465522
let mut wasm_eval_interval = tokio::time::interval(Duration::from_secs(5));
466523
let mut stale_job_interval = tokio::time::interval(Duration::from_secs(120));
467524

468-
let (eval_broadcast_tx, mut eval_broadcast_rx) = tokio::sync::mpsc::channel::<P2PMessage>(256);
525+
// Clone p2p_cmd_tx for use in the loop
526+
let p2p_broadcast_tx = p2p_cmd_tx.clone();
469527

470528
loop {
471529
tokio::select! {
@@ -480,16 +538,6 @@ async fn main() -> Result<()> {
480538
).await;
481539
}
482540

483-
// Outbound evaluation broadcasts
484-
Some(msg) = eval_broadcast_rx.recv() => {
485-
if let Err(e) = event_tx.send(NetworkEvent::Message {
486-
source: network.local_peer_id(),
487-
message: msg,
488-
}).await {
489-
warn!("Failed to forward evaluation broadcast: {}", e);
490-
}
491-
}
492-
493541
// Bittensor block events
494542
Some(event) = async {
495543
match block_rx.as_mut() {
@@ -561,7 +609,7 @@ async fn main() -> Result<()> {
561609
executor,
562610
&state_manager,
563611
&keypair,
564-
&eval_broadcast_tx,
612+
&p2p_broadcast_tx,
565613
).await;
566614
}
567615
}
@@ -607,25 +655,36 @@ async fn main() -> Result<()> {
607655
}
608656

609657
fn load_keypair(args: &Args) -> Result<Keypair> {
610-
let secret = args
611-
.secret_key
612-
.as_ref()
613-
.ok_or_else(|| anyhow::anyhow!("VALIDATOR_SECRET_KEY required"))?
614-
.trim();
615-
616-
let hex = secret.strip_prefix("0x").unwrap_or(secret);
617-
618-
if hex.len() == 64 {
619-
if let Ok(bytes) = hex::decode(hex) {
620-
if bytes.len() == 32 {
621-
let mut arr = [0u8; 32];
622-
arr.copy_from_slice(&bytes);
623-
return Ok(Keypair::from_seed(&arr)?);
658+
match args.secret_key.as_ref() {
659+
Some(secret) => {
660+
let secret = secret.trim();
661+
let hex = secret.strip_prefix("0x").unwrap_or(secret);
662+
663+
if hex.len() == 64 {
664+
if let Ok(bytes) = hex::decode(hex) {
665+
if bytes.len() == 32 {
666+
let mut arr = [0u8; 32];
667+
arr.copy_from_slice(&bytes);
668+
return Ok(Keypair::from_seed(&arr)?);
669+
}
670+
}
671+
}
672+
673+
Ok(Keypair::from_mnemonic(secret)?)
674+
}
675+
None => {
676+
if args.bootnode {
677+
// Bootnode mode without secret key - generate random keypair
678+
// Note: This means the PeerId will change on each restart.
679+
// For a stable PeerId, provide BOOTNODE_SECRET_KEY.
680+
warn!("No secret key provided in bootnode mode, generating random keypair (PeerId will change on restart)");
681+
let seed: [u8; 32] = rand::random();
682+
Ok(Keypair::from_seed(&seed)?)
683+
} else {
684+
Err(anyhow::anyhow!("VALIDATOR_SECRET_KEY required"))
624685
}
625686
}
626687
}
627-
628-
Ok(Keypair::from_mnemonic(secret)?)
629688
}
630689

631690
/// Load persisted state from distributed storage
@@ -1250,7 +1309,7 @@ async fn process_wasm_evaluations(
12501309
executor: &Arc<WasmChallengeExecutor>,
12511310
state_manager: &Arc<StateManager>,
12521311
keypair: &Keypair,
1253-
eval_broadcast_tx: &tokio::sync::mpsc::Sender<P2PMessage>,
1312+
p2p_cmd_tx: &tokio::sync::mpsc::Sender<platform_p2p_consensus::P2PCommand>,
12541313
) {
12551314
let pending: Vec<(String, ChallengeId, String)> = state_manager.read(|state| {
12561315
state
@@ -1432,7 +1491,10 @@ async fn process_wasm_evaluations(
14321491
signature,
14331492
timestamp,
14341493
});
1435-
if let Err(e) = eval_broadcast_tx.send(eval_msg).await {
1494+
if let Err(e) = p2p_cmd_tx
1495+
.send(platform_p2p_consensus::P2PCommand::Broadcast(eval_msg))
1496+
.await
1497+
{
14361498
warn!(
14371499
submission_id = %submission_id,
14381500
error = %e,

crates/p2p-consensus/src/config.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
1313
/// Configure your bootstrap peers via the BOOTSTRAP_PEERS environment variable
1414
/// or add them here for your deployment.
1515
pub const DEFAULT_BOOTSTRAP_NODES: &[&str] = &[
16-
// Add your bootstrap peers here
16+
"/dns4/bootnode.platform.network/tcp/8090/p2p/12D3KooWSpDLH6kBTHuHhJCmS4vZMhuTZa1T35qJYd9NgSVdzGnP",
1717
];
1818

1919
/// P2P network configuration
@@ -43,11 +43,17 @@ pub struct P2PConfig {
4343
pub is_bootnode: bool,
4444
}
4545

46+
/// Default P2P port
47+
pub const DEFAULT_P2P_PORT: u16 = 8090;
48+
4649
impl Default for P2PConfig {
4750
fn default() -> Self {
4851
Self {
49-
listen_addrs: vec!["/ip4/0.0.0.0/tcp/8090".to_string()],
50-
bootstrap_peers: vec![],
52+
listen_addrs: vec![format!("/ip4/0.0.0.0/tcp/{}", DEFAULT_P2P_PORT)],
53+
bootstrap_peers: DEFAULT_BOOTSTRAP_NODES
54+
.iter()
55+
.map(|s| s.to_string())
56+
.collect(),
5157
consensus_topic: "platform/consensus/1.0.0".to_string(),
5258
challenge_topic: "platform/challenge/1.0.0".to_string(),
5359
netuid: 100,
@@ -113,8 +119,8 @@ impl P2PConfig {
113119
pub fn production() -> Self {
114120
Self {
115121
listen_addrs: vec![
116-
"/ip4/0.0.0.0/tcp/8090".to_string(),
117-
"/ip6/::/tcp/8090".to_string(),
122+
format!("/ip4/0.0.0.0/tcp/{}", DEFAULT_P2P_PORT),
123+
format!("/ip6/::/tcp/{}", DEFAULT_P2P_PORT),
118124
],
119125
bootstrap_peers: DEFAULT_BOOTSTRAP_NODES
120126
.iter()

0 commit comments

Comments
 (0)