Skip to content

Commit 8abeda7

Browse files
author
EchoBT
committed
feat: add P2P message forwarding to challenge containers
- Forward ChallengeMessage to containers via /p2p/message - Forward AgentSubmission with source code to local container - Add periodic outbox polling task (5s interval) - Broadcast container P2P messages to network
1 parent 106681f commit 8abeda7

File tree

1 file changed

+219
-10
lines changed

1 file changed

+219
-10
lines changed

bins/validator-node/src/main.rs

Lines changed: 219 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ use platform_bittensor::{
1414
use platform_challenge_runtime::{ChallengeRuntime, RuntimeConfig, RuntimeEvent};
1515
use platform_consensus::PBFTEngine;
1616
use platform_core::{
17-
production_sudo_key, ChainState, ChallengeContainerConfig, Hotkey, Keypair, NetworkConfig,
18-
NetworkMessage, SignedNetworkMessage, Stake, SudoAction, ValidatorInfo, SUDO_KEY_SS58,
17+
production_sudo_key, ChainState, ChallengeContainerConfig, ChallengeMessageType,
18+
ChallengeNetworkMessage, Hotkey, Keypair, NetworkConfig, NetworkMessage, SignedNetworkMessage,
19+
Stake, SudoAction, ValidatorInfo, SUDO_KEY_SS58,
1920
};
2021
use platform_epoch::{EpochConfig, EpochPhase, EpochTransition};
2122
use platform_network::{
@@ -1098,6 +1099,97 @@ async fn main() -> Result<()> {
10981099
});
10991100
}
11001101

1102+
// Spawn P2P outbox polling task for challenge containers
1103+
// This polls each container's /p2p/outbox endpoint and broadcasts messages to the network
1104+
if let Some(ref _orch) = challenge_orchestrator {
1105+
let state_for_outbox = chain_state.clone();
1106+
let net_cmd_tx_for_outbox = net_cmd_tx.clone();
1107+
let keypair_for_outbox = keypair.clone();
1108+
1109+
tokio::spawn(async move {
1110+
let client = reqwest::Client::new();
1111+
let poll_interval = std::time::Duration::from_secs(5);
1112+
1113+
loop {
1114+
tokio::time::sleep(poll_interval).await;
1115+
1116+
// Get all challenge containers
1117+
let configs: Vec<ChallengeContainerConfig> = {
1118+
let state = state_for_outbox.read();
1119+
state.challenge_configs.values().cloned().collect()
1120+
};
1121+
1122+
for config in configs {
1123+
let container_name = config.name.to_lowercase().replace([' ', '_'], "-");
1124+
let outbox_url = format!("http://challenge-{}:8080/p2p/outbox", container_name);
1125+
1126+
match client.get(&outbox_url).send().await {
1127+
Ok(resp) if resp.status().is_success() => {
1128+
if let Ok(outbox) = resp.json::<serde_json::Value>().await {
1129+
if let Some(messages) =
1130+
outbox.get("messages").and_then(|m| m.as_array())
1131+
{
1132+
for msg_value in messages {
1133+
// Parse the outbox message
1134+
if let (Some(message), target) = (
1135+
msg_value.get("message"),
1136+
msg_value.get("target").and_then(|t| t.as_str()),
1137+
) {
1138+
// Create ChallengeNetworkMessage to broadcast
1139+
let challenge_msg = ChallengeNetworkMessage {
1140+
challenge_id: config.name.clone(),
1141+
message_type: ChallengeMessageType::Custom(
1142+
"p2p_bridge".to_string(),
1143+
),
1144+
payload: serde_json::to_vec(message)
1145+
.unwrap_or_default(),
1146+
};
1147+
1148+
let network_msg =
1149+
NetworkMessage::ChallengeMessage(challenge_msg);
1150+
if let Ok(signed) = SignedNetworkMessage::new(
1151+
network_msg,
1152+
&keypair_for_outbox,
1153+
) {
1154+
if target.is_some() {
1155+
// TODO: Implement targeted send to specific validator
1156+
debug!("Targeted P2P send not yet implemented");
1157+
} else {
1158+
// Broadcast to all
1159+
if let Err(e) = net_cmd_tx_for_outbox
1160+
.send(NetworkCommand::Broadcast(signed))
1161+
.await
1162+
{
1163+
debug!("Failed to broadcast outbox message: {}", e);
1164+
}
1165+
}
1166+
}
1167+
}
1168+
}
1169+
1170+
let count = messages.len();
1171+
if count > 0 {
1172+
debug!(
1173+
"Broadcast {} P2P messages from container '{}'",
1174+
count, config.name
1175+
);
1176+
}
1177+
}
1178+
}
1179+
}
1180+
Ok(_) => {
1181+
// Container might not be ready yet, silently ignore
1182+
}
1183+
Err(_) => {
1184+
// Container might not be running, silently ignore
1185+
}
1186+
}
1187+
}
1188+
}
1189+
});
1190+
info!("P2P outbox polling task started (interval: 5s)");
1191+
}
1192+
11011193
// Main event loop
11021194
info!("Validator node running. Press Ctrl+C to stop.");
11031195

