Skip to content

Commit 0f4eb83

Browse files
author
EchoBT
committed
feat: add keypair to RpcHandler for webhook P2P broadcast
- Add keypair field to RpcHandler for signing TaskProgress messages - Add set_keypair() method to configure keypair from validator-node - Update webhook_progress_handler to sign and broadcast via P2P - Set keypair in validator-node main.rs after RPC server setup - Add warn import to server.rs for error logging
1 parent 5317b8d commit 0f4eb83

File tree

3 files changed

+150
-1
lines changed

3 files changed

+150
-1
lines changed

bins/validator-node/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,9 @@ async fn main() -> Result<()> {
589589
tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
590590
rpc_handler.set_broadcast_tx(rpc_broadcast_tx.clone());
591591

592+
// Set keypair for signing P2P messages (for webhook progress broadcasts)
593+
rpc_handler.set_keypair(keypair.clone());
594+
592595
// Also set the agent broadcast channel (used by route handler for /submit)
593596
*agent_broadcast_tx.write() = Some(rpc_broadcast_tx);
594597

crates/rpc-server/src/jsonrpc.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ pub struct RpcHandler {
187187
pub route_handler: Arc<RwLock<Option<ChallengeRouteHandler>>>,
188188
/// Channel to send signed messages for P2P broadcast
189189
pub broadcast_tx: Arc<RwLock<Option<tokio::sync::mpsc::UnboundedSender<Vec<u8>>>>>,
190+
/// Keypair for signing P2P messages (optional, set by validator)
191+
pub keypair: Arc<RwLock<Option<platform_core::Keypair>>>,
190192
}
191193

192194
impl RpcHandler {
@@ -201,9 +203,15 @@ impl RpcHandler {
201203
challenge_routes: Arc::new(RwLock::new(HashMap::new())),
202204
route_handler: Arc::new(RwLock::new(None)),
203205
broadcast_tx: Arc::new(RwLock::new(None)),
206+
keypair: Arc::new(RwLock::new(None)),
204207
}
205208
}
206209

210+
/// Set the keypair for signing P2P messages
211+
pub fn set_keypair(&self, keypair: platform_core::Keypair) {
212+
*self.keypair.write() = Some(keypair);
213+
}
214+
207215
/// Set the broadcast channel for P2P message sending
208216
pub fn set_broadcast_tx(&self, tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>) {
209217
*self.broadcast_tx.write() = Some(tx);

crates/rpc-server/src/server.rs

Lines changed: 139 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::net::SocketAddr;
1919
use std::sync::Arc;
2020
use tower_http::cors::{Any, CorsLayer};
2121
use tower_http::trace::TraceLayer;
22-
use tracing::{debug, info, trace};
22+
use tracing::{debug, info, trace, warn};
2323

2424
/// RPC server configuration
2525
#[derive(Clone, Debug)]
@@ -170,6 +170,14 @@ impl RpcServer {
170170
},
171171
)
172172
})
173+
// Webhook endpoint for progress callbacks from challenge containers
174+
.route("/webhook/progress", {
175+
let handler = rpc_handler.clone();
176+
post(move |body: Json<Value>| {
177+
let handler = handler.clone();
178+
async move { webhook_progress_handler(handler, body.0).await }
179+
})
180+
})
173181
.with_state(self.state.clone())
174182
.layer(TraceLayer::new_for_http());
175183

@@ -400,6 +408,136 @@ fn handle_single_request(body: Value, handler: &RpcHandler) -> (StatusCode, Json
400408
(status, Json(response))
401409
}
402410

