Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/sentinel/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

- feat: autojoin validators to approved mcast pub groups
- feat: billing sentinel for tenant payment status monitoring ([#265](https://github.com/doublezerofoundation/doublezero-offchain/pull/265))
- change default leader schedule lookahead from 2 epochs to 1 ([#259](https://github.com/doublezerofoundation/doublezero-offchain/pull/259))
- fix(sentinel): improve retry handling for transient RPC errors([#220](https://github.com/doublezerofoundation/doublezero-offchain/pull/220))
Expand Down
342 changes: 339 additions & 3 deletions crates/sentinel/src/client/doublezero_ledger.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::Ipv4Addr, sync::Arc, time::Duration};
use std::{collections::HashMap, net::Ipv4Addr, sync::Arc, time::Duration};

use async_trait::async_trait;
use doublezero_program_tools::instruction::try_build_instruction;
Expand All @@ -7,11 +7,14 @@ use doublezero_serviceability::{
instructions::DoubleZeroInstruction,
pda::{get_accesspass_pda, get_globalstate_pda},
processors::{
accesspass::set::SetAccessPassArgs, tenant::update_payment_status::UpdatePaymentStatusArgs,
accesspass::set::SetAccessPassArgs,
multicastgroup::allowlist::publisher::add::AddMulticastGroupPubAllowlistArgs,
tenant::update_payment_status::UpdatePaymentStatusArgs,
},
state::{
accesspass::AccessPassType,
accesspass::{AccessPass, AccessPassType},
accounttype::AccountType,
multicastgroup::{MulticastGroup, MulticastGroupStatus},
tenant::{Tenant, TenantPaymentStatus},
},
};
Expand Down Expand Up @@ -50,6 +53,19 @@ pub trait DzRpcClientType {
validator_id: &Pubkey,
) -> Result<Signature>;

async fn add_multicast_publisher_allowlist(
&self,
multicast_group_pda: &Pubkey,
service_key: &Pubkey,
client_ip: &Ipv4Addr,
) -> Result<Signature>;

async fn get_access_pass(
&self,
service_key: &Pubkey,
client_ip: &Ipv4Addr,
) -> Result<AccessPass>;

async fn get_tenants_with_token_accounts(&self) -> Result<Vec<TenantBillingInfo>>;

async fn update_tenant_payment_status(
Expand Down Expand Up @@ -87,6 +103,24 @@ impl DzRpcClientType for DzRpcClient {
.await
}

async fn add_multicast_publisher_allowlist(
&self,
multicast_group_pda: &Pubkey,
service_key: &Pubkey,
client_ip: &Ipv4Addr,
) -> Result<Signature> {
self.add_multicast_publisher_allowlist(multicast_group_pda, service_key, client_ip)
.await
}

async fn get_access_pass(
&self,
service_key: &Pubkey,
client_ip: &Ipv4Addr,
) -> Result<AccessPass> {
self.get_access_pass(service_key, client_ip).await
}

async fn get_tenants_with_token_accounts(&self) -> Result<Vec<TenantBillingInfo>> {
self.get_tenants_with_token_accounts().await
}
Expand Down Expand Up @@ -170,6 +204,66 @@ impl DzRpcClient {
Ok(signature)
}

pub async fn add_multicast_publisher_allowlist(
&self,
multicast_group_pda: &Pubkey,
service_key: &Pubkey,
client_ip: &Ipv4Addr,
) -> Result<Signature> {
let (globalstate_pk, _) = get_globalstate_pda(&self.serviceability_id);
let (accesspass_pk, _) =
get_accesspass_pda(&self.serviceability_id, client_ip, service_key);

let args = DoubleZeroInstruction::AddMulticastGroupPubAllowlist(
AddMulticastGroupPubAllowlistArgs {
client_ip: *client_ip,
user_payer: *service_key,
},
);

let accounts = vec![
AccountMeta::new(*multicast_group_pda, false),
AccountMeta::new(accesspass_pk, false),
AccountMeta::new_readonly(globalstate_pk, false),
AccountMeta::new(self.payer.pubkey(), true),
AccountMeta::new_readonly(system_program::id(), false),
];

let ix = try_build_instruction(&self.serviceability_id, accounts, &args)?;
let signer = &self.payer;
let recent_blockhash = self.client.get_latest_blockhash().await?;
let transaction = new_transaction(&[ix], &[signer], recent_blockhash);

let signature = tokio::time::timeout(
SEND_AND_CONFIRM_TIMEOUT,
self.client.send_and_confirm_transaction(&transaction),
)
.await
.map_err(|_| Error::Deserialize("add_multicast_publisher_allowlist timed out".into()))??;
info!(
validator = %service_key,
%multicast_group_pda,
%client_ip,
%signature,
"added to multicast publisher allowlist"
);

Ok(signature)
}

pub async fn get_access_pass(
&self,
service_key: &Pubkey,
client_ip: &Ipv4Addr,
) -> Result<AccessPass> {
let (accesspass_pk, _) =
get_accesspass_pda(&self.serviceability_id, client_ip, service_key);
let account_data = self.client.get_account_data(&accesspass_pk).await?;
let access_pass = AccessPass::try_from(&account_data[..])
.map_err(|e| Error::Deserialize(format!("AccessPass deserialization failed: {e}")))?;
Ok(access_pass)
}

pub async fn get_tenants_with_token_accounts(&self) -> Result<Vec<TenantBillingInfo>> {
let config = RpcProgramAccountsConfig {
filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
Expand Down Expand Up @@ -344,4 +438,246 @@ impl DzRpcClient {
let epoch_info = self.client.get_epoch_info().await?;
Ok(epoch_info.epoch.saturating_sub(1))
}

/// Resolves multicast group codes to their on-chain PDAs by fetching all
/// MulticastGroup accounts and filtering by code. Intended to be called
/// once at startup, not in the hot loop.
pub async fn resolve_multicast_group_codes(
&self,
codes: &[String],
) -> Result<Vec<(String, Pubkey)>> {
if codes.is_empty() {
return Ok(vec![]);
}

let config = RpcProgramAccountsConfig {
filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
0,
vec![AccountType::MulticastGroup as u8],
))]),
account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
..Default::default()
},
..Default::default()
};

let accounts = self
.client
.get_program_accounts_with_config(&self.serviceability_id, config)
.await?;

let decoded: Vec<(Pubkey, MulticastGroup)> = accounts
.iter()
.filter_map(|(pubkey, account)| {
let group = MulticastGroup::try_from(&account.data[..]).ok()?;
Some((*pubkey, group))
})
.collect();

let resolved = resolve_codes_from_accounts(&decoded, codes)?;
for (code, pda) in &resolved {
info!(code, %pda, "resolved multicast group code");
}

Ok(resolved)
}
}

/// Pure resolution logic extracted for testability. Detects duplicate
/// codes across **all** groups (regardless of status), then resolves
/// requested codes against `Activated`-only groups.
fn resolve_codes_from_accounts(
accounts: &[(Pubkey, MulticastGroup)],
codes: &[String],
) -> Result<Vec<(String, Pubkey)>> {
// First pass: detect duplicate codes across all statuses.
let mut seen: HashMap<&str, Pubkey> = HashMap::new();
for (pda, group) in accounts {
if let Some(existing_pda) = seen.insert(group.code.as_str(), *pda) {
return Err(Error::Deserialize(format!(
"duplicate multicast group code '{}': PDAs {} and {}",
group.code, existing_pda, pda
)));
}
}

// Second pass: build lookup from Activated groups only.
let activated: HashMap<&str, Pubkey> = accounts
.iter()
.filter(|(_, group)| group.status == MulticastGroupStatus::Activated)
.map(|(pda, group)| (group.code.as_str(), *pda))
.collect();

codes
.iter()
.map(|code| {
activated
.get(code.as_str())
.map(|&pda| (code.clone(), pda))
.ok_or_else(|| {
Error::Deserialize(format!(
"multicast group code '{}' not found (or not activated) on DZ Ledger",
code
))
})
})
.collect()
}

#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;

use doublezero_serviceability::state::{
accounttype::AccountType,
multicastgroup::{MulticastGroup, MulticastGroupStatus},
};
use solana_sdk::pubkey::Pubkey;

use super::resolve_codes_from_accounts;

fn make_group(code: &str, status: MulticastGroupStatus) -> MulticastGroup {
MulticastGroup {
account_type: AccountType::MulticastGroup,
owner: Pubkey::default(),
index: 0,
bump_seed: 0,
tenant_pk: Pubkey::default(),
multicast_ip: Ipv4Addr::new(239, 0, 0, 1),
max_bandwidth: 0,
status,
code: code.to_string(),
publisher_count: 0,
subscriber_count: 0,
}
}

#[test]
fn empty_codes_returns_empty() {
let accounts = vec![(
Pubkey::new_unique(),
make_group("ALPHA", MulticastGroupStatus::Activated),
)];
let result = resolve_codes_from_accounts(&accounts, &[]).unwrap();
assert!(result.is_empty());
}

#[test]
fn happy_path_multiple_codes() {
let pda_a = Pubkey::new_unique();
let pda_b = Pubkey::new_unique();
let accounts = vec![
(pda_a, make_group("ALPHA", MulticastGroupStatus::Activated)),
(pda_b, make_group("BETA", MulticastGroupStatus::Activated)),
];
let codes = vec!["ALPHA".into(), "BETA".into()];
let result = resolve_codes_from_accounts(&accounts, &codes).unwrap();
assert_eq!(
result,
vec![("ALPHA".into(), pda_a), ("BETA".into(), pda_b)]
);
}

#[test]
fn same_code_across_statuses_is_duplicate_error() {
let pda_pending = Pubkey::new_unique();
let pda_activated = Pubkey::new_unique();
let accounts = vec![
(
pda_pending,
make_group("ALPHA", MulticastGroupStatus::Pending),
),
(
pda_activated,
make_group("ALPHA", MulticastGroupStatus::Activated),
),
];
let codes = vec!["ALPHA".into()];
let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err();
assert!(
err.to_string()
.contains("duplicate multicast group code 'ALPHA'"),
"{}",
err
);
}

#[test]
fn non_activated_groups_not_resolvable() {
let accounts = vec![
(
Pubkey::new_unique(),
make_group("PENDING", MulticastGroupStatus::Pending),
),
(
Pubkey::new_unique(),
make_group("SUSPENDED", MulticastGroupStatus::Suspended),
),
(
Pubkey::new_unique(),
make_group("DELETING", MulticastGroupStatus::Deleting),
),
(
Pubkey::new_unique(),
make_group("REJECTED", MulticastGroupStatus::Rejected),
),
];
let err = resolve_codes_from_accounts(&accounts, &["PENDING".into()]).unwrap_err();
assert!(
err.to_string().contains("not found (or not activated)"),
"{}",
err
);
}

#[test]
fn duplicate_activated_codes_returns_error() {
let pda_1 = Pubkey::new_unique();
let pda_2 = Pubkey::new_unique();
let accounts = vec![
(pda_1, make_group("ALPHA", MulticastGroupStatus::Activated)),
(pda_2, make_group("ALPHA", MulticastGroupStatus::Activated)),
];
let codes = vec!["ALPHA".into()];
let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("duplicate multicast group code 'ALPHA'"),
"{msg}"
);
assert!(msg.contains(&pda_1.to_string()), "{msg}");
assert!(msg.contains(&pda_2.to_string()), "{msg}");
}

#[test]
fn code_not_found_returns_error() {
let accounts = vec![(
Pubkey::new_unique(),
make_group("BETA", MulticastGroupStatus::Activated),
)];
let codes = vec!["ALPHA".into()];
let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err();
assert!(
err.to_string()
.contains("multicast group code 'ALPHA' not found"),
"{}",
err
);
}

#[test]
fn code_exists_but_not_activated_returns_error() {
let accounts = vec![(
Pubkey::new_unique(),
make_group("ALPHA", MulticastGroupStatus::Suspended),
)];
let codes = vec!["ALPHA".into()];
let err = resolve_codes_from_accounts(&accounts, &codes).unwrap_err();
assert!(
err.to_string().contains("not found (or not activated)"),
"{}",
err
);
}
}
Loading