diff --git a/Cargo.lock b/Cargo.lock index 85f3b06da..3393fa186 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8113,6 +8113,7 @@ dependencies = [ "serde", "serde_json", "serial_test", + "solana-account-decoder-client-types", "solana-client", "time", "tokio", diff --git a/architectures/decentralized/justfile b/architectures/decentralized/justfile index 77ba7d4ed..e1eddf893 100644 --- a/architectures/decentralized/justfile +++ b/architectures/decentralized/justfile @@ -25,16 +25,16 @@ setup-solana-localnet-light-test-run-treasurer run_id="test" *args='': RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml PERMISSIONLESS=true ./scripts/setup-and-deploy-solana-test.sh --treasurer {{ args }} setup-solana-localnet-permissioned-test-run run_id="test" *args='': - RUN_ID={{ run_id }} ./scripts/deploy-solana-test.sh {{ args }} + RUN_ID={{ run_id }} ./scripts/setup-and-deploy-solana-test.sh {{ args }} setup-solana-localnet-permissioned-light-test-run run_id="test" *args='': - RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/deploy-solana-test.sh {{ args }} + RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/setup-and-deploy-solana-test.sh {{ args }} setup-solana-localnet-permissioned-test-run-treasurer run_id="test" *args='': - RUN_ID={{ run_id }} ./scripts/deploy-solana-test.sh --treasurer {{ args }} + RUN_ID={{ run_id }} ./scripts/setup-and-deploy-solana-test.sh --treasurer {{ args }} setup-solana-localnet-permissioned-light-test-run-treasurer run_id="test" *args='': - RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/deploy-solana-test.sh --treasurer {{ args }} + RUN_ID={{ run_id }} CONFIG_FILE=./config/solana-test/light-config.toml ./scripts/setup-and-deploy-solana-test.sh --treasurer {{ args }} start-training-localnet-client run_id="test" *args='': AUTHORIZER={{ AUTHORIZER }} RUN_ID={{ run_id }} ./scripts/train-solana-test.sh {{ args }} diff --git a/architectures/decentralized/solana-authorizer/Cargo.lock b/architectures/decentralized/solana-authorizer/Cargo.lock index c1a36b8a6..7eb386dad 100644 --- a/architectures/decentralized/solana-authorizer/Cargo.lock +++ b/architectures/decentralized/solana-authorizer/Cargo.lock @@ -1389,7 +1389,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/architectures/decentralized/solana-coordinator/Cargo.lock b/architectures/decentralized/solana-coordinator/Cargo.lock index 22d64fadc..72a38df53 100644 --- a/architectures/decentralized/solana-coordinator/Cargo.lock +++ b/architectures/decentralized/solana-coordinator/Cargo.lock @@ -1602,7 +1602,7 @@ dependencies = [ [[package]] name = "psyche-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "async-trait", @@ -1616,7 +1616,7 @@ dependencies = [ [[package]] name = "psyche-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-lang-idl", @@ -1635,7 +1635,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", @@ -1643,7 +1643,7 @@ dependencies = [ [[package]] name = "psyche-solana-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "bytemuck", diff --git a/architectures/decentralized/solana-mining-pool/Cargo.lock b/architectures/decentralized/solana-mining-pool/Cargo.lock index 06fb31df5..225d03bf9 100644 --- a/architectures/decentralized/solana-mining-pool/Cargo.lock +++ b/architectures/decentralized/solana-mining-pool/Cargo.lock @@ -1389,7 +1389,7 @@ dependencies = [ [[package]] name = "psyche-solana-mining-pool" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/architectures/decentralized/solana-treasurer/Cargo.lock b/architectures/decentralized/solana-treasurer/Cargo.lock index 5d56eb74d..9be05b852 100644 --- a/architectures/decentralized/solana-treasurer/Cargo.lock +++ b/architectures/decentralized/solana-treasurer/Cargo.lock @@ -1602,7 +1602,7 @@ dependencies = [ [[package]] name = "psyche-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "async-trait", @@ -1616,7 +1616,7 @@ dependencies = [ [[package]] name = "psyche-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-lang-idl", @@ -1635,7 +1635,7 @@ dependencies = [ [[package]] name = "psyche-solana-authorizer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", @@ -1643,7 +1643,7 @@ dependencies = [ [[package]] name = "psyche-solana-coordinator" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "bytemuck", @@ -1656,7 +1656,7 @@ dependencies = [ [[package]] name = "psyche-solana-treasurer" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anchor-lang", "anchor-spl", diff --git a/psyche-book/src/enduser/join-run.md b/psyche-book/src/enduser/join-run.md index b79678bcd..968b09980 100644 --- a/psyche-book/src/enduser/join-run.md +++ b/psyche-book/src/enduser/join-run.md @@ -64,7 +64,8 @@ WALLET_PATH=/path/to/your/keypair.json RPC=https://your-primary-rpc-provider.com WS_RPC=wss://your-primary-rpc-provider.com -# Required: Which run id to join +# Optional: Which run id to join +# If not set, the client will automatically discover and join an available run RUN_ID=your_run_id_here # Recommended: Fallback RPC Endpoints (for reliability) @@ -78,6 +79,19 @@ Then, you can start training through the run manager running: ./run-manager --env-file /path/to/your/.env ``` +### Automatic Run Selection + +If you don't specify a `RUN_ID` in your `.env` file, the run-manager will automatically query the Solana coordinator to find a suitable run to join. +This makes it easier to join training without needing to know the specific run ID in advance. The run-manager will display which run it selected in the logs: + +``` +INFO RUN_ID not set, discovering available runs... +INFO Found 2 available run(s): +INFO - run_abc123 (state: Waiting for members) +INFO - run_def456 (state: Training) +INFO Selected run: run_abc123 (state: Waiting for members) +``` + After the initial setup, you'll see the Psyche client logs streaming in real-time. These logs show training progress, network status, and other important information. To stop the client, press `Ctrl+C` in the terminal. @@ -86,9 +100,19 @@ To stop the client, press `Ctrl+C` in the terminal. We recommend using a dedicated RPC service such as [Helius](https://www.helius.dev/), [QuickNode](https://www.quicknode.com/), [Triton](https://triton.one/), or self-hosting your own Solana RPC node. +## Filtering by Authorizer + +If you want to only join runs authorized by a specific entity, you can use the `--authorizer` flag: + +```bash +./run-manager --env-file /path/to/your/.env --authorizer +``` + +This is useful when you want to ensure you only join runs from a trusted coordinator. + ## Additional config variables -In general it's not neccesary to change these variables to join a run since we provide sensible defaults, +In general it's not necessary to change these variables to join a run since we provide sensible defaults, though you might need to. **`NVIDIA_DRIVER_CAPABILITIES`** - An environment variable that the NVIDIA Container Toolkit uses to determine which compute capabilities should be provided to your container. It is recommended to set it to 'all', e.g. `NVIDIA_DRIVER_CAPABILITIES=all`. diff --git a/scripts/setup-and-deploy-solana-test.sh b/scripts/setup-and-deploy-solana-test.sh index 7886b50bf..69a627cbd 100755 --- a/scripts/setup-and-deploy-solana-test.sh +++ b/scripts/setup-and-deploy-solana-test.sh @@ -40,13 +40,19 @@ sleep 3 solana airdrop 10 --url ${RPC} --keypair ${WALLET_FILE} -# Pass treasurer flag to deploy script if set -if [[ "$DEPLOY_TREASURER" == "true" && "$PERMISSIONLESS" == "true" ]]; then + +if [[ "$DEPLOY_TREASURER" == "true" ]]; then WALLET_FILE=${WALLET_FILE} ./scripts/deploy-solana-test.sh --treasurer "${EXTRA_ARGS[@]}" - CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh --treasurer "${EXTRA_ARGS[@]}" -elif [[ "$PERMISSIONLESS" == "true" ]]; then +else WALLET_FILE=${WALLET_FILE} ./scripts/deploy-solana-test.sh "${EXTRA_ARGS[@]}" - CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh "${EXTRA_ARGS[@]}" +fi + +if [[ "$PERMISSIONLESS" == "true" ]]; then + if [[ "$DEPLOY_TREASURER" == "true" ]]; then + CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh --treasurer "${EXTRA_ARGS[@]}" + else + CONFIG_FILE=${CONFIG_FILE} WALLET_FILE=${WALLET_FILE} ./scripts/create-permissionless-run.sh "${EXTRA_ARGS[@]}" + fi fi echo -e "\n[+] Testing Solana setup ready, starting Solana logs...\n" diff --git a/tools/rust-tools/run-manager/Cargo.toml b/tools/rust-tools/run-manager/Cargo.toml index e47a1931f..d6b133134 100644 --- a/tools/rust-tools/run-manager/Cargo.toml +++ b/tools/rust-tools/run-manager/Cargo.toml @@ -35,6 +35,7 @@ rand.workspace = true rand_chacha.workspace = true time.workspace = true solana-client = "=2.1.4" +solana-account-decoder-client-types = "=2.1.4" [dev-dependencies] serial_test = "3.0" diff --git a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs index 729a38786..ade58ba8e 100644 --- a/tools/rust-tools/run-manager/src/docker/coordinator_client.rs +++ b/tools/rust-tools/run-manager/src/docker/coordinator_client.rs @@ -1,16 +1,67 @@ -use anchor_client::solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; +use anchor_client::solana_sdk::{ + commitment_config::CommitmentConfig, pubkey::Pubkey, system_program, +}; use anchor_lang::AccountDeserialize; use anyhow::{Context, Result}; +use psyche_coordinator::RunState; +use psyche_solana_authorizer::state::Authorization; use psyche_solana_coordinator::{ CoordinatorInstance, coordinator_account_from_bytes, find_coordinator_instance, + logic::JOIN_RUN_AUTHORIZATION_SCOPE, }; +use solana_account_decoder_client_types::UiAccountEncoding; use solana_client::rpc_client::RpcClient; -use tracing::info; +use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; +use tracing::{debug, info, warn}; + +#[derive(Debug, Clone)] +pub struct RunInfo { + pub run_id: String, + pub instance_pubkey: Pubkey, + pub coordinator_account: Pubkey, + pub run_state: RunState, + pub num_clients: usize, + pub min_clients: u16, +} + +impl RunInfo { + pub fn clients_display(&self) -> String { + if (self.num_clients as u16) < self.min_clients { + format!("{}/{} waiting clients", self.num_clients, self.min_clients) + } else { + format!("{} training clients", self.num_clients) + } + } + + pub fn format_table(runs: &[&RunInfo]) -> Vec { + let rows: Vec<_> = runs + .iter() + .map(|r| { + ( + r.run_id.as_str(), + r.run_state.to_string(), + r.clients_display(), + ) + }) + .collect(); + // This is so we can nicely align the output of the runs list + let run_id_width = rows.iter().map(|(id, _, _)| id.len()).max().unwrap_or(0); + let state_width = rows.iter().map(|(_, st, _)| st.len()).max().unwrap_or(0); + + rows.iter() + .map(|(run_id, state, clients)| { + format!( + " {: Result> { + // Fetch all CoordinatorInstance accounts that are owned by the program + let accounts = self + .rpc_client + .get_program_accounts_with_config( + &self.program_id, + RpcProgramAccountsConfig { + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig::confirmed()), + ..Default::default() + }, + ..Default::default() + }, + ) + .map_err(|e| { + anyhow::anyhow!( + "Failed to fetch program accounts from coordinator program {}: {}", + self.program_id, + e + ) + })?; + + let mut runs = Vec::new(); + for (pubkey, account) in accounts { + let Ok(instance) = CoordinatorInstance::try_deserialize(&mut account.data.as_slice()) + else { + debug!("Failed to deserialize CoordinatorInstance at {}", pubkey); + continue; + }; + + let Ok(coord_account) = self.rpc_client.get_account(&instance.coordinator_account) + else { + debug!( + "Skipping run {} - could not fetch coordinator account", + instance.run_id + ); + continue; + }; + + let Ok(coordinator) = coordinator_account_from_bytes(&coord_account.data) else { + debug!( + "Skipping run {} - could not deserialize coordinator account", + instance.run_id + ); + continue; + }; + + let state = &coordinator.state.coordinator; + runs.push(RunInfo { + run_id: instance.run_id.clone(), + instance_pubkey: pubkey, + coordinator_account: instance.coordinator_account, + run_state: state.run_state, + num_clients: state.epoch_state.clients.len(), + min_clients: state.config.min_clients, + }); + } + + Ok(runs) + } + + /// Check if a user is authorized to join a specific run. + /// + /// This checks permissionless authorization (grantee = system_program::ID), + /// user-specific authorization (grantee = user_pubkey), + /// and optionally delegate-key authorization. + /// Returns the matched grantee pubkey if authorized, or None if not. + pub fn can_user_join_run( + &self, + run_id: &str, + user_pubkey: &Pubkey, + delegate_authorizer: Option<&Pubkey>, + ) -> Result> { + // Fetch the CoordinatorInstance to get join_authority + let instance = self.fetch_coordinator_data(run_id)?; + let join_authority = instance.join_authority; + + // Try permissionless authorization (grantee = system_program::ID) + if self.check_authorization_for_grantee(&join_authority, &system_program::ID, user_pubkey) { + return Ok(Some(system_program::ID)); + } + + // Try user-specific authorization (grantee = user_pubkey) + if self.check_authorization_for_grantee(&join_authority, user_pubkey, user_pubkey) { + return Ok(Some(*user_pubkey)); + } + + // Try delegate-key authorization if provided + if let Some(authorizer) = delegate_authorizer { + debug!("Attempting authorization via delegate key {}", authorizer); + if self.check_authorization_for_grantee(&join_authority, authorizer, user_pubkey) { + return Ok(Some(*authorizer)); + } + } + + Ok(None) + } + + /// Check if an authorization exists and is valid for a specific grantee. + fn check_authorization_for_grantee( + &self, + join_authority: &Pubkey, + grantee: &Pubkey, + user_pubkey: &Pubkey, + ) -> bool { + let auth_pda = psyche_solana_authorizer::find_authorization( + join_authority, + grantee, + JOIN_RUN_AUTHORIZATION_SCOPE, + ); + + let Ok(account) = self.rpc_client.get_account(&auth_pda) else { + return false; + }; + + let Ok(authorization) = Authorization::try_deserialize(&mut account.data.as_slice()) else { + warn!( + "Failed to deserialize authorization at {}: invalid data", + auth_pda + ); + return false; + }; + + authorization.is_valid_for(join_authority, user_pubkey, JOIN_RUN_AUTHORIZATION_SCOPE) + } } diff --git a/tools/rust-tools/run-manager/src/docker/manager.rs b/tools/rust-tools/run-manager/src/docker/manager.rs index cf20f480a..7e076e0d8 100644 --- a/tools/rust-tools/run-manager/src/docker/manager.rs +++ b/tools/rust-tools/run-manager/src/docker/manager.rs @@ -1,15 +1,19 @@ +use anchor_client::solana_sdk::bs58; use anchor_client::solana_sdk::pubkey::Pubkey; +use anchor_client::solana_sdk::signature::{EncodableKey, Keypair, Signer}; use anyhow::{Context, Result, anyhow, bail}; -use std::fs; -use std::io::{BufRead, BufReader}; +use std::io::{BufRead, BufReader, Cursor}; use std::path::PathBuf; use std::process::{Command, Stdio}; use tokio::signal; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; +use crate::docker::RunInfo; use crate::docker::coordinator_client::CoordinatorClient; use crate::get_env_var; use crate::load_and_apply_env_file; +use crate::load_wallet_key; +use psyche_coordinator::RunState; const RETRY_DELAY_SECS: u64 = 5; const VERSION_MISMATCH_EXIT_CODE: i32 = 10; @@ -21,6 +25,7 @@ pub struct RunManager { local_docker: bool, coordinator_client: CoordinatorClient, scratch_dir: Option, + client_authorizer: Pubkey, } #[derive(Debug)] @@ -34,6 +39,7 @@ impl RunManager { coordinator_program_id: String, env_file: PathBuf, local_docker: bool, + authorizer: Option, ) -> Result { // Verify docker is available Command::new("docker") @@ -43,20 +49,9 @@ impl RunManager { load_and_apply_env_file(&env_file)?; - let wallet_key = - if let Ok(raw_wallet_private_key) = std::env::var("RAW_WALLET_PRIVATE_KEY") { - info!("Using RAW_WALLET_PRIVATE_KEY from command line"); - raw_wallet_private_key - } else if let Ok(wallet_path) = std::env::var("WALLET_PRIVATE_KEY_PATH") { - info!("Using WALLET_PRIVATE_KEY_PATH: {wallet_path}"); - fs::read_to_string(wallet_path)? - } else { - bail!( - "No wallet private key! Must set RAW_WALLET_PRIVATE_KEY or WALLET_PRIVATE_KEY_PATH" - ) - } - .trim() - .to_string(); + let wallet_key = load_wallet_key()?; + let user_pubkey = parse_wallet_pubkey(&wallet_key)?; + info!("User pubkey: {}", user_pubkey); let coordinator_program_id = coordinator_program_id .parse::() @@ -64,13 +59,50 @@ impl RunManager { info!("Using coordinator program ID: {}", coordinator_program_id); - let run_id = get_env_var("RUN_ID")?; let rpc = get_env_var("RPC")?; - let scratch_dir = std::env::var("SCRATCH_DIR").ok(); let coordinator_client = CoordinatorClient::new(rpc, coordinator_program_id); + // Read delegate key from AUTHORIZER env var (separate from --authorizer flag) + let delegate_authorizer = parse_delegate_authorizer_from_env()?; + + // Try to get RUN_ID from env, or discover available runs + if let Ok(run_id) = std::env::var("RUN_ID") { + if !run_id.is_empty() { + info!("Using RUN_ID from environment: {}", run_id); + let client_authorizer = resolve_client_authorizer( + &coordinator_client, + &run_id, + &user_pubkey, + delegate_authorizer.as_ref(), + )?; + return Ok(Self { + wallet_key, + run_id, + coordinator_client, + env_file, + local_docker, + scratch_dir, + client_authorizer, + }); + } + } + + info!("RUN_ID not set, discovering available runs..."); + let runs = coordinator_client.get_all_runs()?; + if runs.is_empty() { + bail!("No runs found on coordinator program"); + } + + let (run_id, client_authorizer) = select_best_run( + &runs, + &user_pubkey, + &coordinator_client, + authorizer.as_ref(), + delegate_authorizer.as_ref(), + )?; + Ok(Self { wallet_key, run_id, @@ -78,6 +110,7 @@ impl RunManager { env_file, local_docker, scratch_dir, + client_authorizer, }) } @@ -159,6 +192,10 @@ impl RunManager { .arg(format!("RAW_WALLET_PRIVATE_KEY={}", &self.wallet_key)) .arg("--env") .arg(format!("CLIENT_VERSION={}", client_version)) + .arg("--env") + .arg(format!("RUN_ID={}", &self.run_id)) + .arg("--env") + .arg(format!("AUTHORIZER={}", &self.client_authorizer)) .arg("--env-file") .arg(&self.env_file); @@ -320,3 +357,173 @@ impl RunManager { } } } + +/// Parse wallet key string to extract the user's pubkey. +pub fn parse_wallet_pubkey(wallet_key: &str) -> Result { + let keypair = if wallet_key.starts_with('[') { + // Assume Keypair::read format (JSON array of bytes) + Keypair::read(&mut Cursor::new(wallet_key)) + .map_err(|e| anyhow!("Failed to parse wallet key: {}", e))? + } else { + // from_base58_string has an internal unwrap() so we use these functions to handle + // errors more gracefuly + let decoded = bs58::decode(wallet_key) + .into_vec() + .map_err(|e| anyhow!("Failed to decode base58 wallet key: {}", e))?; + + Keypair::from_bytes(&decoded) + .map_err(|e| anyhow!("Failed to create keypair from decoded bytes: {}", e))? + }; + Ok(keypair.pubkey()) +} + +/// Read the AUTHORIZER env var as a delegate key pubkey, if set. +pub fn parse_delegate_authorizer_from_env() -> Result> { + match std::env::var("AUTHORIZER") { + Ok(val) if !val.is_empty() => { + let pubkey = val.parse::().with_context(|| { + format!("Failed to parse AUTHORIZER env var as pubkey: {}", val) + })?; + info!( + "Using delegate authorizer from AUTHORIZER env var: {}", + pubkey + ); + Ok(Some(pubkey)) + } + _ => { + info!("AUTHORIZER env var not set, skipping delegate key authorization"); + Ok(None) + } + } +} + +/// Determine the correct AUTHORIZER value for the client container by checking +/// which authorization type (permissionless, user-specific, or delegate) is valid for this run. +fn resolve_client_authorizer( + coordinator_client: &CoordinatorClient, + run_id: &str, + user_pubkey: &Pubkey, + delegate_authorizer: Option<&Pubkey>, +) -> Result { + match coordinator_client.can_user_join_run(run_id, user_pubkey, delegate_authorizer)? { + Some(grantee) => { + info!("Resolved AUTHORIZER={} for run {}", grantee, run_id); + Ok(grantee) + } + None => { + bail!( + "User {} is not authorized to join run {}", + user_pubkey, + run_id + ); + } + } +} + +/// Filter runs to only those that are joinable and authorized for the given user. +/// Returns (run_info, grantee_pubkey) pairs sorted by priority (WaitingForMembers first). +/// +/// - `join_authority_filter`: if set, only consider runs whose join_authority matches this pubkey +/// - `delegate_authorizer`: if set, also try delegate-key authorization via this pubkey +pub fn find_joinable_runs( + runs: &[RunInfo], + user_pubkey: &Pubkey, + coordinator_client: &CoordinatorClient, + join_authority_filter: Option<&Pubkey>, + delegate_authorizer: Option<&Pubkey>, +) -> Result> { + // Filter out unjoinable run states + let mut candidates: Vec<_> = runs + .iter() + .filter(|run| { + !matches!( + run.run_state, + RunState::Uninitialized | RunState::Finished | RunState::Paused + ) + }) + .cloned() + .collect(); + + if candidates.is_empty() { + return Ok(Vec::new()); + } + + // Filter by join_authority if specified + if let Some(auth) = join_authority_filter { + info!("Filtering runs by join_authority: {}", auth); + candidates.retain( + |run| match coordinator_client.fetch_coordinator_data(&run.run_id) { + Ok(data) => data.join_authority == *auth, + Err(e) => { + debug!("Skipping run {} - failed to fetch data: {}", run.run_id, e); + false + } + }, + ); + } + + // Filter to runs the user is authorized to join, capturing the matched grantee + let mut authorized_candidates: Vec<(RunInfo, Pubkey)> = Vec::new(); + for run in candidates { + match coordinator_client.can_user_join_run(&run.run_id, user_pubkey, delegate_authorizer) { + Ok(Some(grantee)) => authorized_candidates.push((run, grantee)), + Ok(None) => {} + Err(e) => { + debug!( + "Skipping run {} - authorization check failed: {}", + run.run_id, e + ); + } + } + } + + // Prioritize runs waiting for members + authorized_candidates.sort_by_key(|(run, _)| match run.run_state { + RunState::WaitingForMembers => 0, + _ => 1, + }); + + Ok(authorized_candidates) +} + +/// Returns (run_id, client_authorizer) where client_authorizer is the grantee +/// to pass to the container as AUTHORIZER. +fn select_best_run( + runs: &[RunInfo], + user_pubkey: &Pubkey, + coordinator_client: &CoordinatorClient, + join_authority_filter: Option<&Pubkey>, + delegate_authorizer: Option<&Pubkey>, +) -> Result<(String, Pubkey)> { + let authorized_candidates = find_joinable_runs( + runs, + user_pubkey, + coordinator_client, + join_authority_filter, + delegate_authorizer, + )?; + + if authorized_candidates.is_empty() { + bail!("No joinable runs found for user {}", user_pubkey); + } + + info!("Found {} available run(s):", authorized_candidates.len()); + let candidate_runs: Vec<_> = authorized_candidates.iter().map(|(r, _)| r).collect(); + for line in RunInfo::format_table(&candidate_runs) { + info!("{}", line); + } + + let (selected_run, grantee) = &authorized_candidates[0]; + info!( + "Selected run: {} ({}, {})", + selected_run.run_id, + selected_run.run_state, + selected_run.clients_display() + ); + info!( + "Resolved AUTHORIZER={} for run {}", + grantee, selected_run.run_id + ); + + Ok((selected_run.run_id.clone(), *grantee)) +} diff --git a/tools/rust-tools/run-manager/src/docker/mod.rs b/tools/rust-tools/run-manager/src/docker/mod.rs index b3221d7eb..f7f7e2eaa 100644 --- a/tools/rust-tools/run-manager/src/docker/mod.rs +++ b/tools/rust-tools/run-manager/src/docker/mod.rs @@ -2,4 +2,8 @@ pub mod coordinator_client; pub mod manager; // Re-exports -pub use manager::{Entrypoint, RunManager}; +pub use coordinator_client::RunInfo; +pub use manager::{ + Entrypoint, RunManager, find_joinable_runs, parse_delegate_authorizer_from_env, + parse_wallet_pubkey, +}; diff --git a/tools/rust-tools/run-manager/src/lib.rs b/tools/rust-tools/run-manager/src/lib.rs index 8734f4d2c..1564e7a91 100644 --- a/tools/rust-tools/run-manager/src/lib.rs +++ b/tools/rust-tools/run-manager/src/lib.rs @@ -1,5 +1,6 @@ // Library exports for run-manager -use anyhow::{Context, Result}; +use anchor_client::solana_sdk::pubkey::Pubkey; +use anyhow::{Context, Result, bail}; use std::path::PathBuf; pub mod commands; @@ -30,3 +31,25 @@ pub fn load_and_apply_env_file(path: &PathBuf) -> Result<()> { pub fn get_env_var(name: &str) -> Result { std::env::var(name).with_context(|| format!("Missing required environment variable: {}", name)) } + +/// Load the wallet private key from environment variables. +pub fn load_wallet_key() -> Result { + if let Ok(raw) = std::env::var("RAW_WALLET_PRIVATE_KEY") { + Ok(raw) + } else if let Ok(path) = std::env::var("WALLET_PRIVATE_KEY_PATH") { + std::fs::read_to_string(&path) + .with_context(|| format!("Failed to read wallet key file: {}", path)) + } else { + bail!("No wallet private key! Set RAW_WALLET_PRIVATE_KEY or WALLET_PRIVATE_KEY_PATH") + } + .map(|s| s.trim().to_string()) +} + +/// Parse an optional string as a Pubkey. +pub fn parse_optional_pubkey(s: Option<&String>, name: &str) -> Result> { + s.map(|s| { + s.parse::() + .with_context(|| format!("Failed to parse {} pubkey: {}", name, s)) + }) + .transpose() +} diff --git a/tools/rust-tools/run-manager/src/main.rs b/tools/rust-tools/run-manager/src/main.rs index 717234d9d..25251f54d 100644 --- a/tools/rust-tools/run-manager/src/main.rs +++ b/tools/rust-tools/run-manager/src/main.rs @@ -10,6 +10,7 @@ use clap::{Args, Parser, Subcommand}; use psyche_solana_rpc::SolanaBackend; use run_manager::commands::{self, Command}; use run_manager::docker::manager::{Entrypoint, RunManager}; +use run_manager::parse_optional_pubkey; use std::io::Cursor; use std::path::PathBuf; use std::sync::Arc; @@ -59,6 +60,10 @@ struct CliArgs { #[arg(long)] local: bool, + /// Only join runs where this pubkey is the join_authority (Docker mode) + #[arg(long)] + authorizer: Option, + /// Optional entrypoint (Docker mode) #[arg(long)] entrypoint: Option, @@ -214,6 +219,21 @@ enum Commands { params: CommandCanJoin, }, + /// List joinable runs on the coordinator program + ListRuns { + /// Path to .env file with RPC and wallet configuration + #[arg(long)] + env_file: Option, + #[clap(flatten)] + cluster: ClusterArgs, + /// Coordinator program ID + #[arg(long, default_value = "4SHugWqSXwKE5fqDchkJcPEqnoZE22VYKtSTVm7axbT7")] + coordinator_program_id: String, + /// Only show runs where this pubkey is the join_authority + #[arg(long)] + authorizer: Option, + }, + // Docs generation #[clap(hide = true)] PrintAllHelp { @@ -287,7 +307,14 @@ async fn async_main() -> Result<()> { None => None, }; - let run_mgr = RunManager::new(args.coordinator_program_id, env_file, args.local)?; + let authorizer = parse_optional_pubkey(args.authorizer.as_ref(), "authorizer")?; + + let run_mgr = RunManager::new( + args.coordinator_program_id, + env_file, + args.local, + authorizer, + )?; let result = run_mgr.run(entrypoint).await; if let Err(e) = &result { error!("Error: {}", e); @@ -370,6 +397,12 @@ async fn async_main() -> Result<()> { Commands::CanJoin { cluster, params } => { params.execute(create_backend_readonly(cluster)?).await } + Commands::ListRuns { + env_file, + cluster, + coordinator_program_id, + authorizer, + } => list_runs(env_file, cluster, coordinator_program_id, authorizer), Commands::PrintAllHelp { markdown } => { assert!(markdown); clap_markdown::print_help_markdown::(); @@ -378,6 +411,58 @@ async fn async_main() -> Result<()> { } } +fn list_runs( + env_file: Option, + cluster: ClusterArgs, + coordinator_program_id: String, + authorizer: Option, +) -> Result<()> { + use anyhow::Context; + use run_manager::docker::{ + RunInfo, coordinator_client::CoordinatorClient, find_joinable_runs, + parse_delegate_authorizer_from_env, parse_wallet_pubkey, + }; + + if let Some(env_file) = env_file { + run_manager::load_and_apply_env_file(&env_file)?; + } + let program_id = coordinator_program_id + .parse::() + .context("Failed to parse coordinator program ID")?; + let rpc = std::env::var("RPC").unwrap_or_else(|_| cluster.rpc.trim_matches('"').to_string()); + let coordinator_client = CoordinatorClient::new(rpc, program_id); + let runs = coordinator_client.get_all_runs()?; + + if runs.is_empty() { + println!("No runs found on coordinator program {}", program_id); + return Ok(()); + } + + let authorizer = parse_optional_pubkey(authorizer.as_ref(), "authorizer")?; + let delegate_authorizer = parse_delegate_authorizer_from_env()?; + let wallet_key = run_manager::load_wallet_key()?; + let user_pubkey = parse_wallet_pubkey(&wallet_key)?; + let candidates = find_joinable_runs( + &runs, + &user_pubkey, + &coordinator_client, + authorizer.as_ref(), + delegate_authorizer.as_ref(), + )?; + + if candidates.is_empty() { + println!("No available runs to join"); + } else { + println!("Found {} joinable run(s):", candidates.len()); + let refs: Vec<_> = candidates.iter().map(|(r, _)| r).collect(); + for line in RunInfo::format_table(&refs) { + println!("{}", line); + } + } + + Ok(()) +} + fn create_backend(cluster: ClusterArgs, wallet: WalletArgs) -> Result { let wallet_keypair: Keypair = wallet.try_into()?; let cluster: Cluster = cluster.into();