411+
/// Handler for webhook progress callbacks from challenge containers
412+
/// Broadcasts TaskProgressMessage via P2P to other validators
413+
async fn webhook_progress_handler(handler: Arc<RpcHandler>, body: Value) -> impl IntoResponse {
414+
use platform_core::{Keypair, NetworkMessage, SignedNetworkMessage, TaskProgressMessage};
415+
416+
// Parse the progress data
417+
let msg_type = body.get("type").and_then(|v| v.as_str()).unwrap_or("");
418+
419+
match msg_type {
420+
"task_progress" => {
421+
// Extract task progress data
422+
let progress = TaskProgressMessage {
423+
challenge_id: body
424+
.get("challenge_id")
425+
.and_then(|v| v.as_str())
426+
.unwrap_or("")
427+
.to_string(),
428+
agent_hash: body
429+
.get("agent_hash")
430+
.and_then(|v| v.as_str())
431+
.unwrap_or("")
432+
.to_string(),
433+
evaluation_id: body
434+
.get("evaluation_id")
435+
.and_then(|v| v.as_str())
436+
.unwrap_or("")
437+
.to_string(),
438+
task_id: body
439+
.get("task_id")
440+
.and_then(|v| v.as_str())
441+
.unwrap_or("")
442+
.to_string(),
443+
task_index: body.get("task_index").and_then(|v| v.as_u64()).unwrap_or(0) as u32,
444+
total_tasks: body
445+
.get("total_tasks")
446+
.and_then(|v| v.as_u64())
447+
.unwrap_or(0) as u32,
448+
passed: body
449+
.get("passed")
450+
.and_then(|v| v.as_bool())
451+
.unwrap_or(false),
452+
score: body.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0),
453+
execution_time_ms: body
454+
.get("execution_time_ms")
455+
.and_then(|v| v.as_u64())
456+
.unwrap_or(0),
457+
cost_usd: body.get("cost_usd").and_then(|v| v.as_f64()).unwrap_or(0.0),
458+
error: body.get("error").and_then(|v| v.as_str()).map(String::from),
459+
validator_hotkey: body
460+
.get("validator_hotkey")
461+
.and_then(|v| v.as_str())
462+
.unwrap_or("")
463+
.to_string(),
464+
timestamp: chrono::Utc::now().timestamp() as u64,
465+
};
466+
467+
info!(
468+
"Webhook received task progress: [{}/{}] agent={} task={} passed={}",
469+
progress.task_index,
470+
progress.total_tasks,
471+
&progress.agent_hash[..16.min(progress.agent_hash.len())],
472+
progress.task_id,
473+
progress.passed
474+
);
475+
476+
// Broadcast via P2P if we have a broadcast channel and keypair
477+
let broadcast_tx = handler.broadcast_tx.read();
478+
let keypair = handler.keypair.read();
479+
480+
if let (Some(tx), Some(kp)) = (broadcast_tx.as_ref(), keypair.as_ref()) {
481+
let network_msg = NetworkMessage::TaskProgress(progress.clone());
482+
483+
match SignedNetworkMessage::new(network_msg, kp) {
484+
Ok(signed) => {
485+
if let Ok(bytes) = bincode::serialize(&signed) {
486+
if tx.send(bytes).is_ok() {
487+
debug!("TaskProgress broadcast via P2P: task={}", progress.task_id);
488+
}
489+
}
490+
}
491+
Err(e) => {
492+
warn!("Failed to sign TaskProgress message: {}", e);
493+
}
494+
}
495+
} else if broadcast_tx.is_none() {
496+
debug!("No broadcast channel available for TaskProgress");
497+
} else if keypair.is_none() {
498+
debug!("No keypair available for signing TaskProgress");
499+
}
500+
501+
(
502+
StatusCode::OK,
503+
Json(serde_json::json!({
504+
"success": true,
505+
"message": "Task progress received"
506+
})),
507+
)
508+
}
509+
"evaluation_complete" => {
510+
info!(
511+
"Webhook received evaluation complete: agent={} score={:.2}",
512+
body.get("agent_hash")
513+
.and_then(|v| v.as_str())
514+
.unwrap_or("unknown"),
515+
body.get("final_score")
516+
.and_then(|v| v.as_f64())
517+
.unwrap_or(0.0)
518+
);
519+
520+
(
521+
StatusCode::OK,
522+
Json(serde_json::json!({
523+
"success": true,
524+
"message": "Evaluation complete received"
525+
})),
526+
)
527+
}
528+
_ => {
529+
warn!("Unknown webhook type: {}", msg_type);
530+
(
531+
StatusCode::BAD_REQUEST,
532+
Json(serde_json::json!({
533+
"success": false,
534+
"error": format!("Unknown webhook type: {}", msg_type)
535+
})),
536+
)
537+
}
538+
}
539+
}
540+
403541
/// Quick helper to create and start a server
404542
pub async fn start_rpc_server(
405543
addr: &str,

0 commit comments

Comments
 (0)