diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index cf89c47caf5236..7371747e219f18 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -27,7 +27,7 @@ use { collections::{HashMap, HashSet}, env, error, fmt::{self, Display}, - net::{IpAddr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, Ordering}, @@ -39,7 +39,8 @@ use { tokio::runtime::Runtime, }; -pub(crate) static FIREDANCER_ADMIN_RPC: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); +pub(crate) static FIREDANCER_ADMIN_RPC: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); #[derive(Clone)] pub struct AdminRpcRequestMetadata { @@ -286,6 +287,136 @@ pub extern "C" fn fd_ext_admin_rpc_set_identity(identity_keypair: *const u8, req } } +// Returns 0 when the TPU address is applied; -1 when failed. +// All args must be in little endian +#[no_mangle] +pub extern "C" fn fd_ext_admin_rpc_set_public_tpu( + tpu_addr: u32, /* big endian ip, eg. 0x010000000 is 16.0.0.0 */ + tpu_port: u16, /* host order */ + tpu_fwd_addr: u32, + tpu_fwd_port: u16, +) -> i32 { + loop { + if FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Relaxed) != 0 { + break; + } + std::hint::spin_loop(); + } + + let metadata: &AdminRpcRequestMetadata = unsafe { + (FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Acquire) + as *const AdminRpcRequestMetadata) + .as_ref() + .unwrap() + }; + // Chain both admin RPC calls together, else we'll get error due to the requests being made in the same millisecond. + // the second contact-info refresh would have the same wallclock/hash and lose the CRDS tie-breaker, + // causing refresh_my_gossip_contact_info to log InsertFailed without updating the entry. + match AdminRpcImpl + .set_public_tpu_address(metadata.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::from(tpu_addr.to_be())), tpu_port)) + .and(AdminRpcImpl.set_public_tpu_forwards_address( + metadata.clone(), + SocketAddr::new(IpAddr::V4(Ipv4Addr::from(tpu_fwd_addr.to_be())), tpu_fwd_port), + )) { + Ok(()) => 0, + Err(err) => { + error!("Failed to set public TPU to {:?}:{tpu_port}, TPU forward to {:?}:{tpu_fwd_port}. Error: {err}", + IpAddr::V4(Ipv4Addr::from(tpu_addr)), + IpAddr::V4(Ipv4Addr::from(tpu_fwd_addr)), + ); + -1 + } + } +} + +// Returns 0 when the TPU address is applied; -1 when failed. +// Args expect little endian +#[no_mangle] +pub extern "C" fn fd_ext_admin_rpc_get_public_tpu( + out_ip4_addr_host: *mut u32, /* big endian output, eg. 0x010000000 is 16.0.0.0 */ + out_port_host: *mut u16 /* big endian */, +) -> i32 { + loop { + if FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Relaxed) != 0 { + break; + } + std::hint::spin_loop(); + } + + let metadata: &AdminRpcRequestMetadata = unsafe { + (FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Acquire) + as *const AdminRpcRequestMetadata) + .as_ref() + .unwrap() + }; + let result = metadata.with_post_init(|post_init| { + let addr = post_init + .cluster_info + .my_contact_info() + .tpu(Protocol::UDP) + .ok_or_else(|| jsonrpc_core::error::Error::internal_error())?; + Ok(addr) + }); + match result { + Ok(addr) => unsafe { + *out_ip4_addr_host = match addr.ip() { + IpAddr::V4(v4) => u32::from_le_bytes(v4.octets()), + _ => return -1, + }; + *out_port_host = addr.port().to_be(); + 0 + }, + Err(err) => { + error!("Failed to get public TPU address: {err}"); + -1 + } + } +} + +// Returns 0 when the TPU address is applied; -1 when failed. +#[no_mangle] +pub extern "C" fn fd_ext_admin_rpc_get_public_tpu_forwards( + out_ip4_addr_host: *mut u32, /* big endian output, eg. 0x010000000 is 16.0.0.0 */ + out_port_host: *mut u16 /* big endian */, +) -> i32 { + loop { + if FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Relaxed) != 0 { + break; + } + std::hint::spin_loop(); + } + + let metadata: &AdminRpcRequestMetadata = unsafe { + (FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Acquire) + as *const AdminRpcRequestMetadata) + .as_ref() + .unwrap() + }; + let result = metadata.with_post_init(|post_init| { + let addr = post_init + .cluster_info + .my_contact_info() + .tpu_forwards(Protocol::UDP) + .ok_or_else(|| jsonrpc_core::error::Error::internal_error())?; + Ok(addr) + }); + + match result { + Ok(addr) => unsafe { + *out_ip4_addr_host = match addr.ip() { + IpAddr::V4(v4) => u32::from_le_bytes(v4.octets()), + _ => return -1, + }; + *out_port_host = addr.port().to_be(); + 0 + }, + Err(err) => { + error!("Failed to get public TPU forwards address: {err}"); + -1 + } + } +} + pub struct AdminRpcImpl; impl AdminRpc for AdminRpcImpl { type Metadata = AdminRpcRequestMetadata; @@ -523,8 +654,10 @@ impl AdminRpc for AdminRpcImpl { _require_tower: bool, ) -> Result<()> { // FIREDANCER: Operator must use Firedancer set-identity command. - error!("`agave-validator set-identity` is not supported with Frankendancer. \ - Please use `fdctl set-identity` to change the identity keypair"); + error!( + "`agave-validator set-identity` is not supported with Frankendancer. \ + Please use `fdctl set-identity` to change the identity keypair" + ); Err(jsonrpc_core::error::Error::invalid_request()) // debug!("set_identity request received"); @@ -545,8 +678,10 @@ impl AdminRpc for AdminRpcImpl { _require_tower: bool, ) -> Result<()> { // FIREDANCER: Operator must use Firedancer set-identity command. - error!("`agave-validator set-identity` is not supported with Frankendancer. \ - Please use `fdctl set-identity` to change the identity keypair"); + error!( + "`agave-validator set-identity` is not supported with Frankendancer. \ + Please use `fdctl set-identity` to change the identity keypair" + ); Err(jsonrpc_core::error::Error::invalid_request()) // debug!("set_identity_from_bytes request received"); @@ -706,7 +841,7 @@ impl AdminRpc for AdminRpcImpl { meta: Self::Metadata, public_tpu_addr: SocketAddr, ) -> Result<()> { - debug!("set_public_tpu_address rpc request received: {public_tpu_addr}"); + info!("set_public_tpu_address rpc request received: {public_tpu_addr}"); meta.with_post_init(|post_init| { post_init @@ -743,7 +878,7 @@ impl AdminRpc for AdminRpcImpl { meta: Self::Metadata, public_tpu_forwards_addr: SocketAddr, ) -> Result<()> { - debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}"); + info!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}"); meta.with_post_init(|post_init| { post_init