diff --git a/crates/sentinel/CHANGELOG.md b/crates/sentinel/CHANGELOG.md index c9bb1875..5f9f02b4 100644 --- a/crates/sentinel/CHANGELOG.md +++ b/crates/sentinel/CHANGELOG.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] - +- feat: autojoin validators to approved mcast pub groups - feat: billing sentinel for tenant payment status monitoring ([#265](https://github.com/doublezerofoundation/doublezero-offchain/pull/265)) - change default leader schedule lookahead from 2 epochs to 1 ([#259](https://github.com/doublezerofoundation/doublezero-offchain/pull/259)) - fix(sentinel): improve retry handling for transient RPC errors([#220](https://github.com/doublezerofoundation/doublezero-offchain/pull/220)) diff --git a/crates/sentinel/src/client/doublezero_ledger.rs b/crates/sentinel/src/client/doublezero_ledger.rs index 78f75dd2..07624f9b 100644 --- a/crates/sentinel/src/client/doublezero_ledger.rs +++ b/crates/sentinel/src/client/doublezero_ledger.rs @@ -1,4 +1,4 @@ -use std::{net::Ipv4Addr, sync::Arc, time::Duration}; +use std::{collections::HashMap, net::Ipv4Addr, sync::Arc, time::Duration}; use async_trait::async_trait; use doublezero_program_tools::instruction::try_build_instruction; @@ -7,11 +7,14 @@ use doublezero_serviceability::{ instructions::DoubleZeroInstruction, pda::{get_accesspass_pda, get_globalstate_pda}, processors::{ - accesspass::set::SetAccessPassArgs, tenant::update_payment_status::UpdatePaymentStatusArgs, + accesspass::set::SetAccessPassArgs, + multicastgroup::allowlist::publisher::add::AddMulticastGroupPubAllowlistArgs, + tenant::update_payment_status::UpdatePaymentStatusArgs, }, state::{ - accesspass::AccessPassType, + accesspass::{AccessPass, AccessPassType}, accounttype::AccountType, + multicastgroup::{MulticastGroup, MulticastGroupStatus}, tenant::{Tenant, TenantPaymentStatus}, }, }; @@ -50,6 +53,19 @@ pub trait DzRpcClientType { validator_id: &Pubkey, ) -> Result; + async fn add_multicast_publisher_allowlist( + &self, + multicast_group_pda: &Pubkey, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result; + + async fn get_access_pass( + &self, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result; + async fn get_tenants_with_token_accounts(&self) -> Result>; async fn update_tenant_payment_status( @@ -87,6 +103,24 @@ impl DzRpcClientType for DzRpcClient { .await } + async fn add_multicast_publisher_allowlist( + &self, + multicast_group_pda: &Pubkey, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result { + self.add_multicast_publisher_allowlist(multicast_group_pda, service_key, client_ip) + .await + } + + async fn get_access_pass( + &self, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result { + self.get_access_pass(service_key, client_ip).await + } + async fn get_tenants_with_token_accounts(&self) -> Result> { self.get_tenants_with_token_accounts().await } @@ -170,6 +204,66 @@ impl DzRpcClient { Ok(signature) } + pub async fn add_multicast_publisher_allowlist( + &self, + multicast_group_pda: &Pubkey, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result { + let (globalstate_pk, _) = get_globalstate_pda(&self.serviceability_id); + let (accesspass_pk, _) = + get_accesspass_pda(&self.serviceability_id, client_ip, service_key); + + let args = DoubleZeroInstruction::AddMulticastGroupPubAllowlist( + AddMulticastGroupPubAllowlistArgs { + client_ip: *client_ip, + user_payer: *service_key, + }, + ); + + let accounts = vec![ + AccountMeta::new(*multicast_group_pda, false), + AccountMeta::new(accesspass_pk, false), + AccountMeta::new_readonly(globalstate_pk, false), + AccountMeta::new(self.payer.pubkey(), true), + AccountMeta::new_readonly(system_program::id(), false), + ]; + + let ix = try_build_instruction(&self.serviceability_id, accounts, &args)?; + let signer = &self.payer; + let recent_blockhash = self.client.get_latest_blockhash().await?; + let transaction = new_transaction(&[ix], &[signer], recent_blockhash); + + let signature = tokio::time::timeout( + SEND_AND_CONFIRM_TIMEOUT, + self.client.send_and_confirm_transaction(&transaction), + ) + .await + .map_err(|_| Error::Deserialize("add_multicast_publisher_allowlist timed out".into()))??; + info!( + validator = %service_key, + %multicast_group_pda, + %client_ip, + %signature, + "added to multicast publisher allowlist" + ); + + Ok(signature) + } + + pub async fn get_access_pass( + &self, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result { + let (accesspass_pk, _) = + get_accesspass_pda(&self.serviceability_id, client_ip, service_key); + let account_data = self.client.get_account_data(&accesspass_pk).await?; + let access_pass = AccessPass::try_from(&account_data[..]) + .map_err(|e| Error::Deserialize(format!("AccessPass deserialization failed: {e}")))?; + Ok(access_pass) + } + pub async fn get_tenants_with_token_accounts(&self) -> Result> { let config = RpcProgramAccountsConfig { filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( @@ -344,4 +438,246 @@ impl DzRpcClient { let epoch_info = self.client.get_epoch_info().await?; Ok(epoch_info.epoch.saturating_sub(1)) } + + /// Resolves multicast group codes to their on-chain PDAs by fetching all + /// MulticastGroup accounts and filtering by code. Intended to be called + /// once at startup, not in the hot loop. + pub async fn resolve_multicast_group_codes( + &self, + codes: &[String], + ) -> Result> { + if codes.is_empty() { + return Ok(vec![]); + } + + let config = RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![AccountType::MulticastGroup as u8], + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..Default::default() + }, + ..Default::default() + }; + + let accounts = self + .client + .get_program_accounts_with_config(&self.serviceability_id, config) + .await?; + + let decoded: Vec<(Pubkey, MulticastGroup)> = accounts + .iter() + .filter_map(|(pubkey, account)| { + let group = MulticastGroup::try_from(&account.data[..]).ok()?; + Some((*pubkey, group)) + }) + .collect(); + + let resolved = resolve_codes_from_accounts(&decoded, codes)?; + for (code, pda) in &resolved { + info!(code, %pda, "resolved multicast group code"); + } + + Ok(resolved) + } +} + +/// Pure resolution logic extracted for testability. Detects duplicate +/// codes across **all** groups (regardless of status), then resolves +/// requested codes against `Activated`-only groups. +fn resolve_codes_from_accounts( + accounts: &[(Pubkey, MulticastGroup)], + codes: &[String], +) -> Result> { + // First pass: detect duplicate codes across all statuses. + let mut seen: HashMap<&str, Pubkey> = HashMap::new(); + for (pda, group) in accounts { + if let Some(existing_pda) = seen.insert(group.code.as_str(), *pda) { + return Err(Error::Deserialize(format!( + "duplicate multicast group code '{}': PDAs {} and {}", + group.code, existing_pda, pda + ))); + } + } + + // Second pass: build lookup from Activated groups only. + let activated: HashMap<&str, Pubkey> = accounts + .iter() + .filter(|(_, group)| group.status == MulticastGroupStatus::Activated) + .map(|(pda, group)| (group.code.as_str(), *pda)) + .collect(); + + codes + .iter() + .map(|code| { + activated + .get(code.as_str()) + .map(|&pda| (code.clone(), pda)) + .ok_or_else(|| { + Error::Deserialize(format!( + "multicast group code '{}' not found (or not activated) on DZ Ledger", + code + )) + }) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use doublezero_serviceability::state::{ + accounttype::AccountType, + multicastgroup::{MulticastGroup, MulticastGroupStatus}, + }; + use solana_sdk::pubkey::Pubkey; + + use super::resolve_codes_from_accounts; + + fn make_group(code: &str, status: MulticastGroupStatus) -> MulticastGroup { + MulticastGroup { + account_type: AccountType::MulticastGroup, + owner: Pubkey::default(), + index: 0, + bump_seed: 0, + tenant_pk: Pubkey::default(), + multicast_ip: Ipv4Addr::new(239, 0, 0, 1), + max_bandwidth: 0, + status, + code: code.to_string(), + publisher_count: 0, + subscriber_count: 0, + } + } + + #[test] + fn empty_codes_returns_empty() { + let accounts = vec![( + Pubkey::new_unique(), + make_group("ALPHA", MulticastGroupStatus::Activated), + )]; + let result = resolve_codes_from_accounts(&accounts, &[]).unwrap(); + assert!(result.is_empty()); + } + + #[test] + fn happy_path_multiple_codes() { + let pda_a = Pubkey::new_unique(); + let pda_b = Pubkey::new_unique(); + let accounts = vec![ + (pda_a, make_group("ALPHA", MulticastGroupStatus::Activated)), + (pda_b, make_group("BETA", MulticastGroupStatus::Activated)), + ]; + let codes = vec!["ALPHA".into(), "BETA".into()]; + let result = resolve_codes_from_accounts(&accounts, &codes).unwrap(); + assert_eq!( + result, + vec![("ALPHA".into(), pda_a), ("BETA".into(), pda_b)] + ); + } + + #[test] + fn same_code_across_statuses_is_duplicate_error() { + let pda_pending = Pubkey::new_unique(); + let pda_activated = Pubkey::new_unique(); + let accounts = vec![ + ( + pda_pending, + make_group("ALPHA", MulticastGroupStatus::Pending), + ), + ( + pda_activated, + make_group("ALPHA", MulticastGroupStatus::Activated), + ), + ]; + let codes = vec!["ALPHA".into()]; + let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err(); + assert!( + err.to_string() + .contains("duplicate multicast group code 'ALPHA'"), + "{}", + err + ); + } + + #[test] + fn non_activated_groups_not_resolvable() { + let accounts = vec![ + ( + Pubkey::new_unique(), + make_group("PENDING", MulticastGroupStatus::Pending), + ), + ( + Pubkey::new_unique(), + make_group("SUSPENDED", MulticastGroupStatus::Suspended), + ), + ( + Pubkey::new_unique(), + make_group("DELETING", MulticastGroupStatus::Deleting), + ), + ( + Pubkey::new_unique(), + make_group("REJECTED", MulticastGroupStatus::Rejected), + ), + ]; + let err = resolve_codes_from_accounts(&accounts, &["PENDING".into()]).unwrap_err(); + assert!( + err.to_string().contains("not found (or not activated)"), + "{}", + err + ); + } + + #[test] + fn duplicate_activated_codes_returns_error() { + let pda_1 = Pubkey::new_unique(); + let pda_2 = Pubkey::new_unique(); + let accounts = vec![ + (pda_1, make_group("ALPHA", MulticastGroupStatus::Activated)), + (pda_2, make_group("ALPHA", MulticastGroupStatus::Activated)), + ]; + let codes = vec!["ALPHA".into()]; + let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("duplicate multicast group code 'ALPHA'"), + "{msg}" + ); + assert!(msg.contains(&pda_1.to_string()), "{msg}"); + assert!(msg.contains(&pda_2.to_string()), "{msg}"); + } + + #[test] + fn code_not_found_returns_error() { + let accounts = vec![( + Pubkey::new_unique(), + make_group("BETA", MulticastGroupStatus::Activated), + )]; + let codes = vec!["ALPHA".into()]; + let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err(); + assert!( + err.to_string() + .contains("multicast group code 'ALPHA' not found"), + "{}", + err + ); + } + + #[test] + fn code_exists_but_not_activated_returns_error() { + let accounts = vec![( + Pubkey::new_unique(), + make_group("ALPHA", MulticastGroupStatus::Suspended), + )]; + let codes = vec!["ALPHA".into()]; + let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err(); + assert!( + err.to_string().contains("not found (or not activated)"), + "{}", + err + ); + } } diff --git a/crates/sentinel/src/main.rs b/crates/sentinel/src/main.rs index a486b8f7..c8589bb9 100644 --- a/crates/sentinel/src/main.rs +++ b/crates/sentinel/src/main.rs @@ -44,6 +44,25 @@ async fn main() -> anyhow::Result<()> { "DoubleZero Ledger Sentinel starting" ); + let multicast_group_codes = settings.multicast_group_codes(); + let multicast_group_pdas = if multicast_group_codes.is_empty() { + info!("multicast publisher allowlisting disabled (no codes configured)"); + vec![] + } else { + let dz_client = DzRpcClient::new(dz_rpc_url.clone(), keypair.clone(), serviceability_id); + let resolved = dz_client + .resolve_multicast_group_codes(&multicast_group_codes) + .await?; + let pdas: Vec<_> = resolved.iter().map(|(_, pda)| *pda).collect(); + info!( + count = pdas.len(), + codes = ?multicast_group_codes, + pdas = ?pdas, + "multicast publisher allowlisting enabled" + ); + pdas + }; + let mut polling_sentinel = PollingSentinel::new( dz_rpc_url.clone(), sol_rpc_url.clone(), @@ -51,6 +70,7 @@ async fn main() -> anyhow::Result<()> { serviceability_id, args.poll_interval, ENV_PREVIOUS_LEADER_EPOCHS, + multicast_group_pdas, ) .await?; diff --git a/crates/sentinel/src/sentinel/poller.rs b/crates/sentinel/src/sentinel/poller.rs index 50a82a78..c607c8c2 100644 --- a/crates/sentinel/src/sentinel/poller.rs +++ b/crates/sentinel/src/sentinel/poller.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, net::Ipv4Addr, sync::Arc, time::{Duration, Instant}, @@ -9,7 +10,7 @@ use retainer::Cache; use solana_sdk::{pubkey::Pubkey, signature::Keypair}; use tokio::time::interval; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{error, info, warn}; use url::Url; use crate::{ @@ -30,6 +31,7 @@ pub struct PollingSentinel { processed_cache: Arc>, poll_interval: Duration, previous_leader_epochs: u8, + multicast_group_pdas: Vec, } impl PollingSentinel { @@ -40,6 +42,7 @@ impl PollingSentinel { serviceability_id: Pubkey, poll_interval_secs: u64, previous_leader_epochs: u8, + multicast_group_pdas: Vec, ) -> Result { // Create cache with automatic background cleanup let processed_cache = Arc::new(Cache::new()); @@ -57,6 +60,7 @@ impl PollingSentinel { processed_cache, poll_interval: Duration::from_secs(poll_interval_secs), previous_leader_epochs, + multicast_group_pdas, }) } @@ -152,6 +156,71 @@ impl PollingSentinel { ) .await?; info!(%validator_id, %validator_ip, user = %service_key, "access pass issued"); + + let existing_mgroup_pubs: HashSet = match rpc_with_retry( + || async { + self.dz_rpc_client + .get_access_pass(&service_key, &validator_ip) + .await + }, + "get_access_pass", + ) + .await + { + Ok(access_pass) => access_pass.mgroup_pub_allowlist.into_iter().collect(), + Err(err) => { + warn!( + ?err, %validator_id, %validator_ip, + "failed to fetch access pass for idempotency check; \ + will attempt all multicast allowlist additions" + ); + HashSet::new() + } + }; + + for mgroup_pda in &self.multicast_group_pdas { + if existing_mgroup_pubs.contains(mgroup_pda) { + info!( + %validator_id, %validator_ip, %mgroup_pda, + "validator already in multicast publisher allowlist; skipping" + ); + metrics::counter!("doublezero_sentinel_multicast_allowlist_skipped") + .increment(1); + continue; + } + + match rpc_with_retry( + || async { + self.dz_rpc_client + .add_multicast_publisher_allowlist( + mgroup_pda, + &service_key, + &validator_ip, + ) + .await + }, + "add_multicast_publisher_allowlist", + ) + .await + { + Ok(_) => { + info!( + %validator_id, %validator_ip, %mgroup_pda, + "multicast publisher allowlist added" + ); + metrics::counter!("doublezero_sentinel_multicast_allowlist_success") + .increment(1); + } + Err(err) => { + error!( + ?err, %validator_id, %validator_ip, %mgroup_pda, + "multicast allowlist failed; continuing" + ); + metrics::counter!("doublezero_sentinel_multicast_allowlist_failed") + .increment(1); + } + } + } } let signature = rpc_with_retry( @@ -277,6 +346,7 @@ mod tests { processed_cache: Arc::new(Cache::new()), poll_interval: Duration::from_secs(15), previous_leader_epochs: 0, + multicast_group_pdas: vec![], }; // Invalid signature -> verify_access_request(...) should return Error::SignatureVerify diff --git a/crates/sentinel/src/settings.rs b/crates/sentinel/src/settings.rs index b27473ea..b8856390 100644 --- a/crates/sentinel/src/settings.rs +++ b/crates/sentinel/src/settings.rs @@ -62,6 +62,11 @@ pub struct Settings { /// metrics listening endpoint #[serde(default = "default_metrics_addr")] metrics_addr: String, + + /// Comma-separated multicast group codes for publisher allowlisting on + /// validator onboarding. When empty (default), allowlisting is disabled. + #[serde(default)] + multicast_group_codes: Option, } impl Settings { @@ -111,6 +116,18 @@ impl Settings { .expect("invalid metrics network address and port") } + pub fn multicast_group_codes(&self) -> Vec { + self.multicast_group_codes + .as_deref() + .map(|s| { + s.split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() + }) + .unwrap_or_default() + } + pub fn serviceability_program_id( &self, ) -> Result { @@ -144,6 +161,21 @@ impl Settings { } } +/// Helper to build a `Settings` with only `multicast_group_codes` set. +/// All other fields use placeholder values (not relevant for the test). +#[cfg(test)] +fn settings_with_mcast_codes(codes: Option<&str>) -> Settings { + Settings { + log: default_log(), + env: "devnet".into(), + dz_rpc: "http://localhost:1234".into(), + sol_rpc: "localhost".into(), + keypair: "/dev/null".into(), + metrics_addr: default_metrics_addr(), + multicast_group_codes: codes.map(String::from), + } +} + fn default_log() -> String { "doublezero_ledger_sentinel=info".to_string() } @@ -151,3 +183,41 @@ fn default_log() -> String { fn default_metrics_addr() -> String { "127.0.0.1:2112".to_string() } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn multicast_group_codes_none() { + let settings = settings_with_mcast_codes(None); + assert!(settings.multicast_group_codes().is_empty()); + } + + #[test] + fn multicast_group_codes_empty_string() { + let settings = settings_with_mcast_codes(Some("")); + assert!(settings.multicast_group_codes().is_empty()); + } + + #[test] + fn multicast_group_codes_single() { + let settings = settings_with_mcast_codes(Some("ALPHA")); + assert_eq!(settings.multicast_group_codes(), vec!["ALPHA"]); + } + + #[test] + fn multicast_group_codes_multiple_with_whitespace() { + let settings = settings_with_mcast_codes(Some(" ALPHA , BETA , GAMMA ")); + assert_eq!( + settings.multicast_group_codes(), + vec!["ALPHA", "BETA", "GAMMA"] + ); + } + + #[test] + fn multicast_group_codes_trailing_comma() { + let settings = settings_with_mcast_codes(Some("ALPHA,BETA,")); + assert_eq!(settings.multicast_group_codes(), vec!["ALPHA", "BETA"]); + } +}