From c58bb9669eaa162eae45dc29b5a3f9de89a16aa1 Mon Sep 17 00:00:00 2001 From: Ben Marx Date: Thu, 19 Feb 2026 12:16:31 -0800 Subject: [PATCH 1/6] feat(sentinel): mcast pub for sentinel --- crates/sentinel/CHANGELOG.md | 2 +- .../sentinel/src/client/doublezero_ledger.rs | 123 +++++++++++++++++- crates/sentinel/src/main.rs | 20 +++ crates/sentinel/src/sentinel/poller.rs | 38 ++++++ crates/sentinel/src/settings.rs | 17 +++ 5 files changed, 198 insertions(+), 2 deletions(-) diff --git a/crates/sentinel/CHANGELOG.md b/crates/sentinel/CHANGELOG.md index c9bb1875..5f9f02b4 100644 --- a/crates/sentinel/CHANGELOG.md +++ b/crates/sentinel/CHANGELOG.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] - +- feat: autojoin validators to approved mcast pub groups - feat: billing sentinel for tenant payment status monitoring ([#265](https://github.com/doublezerofoundation/doublezero-offchain/pull/265)) - change default leader schedule lookahead from 2 epochs to 1 ([#259](https://github.com/doublezerofoundation/doublezero-offchain/pull/259)) - fix(sentinel): improve retry handling for transient RPC errors([#220](https://github.com/doublezerofoundation/doublezero-offchain/pull/220)) diff --git a/crates/sentinel/src/client/doublezero_ledger.rs b/crates/sentinel/src/client/doublezero_ledger.rs index 78f75dd2..42b94097 100644 --- a/crates/sentinel/src/client/doublezero_ledger.rs +++ b/crates/sentinel/src/client/doublezero_ledger.rs @@ -7,11 +7,14 @@ use doublezero_serviceability::{ instructions::DoubleZeroInstruction, pda::{get_accesspass_pda, get_globalstate_pda}, processors::{ - accesspass::set::SetAccessPassArgs, tenant::update_payment_status::UpdatePaymentStatusArgs, + accesspass::set::SetAccessPassArgs, + multicastgroup::allowlist::publisher::add::AddMulticastGroupPubAllowlistArgs, + tenant::update_payment_status::UpdatePaymentStatusArgs, }, state::{ accesspass::AccessPassType, accounttype::AccountType, + multicastgroup::MulticastGroup, tenant::{Tenant, TenantPaymentStatus}, }, }; @@ -50,6 +53,13 @@ pub trait DzRpcClientType { validator_id: &Pubkey, ) -> Result; + async fn add_multicast_publisher_allowlist( + &self, + multicast_group_pda: &Pubkey, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result; + async fn get_tenants_with_token_accounts(&self) -> Result>; async fn update_tenant_payment_status( @@ -87,6 +97,16 @@ impl DzRpcClientType for DzRpcClient { .await } + async fn add_multicast_publisher_allowlist( + &self, + multicast_group_pda: &Pubkey, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result { + self.add_multicast_publisher_allowlist(multicast_group_pda, service_key, client_ip) + .await + } + async fn get_tenants_with_token_accounts(&self) -> Result> { self.get_tenants_with_token_accounts().await } @@ -170,6 +190,51 @@ impl DzRpcClient { Ok(signature) } + pub async fn add_multicast_publisher_allowlist( + &self, + multicast_group_pda: &Pubkey, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result { + let (globalstate_pk, _) = get_globalstate_pda(&self.serviceability_id); + let (accesspass_pk, _) = + get_accesspass_pda(&self.serviceability_id, client_ip, service_key); + + let args = DoubleZeroInstruction::AddMulticastGroupPubAllowlist( + AddMulticastGroupPubAllowlistArgs { + client_ip: *client_ip, + user_payer: *service_key, + }, + ); + + let accounts = vec![ + AccountMeta::new(*multicast_group_pda, false), + AccountMeta::new(accesspass_pk, false), + AccountMeta::new_readonly(globalstate_pk, false), + AccountMeta::new(self.payer.pubkey(), true), + AccountMeta::new_readonly(system_program::id(), false), + ]; + + let ix = try_build_instruction(&self.serviceability_id, accounts, &args)?; + let signer = &self.payer; + let recent_blockhash = self.client.get_latest_blockhash().await?; + let transaction = new_transaction(&[ix], &[signer], recent_blockhash); + + let signature = self + .client + .send_and_confirm_transaction(&transaction) + .await?; + info!( + validator = %service_key, + %multicast_group_pda, + %client_ip, + %signature, + "added to multicast publisher allowlist" + ); + + Ok(signature) + } + pub async fn get_tenants_with_token_accounts(&self) -> Result> { let config = RpcProgramAccountsConfig { filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( @@ -344,4 +409,60 @@ impl DzRpcClient { let epoch_info = self.client.get_epoch_info().await?; Ok(epoch_info.epoch.saturating_sub(1)) } + + /// Resolves multicast group codes to their on-chain PDAs by fetching all + /// MulticastGroup accounts and filtering by code. Intended to be called + /// once at startup, not in the hot loop. + pub async fn resolve_multicast_group_codes( + &self, + codes: &[String], + ) -> Result> { + if codes.is_empty() { + return Ok(vec![]); + } + + let config = RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![AccountType::MulticastGroup as u8], + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..Default::default() + }, + ..Default::default() + }; + + let accounts = self + .client + .get_program_accounts_with_config(&self.serviceability_id, config) + .await?; + + let mut resolved = Vec::new(); + for code in codes { + let found = accounts.iter().find_map(|(pubkey, account)| { + let group = MulticastGroup::try_from(&account.data[..]).ok()?; + if group.code == *code { + Some(*pubkey) + } else { + None + } + }); + + match found { + Some(pda) => { + info!(code, %pda, "resolved multicast group code"); + resolved.push((code.clone(), pda)); + } + None => { + return Err(Error::Deserialize(format!( + "multicast group code '{}' not found on DZ Ledger", + code + ))); + } + } + } + + Ok(resolved) + } } diff --git a/crates/sentinel/src/main.rs b/crates/sentinel/src/main.rs index a486b8f7..c8589bb9 100644 --- a/crates/sentinel/src/main.rs +++ b/crates/sentinel/src/main.rs @@ -44,6 +44,25 @@ async fn main() -> anyhow::Result<()> { "DoubleZero Ledger Sentinel starting" ); + let multicast_group_codes = settings.multicast_group_codes(); + let multicast_group_pdas = if multicast_group_codes.is_empty() { + info!("multicast publisher allowlisting disabled (no codes configured)"); + vec![] + } else { + let dz_client = DzRpcClient::new(dz_rpc_url.clone(), keypair.clone(), serviceability_id); + let resolved = dz_client + .resolve_multicast_group_codes(&multicast_group_codes) + .await?; + let pdas: Vec<_> = resolved.iter().map(|(_, pda)| *pda).collect(); + info!( + count = pdas.len(), + codes = ?multicast_group_codes, + pdas = ?pdas, + "multicast publisher allowlisting enabled" + ); + pdas + }; + let mut polling_sentinel = PollingSentinel::new( dz_rpc_url.clone(), sol_rpc_url.clone(), @@ -51,6 +70,7 @@ async fn main() -> anyhow::Result<()> { serviceability_id, args.poll_interval, ENV_PREVIOUS_LEADER_EPOCHS, + multicast_group_pdas, ) .await?; diff --git a/crates/sentinel/src/sentinel/poller.rs b/crates/sentinel/src/sentinel/poller.rs index 50a82a78..cdae373c 100644 --- a/crates/sentinel/src/sentinel/poller.rs +++ b/crates/sentinel/src/sentinel/poller.rs @@ -30,6 +30,7 @@ pub struct PollingSentinel { processed_cache: Arc>, poll_interval: Duration, previous_leader_epochs: u8, + multicast_group_pdas: Vec, } impl PollingSentinel { @@ -40,6 +41,7 @@ impl PollingSentinel { serviceability_id: Pubkey, poll_interval_secs: u64, previous_leader_epochs: u8, + multicast_group_pdas: Vec, ) -> Result { // Create cache with automatic background cleanup let processed_cache = Arc::new(Cache::new()); @@ -57,6 +59,7 @@ impl PollingSentinel { processed_cache, poll_interval: Duration::from_secs(poll_interval_secs), previous_leader_epochs, + multicast_group_pdas, }) } @@ -152,6 +155,40 @@ impl PollingSentinel { ) .await?; info!(%validator_id, %validator_ip, user = %service_key, "access pass issued"); + + for mgroup_pda in &self.multicast_group_pdas { + match rpc_with_retry( + || async { + self.dz_rpc_client + .add_multicast_publisher_allowlist( + mgroup_pda, + &service_key, + &validator_ip, + ) + .await + }, + "add_multicast_publisher_allowlist", + ) + .await + { + Ok(_) => { + info!( + %validator_id, %validator_ip, %mgroup_pda, + "multicast publisher allowlist added" + ); + metrics::counter!("doublezero_sentinel_multicast_allowlist_success") + .increment(1); + } + Err(err) => { + error!( + ?err, %validator_id, %validator_ip, %mgroup_pda, + "multicast allowlist failed; continuing" + ); + metrics::counter!("doublezero_sentinel_multicast_allowlist_failed") + .increment(1); + } + } + } } let signature = rpc_with_retry( @@ -277,6 +314,7 @@ mod tests { processed_cache: Arc::new(Cache::new()), poll_interval: Duration::from_secs(15), previous_leader_epochs: 0, + multicast_group_pdas: vec![], }; // Invalid signature -> verify_access_request(...) should return Error::SignatureVerify diff --git a/crates/sentinel/src/settings.rs b/crates/sentinel/src/settings.rs index b27473ea..7c4fc4a7 100644 --- a/crates/sentinel/src/settings.rs +++ b/crates/sentinel/src/settings.rs @@ -62,6 +62,11 @@ pub struct Settings { /// metrics listening endpoint #[serde(default = "default_metrics_addr")] metrics_addr: String, + + /// Comma-separated multicast group codes for publisher allowlisting on + /// validator onboarding. When empty (default), allowlisting is disabled. + #[serde(default)] + multicast_group_codes: Option, } impl Settings { @@ -111,6 +116,18 @@ impl Settings { .expect("invalid metrics network address and port") } + pub fn multicast_group_codes(&self) -> Vec { + self.multicast_group_codes + .as_deref() + .map(|s| { + s.split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() + }) + .unwrap_or_default() + } + pub fn serviceability_program_id( &self, ) -> Result { From 613e3cb2d114490fb887d5cd7937b7b99f8438ee Mon Sep 17 00:00:00 2001 From: Ben Marx Date: Thu, 26 Feb 2026 08:42:15 -0800 Subject: [PATCH 2/6] refactor: use HashMap for multicast group code resolution Co-Authored-By: Claude Opus 4.6 --- .../sentinel/src/client/doublezero_ledger.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/crates/sentinel/src/client/doublezero_ledger.rs b/crates/sentinel/src/client/doublezero_ledger.rs index 42b94097..be1b70cc 100644 --- a/crates/sentinel/src/client/doublezero_ledger.rs +++ b/crates/sentinel/src/client/doublezero_ledger.rs @@ -438,19 +438,18 @@ impl DzRpcClient { .get_program_accounts_with_config(&self.serviceability_id, config) .await?; - let mut resolved = Vec::new(); - for code in codes { - let found = accounts.iter().find_map(|(pubkey, account)| { + let code_to_pda: std::collections::HashMap = accounts + .iter() + .filter_map(|(pubkey, account)| { let group = MulticastGroup::try_from(&account.data[..]).ok()?; - if group.code == *code { - Some(*pubkey) - } else { - None - } - }); + Some((group.code.clone(), *pubkey)) + }) + .collect(); - match found { - Some(pda) => { + let mut resolved = Vec::new(); + for code in codes { + match code_to_pda.get(code.as_str()) { + Some(&pda) => { info!(code, %pda, "resolved multicast group code"); resolved.push((code.clone(), pda)); } From affa31b2d2124bea8efc05d514ac332461ef8984 Mon Sep 17 00:00:00 2001 From: Ben Marx Date: Thu, 26 Feb 2026 08:46:20 -0800 Subject: [PATCH 3/6] feat: add get_access_pass to DzRpcClientType Co-Authored-By: Claude Opus 4.6 --- .../sentinel/src/client/doublezero_ledger.rs | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/crates/sentinel/src/client/doublezero_ledger.rs b/crates/sentinel/src/client/doublezero_ledger.rs index be1b70cc..76606b8a 100644 --- a/crates/sentinel/src/client/doublezero_ledger.rs +++ b/crates/sentinel/src/client/doublezero_ledger.rs @@ -12,7 +12,7 @@ use doublezero_serviceability::{ tenant::update_payment_status::UpdatePaymentStatusArgs, }, state::{ - accesspass::AccessPassType, + accesspass::{AccessPass, AccessPassType}, accounttype::AccountType, multicastgroup::MulticastGroup, tenant::{Tenant, TenantPaymentStatus}, @@ -60,6 +60,12 @@ pub trait DzRpcClientType { client_ip: &Ipv4Addr, ) -> Result; + async fn get_access_pass( + &self, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result; + async fn get_tenants_with_token_accounts(&self) -> Result>; async fn update_tenant_payment_status( @@ -107,6 +113,14 @@ impl DzRpcClientType for DzRpcClient { .await } + async fn get_access_pass( + &self, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result { + self.get_access_pass(service_key, client_ip).await + } + async fn get_tenants_with_token_accounts(&self) -> Result> { self.get_tenants_with_token_accounts().await } @@ -235,6 +249,19 @@ impl DzRpcClient { Ok(signature) } + pub async fn get_access_pass( + &self, + service_key: &Pubkey, + client_ip: &Ipv4Addr, + ) -> Result { + let (accesspass_pk, _) = + get_accesspass_pda(&self.serviceability_id, client_ip, service_key); + let account_data = self.client.get_account_data(&accesspass_pk).await?; + let access_pass = AccessPass::try_from(&account_data[..]) + .map_err(|e| Error::Deserialize(format!("AccessPass deserialization failed: {e}")))?; + Ok(access_pass) + } + pub async fn get_tenants_with_token_accounts(&self) -> Result> { let config = RpcProgramAccountsConfig { filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( From e188cc743e1337da920e8bbdc240c51b0360be64 Mon Sep 17 00:00:00 2001 From: Ben Marx Date: Thu, 26 Feb 2026 08:49:47 -0800 Subject: [PATCH 4/6] feat: pre-check access pass for idempotent multicast allowlisting Co-Authored-By: Claude Opus 4.6 --- crates/sentinel/src/sentinel/poller.rs | 34 +++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/crates/sentinel/src/sentinel/poller.rs b/crates/sentinel/src/sentinel/poller.rs index cdae373c..c607c8c2 100644 --- a/crates/sentinel/src/sentinel/poller.rs +++ b/crates/sentinel/src/sentinel/poller.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, net::Ipv4Addr, sync::Arc, time::{Duration, Instant}, @@ -9,7 +10,7 @@ use retainer::Cache; use solana_sdk::{pubkey::Pubkey, signature::Keypair}; use tokio::time::interval; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{error, info, warn}; use url::Url; use crate::{ @@ -156,7 +157,38 @@ impl PollingSentinel { .await?; info!(%validator_id, %validator_ip, user = %service_key, "access pass issued"); + let existing_mgroup_pubs: HashSet = match rpc_with_retry( + || async { + self.dz_rpc_client + .get_access_pass(&service_key, &validator_ip) + .await + }, + "get_access_pass", + ) + .await + { + Ok(access_pass) => access_pass.mgroup_pub_allowlist.into_iter().collect(), + Err(err) => { + warn!( + ?err, %validator_id, %validator_ip, + "failed to fetch access pass for idempotency check; \ + will attempt all multicast allowlist additions" + ); + HashSet::new() + } + }; + for mgroup_pda in &self.multicast_group_pdas { + if existing_mgroup_pubs.contains(mgroup_pda) { + info!( + %validator_id, %validator_ip, %mgroup_pda, + "validator already in multicast publisher allowlist; skipping" + ); + metrics::counter!("doublezero_sentinel_multicast_allowlist_skipped") + .increment(1); + continue; + } + match rpc_with_retry( || async { self.dz_rpc_client From 95d6cae6888b2470c510055c927d29738d6c8b41 Mon Sep 17 00:00:00 2001 From: Rahul Garg Date: Thu, 26 Feb 2026 12:17:37 -0700 Subject: [PATCH 5/6] fix(sentinel): multicast group resolution filter by Activated status Furthermore: - Reject duplicate codes - Add timeout - Add tests --- .../sentinel/src/client/doublezero_ledger.rs | 230 ++++++++++++++++-- crates/sentinel/src/settings.rs | 53 ++++ 2 files changed, 261 insertions(+), 22 deletions(-) diff --git a/crates/sentinel/src/client/doublezero_ledger.rs b/crates/sentinel/src/client/doublezero_ledger.rs index 76606b8a..576e7218 100644 --- a/crates/sentinel/src/client/doublezero_ledger.rs +++ b/crates/sentinel/src/client/doublezero_ledger.rs @@ -1,4 +1,4 @@ -use std::{net::Ipv4Addr, sync::Arc, time::Duration}; +use std::{collections::HashMap, net::Ipv4Addr, sync::Arc, time::Duration}; use async_trait::async_trait; use doublezero_program_tools::instruction::try_build_instruction; @@ -14,7 +14,7 @@ use doublezero_serviceability::{ state::{ accesspass::{AccessPass, AccessPassType}, accounttype::AccountType, - multicastgroup::MulticastGroup, + multicastgroup::{MulticastGroup, MulticastGroupStatus}, tenant::{Tenant, TenantPaymentStatus}, }, }; @@ -234,10 +234,12 @@ impl DzRpcClient { let recent_blockhash = self.client.get_latest_blockhash().await?; let transaction = new_transaction(&[ix], &[signer], recent_blockhash); - let signature = self - .client - .send_and_confirm_transaction(&transaction) - .await?; + let signature = tokio::time::timeout( + SEND_AND_CONFIRM_TIMEOUT, + self.client.send_and_confirm_transaction(&transaction), + ) + .await + .map_err(|_| Error::Deserialize("add_multicast_publisher_allowlist timed out".into()))??; info!( validator = %service_key, %multicast_group_pda, @@ -465,30 +467,214 @@ impl DzRpcClient { .get_program_accounts_with_config(&self.serviceability_id, config) .await?; - let code_to_pda: std::collections::HashMap = accounts + let decoded: Vec<(Pubkey, MulticastGroup)> = accounts .iter() .filter_map(|(pubkey, account)| { let group = MulticastGroup::try_from(&account.data[..]).ok()?; - Some((group.code.clone(), *pubkey)) + Some((*pubkey, group)) }) .collect(); - let mut resolved = Vec::new(); - for code in codes { - match code_to_pda.get(code.as_str()) { - Some(&pda) => { - info!(code, %pda, "resolved multicast group code"); - resolved.push((code.clone(), pda)); - } - None => { - return Err(Error::Deserialize(format!( - "multicast group code '{}' not found on DZ Ledger", - code - ))); - } - } + let resolved = resolve_codes_from_accounts(&decoded, codes)?; + for (code, pda) in &resolved { + info!(code, %pda, "resolved multicast group code"); } Ok(resolved) } } + +/// Pure resolution logic extracted for testability. Filters to only +/// `Activated` groups, detects duplicate activated codes, and looks up +/// each requested code. +fn resolve_codes_from_accounts( + accounts: &[(Pubkey, MulticastGroup)], + codes: &[String], +) -> Result> { + let mut code_to_pda: HashMap<&str, Pubkey> = HashMap::new(); + + for (pda, group) in accounts { + if group.status != MulticastGroupStatus::Activated { + continue; + } + if let Some(existing_pda) = code_to_pda.insert(group.code.as_str(), *pda) { + return Err(Error::Deserialize(format!( + "duplicate activated multicast group code '{}': PDAs {} and {}", + group.code, existing_pda, pda + ))); + } + } + + codes + .iter() + .map(|code| { + code_to_pda + .get(code.as_str()) + .map(|&pda| (code.clone(), pda)) + .ok_or_else(|| { + Error::Deserialize(format!( + "multicast group code '{}' not found (or not activated) on DZ Ledger", + code + )) + }) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use doublezero_serviceability::state::{ + accounttype::AccountType, + multicastgroup::{MulticastGroup, MulticastGroupStatus}, + }; + use solana_sdk::pubkey::Pubkey; + + use super::resolve_codes_from_accounts; + + fn make_group(code: &str, status: MulticastGroupStatus) -> MulticastGroup { + MulticastGroup { + account_type: AccountType::MulticastGroup, + owner: Pubkey::default(), + index: 0, + bump_seed: 0, + tenant_pk: Pubkey::default(), + multicast_ip: Ipv4Addr::new(239, 0, 0, 1), + max_bandwidth: 0, + status, + code: code.to_string(), + publisher_count: 0, + subscriber_count: 0, + } + } + + #[test] + fn empty_codes_returns_empty() { + let accounts = vec![( + Pubkey::new_unique(), + make_group("ALPHA", MulticastGroupStatus::Activated), + )]; + let result = resolve_codes_from_accounts(&accounts, &[]).unwrap(); + assert!(result.is_empty()); + } + + #[test] + fn happy_path_multiple_codes() { + let pda_a = Pubkey::new_unique(); + let pda_b = Pubkey::new_unique(); + let accounts = vec![ + (pda_a, make_group("ALPHA", MulticastGroupStatus::Activated)), + (pda_b, make_group("BETA", MulticastGroupStatus::Activated)), + ]; + let codes = vec!["ALPHA".into(), "BETA".into()]; + let result = resolve_codes_from_accounts(&accounts, &codes).unwrap(); + assert_eq!( + result, + vec![("ALPHA".into(), pda_a), ("BETA".into(), pda_b)] + ); + } + + #[test] + fn filters_non_activated_groups() { + let pda_pending = Pubkey::new_unique(); + let pda_suspended = Pubkey::new_unique(); + let pda_deleting = Pubkey::new_unique(); + let pda_rejected = Pubkey::new_unique(); + let pda_activated = Pubkey::new_unique(); + let accounts = vec![ + ( + pda_pending, + make_group("ALPHA", MulticastGroupStatus::Pending), + ), + ( + pda_suspended, + make_group("ALPHA", MulticastGroupStatus::Suspended), + ), + ( + pda_deleting, + make_group("ALPHA", MulticastGroupStatus::Deleting), + ), + ( + pda_rejected, + make_group("ALPHA", MulticastGroupStatus::Rejected), + ), + ( + pda_activated, + make_group("ALPHA", MulticastGroupStatus::Activated), + ), + ]; + let codes = vec!["ALPHA".into()]; + let result = resolve_codes_from_accounts(&accounts, &codes).unwrap(); + assert_eq!(result, vec![("ALPHA".into(), pda_activated)]); + } + + #[test] + fn duplicate_activated_codes_returns_error() { + let pda_1 = Pubkey::new_unique(); + let pda_2 = Pubkey::new_unique(); + let accounts = vec![ + (pda_1, make_group("ALPHA", MulticastGroupStatus::Activated)), + (pda_2, make_group("ALPHA", MulticastGroupStatus::Activated)), + ]; + let codes = vec!["ALPHA".into()]; + let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("duplicate activated multicast group code 'ALPHA'"), + "{msg}" + ); + assert!(msg.contains(&pda_1.to_string()), "{msg}"); + assert!(msg.contains(&pda_2.to_string()), "{msg}"); + } + + #[test] + fn pending_and_activated_same_code_no_conflict() { + let pda_pending = Pubkey::new_unique(); + let pda_activated = Pubkey::new_unique(); + let accounts = vec![ + ( + pda_pending, + make_group("ALPHA", MulticastGroupStatus::Pending), + ), + ( + pda_activated, + make_group("ALPHA", MulticastGroupStatus::Activated), + ), + ]; + let codes = vec!["ALPHA".into()]; + let result = resolve_codes_from_accounts(&accounts, &codes).unwrap(); + assert_eq!(result, vec![("ALPHA".into(), pda_activated)]); + } + + #[test] + fn code_not_found_returns_error() { + let accounts = vec![( + Pubkey::new_unique(), + make_group("BETA", MulticastGroupStatus::Activated), + )]; + let codes = vec!["ALPHA".into()]; + let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err(); + assert!( + err.to_string() + .contains("multicast group code 'ALPHA' not found"), + "{}", + err + ); + } + + #[test] + fn code_exists_but_not_activated_returns_error() { + let accounts = vec![( + Pubkey::new_unique(), + make_group("ALPHA", MulticastGroupStatus::Suspended), + )]; + let codes = vec!["ALPHA".into()]; + let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err(); + assert!( + err.to_string().contains("not found (or not activated)"), + "{}", + err + ); + } +} diff --git a/crates/sentinel/src/settings.rs b/crates/sentinel/src/settings.rs index 7c4fc4a7..b8856390 100644 --- a/crates/sentinel/src/settings.rs +++ b/crates/sentinel/src/settings.rs @@ -161,6 +161,21 @@ impl Settings { } } +/// Helper to build a `Settings` with only `multicast_group_codes` set. +/// All other fields use placeholder values (not relevant for the test). +#[cfg(test)] +fn settings_with_mcast_codes(codes: Option<&str>) -> Settings { + Settings { + log: default_log(), + env: "devnet".into(), + dz_rpc: "http://localhost:1234".into(), + sol_rpc: "localhost".into(), + keypair: "/dev/null".into(), + metrics_addr: default_metrics_addr(), + multicast_group_codes: codes.map(String::from), + } +} + fn default_log() -> String { "doublezero_ledger_sentinel=info".to_string() } @@ -168,3 +183,41 @@ fn default_log() -> String { fn default_metrics_addr() -> String { "127.0.0.1:2112".to_string() } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn multicast_group_codes_none() { + let settings = settings_with_mcast_codes(None); + assert!(settings.multicast_group_codes().is_empty()); + } + + #[test] + fn multicast_group_codes_empty_string() { + let settings = settings_with_mcast_codes(Some("")); + assert!(settings.multicast_group_codes().is_empty()); + } + + #[test] + fn multicast_group_codes_single() { + let settings = settings_with_mcast_codes(Some("ALPHA")); + assert_eq!(settings.multicast_group_codes(), vec!["ALPHA"]); + } + + #[test] + fn multicast_group_codes_multiple_with_whitespace() { + let settings = settings_with_mcast_codes(Some(" ALPHA , BETA , GAMMA ")); + assert_eq!( + settings.multicast_group_codes(), + vec!["ALPHA", "BETA", "GAMMA"] + ); + } + + #[test] + fn multicast_group_codes_trailing_comma() { + let settings = settings_with_mcast_codes(Some("ALPHA,BETA,")); + assert_eq!(settings.multicast_group_codes(), vec!["ALPHA", "BETA"]); + } +} From f92421bcfff0ff7a2e9882529df362c1f9820e4c Mon Sep 17 00:00:00 2001 From: Rahul Garg Date: Thu, 26 Feb 2026 12:31:55 -0700 Subject: [PATCH 6/6] fix(sentinel): filter pre insert --- .../sentinel/src/client/doublezero_ledger.rs | 95 ++++++++++--------- 1 file changed, 49 insertions(+), 46 deletions(-) diff --git a/crates/sentinel/src/client/doublezero_ledger.rs b/crates/sentinel/src/client/doublezero_ledger.rs index 576e7218..07624f9b 100644 --- a/crates/sentinel/src/client/doublezero_ledger.rs +++ b/crates/sentinel/src/client/doublezero_ledger.rs @@ -484,31 +484,35 @@ impl DzRpcClient { } } -/// Pure resolution logic extracted for testability. Filters to only -/// `Activated` groups, detects duplicate activated codes, and looks up -/// each requested code. +/// Pure resolution logic extracted for testability. Detects duplicate +/// codes across **all** groups (regardless of status), then resolves +/// requested codes against `Activated`-only groups. fn resolve_codes_from_accounts( accounts: &[(Pubkey, MulticastGroup)], codes: &[String], ) -> Result> { - let mut code_to_pda: HashMap<&str, Pubkey> = HashMap::new(); - + // First pass: detect duplicate codes across all statuses. + let mut seen: HashMap<&str, Pubkey> = HashMap::new(); for (pda, group) in accounts { - if group.status != MulticastGroupStatus::Activated { - continue; - } - if let Some(existing_pda) = code_to_pda.insert(group.code.as_str(), *pda) { + if let Some(existing_pda) = seen.insert(group.code.as_str(), *pda) { return Err(Error::Deserialize(format!( - "duplicate activated multicast group code '{}': PDAs {} and {}", + "duplicate multicast group code '{}': PDAs {} and {}", group.code, existing_pda, pda ))); } } + // Second pass: build lookup from Activated groups only. + let activated: HashMap<&str, Pubkey> = accounts + .iter() + .filter(|(_, group)| group.status == MulticastGroupStatus::Activated) + .map(|(pda, group)| (group.code.as_str(), *pda)) + .collect(); + codes .iter() .map(|code| { - code_to_pda + activated .get(code.as_str()) .map(|&pda| (code.clone(), pda)) .ok_or_else(|| { @@ -576,11 +580,8 @@ mod tests { } #[test] - fn filters_non_activated_groups() { + fn same_code_across_statuses_is_duplicate_error() { let pda_pending = Pubkey::new_unique(); - let pda_suspended = Pubkey::new_unique(); - let pda_deleting = Pubkey::new_unique(); - let pda_rejected = Pubkey::new_unique(); let pda_activated = Pubkey::new_unique(); let accounts = vec![ ( @@ -588,25 +589,46 @@ mod tests { make_group("ALPHA", MulticastGroupStatus::Pending), ), ( - pda_suspended, - make_group("ALPHA", MulticastGroupStatus::Suspended), + pda_activated, + make_group("ALPHA", MulticastGroupStatus::Activated), ), + ]; + let codes = vec!["ALPHA".into()]; + let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err(); + assert!( + err.to_string() + .contains("duplicate multicast group code 'ALPHA'"), + "{}", + err + ); + } + + #[test] + fn non_activated_groups_not_resolvable() { + let accounts = vec![ ( - pda_deleting, - make_group("ALPHA", MulticastGroupStatus::Deleting), + Pubkey::new_unique(), + make_group("PENDING", MulticastGroupStatus::Pending), ), ( - pda_rejected, - make_group("ALPHA", MulticastGroupStatus::Rejected), + Pubkey::new_unique(), + make_group("SUSPENDED", MulticastGroupStatus::Suspended), ), ( - pda_activated, - make_group("ALPHA", MulticastGroupStatus::Activated), + Pubkey::new_unique(), + make_group("DELETING", MulticastGroupStatus::Deleting), + ), + ( + Pubkey::new_unique(), + make_group("REJECTED", MulticastGroupStatus::Rejected), ), ]; - let codes = vec!["ALPHA".into()]; - let result = resolve_codes_from_accounts(&accounts, &codes).unwrap(); - assert_eq!(result, vec![("ALPHA".into(), pda_activated)]); + let err = resolve_codes_from_accounts(&accounts, &["PENDING".into()]).unwrap_err(); + assert!( + err.to_string().contains("not found (or not activated)"), + "{}", + err + ); } #[test] @@ -621,32 +643,13 @@ mod tests { let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err(); let msg = err.to_string(); assert!( - msg.contains("duplicate activated multicast group code 'ALPHA'"), + msg.contains("duplicate multicast group code 'ALPHA'"), "{msg}" ); assert!(msg.contains(&pda_1.to_string()), "{msg}"); assert!(msg.contains(&pda_2.to_string()), "{msg}"); } - #[test] - fn pending_and_activated_same_code_no_conflict() { - let pda_pending = Pubkey::new_unique(); - let pda_activated = Pubkey::new_unique(); - let accounts = vec![ - ( - pda_pending, - make_group("ALPHA", MulticastGroupStatus::Pending), - ), - ( - pda_activated, - make_group("ALPHA", MulticastGroupStatus::Activated), - ), - ]; - let codes = vec!["ALPHA".into()]; - let result = resolve_codes_from_accounts(&accounts, &codes).unwrap(); - assert_eq!(result, vec![("ALPHA".into(), pda_activated)]); - } - #[test] fn code_not_found_returns_error() { let accounts = vec![(