Skip to content

Commit f3d52d0

Browse files
committed
feat(validator-node): handle ChallengeStopped WebSocket event
When platform-server broadcasts challenge_stopped event via WebSocket, validator-node now: - Receives the event - Stops the local challenge container via orchestrator - Removes the challenge from the URL map This allows csudo stop challenge to propagate to all connected validators.
1 parent 54057fd commit f3d52d0

File tree

1 file changed

+61
-2
lines changed

1 file changed

+61
-2
lines changed

bins/validator-node/src/main.rs

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,27 @@ pub struct ChallengeInfo {
198198
pub enum WsEvent {
199199
#[serde(rename = "challenge_event")]
200200
ChallengeEvent(ChallengeCustomEvent),
201+
#[serde(rename = "challenge_stopped")]
202+
ChallengeStopped(ChallengeStoppedEvent),
203+
#[serde(rename = "challenge_started")]
204+
ChallengeStarted(ChallengeStartedEvent),
201205
#[serde(rename = "ping")]
202206
Ping,
203207
#[serde(other)]
204208
Other,
205209
}
206210

211+
#[derive(Debug, Clone, serde::Deserialize)]
212+
pub struct ChallengeStoppedEvent {
213+
pub id: String,
214+
}
215+
216+
#[derive(Debug, Clone, serde::Deserialize)]
217+
pub struct ChallengeStartedEvent {
218+
pub id: String,
219+
pub endpoint: String,
220+
}
221+
207222
/// Custom event from a challenge
208223
#[derive(Debug, Clone, serde::Deserialize)]
209224
pub struct ChallengeCustomEvent {
@@ -416,11 +431,19 @@ async fn main() -> Result<()> {
416431

417432
// Start WebSocket listener for platform-server events
418433
// This listens for new_submission events and triggers local evaluation
434+
// Also handles challenge_stopped events to stop local containers
419435
let ws_platform_url = args.platform_server.clone();
420436
let ws_validator_hotkey = keypair.ss58_address();
421437
let ws_challenge_urls = challenge_urls.clone();
438+
let ws_orchestrator = orchestrator.clone();
422439
tokio::spawn(async move {
423-
start_websocket_listener(ws_platform_url, ws_validator_hotkey, ws_challenge_urls).await;
440+
start_websocket_listener(
441+
ws_platform_url,
442+
ws_validator_hotkey,
443+
ws_challenge_urls,
444+
ws_orchestrator,
445+
)
446+
.await;
424447
});
425448

426449
// RPC server
@@ -717,10 +740,12 @@ async fn handle_block_event(
717740

718741
/// Start WebSocket listener for platform-server events
719742
/// Listens for challenge events and triggers evaluations
743+
/// Also handles challenge_stopped events to stop local containers
720744
pub async fn start_websocket_listener(
721745
platform_url: String,
722746
validator_hotkey: String,
723747
challenge_urls: Arc<RwLock<HashMap<String, String>>>,
748+
orchestrator: Option<Arc<ChallengeOrchestrator>>,
724749
) {
725750
// Convert HTTP URL to WebSocket URL
726751
let ws_url = platform_url
@@ -731,7 +756,14 @@ pub async fn start_websocket_listener(
731756
info!("Starting WebSocket listener: {}", ws_url);
732757

733758
loop {
734-
match connect_to_websocket(&ws_url, &validator_hotkey, challenge_urls.clone()).await {
759+
match connect_to_websocket(
760+
&ws_url,
761+
&validator_hotkey,
762+
challenge_urls.clone(),
763+
orchestrator.clone(),
764+
)
765+
.await
766+
{
735767
Ok(()) => {
736768
info!("WebSocket connection closed, reconnecting in 5s...");
737769
}
@@ -749,6 +781,7 @@ async fn connect_to_websocket(
749781
ws_url: &str,
750782
validator_hotkey: &str,
751783
challenge_urls: Arc<RwLock<HashMap<String, String>>>,
784+
orchestrator: Option<Arc<ChallengeOrchestrator>>,
752785
) -> Result<()> {
753786
let (ws_stream, _) = connect_async(ws_url).await?;
754787
let (mut write, mut read) = ws_stream.split();
@@ -773,6 +806,32 @@ async fn connect_to_websocket(
773806
Ok(WsEvent::ChallengeEvent(event)) => {
774807
handle_challenge_event(event, validator_hotkey, challenge_urls.clone()).await;
775808
}
809+
Ok(WsEvent::ChallengeStopped(event)) => {
810+
info!("Received challenge_stopped event for: {}", event.id);
811+
if let Some(ref orch) = orchestrator {
812+
// Get the ChallengeId from challenge name
813+
let challenge_uuid =
814+
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_DNS, event.id.as_bytes());
815+
let challenge_id = platform_core::ChallengeId(challenge_uuid);
816+
match orch.remove_challenge(challenge_id).await {
817+
Ok(_) => info!("Challenge container stopped: {}", event.id),
818+
Err(e) => {
819+
warn!("Failed to stop challenge container {}: {}", event.id, e)
820+
}
821+
}
822+
// Remove from URL map
823+
challenge_urls.write().remove(&event.id);
824+
} else {
825+
warn!("No orchestrator available to stop challenge: {}", event.id);
826+
}
827+
}
828+
Ok(WsEvent::ChallengeStarted(event)) => {
829+
info!(
830+
"Received challenge_started event for: {} at {}",
831+
event.id, event.endpoint
832+
);
833+
// Could auto-start container here if needed
834+
}
776835
Ok(WsEvent::Ping) => {
777836
debug!("Received ping from server");
778837
}

0 commit comments

Comments
 (0)