Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 143 additions & 8 deletions validator/src/admin_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use {
collections::{HashMap, HashSet},
env, error,
fmt::{self, Display},
net::{IpAddr, SocketAddr},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -39,7 +39,8 @@ use {
tokio::runtime::Runtime,
};

pub(crate) static FIREDANCER_ADMIN_RPC: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
pub(crate) static FIREDANCER_ADMIN_RPC: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);

#[derive(Clone)]
pub struct AdminRpcRequestMetadata {
Expand Down Expand Up @@ -286,6 +287,136 @@ pub extern "C" fn fd_ext_admin_rpc_set_identity(identity_keypair: *const u8, req
}
}

// Returns 0 when the TPU address is applied; -1 when failed.
// All args must be in little endian
#[no_mangle]
pub extern "C" fn fd_ext_admin_rpc_set_public_tpu(
tpu_addr: u32, /* big endian ip, eg. 0x010000000 is 16.0.0.0 */
tpu_port: u16, /* host order */
tpu_fwd_addr: u32,
tpu_fwd_port: u16,
) -> i32 {
loop {
if FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Relaxed) != 0 {
break;
}
std::hint::spin_loop();
}

let metadata: &AdminRpcRequestMetadata = unsafe {
(FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Acquire)
as *const AdminRpcRequestMetadata)
.as_ref()
.unwrap()
};
// Chain both admin RPC calls together, else we'll get error due to the requests being made in the same millisecond.
// the second contact-info refresh would have the same wallclock/hash and lose the CRDS tie-breaker,
// causing refresh_my_gossip_contact_info to log InsertFailed without updating the entry.
match AdminRpcImpl
.set_public_tpu_address(metadata.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::from(tpu_addr.to_be())), tpu_port))
.and(AdminRpcImpl.set_public_tpu_forwards_address(
metadata.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::from(tpu_fwd_addr.to_be())), tpu_fwd_port),
)) {
Ok(()) => 0,
Err(err) => {
error!("Failed to set public TPU to {:?}:{tpu_port}, TPU forward to {:?}:{tpu_fwd_port}. Error: {err}",
IpAddr::V4(Ipv4Addr::from(tpu_addr)),
IpAddr::V4(Ipv4Addr::from(tpu_fwd_addr)),
);
-1
}
}
}

// Returns 0 when the TPU address is applied; -1 when failed.
// Args expect little endian
#[no_mangle]
pub extern "C" fn fd_ext_admin_rpc_get_public_tpu(
out_ip4_addr_host: *mut u32, /* big endian output, eg. 0x010000000 is 16.0.0.0 */
out_port_host: *mut u16 /* big endian */,
) -> i32 {
loop {
if FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Relaxed) != 0 {
break;
}
std::hint::spin_loop();
}

let metadata: &AdminRpcRequestMetadata = unsafe {
(FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Acquire)
as *const AdminRpcRequestMetadata)
.as_ref()
.unwrap()
};
let result = metadata.with_post_init(|post_init| {
let addr = post_init
.cluster_info
.my_contact_info()
.tpu(Protocol::UDP)
.ok_or_else(|| jsonrpc_core::error::Error::internal_error())?;
Ok(addr)
});
match result {
Ok(addr) => unsafe {
*out_ip4_addr_host = match addr.ip() {
IpAddr::V4(v4) => u32::from_le_bytes(v4.octets()),
_ => return -1,
};
*out_port_host = addr.port().to_be();
0
},
Err(err) => {
error!("Failed to get public TPU address: {err}");
-1
}
}
}

// Returns 0 when the TPU address is applied; -1 when failed.
#[no_mangle]
pub extern "C" fn fd_ext_admin_rpc_get_public_tpu_forwards(
out_ip4_addr_host: *mut u32, /* big endian output, eg. 0x010000000 is 16.0.0.0 */
out_port_host: *mut u16 /* big endian */,
) -> i32 {
loop {
if FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Relaxed) != 0 {
break;
}
std::hint::spin_loop();
}

let metadata: &AdminRpcRequestMetadata = unsafe {
(FIREDANCER_ADMIN_RPC.load(std::sync::atomic::Ordering::Acquire)
as *const AdminRpcRequestMetadata)
.as_ref()
.unwrap()
};
let result = metadata.with_post_init(|post_init| {
let addr = post_init
.cluster_info
.my_contact_info()
.tpu_forwards(Protocol::UDP)
.ok_or_else(|| jsonrpc_core::error::Error::internal_error())?;
Ok(addr)
});

match result {
Ok(addr) => unsafe {
*out_ip4_addr_host = match addr.ip() {
IpAddr::V4(v4) => u32::from_le_bytes(v4.octets()),
_ => return -1,
};
*out_port_host = addr.port().to_be();
0
},
Err(err) => {
error!("Failed to get public TPU forwards address: {err}");
-1
}
}
}

pub struct AdminRpcImpl;
impl AdminRpc for AdminRpcImpl {
type Metadata = AdminRpcRequestMetadata;
Expand Down Expand Up @@ -523,8 +654,10 @@ impl AdminRpc for AdminRpcImpl {
_require_tower: bool,
) -> Result<()> {
// FIREDANCER: Operator must use Firedancer set-identity command.
error!("`agave-validator set-identity` is not supported with Frankendancer. \
Please use `fdctl set-identity` to change the identity keypair");
error!(
"`agave-validator set-identity` is not supported with Frankendancer. \
Please use `fdctl set-identity` to change the identity keypair"
);
Err(jsonrpc_core::error::Error::invalid_request())

// debug!("set_identity request received");
Expand All @@ -545,8 +678,10 @@ impl AdminRpc for AdminRpcImpl {
_require_tower: bool,
) -> Result<()> {
// FIREDANCER: Operator must use Firedancer set-identity command.
error!("`agave-validator set-identity` is not supported with Frankendancer. \
Please use `fdctl set-identity` to change the identity keypair");
error!(
"`agave-validator set-identity` is not supported with Frankendancer. \
Please use `fdctl set-identity` to change the identity keypair"
);
Err(jsonrpc_core::error::Error::invalid_request())

// debug!("set_identity_from_bytes request received");
Expand Down Expand Up @@ -706,7 +841,7 @@ impl AdminRpc for AdminRpcImpl {
meta: Self::Metadata,
public_tpu_addr: SocketAddr,
) -> Result<()> {
debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
info!("set_public_tpu_address rpc request received: {public_tpu_addr}");

meta.with_post_init(|post_init| {
post_init
Expand Down Expand Up @@ -743,7 +878,7 @@ impl AdminRpc for AdminRpcImpl {
meta: Self::Metadata,
public_tpu_forwards_addr: SocketAddr,
) -> Result<()> {
debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
info!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");

meta.with_post_init(|post_init| {
post_init
Expand Down