From 8c3623975b99be57fa43ea385640ea1c8cf252d8 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Wed, 21 Jan 2026 06:41:33 -0800 Subject: [PATCH 01/17] run-manager: wip automatic run selection --- Cargo.lock | 1 + tools/rust-tools/run-manager/Cargo.toml | 1 + .../src/docker/coordinator_client.rs | 76 ++++++++++++++++++- .../run-manager/src/docker/manager.rs | 32 +++++++- .../rust-tools/run-manager/src/docker/mod.rs | 1 + 5 files changed, 107 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 24ee94226..866adf6e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8112,6 +8112,7 @@ dependencies = [ "serde", "serde_json", "serial_test", + "solana-account-decoder-client-types", "solana-client", "time", "tokio", diff --git a/tools/rust-tools/run-manager/Cargo.toml b/tools/rust-tools/run-manager/Cargo.toml index e47a1931f..d6b133134 100644 --- a/tools/rust-tools/run-manager/Cargo.toml +++ b/tools/rust-tools/run-manager/Cargo.toml @@ -35,6 +35,7 @@ rand.workspace = true rand_chacha.workspace = true time.workspace = true solana-client = "=2.1.4" +solana-account-decoder-client-types = "=2.1.4" [dev-dependencies] serial_test = "3.0" diff --git a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs index 729a38786..c2322708a 100644 --- a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs +++ b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs @@ -1,16 +1,28 @@ use anchor_client::solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; -use anchor_lang::AccountDeserialize; +use anchor_lang::{AccountDeserialize, Discriminator, Space}; use anyhow::{Context, Result}; +use psyche_coordinator::RunState; use psyche_solana_coordinator::{ CoordinatorInstance, coordinator_account_from_bytes, find_coordinator_instance, }; +use solana_account_decoder_client_types::UiAccountEncoding; use solana_client::rpc_client::RpcClient; +use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; +use solana_client::rpc_filter::RpcFilterType; use tracing::info; +/// Information about a discovered run +#[derive(Debug, Clone)] +pub struct RunInfo { + pub run_id: String, + pub instance_pubkey: Pubkey, + pub coordinator_account: Pubkey, + pub run_state: RunState, +} + /// Coordinator client for querying Solana pub struct CoordinatorClient { rpc_client: RpcClient, - #[allow(dead_code)] program_id: Pubkey, } @@ -82,4 +94,64 @@ impl CoordinatorClient { Ok(image_name) } + + /// Fetch all available runs from the coordinator program + pub fn get_all_runs(&self) -> Result> { + // Get Anchor discriminator for CoordinatorInstance (first 8 bytes) + let discriminator = CoordinatorInstance::DISCRIMINATOR; + + // Fetch all accounts and filter client-side by discriminator + // (getProgramAccounts with memcmp filter has encoding issues on some RPC nodes) + let accounts = self + .rpc_client + .get_program_accounts_with_config( + &self.program_id, + RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::DataSize( + CoordinatorInstance::INIT_SPACE as u64 + 8, // +8 for discriminator + )]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig::confirmed()), + ..Default::default() + }, + ..Default::default() + }, + ) + .map_err(|e| { + anyhow::anyhow!( + "Failed to fetch program accounts from coordinator program {}: {}", + self.program_id, + e + ) + })?; + + let mut runs = Vec::new(); + for (pubkey, account) in accounts { + // Check discriminator matches CoordinatorInstance + if account.data.len() < 8 || &account.data[..8] != discriminator { + continue; + } + + if let Ok(instance) = CoordinatorInstance::try_deserialize(&mut account.data.as_slice()) + { + // Fetch run state from coordinator account + let run_state = match self.rpc_client.get_account(&instance.coordinator_account) { + Ok(coord_account) => coordinator_account_from_bytes(&coord_account.data) + .map(|acc| acc.state.coordinator.run_state) + .unwrap_or(RunState::Uninitialized), + Err(_) => RunState::Uninitialized, + }; + + runs.push(RunInfo { + run_id: instance.run_id.clone(), + instance_pubkey: pubkey, + coordinator_account: instance.coordinator_account, + run_state, + }); + } + } + + Ok(runs) + } } diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index cf20f480a..cbe3178b5 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -64,13 +64,41 @@ impl RunManager { info!("Using coordinator program ID: {}", coordinator_program_id); - let run_id = get_env_var("RUN_ID")?; let rpc = get_env_var("RPC")?; - let scratch_dir = std::env::var("SCRATCH_DIR").ok(); let coordinator_client = CoordinatorClient::new(rpc, coordinator_program_id); + // Try to get RUN_ID from env, or discover available runs + let run_id = match std::env::var("RUN_ID") { + Ok(id) => { + info!("Using RUN_ID from environment: {}", id); + id + } + Err(_) => { + info!("RUN_ID not set, discovering available runs..."); + let runs = coordinator_client.get_all_runs()?; + + if runs.is_empty() { + bail!("No runs found on coordinator program"); + } + + // Log all discovered runs + info!("Discovered {} run(s):", runs.len()); + for run in &runs { + info!(" - {} (state: {})", run.run_id, run.run_state); + } + + // Select first available run + let selected = &runs[0]; + info!( + "Selected run: {} (state: {})", + selected.run_id, selected.run_state + ); + selected.run_id.clone() + } + }; + Ok(Self { wallet_key, run_id, diff --git a/tools/rust-tools/run-manager/src/docker/mod.rs b/tools/rust-tools/run-manager/src/docker/mod.rs index b3221d7eb..41f2aae7e 100644 --- a/tools/rust-tools/run-manager/src/docker/mod.rs +++ b/tools/rust-tools/run-manager/src/docker/mod.rs @@ -2,4 +2,5 @@ pub mod coordinator_client; pub mod manager; // Re-exports +pub use coordinator_client::RunInfo; pub use manager::{Entrypoint, RunManager}; From 02f03a84ef503ae47a63630d6d843dd2787759e6 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Thu, 22 Jan 2026 11:25:45 -0800 Subject: [PATCH 02/17] run-manager: filter halted runs from joining --- .../run-manager/src/docker/manager.rs | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index cbe3178b5..568bd7a5c 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -7,9 +7,11 @@ use std::process::{Command, Stdio}; use tokio::signal; use tracing::{error, info, warn}; +use crate::docker::RunInfo; use crate::docker::coordinator_client::CoordinatorClient; use crate::get_env_var; use crate::load_and_apply_env_file; +use psyche_coordinator::RunState; const RETRY_DELAY_SECS: u64 = 5; const VERSION_MISMATCH_EXIT_CODE: i32 = 10; @@ -78,7 +80,6 @@ impl RunManager { Err(_) => { info!("RUN_ID not set, discovering available runs..."); let runs = coordinator_client.get_all_runs()?; - if runs.is_empty() { bail!("No runs found on coordinator program"); } @@ -89,8 +90,7 @@ impl RunManager { info!(" - {} (state: {})", run.run_id, run.run_state); } - // Select first available run - let selected = &runs[0]; + let selected = select_best_run(&runs)?; info!( "Selected run: {} (state: {})", selected.run_id, selected.run_state @@ -348,3 +348,27 @@ impl RunManager { } } } + +fn select_best_run(runs: &[RunInfo]) -> Result<&RunInfo> { + // Avoid joining runs that are halted + let joinable: Vec<_> = runs + .iter() + .filter(|run| { + !matches!( + run.run_state, + RunState::Uninitialized | RunState::Finished | RunState::Paused + ) + }) + .collect(); + + if joinable.is_empty() { + bail!( + "No joinable runs found. All {} run(s) are in unjoinable states.", + runs.len() + ); + } + + // For now let's just return the first joinable run, later we can define + // a more sophisticated priority system if needed. + Ok(joinable[0]) +} From 308a0fa55acbcb4f321644246165814f50086ed5 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Thu, 22 Jan 2026 11:43:56 -0800 Subject: [PATCH 03/17] run-manager: more refactor --- .../src/docker/coordinator_client.rs | 73 ++++++++++--------- .../run-manager/src/docker/manager.rs | 52 +++++++------ 2 files changed, 67 insertions(+), 58 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs index c2322708a..bed5839e6 100644 --- a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs +++ b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs @@ -1,5 +1,5 @@ use anchor_client::solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; -use anchor_lang::{AccountDeserialize, Discriminator, Space}; +use anchor_lang::AccountDeserialize; use anyhow::{Context, Result}; use psyche_coordinator::RunState; use psyche_solana_coordinator::{ @@ -8,10 +8,8 @@ use psyche_solana_coordinator::{ use solana_account_decoder_client_types::UiAccountEncoding; use solana_client::rpc_client::RpcClient; use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; -use solana_client::rpc_filter::RpcFilterType; -use tracing::info; +use tracing::{info, warn}; -/// Information about a discovered run #[derive(Debug, Clone)] pub struct RunInfo { pub run_id: String, @@ -52,6 +50,20 @@ impl CoordinatorClient { Ok(instance) } + fn fetch_run_state(&self, coordinator_account: &Pubkey) -> Result { + // Fetch the raw Solana account data from the blockchain + let solana_account = self + .rpc_client + .get_account(coordinator_account) + .with_context(|| format!("Failed to fetch coordinator account {}", coordinator_account))?; + + // Deserialize the account data into a CoordinatorAccount struct + let coordinator = coordinator_account_from_bytes(&solana_account.data) + .with_context(|| format!("Failed to deserialize coordinator account {}", coordinator_account))?; + + Ok(coordinator.state.coordinator.run_state) + } + pub fn get_docker_tag_for_run(&self, run_id: &str, local_docker: bool) -> Result { info!("Querying coordinator for Run ID: {}", run_id); @@ -95,21 +107,13 @@ impl CoordinatorClient { Ok(image_name) } - /// Fetch all available runs from the coordinator program pub fn get_all_runs(&self) -> Result> { - // Get Anchor discriminator for CoordinatorInstance (first 8 bytes) - let discriminator = CoordinatorInstance::DISCRIMINATOR; - - // Fetch all accounts and filter client-side by discriminator - // (getProgramAccounts with memcmp filter has encoding issues on some RPC nodes) + // Fetch all CoordinatorInstance accounts that are owned by the program let accounts = self .rpc_client .get_program_accounts_with_config( &self.program_id, RpcProgramAccountsConfig { - filters: Some(vec![RpcFilterType::DataSize( - CoordinatorInstance::INIT_SPACE as u64 + 8, // +8 for discriminator - )]), account_config: RpcAccountInfoConfig { encoding: Some(UiAccountEncoding::Base64), commitment: Some(CommitmentConfig::confirmed()), @@ -128,27 +132,28 @@ impl CoordinatorClient { let mut runs = Vec::new(); for (pubkey, account) in accounts { - // Check discriminator matches CoordinatorInstance - if account.data.len() < 8 || &account.data[..8] != discriminator { - continue; - } - - if let Ok(instance) = CoordinatorInstance::try_deserialize(&mut account.data.as_slice()) - { - // Fetch run state from coordinator account - let run_state = match self.rpc_client.get_account(&instance.coordinator_account) { - Ok(coord_account) => coordinator_account_from_bytes(&coord_account.data) - .map(|acc| acc.state.coordinator.run_state) - .unwrap_or(RunState::Uninitialized), - Err(_) => RunState::Uninitialized, - }; - - runs.push(RunInfo { - run_id: instance.run_id.clone(), - instance_pubkey: pubkey, - coordinator_account: instance.coordinator_account, - run_state, - }); + match CoordinatorInstance::try_deserialize(&mut account.data.as_slice()) { + Ok(instance) => { + if let Ok(run_state) = self.fetch_run_state(&instance.coordinator_account) { + runs.push(RunInfo { + run_id: instance.run_id.clone(), + instance_pubkey: pubkey, + coordinator_account: instance.coordinator_account, + run_state, + }); + } else { + warn!( + "Skipping run {} (instance: {}) - could not fetch coordinator state", + instance.run_id, pubkey + ); + } + } + Err(e) => { + warn!( + "Failed to deserialize CoordinatorInstance at {}: {}", + pubkey, e + ); + } } } diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index 568bd7a5c..c94d2bc5a 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -72,32 +72,36 @@ impl RunManager { let coordinator_client = CoordinatorClient::new(rpc, coordinator_program_id); // Try to get RUN_ID from env, or discover available runs - let run_id = match std::env::var("RUN_ID") { - Ok(id) => { - info!("Using RUN_ID from environment: {}", id); - id - } - Err(_) => { - info!("RUN_ID not set, discovering available runs..."); - let runs = coordinator_client.get_all_runs()?; - if runs.is_empty() { - bail!("No runs found on coordinator program"); - } + if let Ok(run_id) = std::env::var("RUN_ID") { + info!("Using RUN_ID from environment: {}", run_id); + return Ok(Self { + wallet_key, + run_id, + coordinator_client, + env_file, + local_docker, + scratch_dir, + }); + } + + info!("RUN_ID not set, discovering available runs..."); + let runs = coordinator_client.get_all_runs()?; + if runs.is_empty() { + bail!("No runs found on coordinator program"); + } - // Log all discovered runs - info!("Discovered {} run(s):", runs.len()); - for run in &runs { - info!(" - {} (state: {})", run.run_id, run.run_state); - } + // Log all discovered runs + info!("Discovered {} run(s):", runs.len()); + for run in &runs { + info!(" - {} (state: {})", run.run_id, run.run_state); + } - let selected = select_best_run(&runs)?; - info!( - "Selected run: {} (state: {})", - selected.run_id, selected.run_state - ); - selected.run_id.clone() - } - }; + let selected = select_best_run(&runs)?; + info!( + "Selected run: {} (state: {})", + selected.run_id, selected.run_state + ); + let run_id = selected.run_id.clone(); Ok(Self { wallet_key, From 9be6a25929a86f6ee32de641a39be2e5bf5c3fd2 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Fri, 23 Jan 2026 16:49:56 -0300 Subject: [PATCH 04/17] run-manager change warn! to debug! to reduce verbosity in some places --- .../src/docker/coordinator_client.rs | 32 +++++++++++-------- .../run-manager/src/docker/manager.rs | 2 +- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs index bed5839e6..628c2b82f 100644 --- a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs +++ b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs @@ -8,7 +8,7 @@ use psyche_solana_coordinator::{ use solana_account_decoder_client_types::UiAccountEncoding; use solana_client::rpc_client::RpcClient; use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; -use tracing::{info, warn}; +use tracing::{debug, info}; #[derive(Debug, Clone)] pub struct RunInfo { @@ -55,11 +55,21 @@ impl CoordinatorClient { let solana_account = self .rpc_client .get_account(coordinator_account) - .with_context(|| format!("Failed to fetch coordinator account {}", coordinator_account))?; + .with_context(|| { + format!( + "Failed to fetch coordinator account {}", + coordinator_account + ) + })?; // Deserialize the account data into a CoordinatorAccount struct - let coordinator = coordinator_account_from_bytes(&solana_account.data) - .with_context(|| format!("Failed to deserialize coordinator account {}", coordinator_account))?; + let coordinator = + coordinator_account_from_bytes(&solana_account.data).with_context(|| { + format!( + "Failed to deserialize coordinator account {}", + coordinator_account + ) + })?; Ok(coordinator.state.coordinator.run_state) } @@ -70,13 +80,9 @@ impl CoordinatorClient { let instance = self.fetch_coordinator_data(run_id)?; // Fetch the coordinator account to get the client version - let coordinator_account_data = self - .rpc_client - .get_account(&instance.coordinator_account) - .context("RPC error: failed to get coordinator account")?; - - let coordinator_account = coordinator_account_from_bytes(&coordinator_account_data.data) - .context("Failed to deserialize CoordinatorAccount")?; + let coordinator_account_data = + self.rpc_client.get_account(&instance.coordinator_account)?; + let coordinator_account = coordinator_account_from_bytes(&coordinator_account_data.data)?; let client_version = String::from(&coordinator_account.state.client_version); @@ -142,14 +148,14 @@ impl CoordinatorClient { run_state, }); } else { - warn!( + debug!( "Skipping run {} (instance: {}) - could not fetch coordinator state", instance.run_id, pubkey ); } } Err(e) => { - warn!( + debug!( "Failed to deserialize CoordinatorInstance at {}: {}", pubkey, e ); diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index c94d2bc5a..f5b125866 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -83,7 +83,7 @@ impl RunManager { scratch_dir, }); } - + info!("RUN_ID not set, discovering available runs..."); let runs = coordinator_client.get_all_runs()?; if runs.is_empty() { From 49a6f18b5f95980ef6c2dec28a970905018d1d8c Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Fri, 23 Jan 2026 17:14:13 -0300 Subject: [PATCH 05/17] run-manager: refactor halted() to simplify code --- shared/coordinator/src/coordinator.rs | 14 ++++++++++---- tools/rust-tools/run-manager/src/docker/manager.rs | 11 +---------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/shared/coordinator/src/coordinator.rs b/shared/coordinator/src/coordinator.rs index 78f054262..73f21d2df 100644 --- a/shared/coordinator/src/coordinator.rs +++ b/shared/coordinator/src/coordinator.rs @@ -53,6 +53,15 @@ pub enum RunState { Paused = 7, } +impl RunState { + pub fn halted(&self) -> bool { + matches!( + self, + RunState::Uninitialized | RunState::Finished | RunState::Paused + ) + } +} + #[derive( Clone, Copy, @@ -811,10 +820,7 @@ impl Coordinator { } pub fn halted(&self) -> bool { - matches!( - self.run_state, - RunState::Uninitialized | RunState::Finished | RunState::Paused - ) + self.run_state.halted() } pub fn get_client_at_historical_index( diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index f5b125866..53de2c6ac 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -11,7 +11,6 @@ use crate::docker::RunInfo; use crate::docker::coordinator_client::CoordinatorClient; use crate::get_env_var; use crate::load_and_apply_env_file; -use psyche_coordinator::RunState; const RETRY_DELAY_SECS: u64 = 5; const VERSION_MISMATCH_EXIT_CODE: i32 = 10; @@ -355,15 +354,7 @@ impl RunManager { fn select_best_run(runs: &[RunInfo]) -> Result<&RunInfo> { // Avoid joining runs that are halted - let joinable: Vec<_> = runs - .iter() - .filter(|run| { - !matches!( - run.run_state, - RunState::Uninitialized | RunState::Finished | RunState::Paused - ) - }) - .collect(); + let joinable: Vec<_> = runs.iter().filter(|run| !run.run_state.halted()).collect(); if joinable.is_empty() { bail!( From e57f68f30a0b37e20ee0d3b66239c523122d84b0 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Fri, 23 Jan 2026 17:58:16 -0300 Subject: [PATCH 06/17] docs: update run-manager related docs to talk about automatic run selection --- psyche-book/src/enduser/join-run.md | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/psyche-book/src/enduser/join-run.md b/psyche-book/src/enduser/join-run.md index b79678bcd..897ac6588 100644 --- a/psyche-book/src/enduser/join-run.md +++ b/psyche-book/src/enduser/join-run.md @@ -64,7 +64,8 @@ WALLET_PATH=/path/to/your/keypair.json RPC=https://your-primary-rpc-provider.com WS_RPC=wss://your-primary-rpc-provider.com -# Required: Which run id to join +# Optional: Which run id to join +# If not set, the client will automatically discover and join an available run RUN_ID=your_run_id_here # Recommended: Fallback RPC Endpoints (for reliability) @@ -78,6 +79,20 @@ Then, you can start training through the run manager running: ./run-manager --env-file /path/to/your/.env ``` +### Automatic Run Selection + +If you don't specify a `RUN_ID` in your `.env` file, the run-manager will automatically query the Solana coordinator to find a suitable run to join. +This makes it easier to join training without needing to know the specific run ID in advance. The run-manager will display which run it selected in the logs: + +``` +INFO RUN_ID not set, discovering available runs... +INFO Discovered 3 run(s): +INFO - run_abc123 (state: Training) +INFO - run_def456 (state: Warmup) +INFO - run_xyz789 (state: Finished) +INFO Selected run: run_abc123 (state: Training) +``` + After the initial setup, you'll see the Psyche client logs streaming in real-time. These logs show training progress, network status, and other important information. To stop the client, press `Ctrl+C` in the terminal. From 19155a26642362c0dfa3d3f24eb7922b4d1f9a05 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Mon, 26 Jan 2026 07:15:20 -0800 Subject: [PATCH 07/17] Revert "run-manager: refactor halted() to simplify code" This reverts commit 49a6f18b5f95980ef6c2dec28a970905018d1d8c. --- shared/coordinator/src/coordinator.rs | 14 ++++---------- tools/rust-tools/run-manager/src/docker/manager.rs | 11 ++++++++++- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/shared/coordinator/src/coordinator.rs b/shared/coordinator/src/coordinator.rs index 73f21d2df..78f054262 100644 --- a/shared/coordinator/src/coordinator.rs +++ b/shared/coordinator/src/coordinator.rs @@ -53,15 +53,6 @@ pub enum RunState { Paused = 7, } -impl RunState { - pub fn halted(&self) -> bool { - matches!( - self, - RunState::Uninitialized | RunState::Finished | RunState::Paused - ) - } -} - #[derive( Clone, Copy, @@ -820,7 +811,10 @@ impl Coordinator { } pub fn halted(&self) -> bool { - self.run_state.halted() + matches!( + self.run_state, + RunState::Uninitialized | RunState::Finished | RunState::Paused + ) } pub fn get_client_at_historical_index( diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index 53de2c6ac..f5b125866 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -11,6 +11,7 @@ use crate::docker::RunInfo; use crate::docker::coordinator_client::CoordinatorClient; use crate::get_env_var; use crate::load_and_apply_env_file; +use psyche_coordinator::RunState; const RETRY_DELAY_SECS: u64 = 5; const VERSION_MISMATCH_EXIT_CODE: i32 = 10; @@ -354,7 +355,15 @@ impl RunManager { fn select_best_run(runs: &[RunInfo]) -> Result<&RunInfo> { // Avoid joining runs that are halted - let joinable: Vec<_> = runs.iter().filter(|run| !run.run_state.halted()).collect(); + let joinable: Vec<_> = runs + .iter() + .filter(|run| { + !matches!( + run.run_state, + RunState::Uninitialized | RunState::Finished | RunState::Paused + ) + }) + .collect(); if joinable.is_empty() { bail!( From 67d3f0739c229daa693ca1d1139a46cb5889304f Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Tue, 27 Jan 2026 11:39:01 -0800 Subject: [PATCH 08/17] run-manager: prioritize joining waiting for compute runs --- tools/rust-tools/run-manager/src/docker/manager.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index f5b125866..9e621e171 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -355,7 +355,7 @@ impl RunManager { fn select_best_run(runs: &[RunInfo]) -> Result<&RunInfo> { // Avoid joining runs that are halted - let joinable: Vec<_> = runs + let mut joinable: Vec<_> = runs .iter() .filter(|run| { !matches!( @@ -372,7 +372,11 @@ fn select_best_run(runs: &[RunInfo]) -> Result<&RunInfo> { ); } - // For now let's just return the first joinable run, later we can define - // a more sophisticated priority system if needed. + // Prioritize joining runs waiting for compute and then just pick the first one + // available from those for now + joinable.sort_by_key(|run| match run.run_state { + RunState::WaitingForMembers => 0, + _ => 1, + }); Ok(joinable[0]) } From f4d091086cbc7d24bfdd9757ed15057b6861c8ae Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Tue, 27 Jan 2026 17:30:54 -0300 Subject: [PATCH 09/17] run-manager: handle authorization in run selection --- .../solana-authorizer/Cargo.lock | 2 +- .../solana-coordinator/Cargo.lock | 8 +-- .../solana-mining-pool/Cargo.lock | 2 +- .../decentralized/solana-treasurer/Cargo.lock | 10 ++-- .../src/docker/coordinator_client.rs | 54 ++++++++++++++++- .../run-manager/src/docker/manager.rs | 60 ++++++++++++++++--- 6 files changed, 116 insertions(+), 20 deletions(-) diff --git a/architectures/decentralized/solana-authorizer/Cargo.lock b/architectures/decentralized/solana-authorizer/Cargo.lock index c1a36b8a6..7eb386dad 100644 --- a/architectures/decentralized/solana-authorizer/Cargo.lock +++ b/architectures/decentralized/solana-authorizer/Cargo.lock @@ -1389,7 +1389,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/architectures/decentralized/solana-coordinator/Cargo.lock b/architectures/decentralized/solana-coordinator/Cargo.lock index 22d64fadc..72a38df53 100644 --- a/architectures/decentralized/solana-coordinator/Cargo.lock +++ b/architectures/decentralized/solana-coordinator/Cargo.lock @@ -1602,7 +1602,7 @@ dependencies = [ [[package]] name = "psyche-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "async-trait", @@ -1616,7 +1616,7 @@ dependencies = [ [[package]] name = "psyche-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-lang-idl", @@ -1635,7 +1635,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", @@ -1643,7 +1643,7 @@ dependencies = [ [[package]] name = "psyche-solana-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "bytemuck", diff --git a/architectures/decentralized/solana-mining-pool/Cargo.lock b/architectures/decentralized/solana-mining-pool/Cargo.lock index 06fb31df5..225d03bf9 100644 --- a/architectures/decentralized/solana-mining-pool/Cargo.lock +++ b/architectures/decentralized/solana-mining-pool/Cargo.lock @@ -1389,7 +1389,7 @@ dependencies = [ [[package]] name = "psyche-solana-mining-pool" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/architectures/decentralized/solana-treasurer/Cargo.lock b/architectures/decentralized/solana-treasurer/Cargo.lock index 5d56eb74d..9be05b852 100644 --- a/architectures/decentralized/solana-treasurer/Cargo.lock +++ b/architectures/decentralized/solana-treasurer/Cargo.lock @@ -1602,7 +1602,7 @@ dependencies = [ [[package]] name = "psyche-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "async-trait", @@ -1616,7 +1616,7 @@ dependencies = [ [[package]] name = "psyche-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-lang-idl", @@ -1635,7 +1635,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", @@ -1643,7 +1643,7 @@ dependencies = [ [[package]] name = "psyche-solana-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "bytemuck", @@ -1656,7 +1656,7 @@ dependencies = [ [[package]] name = "psyche-solana-treasurer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs index 628c2b82f..7ae9f5719 100644 --- a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs +++ b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs @@ -1,14 +1,18 @@ -use anchor_client::solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; +use anchor_client::solana_sdk::{ + commitment_config::CommitmentConfig, pubkey::Pubkey, system_program, +}; use anchor_lang::AccountDeserialize; use anyhow::{Context, Result}; use psyche_coordinator::RunState; +use psyche_solana_authorizer::state::Authorization; use psyche_solana_coordinator::{ CoordinatorInstance, coordinator_account_from_bytes, find_coordinator_instance, + logic::JOIN_RUN_AUTHORIZATION_SCOPE, }; use solana_account_decoder_client_types::UiAccountEncoding; use solana_client::rpc_client::RpcClient; use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; #[derive(Debug, Clone)] pub struct RunInfo { @@ -165,4 +169,50 @@ impl CoordinatorClient { Ok(runs) } + + /// Check if a user is authorized to join a specific run. + /// + /// This checks both permissionless authorization (grantee = system_program::ID) + /// and user-specific authorization (grantee = user_pubkey). + pub fn can_user_join_run(&self, run: &RunInfo, user_pubkey: &Pubkey) -> Result { + // Fetch the CoordinatorInstance to get join_authority + let instance = self.fetch_coordinator_data(&run.run_id)?; + let join_authority = instance.join_authority; + + // Try permissionless authorization first (grantee = system_program::ID) + if self.check_authorization_for_grantee(&join_authority, &system_program::ID, user_pubkey) { + return Ok(true); + } + + // Try user-specific authorization (grantee = user_pubkey) + Ok(self.check_authorization_for_grantee(&join_authority, user_pubkey, user_pubkey)) + } + + /// Check if an authorization exists and is valid for a specific grantee. + fn check_authorization_for_grantee( + &self, + join_authority: &Pubkey, + grantee: &Pubkey, + user_pubkey: &Pubkey, + ) -> bool { + let auth_pda = psyche_solana_authorizer::find_authorization( + join_authority, + grantee, + JOIN_RUN_AUTHORIZATION_SCOPE, + ); + + let Ok(account) = self.rpc_client.get_account(&auth_pda) else { + return false; + }; + + let Ok(authorization) = Authorization::try_deserialize(&mut account.data.as_slice()) else { + warn!( + "Failed to deserialize authorization at {}: invalid data", + auth_pda + ); + return false; + }; + + authorization.is_valid_for(join_authority, user_pubkey, JOIN_RUN_AUTHORIZATION_SCOPE) + } } diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index 9e621e171..1dcbe703b 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -1,7 +1,8 @@ use anchor_client::solana_sdk::pubkey::Pubkey; +use anchor_client::solana_sdk::signature::{EncodableKey, Keypair, Signer}; use anyhow::{Context, Result, anyhow, bail}; use std::fs; -use std::io::{BufRead, BufReader}; +use std::io::{BufRead, BufReader, Cursor}; use std::path::PathBuf; use std::process::{Command, Stdio}; use tokio::signal; @@ -96,12 +97,16 @@ impl RunManager { info!(" - {} (state: {})", run.run_id, run.run_state); } - let selected = select_best_run(&runs)?; + // Parse wallet key to get user's pubkey for authorization checks + let user_pubkey = parse_wallet_pubkey(&wallet_key)?; + info!("User pubkey: {}", user_pubkey); + + let run_id = select_best_run(&runs, &user_pubkey, &coordinator_client)?; + let selected = runs.iter().find(|r| r.run_id == run_id).unwrap(); info!( "Selected run: {} (state: {})", selected.run_id, selected.run_state ); - let run_id = selected.run_id.clone(); Ok(Self { wallet_key, @@ -353,9 +358,26 @@ impl RunManager { } } -fn select_best_run(runs: &[RunInfo]) -> Result<&RunInfo> { +/// Parse wallet key string to extract the user's pubkey. +fn parse_wallet_pubkey(wallet_key: &str) -> Result { + let keypair = if wallet_key.starts_with('[') { + // Assume Keypair::read format (JSON array of bytes) + Keypair::read(&mut Cursor::new(wallet_key)) + .map_err(|e| anyhow!("Failed to parse wallet key: {}", e))? + } else { + // Assume base58 encoded private key + Keypair::from_base58_string(wallet_key) + }; + Ok(keypair.pubkey()) +} + +fn select_best_run( + runs: &[RunInfo], + user_pubkey: &Pubkey, + coordinator_client: &CoordinatorClient, +) -> Result { // Avoid joining runs that are halted - let mut joinable: Vec<_> = runs + let joinable: Vec<_> = runs .iter() .filter(|run| { !matches!( @@ -372,11 +394,35 @@ fn select_best_run(runs: &[RunInfo]) -> Result<&RunInfo> { ); } + // Filter out runs the user is not authorized to join + let mut authorized: Vec<_> = Vec::new(); + for run in joinable { + match coordinator_client.can_user_join_run(run, user_pubkey) { + Ok(true) => authorized.push(run), + Ok(false) => { + info!( + "Skipping run {} - not authorized (user: {})", + run.run_id, user_pubkey + ); + } + Err(e) => { + warn!( + "Skipping run {} - failed to check authorization: {}", + run.run_id, e + ); + } + } + } + + if authorized.is_empty() { + bail!("No authorized runs found for user {}.", user_pubkey); + } + // Prioritize joining runs waiting for compute and then just pick the first one // available from those for now - joinable.sort_by_key(|run| match run.run_state { + authorized.sort_by_key(|run| match run.run_state { RunState::WaitingForMembers => 0, _ => 1, }); - Ok(joinable[0]) + Ok(authorized[0].run_id.clone()) } From 85f35e4f184ac5c9733d0dd6076753658d5854c2 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Tue, 27 Jan 2026 18:54:00 -0300 Subject: [PATCH 10/17] run-manager: fix empty run_id thing --- .../run-manager/src/docker/manager.rs | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index 1dcbe703b..5d9a16448 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -74,15 +74,17 @@ impl RunManager { // Try to get RUN_ID from env, or discover available runs if let Ok(run_id) = std::env::var("RUN_ID") { - info!("Using RUN_ID from environment: {}", run_id); - return Ok(Self { - wallet_key, - run_id, - coordinator_client, - env_file, - local_docker, - scratch_dir, - }); + if !run_id.is_empty() { + info!("Using RUN_ID from environment: {}", run_id); + return Ok(Self { + wallet_key, + run_id, + coordinator_client, + env_file, + local_docker, + scratch_dir, + }); + } } info!("RUN_ID not set, discovering available runs..."); @@ -196,6 +198,8 @@ impl RunManager { .arg(format!("RAW_WALLET_PRIVATE_KEY={}", &self.wallet_key)) .arg("--env") .arg(format!("CLIENT_VERSION={}", client_version)) + .arg("--env") + .arg(format!("RUN_ID={}", &self.run_id)) .arg("--env-file") .arg(&self.env_file); From eedb03bb6164fb87cf20f427858e8043255206ba Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Wed, 28 Jan 2026 15:35:10 -0300 Subject: [PATCH 11/17] dev: fix just commands for creating authorized test runs --- architectures/decentralized/justfile | 8 ++++---- scripts/setup-and-deploy-solana-test.sh | 16 +++++++++++----- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/architectures/decentralized/justfile b/architectures/decentralized/justfile index 77ba7d4ed..e1eddf893 100644 --- a/architectures/decentralized/justfile +++ b/architectures/decentralized/justfile @@ -25,16 +25,16 @@ setup-solana-localnet-light-test-run-treasurer run_id="test" *args='': RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml PERMISSIONLESS=true ./scripts/setup-and-deploy-solana-test.sh --treasurer {{ args }} setup-solana-localnet-permissioned-test-run run_id="test" *args='': - RUN_ID={{ run_id }} ./scripts/deploy-solana-test.sh {{ args }} + RUN_ID={{ run_id }} ./scripts/setup-and-deploy-solana-test.sh {{ args }} setup-solana-localnet-permissioned-light-test-run run_id="test" *args='': - RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/deploy-solana-test.sh {{ args }} + RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/setup-and-deploy-solana-test.sh {{ args }} setup-solana-localnet-permissioned-test-run-treasurer run_id="test" *args='': - RUN_ID={{ run_id }} ./scripts/deploy-solana-test.sh --treasurer {{ args }} + RUN_ID={{ run_id }} ./scripts/setup-and-deploy-solana-test.sh --treasurer {{ args }} setup-solana-localnet-permissioned-light-test-run-treasurer run_id="test" *args='': - RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/deploy-solana-test.sh --treasurer {{ args }} + RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/setup-and-deploy-solana-test.sh --treasurer {{ args }} start-training-localnet-client run_id="test" *args='': AUTHORIZER={{ AUTHORIZER }} RUN_ID={{ run_id }} ./scripts/train-solana-test.sh {{ args }} diff --git a/scripts/setup-and-deploy-solana-test.sh b/scripts/setup-and-deploy-solana-test.sh index 7886b50bf..69a627cbd 100755 --- a/scripts/setup-and-deploy-solana-test.sh +++ b/scripts/setup-and-deploy-solana-test.sh @@ -40,13 +40,19 @@ sleep 3 solana airdrop 10 --url ${RPC} --keypair ${WALLET_FILE} -# Pass treasurer flag to deploy script if set -if [[ "$DEPLOY_TREASURER" == "true" && "$PERMISSIONLESS" == "true" ]]; then + +if [[ "$DEPLOY_TREASURER" == "true" ]]; then WALLET_FILE=${WALLET_FILE} ./scripts/deploy-solana-test.sh --treasurer "${EXTRA_ARGS[@]}" - CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh --treasurer "${EXTRA_ARGS[@]}" -elif [[ "$PERMISSIONLESS" == "true" ]]; then +else WALLET_FILE=${WALLET_FILE} ./scripts/deploy-solana-test.sh "${EXTRA_ARGS[@]}" - CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh "${EXTRA_ARGS[@]}" +fi + +if [[ "$PERMISSIONLESS" == "true" ]]; then + if [[ "$DEPLOY_TREASURER" == "true" ]]; then + CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh --treasurer "${EXTRA_ARGS[@]}" + else + CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh "${EXTRA_ARGS[@]}" + fi fi echo -e "\n[+] Testing Solana setup ready, starting Solana logs...\n" From 7e3d8529a7c19aab57e78e06fa0623c768caba71 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Wed, 28 Jan 2026 16:28:25 -0300 Subject: [PATCH 12/17] run-manager: --authorizer param to filter runs authorized by a certain param --- .../run-manager/src/docker/manager.rs | 86 +++++++++++-------- tools/rust-tools/run-manager/src/main.rs | 21 ++++- 2 files changed, 71 insertions(+), 36 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index 5d9a16448..40c884f95 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -37,6 +37,7 @@ impl RunManager { coordinator_program_id: String, env_file: PathBuf, local_docker: bool, + authorizer: Option, ) -> Result { // Verify docker is available Command::new("docker") @@ -93,22 +94,16 @@ impl RunManager { bail!("No runs found on coordinator program"); } - // Log all discovered runs - info!("Discovered {} run(s):", runs.len()); - for run in &runs { - info!(" - {} (state: {})", run.run_id, run.run_state); - } - // Parse wallet key to get user's pubkey for authorization checks let user_pubkey = parse_wallet_pubkey(&wallet_key)?; info!("User pubkey: {}", user_pubkey); - let run_id = select_best_run(&runs, &user_pubkey, &coordinator_client)?; - let selected = runs.iter().find(|r| r.run_id == run_id).unwrap(); - info!( - "Selected run: {} (state: {})", - selected.run_id, selected.run_state - ); + let run_id = select_best_run( + &runs, + &user_pubkey, + &coordinator_client, + authorizer.as_ref(), + )?; Ok(Self { wallet_key, @@ -379,9 +374,10 @@ fn select_best_run( runs: &[RunInfo], user_pubkey: &Pubkey, coordinator_client: &CoordinatorClient, + authorizer: Option<&Pubkey>, ) -> Result { - // Avoid joining runs that are halted - let joinable: Vec<_> = runs + // Filter out unjoinable run states + let mut candidates: Vec<_> = runs .iter() .filter(|run| { !matches!( @@ -391,42 +387,62 @@ fn select_best_run( }) .collect(); - if joinable.is_empty() { + if candidates.is_empty() { bail!( "No joinable runs found. All {} run(s) are in unjoinable states.", runs.len() ); } - // Filter out runs the user is not authorized to join - let mut authorized: Vec<_> = Vec::new(); - for run in joinable { - match coordinator_client.can_user_join_run(run, user_pubkey) { - Ok(true) => authorized.push(run), - Ok(false) => { - info!( - "Skipping run {} - not authorized (user: {})", - run.run_id, user_pubkey - ); - } + // Filter by join_authority if --authorizer was specified + if let Some(auth) = authorizer { + info!("Filtering runs by join_authority: {}", auth); + candidates.retain( + |run| match coordinator_client.fetch_coordinator_data(&run.run_id) { + Ok(data) => data.join_authority == *auth, + Err(e) => { + warn!("Skipping run {} - failed to fetch data: {}", run.run_id, e); + false + } + }, + ); + if candidates.is_empty() { + bail!("No runs found matching authorizer {}", auth); + } + } + + // Filter to runs the user is authorized to join + candidates.retain( + |run| match coordinator_client.can_user_join_run(run, user_pubkey) { + Ok(authorized) => authorized, Err(e) => { warn!( - "Skipping run {} - failed to check authorization: {}", + "Skipping run {} - authorization check failed: {}", run.run_id, e ); + false } - } - } + }, + ); - if authorized.is_empty() { - bail!("No authorized runs found for user {}.", user_pubkey); + if candidates.is_empty() { + bail!("No authorized runs found for user {}", user_pubkey); } - // Prioritize joining runs waiting for compute and then just pick the first one - // available from those for now - authorized.sort_by_key(|run| match run.run_state { + // Prioritize runs waiting for members + candidates.sort_by_key(|run| match run.run_state { RunState::WaitingForMembers => 0, _ => 1, }); - Ok(authorized[0].run_id.clone()) + + info!("Found {} available run(s):", candidates.len()); + for run in &candidates { + info!(" - {} (state: {})", run.run_id, run.run_state); + } + + info!( + "Selected run: {} (state: {})", + candidates[0].run_id, candidates[0].run_state + ); + Ok(candidates[0].run_id.clone()) } diff --git a/tools/rust-tools/run-manager/src/main.rs b/tools/rust-tools/run-manager/src/main.rs index 717234d9d..d6036585b 100644 --- a/tools/rust-tools/run-manager/src/main.rs +++ b/tools/rust-tools/run-manager/src/main.rs @@ -59,6 +59,10 @@ struct CliArgs { #[arg(long)] local: bool, + /// Only join runs where this pubkey is the join_authority (Docker mode) + #[arg(long)] + authorizer: Option, + /// Optional entrypoint (Docker mode) #[arg(long)] entrypoint: Option, @@ -287,7 +291,22 @@ async fn async_main() -> Result<()> { None => None, }; - let run_mgr = RunManager::new(args.coordinator_program_id, env_file, args.local)?; + // Parse pubkey into Pubkey type + let authorizer = args + .authorizer + .as_ref() + .map(|s| { + s.parse() + .map_err(|e| anyhow::anyhow!("Failed to parse authorizer pubkey: {}", e)) + }) + .transpose()?; + + let run_mgr = RunManager::new( + args.coordinator_program_id, + env_file, + args.local, + authorizer, + )?; let result = run_mgr.run(entrypoint).await; if let Err(e) = &result { error!("Error: {}", e); From 5850113d59ef32df975ac392b157aed807ef6d99 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Wed, 28 Jan 2026 17:24:28 -0300 Subject: [PATCH 13/17] docs: update with new run-manager --authorizer flag --- psyche-book/src/enduser/join-run.md | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/psyche-book/src/enduser/join-run.md b/psyche-book/src/enduser/join-run.md index 897ac6588..968b09980 100644 --- a/psyche-book/src/enduser/join-run.md +++ b/psyche-book/src/enduser/join-run.md @@ -86,11 +86,10 @@ This makes it easier to join training without needing to know the specific run I ``` INFO RUN_ID not set, discovering available runs... -INFO Discovered 3 run(s): -INFO - run_abc123 (state: Training) -INFO - run_def456 (state: Warmup) -INFO - run_xyz789 (state: Finished) -INFO Selected run: run_abc123 (state: Training) +INFO Found 2 available run(s): +INFO - run_abc123 (state: Waiting for members) +INFO - run_def456 (state: Training) +INFO Selected run: run_abc123 (state: Waiting for members) ``` After the initial setup, you'll see the Psyche client logs streaming in real-time. These logs show training progress, network status, and other important information. @@ -101,9 +100,19 @@ To stop the client, press `Ctrl+C` in the terminal. We recommend using a dedicated RPC service such as [Helius](https://www.helius.dev/), [QuickNode](https://www.quicknode.com/), [Triton](https://triton.one/), or self-hosting your own Solana RPC node. +## Filtering by Authorizer + +If you want to only join runs authorized by a specific entity, you can use the `--authorizer` flag: + +```bash +./run-manager --env-file /path/to/your/.env --authorizer +``` + +This is useful when you want to ensure you only join runs from a trusted coordinator. + ## Additional config variables -In general it's not neccesary to change these variables to join a run since we provide sensible defaults, +In general it's not necessary to change these variables to join a run since we provide sensible defaults, though you might need to. **`NVIDIA_DRIVER_CAPABILITIES`** - An environment variable that the NVIDIA Container Toolkit uses to determine which compute capabilities should be provided to your container. It is recommended to set it to 'all', e.g. `NVIDIA_DRIVER_CAPABILITIES=all`. From 645d3c7307bb877d11cfcdf5019a7b1b88dbca4d Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Fri, 30 Jan 2026 11:34:32 -0300 Subject: [PATCH 14/17] run-manager: improve base58 wallet key handling --- .../run-manager/src/docker/manager.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index 40c884f95..c3e65dd8d 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -1,3 +1,4 @@ +use anchor_client::solana_sdk::bs58; use anchor_client::solana_sdk::pubkey::Pubkey; use anchor_client::solana_sdk::signature::{EncodableKey, Keypair, Signer}; use anyhow::{Context, Result, anyhow, bail}; @@ -61,6 +62,8 @@ impl RunManager { } .trim() .to_string(); + let user_pubkey = parse_wallet_pubkey(&wallet_key)?; + info!("User pubkey: {}", user_pubkey); let coordinator_program_id = coordinator_program_id .parse::() @@ -94,10 +97,6 @@ impl RunManager { bail!("No runs found on coordinator program"); } - // Parse wallet key to get user's pubkey for authorization checks - let user_pubkey = parse_wallet_pubkey(&wallet_key)?; - info!("User pubkey: {}", user_pubkey); - let run_id = select_best_run( &runs, &user_pubkey, @@ -364,8 +363,14 @@ fn parse_wallet_pubkey(wallet_key: &str) -> Result { Keypair::read(&mut Cursor::new(wallet_key)) .map_err(|e| anyhow!("Failed to parse wallet key: {}", e))? } else { - // Assume base58 encoded private key - Keypair::from_base58_string(wallet_key) + // from_base58_string has an internal unwrap() so we use these functions to handle + // errors more gracefuly + let decoded = bs58::decode(wallet_key) + .into_vec() + .map_err(|e| anyhow!("Failed to decode base58 wallet key: {}", e))?; + + Keypair::from_bytes(&decoded) + .map_err(|e| anyhow!("Failed to create keypair from decoded bytes: {}", e))? }; Ok(keypair.pubkey()) } From 951fdb8a28a69f260372c63a2a8889eccd3fbb59 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Fri, 30 Jan 2026 11:26:41 -0800 Subject: [PATCH 15/17] run-manager: fix authorization on join --- .../src/docker/coordinator_client.rs | 13 ++-- .../run-manager/src/docker/manager.rs | 68 +++++++++++++++---- 2 files changed, 62 insertions(+), 19 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs index 7ae9f5719..c15f6c6c0 100644 --- a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs +++ b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs @@ -174,18 +174,23 @@ impl CoordinatorClient { /// /// This checks both permissionless authorization (grantee = system_program::ID) /// and user-specific authorization (grantee = user_pubkey). - pub fn can_user_join_run(&self, run: &RunInfo, user_pubkey: &Pubkey) -> Result { + /// Returns the matched grantee pubkey if authorized, or None if not. + pub fn can_user_join_run(&self, run_id: &str, user_pubkey: &Pubkey) -> Result> { // Fetch the CoordinatorInstance to get join_authority - let instance = self.fetch_coordinator_data(&run.run_id)?; + let instance = self.fetch_coordinator_data(run_id)?; let join_authority = instance.join_authority; // Try permissionless authorization first (grantee = system_program::ID) if self.check_authorization_for_grantee(&join_authority, &system_program::ID, user_pubkey) { - return Ok(true); + return Ok(Some(system_program::ID)); } // Try user-specific authorization (grantee = user_pubkey) - Ok(self.check_authorization_for_grantee(&join_authority, user_pubkey, user_pubkey)) + if self.check_authorization_for_grantee(&join_authority, user_pubkey, user_pubkey) { + return Ok(Some(*user_pubkey)); + } + + Ok(None) } /// Check if an authorization exists and is valid for a specific grantee. diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index c3e65dd8d..81ebdeb61 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -25,6 +25,7 @@ pub struct RunManager { local_docker: bool, coordinator_client: CoordinatorClient, scratch_dir: Option, + client_authorizer: Pubkey, } #[derive(Debug)] @@ -80,6 +81,8 @@ impl RunManager { if let Ok(run_id) = std::env::var("RUN_ID") { if !run_id.is_empty() { info!("Using RUN_ID from environment: {}", run_id); + let client_authorizer = + resolve_client_authorizer(&coordinator_client, &run_id, &user_pubkey)?; return Ok(Self { wallet_key, run_id, @@ -87,6 +90,7 @@ impl RunManager { env_file, local_docker, scratch_dir, + client_authorizer, }); } } @@ -97,7 +101,7 @@ impl RunManager { bail!("No runs found on coordinator program"); } - let run_id = select_best_run( + let (run_id, client_authorizer) = select_best_run( &runs, &user_pubkey, &coordinator_client, @@ -111,6 +115,7 @@ impl RunManager { env_file, local_docker, scratch_dir, + client_authorizer, }) } @@ -194,6 +199,8 @@ impl RunManager { .arg(format!("CLIENT_VERSION={}", client_version)) .arg("--env") .arg(format!("RUN_ID={}", &self.run_id)) + .arg("--env") + .arg(format!("AUTHORIZER={}", &self.client_authorizer)) .arg("--env-file") .arg(&self.env_file); @@ -375,12 +382,36 @@ fn parse_wallet_pubkey(wallet_key: &str) -> Result { Ok(keypair.pubkey()) } +/// Determine the correct AUTHORIZER value for the client container by checking +/// which authorization type (permissionless vs user-specific) is valid for this run. +fn resolve_client_authorizer( + coordinator_client: &CoordinatorClient, + run_id: &str, + user_pubkey: &Pubkey, +) -> Result { + match coordinator_client.can_user_join_run(run_id, user_pubkey)? { + Some(grantee) => { + info!("Resolved AUTHORIZER={} for run {}", grantee, run_id); + Ok(grantee) + } + None => { + bail!( + "User {} is not authorized to join run {}", + user_pubkey, + run_id + ); + } + } +} + +/// Returns (run_id, client_authorizer) where client_authorizer is the grantee +/// to pass to the container as AUTHORIZER. fn select_best_run( runs: &[RunInfo], user_pubkey: &Pubkey, coordinator_client: &CoordinatorClient, authorizer: Option<&Pubkey>, -) -> Result { +) -> Result<(String, Pubkey)> { // Filter out unjoinable run states let mut candidates: Vec<_> = runs .iter() @@ -416,38 +447,45 @@ fn select_best_run( } } - // Filter to runs the user is authorized to join - candidates.retain( - |run| match coordinator_client.can_user_join_run(run, user_pubkey) { - Ok(authorized) => authorized, + // Filter to runs the user is authorized to join, capturing the matched grantee + let mut authorized_candidates: Vec<(&RunInfo, Pubkey)> = Vec::new(); + for run in &candidates { + match coordinator_client.can_user_join_run(&run.run_id, user_pubkey) { + Ok(Some(grantee)) => authorized_candidates.push((run, grantee)), + Ok(None) => {} Err(e) => { warn!( "Skipping run {} - authorization check failed: {}", run.run_id, e ); - false } - }, - ); + } + } - if candidates.is_empty() { + if authorized_candidates.is_empty() { bail!("No authorized runs found for user {}", user_pubkey); } // Prioritize runs waiting for members - candidates.sort_by_key(|run| match run.run_state { + authorized_candidates.sort_by_key(|(run, _)| match run.run_state { RunState::WaitingForMembers => 0, _ => 1, }); - info!("Found {} available run(s):", candidates.len()); - for run in &candidates { + info!("Found {} available run(s):", authorized_candidates.len()); + for (run, _) in &authorized_candidates { info!(" - {} (state: {})", run.run_id, run.run_state); } + let (selected_run, grantee) = authorized_candidates[0]; info!( "Selected run: {} (state: {})", - candidates[0].run_id, candidates[0].run_state + selected_run.run_id, selected_run.run_state + ); + info!( + "Resolved AUTHORIZER={} for run {}", + grantee, selected_run.run_id ); - Ok(candidates[0].run_id.clone()) + + Ok((selected_run.run_id.clone(), grantee)) } From 3017048f19b65bf80f76d4ef42d016e4c307b1c5 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Tue, 3 Feb 2026 18:38:02 -0300 Subject: [PATCH 16/17] run-manager: handle delegate keys in can_user_join_run --- .../src/docker/coordinator_client.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs index c15f6c6c0..237c2b74c 100644 --- a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs +++ b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs @@ -2,7 +2,7 @@ use anchor_client::solana_sdk::{ commitment_config::CommitmentConfig, pubkey::Pubkey, system_program, }; use anchor_lang::AccountDeserialize; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; use psyche_coordinator::RunState; use psyche_solana_authorizer::state::Authorization; use psyche_solana_coordinator::{ @@ -173,14 +173,15 @@ impl CoordinatorClient { /// Check if a user is authorized to join a specific run. /// /// This checks both permissionless authorization (grantee = system_program::ID) - /// and user-specific authorization (grantee = user_pubkey). + /// and user-specific authorization (grantee = user_pubkey), + /// as well as delegate-key authorization.. /// Returns the matched grantee pubkey if authorized, or None if not. pub fn can_user_join_run(&self, run_id: &str, user_pubkey: &Pubkey) -> Result> { // Fetch the CoordinatorInstance to get join_authority let instance = self.fetch_coordinator_data(run_id)?; let join_authority = instance.join_authority; - // Try permissionless authorization first (grantee = system_program::ID) + // Try permissionless authorization (grantee = system_program::ID) if self.check_authorization_for_grantee(&join_authority, &system_program::ID, user_pubkey) { return Ok(Some(system_program::ID)); } @@ -190,6 +191,18 @@ impl CoordinatorClient { return Ok(Some(*user_pubkey)); } + // If we reached here attempt to join as a delegate key via AUTHORIZER env var + info!("Attempting authorization via delegate key..."); + let Ok(authorizer_str) = std::env::var("AUTHORIZER") else { + return Err(anyhow!("AUTHORIZER not set")); + }; + let Ok(authorizer) = authorizer_str.parse::() else { + return Err(anyhow!("Failed to parse AUTHORIZER as pubkey")); + }; + if self.check_authorization_for_grantee(&join_authority, &authorizer, user_pubkey) { + return Ok(Some(authorizer)); + } + Ok(None) } From 75f1d650a37b4051cf07d8fc508650c12c458e48 Mon Sep 17 00:00:00 2001 From: Dylan Socolobsky Date: Fri, 6 Feb 2026 11:16:39 -0300 Subject: [PATCH 17/17] run-manager: list-runs command --- .../src/docker/coordinator_client.rs | 147 ++++++++++-------- .../run-manager/src/docker/manager.rs | 138 ++++++++++------ .../rust-tools/run-manager/src/docker/mod.rs | 5 +- tools/rust-tools/run-manager/src/lib.rs | 25 ++- tools/rust-tools/run-manager/src/main.rs | 84 ++++++++-- 5 files changed, 276 insertions(+), 123 deletions(-) diff --git a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs index 237c2b74c..ade58ba8e 100644 --- a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs +++ b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs @@ -2,7 +2,7 @@ use anchor_client::solana_sdk::{ commitment_config::CommitmentConfig, pubkey::Pubkey, system_program, }; use anchor_lang::AccountDeserialize; -use anyhow::{Context, Result, anyhow}; +use anyhow::{Context, Result}; use psyche_coordinator::RunState; use psyche_solana_authorizer::state::Authorization; use psyche_solana_coordinator::{ @@ -20,6 +20,43 @@ pub struct RunInfo { pub instance_pubkey: Pubkey, pub coordinator_account: Pubkey, pub run_state: RunState, + pub num_clients: usize, + pub min_clients: u16, +} + +impl RunInfo { + pub fn clients_display(&self) -> String { + if (self.num_clients as u16) < self.min_clients { + format!("{}/{} waiting clients", self.num_clients, self.min_clients) + } else { + format!("{} training clients", self.num_clients) + } + } + + pub fn format_table(runs: &[&RunInfo]) -> Vec { + let rows: Vec<_> = runs + .iter() + .map(|r| { + ( + r.run_id.as_str(), + r.run_state.to_string(), + r.clients_display(), + ) + }) + .collect(); + // This is so we can nicely align the output of the runs list + let run_id_width = rows.iter().map(|(id, _, _)| id.len()).max().unwrap_or(0); + let state_width = rows.iter().map(|(_, st, _)| st.len()).max().unwrap_or(0); + + rows.iter() + .map(|(run_id, state, clients)| { + format!( + " {: Result { - // Fetch the raw Solana account data from the blockchain - let solana_account = self - .rpc_client - .get_account(coordinator_account) - .with_context(|| { - format!( - "Failed to fetch coordinator account {}", - coordinator_account - ) - })?; - - // Deserialize the account data into a CoordinatorAccount struct - let coordinator = - coordinator_account_from_bytes(&solana_account.data).with_context(|| { - format!( - "Failed to deserialize coordinator account {}", - coordinator_account - ) - })?; - - Ok(coordinator.state.coordinator.run_state) - } - pub fn get_docker_tag_for_run(&self, run_id: &str, local_docker: bool) -> Result { info!("Querying coordinator for Run ID: {}", run_id); @@ -142,29 +155,38 @@ impl CoordinatorClient { let mut runs = Vec::new(); for (pubkey, account) in accounts { - match CoordinatorInstance::try_deserialize(&mut account.data.as_slice()) { - Ok(instance) => { - if let Ok(run_state) = self.fetch_run_state(&instance.coordinator_account) { - runs.push(RunInfo { - run_id: instance.run_id.clone(), - instance_pubkey: pubkey, - coordinator_account: instance.coordinator_account, - run_state, - }); - } else { - debug!( - "Skipping run {} (instance: {}) - could not fetch coordinator state", - instance.run_id, pubkey - ); - } - } - Err(e) => { - debug!( - "Failed to deserialize CoordinatorInstance at {}: {}", - pubkey, e - ); - } - } + let Ok(instance) = CoordinatorInstance::try_deserialize(&mut account.data.as_slice()) + else { + debug!("Failed to deserialize CoordinatorInstance at {}", pubkey); + continue; + }; + + let Ok(coord_account) = self.rpc_client.get_account(&instance.coordinator_account) + else { + debug!( + "Skipping run {} - could not fetch coordinator account", + instance.run_id + ); + continue; + }; + + let Ok(coordinator) = coordinator_account_from_bytes(&coord_account.data) else { + debug!( + "Skipping run {} - could not deserialize coordinator account", + instance.run_id + ); + continue; + }; + + let state = &coordinator.state.coordinator; + runs.push(RunInfo { + run_id: instance.run_id.clone(), + instance_pubkey: pubkey, + coordinator_account: instance.coordinator_account, + run_state: state.run_state, + num_clients: state.epoch_state.clients.len(), + min_clients: state.config.min_clients, + }); } Ok(runs) @@ -172,11 +194,16 @@ impl CoordinatorClient { /// Check if a user is authorized to join a specific run. /// - /// This checks both permissionless authorization (grantee = system_program::ID) - /// and user-specific authorization (grantee = user_pubkey), - /// as well as delegate-key authorization.. + /// This checks permissionless authorization (grantee = system_program::ID), + /// user-specific authorization (grantee = user_pubkey), + /// and optionally delegate-key authorization. /// Returns the matched grantee pubkey if authorized, or None if not. - pub fn can_user_join_run(&self, run_id: &str, user_pubkey: &Pubkey) -> Result> { + pub fn can_user_join_run( + &self, + run_id: &str, + user_pubkey: &Pubkey, + delegate_authorizer: Option<&Pubkey>, + ) -> Result> { // Fetch the CoordinatorInstance to get join_authority let instance = self.fetch_coordinator_data(run_id)?; let join_authority = instance.join_authority; @@ -191,16 +218,12 @@ impl CoordinatorClient { return Ok(Some(*user_pubkey)); } - // If we reached here attempt to join as a delegate key via AUTHORIZER env var - info!("Attempting authorization via delegate key..."); - let Ok(authorizer_str) = std::env::var("AUTHORIZER") else { - return Err(anyhow!("AUTHORIZER not set")); - }; - let Ok(authorizer) = authorizer_str.parse::() else { - return Err(anyhow!("Failed to parse AUTHORIZER as pubkey")); - }; - if self.check_authorization_for_grantee(&join_authority, &authorizer, user_pubkey) { - return Ok(Some(authorizer)); + // Try delegate-key authorization if provided + if let Some(authorizer) = delegate_authorizer { + debug!("Attempting authorization via delegate key {}", authorizer); + if self.check_authorization_for_grantee(&join_authority, authorizer, user_pubkey) { + return Ok(Some(*authorizer)); + } } Ok(None) diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index 81ebdeb61..7e076e0d8 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -2,17 +2,17 @@ use anchor_client::solana_sdk::bs58; use anchor_client::solana_sdk::pubkey::Pubkey; use anchor_client::solana_sdk::signature::{EncodableKey, Keypair, Signer}; use anyhow::{Context, Result, anyhow, bail}; -use std::fs; use std::io::{BufRead, BufReader, Cursor}; use std::path::PathBuf; use std::process::{Command, Stdio}; use tokio::signal; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use crate::docker::RunInfo; use crate::docker::coordinator_client::CoordinatorClient; use crate::get_env_var; use crate::load_and_apply_env_file; +use crate::load_wallet_key; use psyche_coordinator::RunState; const RETRY_DELAY_SECS: u64 = 5; @@ -49,20 +49,7 @@ impl RunManager { load_and_apply_env_file(&env_file)?; - let wallet_key = - if let Ok(raw_wallet_private_key) = std::env::var("RAW_WALLET_PRIVATE_KEY") { - info!("Using RAW_WALLET_PRIVATE_KEY from command line"); - raw_wallet_private_key - } else if let Ok(wallet_path) = std::env::var("WALLET_PRIVATE_KEY_PATH") { - info!("Using WALLET_PRIVATE_KEY_PATH: {wallet_path}"); - fs::read_to_string(wallet_path)? - } else { - bail!( - "No wallet private key! Must set RAW_WALLET_PRIVATE_KEY or WALLET_PRIVATE_KEY_PATH" - ) - } - .trim() - .to_string(); + let wallet_key = load_wallet_key()?; let user_pubkey = parse_wallet_pubkey(&wallet_key)?; info!("User pubkey: {}", user_pubkey); @@ -77,12 +64,19 @@ impl RunManager { let coordinator_client = CoordinatorClient::new(rpc, coordinator_program_id); + // Read delegate key from AUTHORIZER env var (separate from --authorizer flag) + let delegate_authorizer = parse_delegate_authorizer_from_env()?; + // Try to get RUN_ID from env, or discover available runs if let Ok(run_id) = std::env::var("RUN_ID") { if !run_id.is_empty() { info!("Using RUN_ID from environment: {}", run_id); - let client_authorizer = - resolve_client_authorizer(&coordinator_client, &run_id, &user_pubkey)?; + let client_authorizer = resolve_client_authorizer( + &coordinator_client, + &run_id, + &user_pubkey, + delegate_authorizer.as_ref(), + )?; return Ok(Self { wallet_key, run_id, @@ -106,6 +100,7 @@ impl RunManager { &user_pubkey, &coordinator_client, authorizer.as_ref(), + delegate_authorizer.as_ref(), )?; Ok(Self { @@ -364,7 +359,7 @@ impl RunManager { } /// Parse wallet key string to extract the user's pubkey. -fn parse_wallet_pubkey(wallet_key: &str) -> Result { +pub fn parse_wallet_pubkey(wallet_key: &str) -> Result { let keypair = if wallet_key.starts_with('[') { // Assume Keypair::read format (JSON array of bytes) Keypair::read(&mut Cursor::new(wallet_key)) @@ -382,14 +377,35 @@ fn parse_wallet_pubkey(wallet_key: &str) -> Result { Ok(keypair.pubkey()) } +/// Read the AUTHORIZER env var as a delegate key pubkey, if set. +pub fn parse_delegate_authorizer_from_env() -> Result> { + match std::env::var("AUTHORIZER") { + Ok(val) if !val.is_empty() => { + let pubkey = val.parse::().with_context(|| { + format!("Failed to parse AUTHORIZER env var as pubkey: {}", val) + })?; + info!( + "Using delegate authorizer from AUTHORIZER env var: {}", + pubkey + ); + Ok(Some(pubkey)) + } + _ => { + info!("AUTHORIZER env var not set, skipping delegate key authorization"); + Ok(None) + } + } +} + /// Determine the correct AUTHORIZER value for the client container by checking -/// which authorization type (permissionless vs user-specific) is valid for this run. +/// which authorization type (permissionless, user-specific, or delegate) is valid for this run. fn resolve_client_authorizer( coordinator_client: &CoordinatorClient, run_id: &str, user_pubkey: &Pubkey, + delegate_authorizer: Option<&Pubkey>, ) -> Result { - match coordinator_client.can_user_join_run(run_id, user_pubkey)? { + match coordinator_client.can_user_join_run(run_id, user_pubkey, delegate_authorizer)? { Some(grantee) => { info!("Resolved AUTHORIZER={} for run {}", grantee, run_id); Ok(grantee) @@ -404,14 +420,18 @@ fn resolve_client_authorizer( } } -/// Returns (run_id, client_authorizer) where client_authorizer is the grantee -/// to pass to the container as AUTHORIZER. -fn select_best_run( +/// Filter runs to only those that are joinable and authorized for the given user. +/// Returns (run_info, grantee_pubkey) pairs sorted by priority (WaitingForMembers first). +/// +/// - `join_authority_filter`: if set, only consider runs whose join_authority matches this pubkey +/// - `delegate_authorizer`: if set, also try delegate-key authorization via this pubkey +pub fn find_joinable_runs( runs: &[RunInfo], user_pubkey: &Pubkey, coordinator_client: &CoordinatorClient, - authorizer: Option<&Pubkey>, -) -> Result<(String, Pubkey)> { + join_authority_filter: Option<&Pubkey>, + delegate_authorizer: Option<&Pubkey>, +) -> Result> { // Filter out unjoinable run states let mut candidates: Vec<_> = runs .iter() @@ -421,40 +441,35 @@ fn select_best_run( RunState::Uninitialized | RunState::Finished | RunState::Paused ) }) + .cloned() .collect(); if candidates.is_empty() { - bail!( - "No joinable runs found. All {} run(s) are in unjoinable states.", - runs.len() - ); + return Ok(Vec::new()); } - // Filter by join_authority if --authorizer was specified - if let Some(auth) = authorizer { + // Filter by join_authority if specified + if let Some(auth) = join_authority_filter { info!("Filtering runs by join_authority: {}", auth); candidates.retain( |run| match coordinator_client.fetch_coordinator_data(&run.run_id) { Ok(data) => data.join_authority == *auth, Err(e) => { - warn!("Skipping run {} - failed to fetch data: {}", run.run_id, e); + debug!("Skipping run {} - failed to fetch data: {}", run.run_id, e); false } }, ); - if candidates.is_empty() { - bail!("No runs found matching authorizer {}", auth); - } } // Filter to runs the user is authorized to join, capturing the matched grantee - let mut authorized_candidates: Vec<(&RunInfo, Pubkey)> = Vec::new(); - for run in &candidates { - match coordinator_client.can_user_join_run(&run.run_id, user_pubkey) { + let mut authorized_candidates: Vec<(RunInfo, Pubkey)> = Vec::new(); + for run in candidates { + match coordinator_client.can_user_join_run(&run.run_id, user_pubkey, delegate_authorizer) { Ok(Some(grantee)) => authorized_candidates.push((run, grantee)), Ok(None) => {} Err(e) => { - warn!( + debug!( "Skipping run {} - authorization check failed: {}", run.run_id, e ); @@ -462,30 +477,53 @@ fn select_best_run( } } - if authorized_candidates.is_empty() { - bail!("No authorized runs found for user {}", user_pubkey); - } - // Prioritize runs waiting for members authorized_candidates.sort_by_key(|(run, _)| match run.run_state { RunState::WaitingForMembers => 0, _ => 1, }); + Ok(authorized_candidates) +} + +/// Returns (run_id, client_authorizer) where client_authorizer is the grantee +/// to pass to the container as AUTHORIZER. +fn select_best_run( + runs: &[RunInfo], + user_pubkey: &Pubkey, + coordinator_client: &CoordinatorClient, + join_authority_filter: Option<&Pubkey>, + delegate_authorizer: Option<&Pubkey>, +) -> Result<(String, Pubkey)> { + let authorized_candidates = find_joinable_runs( + runs, + user_pubkey, + coordinator_client, + join_authority_filter, + delegate_authorizer, + )?; + + if authorized_candidates.is_empty() { + bail!("No joinable runs found for user {}", user_pubkey); + } + info!("Found {} available run(s):", authorized_candidates.len()); - for (run, _) in &authorized_candidates { - info!(" - {} (state: {})", run.run_id, run.run_state); + let candidate_runs: Vec<_> = authorized_candidates.iter().map(|(r, _)| r).collect(); + for line in RunInfo::format_table(&candidate_runs) { + info!("{}", line); } - let (selected_run, grantee) = authorized_candidates[0]; + let (selected_run, grantee) = &authorized_candidates[0]; info!( - "Selected run: {} (state: {})", - selected_run.run_id, selected_run.run_state + "Selected run: {} ({}, {})", + selected_run.run_id, + selected_run.run_state, + selected_run.clients_display() ); info!( "Resolved AUTHORIZER={} for run {}", grantee, selected_run.run_id ); - Ok((selected_run.run_id.clone(), grantee)) + Ok((selected_run.run_id.clone(), *grantee)) } diff --git a/tools/rust-tools/run-manager/src/docker/mod.rs b/tools/rust-tools/run-manager/src/docker/mod.rs index 41f2aae7e..f7f7e2eaa 100644 --- a/tools/rust-tools/run-manager/src/docker/mod.rs +++ b/tools/rust-tools/run-manager/src/docker/mod.rs @@ -3,4 +3,7 @@ pub mod manager; // Re-exports pub use coordinator_client::RunInfo; -pub use manager::{Entrypoint, RunManager}; +pub use manager::{ + Entrypoint, RunManager, find_joinable_runs, parse_delegate_authorizer_from_env, + parse_wallet_pubkey, +}; diff --git a/tools/rust-tools/run-manager/src/lib.rs b/tools/rust-tools/run-manager/src/lib.rs index 8734f4d2c..1564e7a91 100644 --- a/tools/rust-tools/run-manager/src/lib.rs +++ b/tools/rust-tools/run-manager/src/lib.rs @@ -1,5 +1,6 @@ // Library exports for run-manager -use anyhow::{Context, Result}; +use anchor_client::solana_sdk::pubkey::Pubkey; +use anyhow::{Context, Result, bail}; use std::path::PathBuf; pub mod commands; @@ -30,3 +31,25 @@ pub fn load_and_apply_env_file(path: &PathBuf) -> Result<()> { pub fn get_env_var(name: &str) -> Result { std::env::var(name).with_context(|| format!("Missing required environment variable: {}", name)) } + +/// Load the wallet private key from environment variables. +pub fn load_wallet_key() -> Result { + if let Ok(raw) = std::env::var("RAW_WALLET_PRIVATE_KEY") { + Ok(raw) + } else if let Ok(path) = std::env::var("WALLET_PRIVATE_KEY_PATH") { + std::fs::read_to_string(&path) + .with_context(|| format!("Failed to read wallet key file: {}", path)) + } else { + bail!("No wallet private key! Set RAW_WALLET_PRIVATE_KEY or WALLET_PRIVATE_KEY_PATH") + } + .map(|s| s.trim().to_string()) +} + +/// Parse an optional string as a Pubkey. +pub fn parse_optional_pubkey(s: Option<&String>, name: &str) -> Result> { + s.map(|s| { + s.parse::() + .with_context(|| format!("Failed to parse {} pubkey: {}", name, s)) + }) + .transpose() +} diff --git a/tools/rust-tools/run-manager/src/main.rs b/tools/rust-tools/run-manager/src/main.rs index d6036585b..25251f54d 100644 --- a/tools/rust-tools/run-manager/src/main.rs +++ b/tools/rust-tools/run-manager/src/main.rs @@ -10,6 +10,7 @@ use clap::{Args, Parser, Subcommand}; use psyche_solana_rpc::SolanaBackend; use run_manager::commands::{self, Command}; use run_manager::docker::manager::{Entrypoint, RunManager}; +use run_manager::parse_optional_pubkey; use std::io::Cursor; use std::path::PathBuf; use std::sync::Arc; @@ -218,6 +219,21 @@ enum Commands { params: CommandCanJoin, }, + /// List joinable runs on the coordinator program + ListRuns { + /// Path to .env file with RPC and wallet configuration + #[arg(long)] + env_file: Option, + #[clap(flatten)] + cluster: ClusterArgs, + /// Coordinator program ID + #[arg(long, default_value = "4SHugWqSXwKE5fqDchkJcPEqnoZE22VYKtSTVm7axbT7")] + coordinator_program_id: String, + /// Only show runs where this pubkey is the join_authority + #[arg(long)] + authorizer: Option, + }, + // Docs generation #[clap(hide = true)] PrintAllHelp { @@ -291,15 +307,7 @@ async fn async_main() -> Result<()> { None => None, }; - // Parse pubkey into Pubkey type - let authorizer = args - .authorizer - .as_ref() - .map(|s| { - s.parse() - .map_err(|e| anyhow::anyhow!("Failed to parse authorizer pubkey: {}", e)) - }) - .transpose()?; + let authorizer = parse_optional_pubkey(args.authorizer.as_ref(), "authorizer")?; let run_mgr = RunManager::new( args.coordinator_program_id, @@ -389,6 +397,12 @@ async fn async_main() -> Result<()> { Commands::CanJoin { cluster, params } => { params.execute(create_backend_readonly(cluster)?).await } + Commands::ListRuns { + env_file, + cluster, + coordinator_program_id, + authorizer, + } => list_runs(env_file, cluster, coordinator_program_id, authorizer), Commands::PrintAllHelp { markdown } => { assert!(markdown); clap_markdown::print_help_markdown::(); @@ -397,6 +411,58 @@ async fn async_main() -> Result<()> { } } +fn list_runs( + env_file: Option, + cluster: ClusterArgs, + coordinator_program_id: String, + authorizer: Option, +) -> Result<()> { + use anyhow::Context; + use run_manager::docker::{ + RunInfo, coordinator_client::CoordinatorClient, find_joinable_runs, + parse_delegate_authorizer_from_env, parse_wallet_pubkey, + }; + + if let Some(env_file) = env_file { + run_manager::load_and_apply_env_file(&env_file)?; + } + let program_id = coordinator_program_id + .parse::() + .context("Failed to parse coordinator program ID")?; + let rpc = std::env::var("RPC").unwrap_or_else(|_| cluster.rpc.trim_matches('"').to_string()); + let coordinator_client = CoordinatorClient::new(rpc, program_id); + let runs = coordinator_client.get_all_runs()?; + + if runs.is_empty() { + println!("No runs found on coordinator program {}", program_id); + return Ok(()); + } + + let authorizer = parse_optional_pubkey(authorizer.as_ref(), "authorizer")?; + let delegate_authorizer = parse_delegate_authorizer_from_env()?; + let wallet_key = run_manager::load_wallet_key()?; + let user_pubkey = parse_wallet_pubkey(&wallet_key)?; + let candidates = find_joinable_runs( + &runs, + &user_pubkey, + &coordinator_client, + authorizer.as_ref(), + delegate_authorizer.as_ref(), + )?; + + if candidates.is_empty() { + println!("No available runs to join"); + } else { + println!("Found {} joinable run(s):", candidates.len()); + let refs: Vec<_> = candidates.iter().map(|(r, _)| r).collect(); + for line in RunInfo::format_table(&refs) { + println!("{}", line); + } + } + + Ok(()) +} + fn create_backend(cluster: ClusterArgs, wallet: WalletArgs) -> Result { let wallet_keypair: Keypair = wallet.try_into()?; let cluster: Cluster = cluster.into();