@@ -1979,9 +2071,77 @@ async fn handle_message(
19792071
"Challenge message from {:?}: challenge={}, type={:?}",
19802072
signer, challenge_msg.challenge_id, challenge_msg.message_type
19812073
);
1982-
// Challenge messages are handled by the challenge runtime/containers
1983-
// The validator just routes them. For now, log and ignore.
1984-
// In production, this would be forwarded to the challenge container via HTTP
2074+
2075+
// Forward challenge message to the appropriate container via HTTP
2076+
let challenge_id = challenge_msg.challenge_id.clone();
2077+
let container_name = challenge_id.to_lowercase().replace([' ', '_'], "-");
2078+
let from_hotkey = signer.to_hex();
2079+
let msg_payload = challenge_msg.payload.clone();
2080+
2081+
tokio::spawn(async move {
2082+
let client = reqwest::Client::new();
2083+
let p2p_endpoint = format!("http://challenge-{}:8080/p2p/message", container_name);
2084+
2085+
// Convert ChallengeMessageType to ChallengeP2PMessage for the container
2086+
let p2p_message = match challenge_msg.message_type {
2087+
platform_core::ChallengeMessageType::EvaluationResult => {
2088+
// Parse payload as evaluation result
2089+
if let Ok(eval) = serde_json::from_slice::<
2090+
platform_challenge_sdk::EvaluationResultMessage,
2091+
>(&msg_payload)
2092+
{
2093+
Some(
2094+
platform_challenge_sdk::ChallengeP2PMessage::EvaluationResult(eval),
2095+
)
2096+
} else {
2097+
warn!("Failed to parse EvaluationResult payload");
2098+
None
2099+
}
2100+
}
2101+
platform_core::ChallengeMessageType::WeightResult => {
2102+
if let Ok(weights) = serde_json::from_slice::<
2103+
platform_challenge_sdk::WeightResultMessage,
2104+
>(&msg_payload)
2105+
{
2106+
Some(platform_challenge_sdk::ChallengeP2PMessage::WeightResult(
2107+
weights,
2108+
))
2109+
} else {
2110+
warn!("Failed to parse WeightResult payload");
2111+
None
2112+
}
2113+
}
2114+
_ => {
2115+
debug!(
2116+
"Unhandled challenge message type: {:?}",
2117+
challenge_msg.message_type
2118+
);
2119+
None
2120+
}
2121+
};
2122+
2123+
if let Some(message) = p2p_message {
2124+
let req_body = serde_json::json!({
2125+
"from_hotkey": from_hotkey,
2126+
"message": message
2127+
});
2128+
2129+
match client.post(&p2p_endpoint).json(&req_body).send().await {
2130+
Ok(resp) if resp.status().is_success() => {
2131+
debug!(
2132+
"Forwarded challenge message to container {}",
2133+
container_name
2134+
);
2135+
}
2136+
Ok(resp) => {
2137+
debug!("Container {} returned {}", container_name, resp.status());
2138+
}
2139+
Err(e) => {
2140+
debug!("Failed to forward to container {}: {}", container_name, e);
2141+
}
2142+
}
2143+
}
2144+
});
19852145
}
19862146
NetworkMessage::AgentSubmission(submission) => {
19872147
info!(
@@ -2019,13 +2179,15 @@ async fn handle_message(
20192179
};
20202180

20212181
if let Some(config) = challenge_config {
2022-
let container_name = config.name.to_lowercase().replace(' ', "-");
2182+
let container_name = config.name.to_lowercase().replace([' ', '_'], "-");
20232183
let agent_hash = submission.agent_hash.clone();
20242184
let agent_hash_for_log = agent_hash.clone();
20252185
let challenge_name = submission.challenge_id.clone();
20262186
let miner = submission.miner_hotkey.clone();
2187+
let miner_for_log = miner.clone();
2188+
let source_code = submission.source_code.clone();
2189+
let miner_signature = submission.miner_signature.clone();
20272190
let obfuscated_hash = submission.obfuscated_hash.clone().unwrap_or_else(|| {
2028-
// Generate obfuscated hash from agent hash if not provided
20292191
use sha2::{Digest, Sha256};
20302192
let mut hasher = Sha256::new();
20312193
hasher.update(agent_hash.as_bytes());
@@ -2034,13 +2196,60 @@ async fn handle_message(
20342196
});
20352197
let validator_hotkey = signer.to_hex();
20362198

2037-
// Sign consensus for this agent (allows evaluation to proceed)
2199+
// Forward agent submission to container and sign consensus
20382200
tokio::spawn(async move {
20392201
let client = reqwest::Client::new();
2202+
2203+
// Step 1: If we have source code, submit the agent to our local container
2204+
// This ensures the container has the agent registered for evaluation
2205+
if let Some(ref code) = source_code {
2206+
let submit_endpoint =
2207+
format!("http://challenge-{}:8080/submit", container_name);
2208+
2209+
let submit_payload = serde_json::json!({
2210+
"source_code": code,
2211+
"miner_hotkey": miner,
2212+
"signature": hex::encode(&miner_signature),
2213+
"stake": 1000000000000u64,
2214+
"from_p2p": true,
2215+
});
2216+
2217+
match client
2218+
.post(&submit_endpoint)
2219+
.json(&submit_payload)
2220+
.send()
2221+
.await
2222+
{
2223+
Ok(resp) if resp.status().is_success() => {
2224+
info!(
2225+
"Agent {} registered in local container via P2P",
2226+
&agent_hash[..16.min(agent_hash.len())]
2227+
);
2228+
}
2229+
Ok(resp) => {
2230+
// 409 Conflict means agent already exists, which is fine
2231+
if resp.status().as_u16() != 409 {
2232+
debug!(
2233+
"Submit to container returned {}: agent {}",
2234+
resp.status(),
2235+
&agent_hash[..16.min(agent_hash.len())]
2236+
);
2237+
}
2238+
}
2239+
Err(e) => {
2240+
warn!(
2241+
"Failed to submit agent {} to local container: {}",
2242+
&agent_hash[..16.min(agent_hash.len())],
2243+
e
2244+
);
2245+
}
2246+
}
2247+
}
2248+
2249+
// Step 2: Sign consensus for this agent (allows evaluation to proceed)
20402250
let consensus_endpoint =
20412251
format!("http://challenge-{}:8080/consensus/sign", container_name);
20422252

2043-
// Sign the consensus
20442253
let sign_payload = serde_json::json!({
20452254
"agent_hash": agent_hash,
20462255
"validator_hotkey": validator_hotkey,
@@ -2089,7 +2298,7 @@ async fn handle_message(
20892298
"Agent {} received via P2P (challenge: {}, miner: {})",
20902299
&agent_hash_for_log[..16.min(agent_hash_for_log.len())],
20912300
challenge_name,
2092-
miner
2301+
miner_for_log
20932302
);
20942303
} else {
20952304
warn!(

0 commit comments

Comments
 (0)