From 19b3c342d53207bf55ff59b5b30ea91327a7f5d5 Mon Sep 17 00:00:00 2001 From: Rahul Garg Date: Wed, 18 Feb 2026 10:45:05 -0700 Subject: [PATCH 1/3] feat(contributor-rewards): implement on-chain reward distribution Adds the distribute-rewards step to the contributor rewards pipeline: - New `calculator::distribute` module with `try_distribute_epoch_rewards`, idempotent ATA creation, and Merkle proof generation per contributor - CLI `distribute-rewards` subcommand (dry-run by default, --execute to send) - Scheduler worker calls distribution after each write cycle (skipped in dry-run) - Removes the TODO from validator-debt initialize_distribution --- .gitignore | 2 +- Cargo.lock | 3 + crates/contributor-rewards/CHANGELOG.md | 1 + crates/contributor-rewards/Cargo.toml | 3 + .../src/calculator/distribute.rs | 430 ++++++++++++++++++ .../contributor-rewards/src/calculator/mod.rs | 1 + crates/contributor-rewards/src/cli/rewards.rs | 107 +++++ .../src/scheduler/worker.rs | 73 ++- crates/validator-debt/CHANGELOG.md | 1 + .../src/worker/initialize_distribution.rs | 2 - 10 files changed, 617 insertions(+), 6 deletions(-) create mode 100644 crates/contributor-rewards/src/calculator/distribute.rs diff --git a/.gitignore b/.gitignore index abe9538a..47186298 100644 --- a/.gitignore +++ b/.gitignore @@ -52,4 +52,4 @@ test-ledger/ *.gz .DS_Store *.state -.tool-versions \ No newline at end of file +.tool-versions diff --git a/Cargo.lock b/Cargo.lock index bd4c31f7..56639d91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2514,6 +2514,7 @@ dependencies = [ "doublezero-revenue-distribution", "doublezero-serviceability", "doublezero-solana-client-tools", + "doublezero-solana-sdk", "doublezero-telemetry", "doublezero_sdk", "governor", @@ -2532,6 +2533,8 @@ dependencies = [ "solana-client", "solana-sdk", "solana-system-interface", + "spl-associated-token-account-interface", + "spl-token-interface", "svm-hash", "tabled", "tempfile", diff --git a/crates/contributor-rewards/CHANGELOG.md b/crates/contributor-rewards/CHANGELOG.md index eb8e9abf..de0c45f2 100644 --- a/crates/contributor-rewards/CHANGELOG.md +++ b/crates/contributor-rewards/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- feat(contributor-rewards): add on-chain reward distribution ([#269](https://github.com/doublezerofoundation/doublezero-offchain/pull/269)) - feat: billing sentinel for tenant payment status monitoring ([#265](https://github.com/doublezerofoundation/doublezero-offchain/pull/265)) - feat(contributor-rewards): add export shapley command ([#234](https://github.com/doublezerofoundation/doublezero-offchain/pull/234)) - feat(contributor-rewards): add read-rewards command ([#212](https://github.com/doublezerofoundation/doublezero-offchain/pull/212)) diff --git a/crates/contributor-rewards/Cargo.toml b/crates/contributor-rewards/Cargo.toml index 9987c6eb..9f14f258 100644 --- a/crates/contributor-rewards/Cargo.toml +++ b/crates/contributor-rewards/Cargo.toml @@ -40,6 +40,7 @@ doublezero-revenue-distribution.workspace = true doublezero_sdk.workspace = true doublezero-serviceability.workspace = true doublezero-solana-client-tools.workspace = true +doublezero-solana-sdk.workspace = true doublezero-telemetry.workspace = true governor.workspace = true indexmap.workspace = true @@ -56,6 +57,8 @@ solana-account-decoder.workspace = true solana-client.workspace = true solana-sdk.workspace = true solana-system-interface.workspace = true +spl-associated-token-account-interface.workspace = true +spl-token-interface.workspace = true svm-hash.workspace = true tabled.workspace = true tempfile.workspace = true diff --git a/crates/contributor-rewards/src/calculator/distribute.rs b/crates/contributor-rewards/src/calculator/distribute.rs new file mode 100644 index 00000000..e325318e --- /dev/null +++ b/crates/contributor-rewards/src/calculator/distribute.rs @@ -0,0 +1,430 @@ +use anyhow::{Result, ensure}; +use doublezero_solana_client_tools::{ + account::zero_copy::ZeroCopyAccountOwnedData, + payer::{TransactionOutcome, Wallet}, + rpc::DoubleZeroLedgerConnection, +}; +use doublezero_solana_sdk::{ + build_memo_instruction, environment_2z_token_mint_key, + revenue_distribution::{ + ID, + fetch::try_fetch_distribution, + instruction::{RevenueDistributionInstructionData, account::DistributeRewardsAccounts}, + state::{ContributorRewards, Distribution}, + try_is_processed_leaf, + types::RewardShare, + }, + try_build_instruction, +}; +use solana_sdk::{compute_budget::ComputeBudgetInstruction, pubkey::Pubkey}; +use spl_associated_token_account_interface::{ + address::get_associated_token_address_and_bump_seed, + instruction::create_associated_token_account_idempotent, +}; +use tracing::{debug, info, warn}; + +use crate::calculator::{ledger_operations::try_fetch_shapley_output, proof::ShapleyOutputStorage}; + +const RELAY_MEMO_CU: u32 = 5_000; + +/// Attempt to distribute rewards for the current eligible epoch. +/// +/// Returns `Ok(true)` if any distributions were made, `Ok(false)` if nothing +/// to distribute (not ready or all done), or an error on failure. +pub async fn try_distribute_epoch_rewards( + wallet: &Wallet, + dz_connection: &DoubleZeroLedgerConnection, + rewards_accountant_key: &Pubkey, + dz_epoch_value: u64, + shapley_prefix: &[u8], +) -> Result { + // Fetch the distribution for this epoch. + let (_, distribution) = try_fetch_distribution(&wallet.connection, dz_epoch_value).await?; + + // Check readiness: rewards must be finalized and tokens swept. + if !distribution.is_rewards_calculation_finalized() { + debug!( + "Distribution for epoch {} is not finalized yet, skipping", + dz_epoch_value + ); + return Ok(false); + } + + if !distribution.has_swept_2z_tokens() { + debug!( + "Distribution for epoch {} has not swept 2Z tokens yet, skipping", + dz_epoch_value + ); + return Ok(false); + } + + // Check if all contributors are already distributed. + if distribution.distributed_rewards_count == distribution.total_contributors { + debug!( + "All {} contributors already distributed for epoch {}, skipping", + distribution.total_contributors, dz_epoch_value + ); + return Ok(false); + } + + info!( + "Distributing rewards for epoch {}: {}/{} contributors processed", + dz_epoch_value, distribution.distributed_rewards_count, distribution.total_contributors + ); + + let network_env = wallet.connection.try_network_environment().await?; + let dz_mint_key = environment_2z_token_mint_key(network_env); + + // Fetch shapley output from DoubleZero Ledger. + let shapley_output = try_fetch_shapley_output( + dz_connection, + shapley_prefix, + rewards_accountant_key, + dz_epoch_value, + ) + .await?; + + let mut distributed_any = false; + + for (leaf_index, reward_share, is_processed) in + try_distribution_rewards_iter(&distribution, &shapley_output)? + { + if is_processed { + continue; + } + + info!( + "Distributing epoch {} leaf {}, contributor: {}", + dz_epoch_value, leaf_index, reward_share.contributor_key + ); + + try_distribute_contributor_rewards( + wallet, + &dz_mint_key, + &distribution, + &shapley_output, + leaf_index, + reward_share, + ) + .await?; + + distributed_any = true; + } + + Ok(distributed_any) +} + +/// Iterate over distribution rewards, yielding (leaf_index, reward_share, is_processed) +/// for each contributor in the shapley output. +pub fn try_distribution_rewards_iter<'a>( + distribution: &ZeroCopyAccountOwnedData, + shapley_output: &'a ShapleyOutputStorage, +) -> Result> { + let start_index = distribution.processed_rewards_start_index as usize; + let end_index = distribution.processed_rewards_end_index as usize; + let processed_leaf_data = &distribution.remaining_data[start_index..end_index]; + + let num_rewards = shapley_output.rewards.len(); + let max_supported_rewards = processed_leaf_data.len() * 8; + + ensure!( + max_supported_rewards >= num_rewards, + "Insufficient processed leaf data for epoch {}: can support {max_supported_rewards} rewards, but got {num_rewards}", + distribution.dz_epoch + ); + + Ok(shapley_output + .rewards + .iter() + .enumerate() + .map(|(index, reward_share)| { + let is_processed = try_is_processed_leaf(processed_leaf_data, index).unwrap(); + (index, reward_share, is_processed) + })) +} + +async fn try_distribute_contributor_rewards( + wallet: &Wallet, + dz_mint_key: &Pubkey, + distribution: &Distribution, + shapley_output: &ShapleyOutputStorage, + leaf_index: usize, + reward_share: &RewardShare, +) -> Result<()> { + const DISTRIBUTE_REWARDS_CU_BASE: u32 = 30_000; + const CREATE_ATA_CU_BASE: u32 = 25_000; + const PER_RECIPIENT_CU: u32 = 12_500; + + let wallet_key = wallet.pubkey(); + + let (contributor_rewards_key, _) = + ContributorRewards::find_address(&reward_share.contributor_key); + + // Fetch contributor reward recipients. + let recipient_shares = match wallet + .connection + .try_fetch_zero_copy_data::(&contributor_rewards_key) + .await + { + Ok(contributor_rewards) => { + let recipient_shares = contributor_rewards + .recipient_shares + .active_iter() + .copied() + .collect::>(); + + if recipient_shares.is_empty() { + warn!( + "No recipients in {contributor_rewards_key} for contributor {}", + reward_share.contributor_key + ); + + return Ok(()); + } + + recipient_shares + } + _ => { + warn!( + "Contributor rewards {contributor_rewards_key} not found for contributor {}", + reward_share.contributor_key + ); + + return Ok(()); + } + }; + + let recipient_keys = recipient_shares + .iter() + .map(|share| &share.recipient_key) + .collect::>(); + + let distribute_rewards_ix = try_build_instruction( + &ID, + DistributeRewardsAccounts::new( + distribution.dz_epoch, + &reward_share.contributor_key, + dz_mint_key, + &wallet_key, + &recipient_keys, + ), + &RevenueDistributionInstructionData::DistributeRewards { + unit_share: reward_share.unit_share, + economic_burn_rate: reward_share.economic_burn_rate(), + proof: shapley_output.generate_merkle_proof(leaf_index)?, + }, + )?; + + // Derive ATA keys and bumps. We will need these bumps to set the CU + // precisely. + let (ata_keys, ata_bumps) = recipient_keys + .iter() + .map(|recipient_key| { + get_associated_token_address_and_bump_seed( + recipient_key, + dz_mint_key, + &spl_associated_token_account_interface::program::ID, + &spl_token_interface::ID, + ) + }) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + // Build instructions to create missing ATAs. We are using idempotent just + // in case there is a race when creating the ATAs. + let (mut instructions, create_ata_compute_units) = wallet + .connection + .get_multiple_accounts(&ata_keys) + .await? + .into_iter() + .zip(recipient_keys.iter()) + .zip(ata_bumps) + .filter_map(|((account_info, recipient_key), bump)| match account_info { + Some(account_info) if account_info.owner == Pubkey::default() => { + Some((recipient_key, bump)) + } + None => Some((recipient_key, bump)), + _ => None, + }) + .map(|(recipient_key, bump)| { + let ix = create_associated_token_account_idempotent( + &wallet_key, + recipient_key, + dz_mint_key, + &spl_token_interface::ID, + ); + + let compute_unit_limit = CREATE_ATA_CU_BASE + Wallet::compute_units_for_bump_seed(bump); + + (ix, compute_unit_limit) + }) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + if !instructions.is_empty() { + warn!("Creating {} ATAs", instructions.len()); + } + + instructions.push(distribute_rewards_ix); + + // Add simple memo to indicate that distributing rewards was relayed. + instructions.push(build_memo_instruction(b"Relay")); + + let compute_unit_limit = DISTRIBUTE_REWARDS_CU_BASE + + recipient_keys.len() as u32 * PER_RECIPIENT_CU + + create_ata_compute_units.iter().sum::() + + RELAY_MEMO_CU; + + instructions.push(ComputeBudgetInstruction::set_compute_unit_limit( + compute_unit_limit, + )); + + if let Some(ref compute_unit_price_ix) = wallet.compute_unit_price_ix { + instructions.push(compute_unit_price_ix.clone()); + } + + let transaction = wallet.new_transaction(&instructions).await?; + let tx_outcome = wallet.send_or_simulate_transaction(&transaction).await?; + + if let TransactionOutcome::Executed(tx_sig) = tx_outcome { + info!( + "Distribute rewards for epoch {}: {tx_sig}", + distribution.dz_epoch + ); + + wallet.print_verbose_output(&[tx_sig]).await?; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use doublezero_revenue_distribution::{state::Distribution, types::DoubleZeroEpoch}; + use doublezero_solana_client_tools::account::zero_copy::ZeroCopyAccountOwnedData; + use network_shapley::shapley::{ShapleyOutput, ShapleyValue}; + use solana_sdk::pubkey::Pubkey; + + use super::try_distribution_rewards_iter; + use crate::calculator::proof::ShapleyOutputStorage; + + fn create_test_shapley_output(num_contributors: usize) -> ShapleyOutput { + let mut output = ShapleyOutput::new(); + for i in 0..num_contributors { + let mut bytes = [0u8; 32]; + bytes[0] = (i + 1) as u8; + bytes[1] = ((i + 1) >> 8) as u8; + let pubkey = Pubkey::new_from_array(bytes); + output.insert( + pubkey.to_string(), + ShapleyValue { + value: 100.0, + proportion: 1.0 / num_contributors as f64, + }, + ); + } + output + } + + fn make_distribution( + epoch: u64, + bitmap: Vec, + total_contributors: u32, + ) -> ZeroCopyAccountOwnedData { + let mut dist = Distribution::default(); + dist.dz_epoch = DoubleZeroEpoch::new(epoch); + dist.processed_rewards_start_index = 0; + dist.processed_rewards_end_index = bitmap.len() as u32; + dist.total_contributors = total_contributors; + + ZeroCopyAccountOwnedData { + mucked_data: Box::new(dist), + remaining_data: bitmap, + } + } + + #[test] + fn test_distribution_rewards_iter_all_unprocessed() { + let shapley = create_test_shapley_output(3); + let shapley_storage = ShapleyOutputStorage::new(42, &shapley).unwrap(); + + // 1 byte = 8 bits, all zeros = all unprocessed + let distribution = make_distribution(42, vec![0x00], 3); + + let results: Vec<_> = try_distribution_rewards_iter(&distribution, &shapley_storage) + .unwrap() + .collect(); + + assert_eq!(results.len(), 3); + for (index, _reward, is_processed) in &results { + assert!(!is_processed, "leaf {} should be unprocessed", index); + } + } + + #[test] + fn test_distribution_rewards_iter_all_processed() { + let shapley = create_test_shapley_output(3); + let shapley_storage = ShapleyOutputStorage::new(42, &shapley).unwrap(); + + // 0xFF = all bits set = all processed + let distribution = make_distribution(42, vec![0xFF], 3); + + let results: Vec<_> = try_distribution_rewards_iter(&distribution, &shapley_storage) + .unwrap() + .collect(); + + assert_eq!(results.len(), 3); + for (index, _reward, is_processed) in &results { + assert!(is_processed, "leaf {} should be processed", index); + } + } + + #[test] + fn test_distribution_rewards_iter_partial() { + let shapley = create_test_shapley_output(3); + let shapley_storage = ShapleyOutputStorage::new(42, &shapley).unwrap(); + + // 0b00000101 = bits 0 and 2 set (leaves 0 and 2 processed, leaf 1 unprocessed) + let distribution = make_distribution(42, vec![0b0000_0101], 3); + + let results: Vec<_> = try_distribution_rewards_iter(&distribution, &shapley_storage) + .unwrap() + .collect(); + + assert_eq!(results.len(), 3); + assert!(results[0].2, "leaf 0 should be processed"); + assert!(!results[1].2, "leaf 1 should be unprocessed"); + assert!(results[2].2, "leaf 2 should be processed"); + } + + #[test] + fn test_distribution_rewards_iter_insufficient_bitmap() { + let shapley = create_test_shapley_output(10); + let shapley_storage = ShapleyOutputStorage::new(42, &shapley).unwrap(); + + // Only 1 byte = 8 bits, but 10 contributors need at least 2 bytes + let distribution = make_distribution(42, vec![0x00], 10); + + let result = try_distribution_rewards_iter(&distribution, &shapley_storage); + let err = result.err().expect("should be an error"); + assert!( + err.to_string().contains("Insufficient"), + "error should mention insufficient bitmap" + ); + } + + #[test] + fn test_distribution_rewards_iter_empty_rewards() { + let shapley_storage = ShapleyOutputStorage { + epoch: 42, + rewards: vec![], + total_unit_shares: 0, + }; + + let distribution = make_distribution(42, vec![], 0); + + let results: Vec<_> = try_distribution_rewards_iter(&distribution, &shapley_storage) + .unwrap() + .collect(); + + assert!(results.is_empty()); + } +} diff --git a/crates/contributor-rewards/src/calculator/mod.rs b/crates/contributor-rewards/src/calculator/mod.rs index 799df104..9039ce46 100644 --- a/crates/contributor-rewards/src/calculator/mod.rs +++ b/crates/contributor-rewards/src/calculator/mod.rs @@ -1,5 +1,6 @@ pub mod constants; pub mod data_prep; +pub mod distribute; pub mod input; pub mod keypair_loader; pub mod ledger_operations; diff --git a/crates/contributor-rewards/src/cli/rewards.rs b/crates/contributor-rewards/src/cli/rewards.rs index 28985f4c..55c22e87 100644 --- a/crates/contributor-rewards/src/cli/rewards.rs +++ b/crates/contributor-rewards/src/cli/rewards.rs @@ -283,6 +283,40 @@ pub enum RewardsCommands { )] keypair: Option, }, + #[command( + about = "Distribute rewards to contributors for a finalized epoch", + long_about = "Runs in dry-run (simulate) mode by default. Pass --execute to send real transactions.", + after_help = r#"Examples: + # Check distribution readiness for the current eligible epoch (dry-run, no keypair needed) + distribute-rewards + + # Check a specific epoch + distribute-rewards -e 27 + + # Actually execute distribution + distribute-rewards --execute -k keypair.json + + # Execute for a specific epoch + distribute-rewards --execute -e 27 -k keypair.json"# + )] + DistributeRewards { + /// DZ epoch to distribute rewards for (defaults to current eligible epoch) + #[arg(short = 'e', long, value_name = "EPOCH")] + dz_epoch: Option, + + /// Actually send transactions (default is dry-run/simulate) + #[arg(long)] + execute: bool, + + /// Path to keypair file for signing transactions + #[arg( + short = 'k', + long, + value_name = "FILE", + required_if_eq("execute", "true") + )] + keypair: Option, + }, } /// Handle rewards commands @@ -456,5 +490,78 @@ pub async fn handle(orchestrator: &Orchestrator, cmd: RewardsCommands) -> Result .write_telemetry_aggregates(epoch, keypair, dry_run, r#type) .await } + RewardsCommands::DistributeRewards { + dz_epoch, + execute, + keypair, + } => { + use anyhow::Context; + use doublezero_solana_client_tools::{ + payer::Wallet, + rpc::{DoubleZeroLedgerConnection, SolanaConnection}, + }; + use doublezero_solana_sdk::revenue_distribution::fetch::try_fetch_config; + use solana_sdk::signature::Keypair; + + use crate::calculator::{distribute, keypair_loader::load_keypair}; + + let dry_run = !execute; + + let signer = if let Some(ref path) = keypair { + load_keypair(&Some(path.clone()))? + } else { + Keypair::new() + }; + + let connection = + SolanaConnection::new(orchestrator.settings.rpc.solana_write_url.clone()); + let dz_connection = + DoubleZeroLedgerConnection::new(orchestrator.settings.rpc.dz_url.clone()); + + let wallet = Wallet { + connection, + signer, + compute_unit_price_ix: None, + verbose: false, + fee_payer: None, + dry_run, + }; + + let (_, config) = try_fetch_config(&wallet.connection).await?; + + let dz_epoch_value = match dz_epoch { + Some(epoch) => epoch, + None => { + let deferral_period = config + .checked_minimum_epoch_duration_to_finalize_rewards() + .context("Minimum epoch duration to finalize rewards not set")?; + config + .next_completed_dz_epoch + .value() + .saturating_sub(deferral_period.into()) + } + }; + + info!("Distributing rewards for epoch {dz_epoch_value}"); + + let shapley_prefix = orchestrator.settings.get_contributor_rewards_prefix(); + + let distributed = distribute::try_distribute_epoch_rewards( + &wallet, + &dz_connection, + &config.rewards_accountant_key, + dz_epoch_value, + &shapley_prefix, + ) + .await?; + + if distributed { + info!("Rewards distributed for epoch {dz_epoch_value}"); + } else { + info!("No rewards to distribute for epoch {dz_epoch_value}"); + } + + Ok(()) + } } } diff --git a/crates/contributor-rewards/src/scheduler/worker.rs b/crates/contributor-rewards/src/scheduler/worker.rs index f857687f..249b8f2d 100644 --- a/crates/contributor-rewards/src/scheduler/worker.rs +++ b/crates/contributor-rewards/src/scheduler/worker.rs @@ -8,7 +8,7 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::{Result, anyhow, bail}; +use anyhow::{Context, Result, anyhow, bail}; use backon::{ExponentialBuilder, Retryable}; use chrono::Utc; use doublezero_program_tools::zero_copy; @@ -17,7 +17,11 @@ use doublezero_revenue_distribution::{ types::DoubleZeroEpoch, }; use doublezero_sdk::record::pubkey::create_record_key; -use doublezero_solana_client_tools::rpc::try_fetch_zero_copy_data_with_commitment; +use doublezero_solana_client_tools::{ + payer::Wallet, + rpc::{DoubleZeroLedgerConnection, SolanaConnection, try_fetch_zero_copy_data_with_commitment}, +}; +use doublezero_solana_sdk::revenue_distribution::fetch::try_fetch_config; use slack_notifier::contributor_rewards::{WriteResultInfo, post_detailed_completion}; use solana_client::client_error::ClientError as SolanaClientError; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; @@ -30,7 +34,10 @@ use tokio::{ use tracing::{debug, error, info, warn}; use crate::{ - calculator::{WriteConfig, ledger_operations::WriteResult, orchestrator::Orchestrator}, + calculator::{ + WriteConfig, distribute, keypair_loader::load_keypair, ledger_operations::WriteResult, + orchestrator::Orchestrator, + }, cli::snapshot::{CompleteSnapshot, SnapshotMetadata}, ingestor::{epoch::EpochFinder, fetcher::Fetcher}, scheduler::state::SchedulerState, @@ -153,11 +160,71 @@ impl ScheduleWorker { } } } + + // Try to distribute rewards for the eligible epoch. + match self.try_distribute_rewards().await { + Ok(true) => { + info!("Distributed rewards successfully"); + metrics::counter!("doublezero_contributor_rewards_distribution_success") + .increment(1); + } + Ok(false) => debug!("No rewards to distribute"), + Err(e) => { + warn!("Failed to distribute rewards: {e}"); + metrics::counter!("doublezero_contributor_rewards_distribution_failure") + .increment(1); + } + } } Ok(()) } + /// Attempt to distribute rewards for the eligible epoch. + async fn try_distribute_rewards(&self) -> Result { + if self.dry_run { + debug!("Dry run mode, skipping reward distribution"); + return Ok(false); + } + + let signer = load_keypair(&self.keypair_path)?; + let connection = + SolanaConnection::new(self.orchestrator.settings.rpc.solana_write_url.clone()); + let dz_connection = + DoubleZeroLedgerConnection::new(self.orchestrator.settings.rpc.dz_url.clone()); + + let wallet = Wallet { + connection, + signer, + compute_unit_price_ix: None, + verbose: false, + fee_payer: None, + dry_run: false, + }; + + let (_, config) = try_fetch_config(&wallet.connection).await?; + + let deferral_period = config + .checked_minimum_epoch_duration_to_finalize_rewards() + .context("Minimum epoch duration to finalize rewards not set")?; + + let dz_epoch_value = config + .next_completed_dz_epoch + .value() + .saturating_sub(deferral_period.into()); + + let shapley_prefix = self.orchestrator.settings.get_contributor_rewards_prefix(); + + distribute::try_distribute_epoch_rewards( + &wallet, + &dz_connection, + &config.rewards_accountant_key, + dz_epoch_value, + &shapley_prefix, + ) + .await + } + /// Process rewards for the current epoch if needed async fn process_rewards(&self, state: &mut SchedulerState) -> Result { // Get current epoch diff --git a/crates/validator-debt/CHANGELOG.md b/crates/validator-debt/CHANGELOG.md index 93994e4a..2ea971c6 100644 --- a/crates/validator-debt/CHANGELOG.md +++ b/crates/validator-debt/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- feat(contributor-rewards): add on-chain reward distribution ([#269](https://github.com/doublezerofoundation/doublezero-offchain/pull/269)) - ensure debt is finalized before collection ([#268](https://github.com/doublezerofoundation/doublezero-offchain/pull/268)) - use inclusive range for completed DZ epochs ([#2815](https://github.com/malbeclabs/doublezero/issues/2815)) - remove dz_ledger as argument ([#255](https://github.com/doublezerofoundation/doublezero-offchain/pull/255)) diff --git a/crates/validator-debt/src/worker/initialize_distribution.rs b/crates/validator-debt/src/worker/initialize_distribution.rs index 131e8e5a..a76455f3 100644 --- a/crates/validator-debt/src/worker/initialize_distribution.rs +++ b/crates/validator-debt/src/worker/initialize_distribution.rs @@ -214,8 +214,6 @@ pub async fn try_initialize_distribution( wallet.print_verbose_output(&[tx_sig]).await?; } - // TODO: Add the distribute-rewards calls here. - Ok(()) } From 8690b2fb5b26931a0d333209849bc69354bd2e74 Mon Sep 17 00:00:00 2001 From: Rahul Garg Date: Wed, 18 Feb 2026 13:00:12 -0700 Subject: [PATCH 2/3] feat(contributor-rewards): distribution idempotency and epoch separation - Add DistributionOutcome enum (NotReady/AlreadyComplete/Distributed(n)) replacing the opaque Result return from try_distribute_epoch_rewards - Track last_distributed_epoch and consecutive_distribution_failures in SchedulerState (serde(default) for backward compat with existing state files) - Add should_distribute_epoch idempotency guard to skip RPC work for epochs already fully distributed - Extract try_get_distribution_epoch helper to decouple config fetch from distribution; log calculation/distribution epoch pairing each tick - Align distribution failure handling with calculation failures: error! log, state persistence, 10x escalation counter - Update CLI DistributeRewards handler for new return type --- .../src/calculator/distribute.rs | 42 +++++-- .../contributor-rewards/src/calculator/mod.rs | 3 +- crates/contributor-rewards/src/cli/rewards.rs | 16 ++- .../src/scheduler/state.rs | 74 ++++++++++++ .../src/scheduler/worker.rs | 108 +++++++++++++----- .../tests/test_scheduler.rs | 7 +- 6 files changed, 201 insertions(+), 49 deletions(-) diff --git a/crates/contributor-rewards/src/calculator/distribute.rs b/crates/contributor-rewards/src/calculator/distribute.rs index e325318e..028b0ed9 100644 --- a/crates/contributor-rewards/src/calculator/distribute.rs +++ b/crates/contributor-rewards/src/calculator/distribute.rs @@ -27,17 +27,25 @@ use crate::calculator::{ledger_operations::try_fetch_shapley_output, proof::Shap const RELAY_MEMO_CU: u32 = 5_000; +/// Outcome of a distribution attempt. +#[derive(Debug)] +pub enum DistributionOutcome { + /// Finalization or sweep guards not met. + NotReady, + /// All contributors have already been distributed. + AlreadyComplete, + /// N contributors were distributed this run (may be a partial batch). + Distributed(usize), +} + /// Attempt to distribute rewards for the current eligible epoch. -/// -/// Returns `Ok(true)` if any distributions were made, `Ok(false)` if nothing -/// to distribute (not ready or all done), or an error on failure. pub async fn try_distribute_epoch_rewards( wallet: &Wallet, dz_connection: &DoubleZeroLedgerConnection, rewards_accountant_key: &Pubkey, dz_epoch_value: u64, shapley_prefix: &[u8], -) -> Result { +) -> Result { // Fetch the distribution for this epoch. let (_, distribution) = try_fetch_distribution(&wallet.connection, dz_epoch_value).await?; @@ -47,7 +55,7 @@ pub async fn try_distribute_epoch_rewards( "Distribution for epoch {} is not finalized yet, skipping", dz_epoch_value ); - return Ok(false); + return Ok(DistributionOutcome::NotReady); } if !distribution.has_swept_2z_tokens() { @@ -55,7 +63,7 @@ pub async fn try_distribute_epoch_rewards( "Distribution for epoch {} has not swept 2Z tokens yet, skipping", dz_epoch_value ); - return Ok(false); + return Ok(DistributionOutcome::NotReady); } // Check if all contributors are already distributed. @@ -64,7 +72,7 @@ pub async fn try_distribute_epoch_rewards( "All {} contributors already distributed for epoch {}, skipping", distribution.total_contributors, dz_epoch_value ); - return Ok(false); + return Ok(DistributionOutcome::AlreadyComplete); } info!( @@ -84,7 +92,7 @@ pub async fn try_distribute_epoch_rewards( ) .await?; - let mut distributed_any = false; + let mut distributed_count = 0usize; for (leaf_index, reward_share, is_processed) in try_distribution_rewards_iter(&distribution, &shapley_output)? @@ -108,10 +116,10 @@ pub async fn try_distribute_epoch_rewards( ) .await?; - distributed_any = true; + distributed_count += 1; } - Ok(distributed_any) + Ok(DistributionOutcome::Distributed(distributed_count)) } /// Iterate over distribution rewards, yielding (leaf_index, reward_share, is_processed) @@ -411,6 +419,20 @@ mod tests { ); } + #[test] + fn test_outcome_all_processed_maps_to_already_complete() { + let shapley = create_test_shapley_output(3); + let storage = ShapleyOutputStorage::new(42, &shapley).unwrap(); + let distribution = make_distribution(42, vec![0xFF], 3); + + let unprocessed_count = try_distribution_rewards_iter(&distribution, &storage) + .unwrap() + .filter(|(_, _, is_processed)| !is_processed) + .count(); + + assert_eq!(unprocessed_count, 0); + } + #[test] fn test_distribution_rewards_iter_empty_rewards() { let shapley_storage = ShapleyOutputStorage { diff --git a/crates/contributor-rewards/src/calculator/mod.rs b/crates/contributor-rewards/src/calculator/mod.rs index 9039ce46..0826a77f 100644 --- a/crates/contributor-rewards/src/calculator/mod.rs +++ b/crates/contributor-rewards/src/calculator/mod.rs @@ -12,5 +12,6 @@ pub mod shapley; pub mod util; pub mod write_config; -// Re-export WriteConfig for access +// Re-export for access +pub use distribute::DistributionOutcome; pub use write_config::WriteConfig; diff --git a/crates/contributor-rewards/src/cli/rewards.rs b/crates/contributor-rewards/src/cli/rewards.rs index 55c22e87..d31fb976 100644 --- a/crates/contributor-rewards/src/cli/rewards.rs +++ b/crates/contributor-rewards/src/cli/rewards.rs @@ -546,7 +546,7 @@ pub async fn handle(orchestrator: &Orchestrator, cmd: RewardsCommands) -> Result let shapley_prefix = orchestrator.settings.get_contributor_rewards_prefix(); - let distributed = distribute::try_distribute_epoch_rewards( + let outcome = distribute::try_distribute_epoch_rewards( &wallet, &dz_connection, &config.rewards_accountant_key, @@ -555,10 +555,16 @@ pub async fn handle(orchestrator: &Orchestrator, cmd: RewardsCommands) -> Result ) .await?; - if distributed { - info!("Rewards distributed for epoch {dz_epoch_value}"); - } else { - info!("No rewards to distribute for epoch {dz_epoch_value}"); + match outcome { + distribute::DistributionOutcome::AlreadyComplete => { + info!("All contributors already distributed for epoch {dz_epoch_value}"); + } + distribute::DistributionOutcome::Distributed(n) => { + info!("Distributed {n} contributors for epoch {dz_epoch_value}"); + } + distribute::DistributionOutcome::NotReady => { + info!("Distribution not ready for epoch {dz_epoch_value}"); + } } Ok(()) diff --git a/crates/contributor-rewards/src/scheduler/state.rs b/crates/contributor-rewards/src/scheduler/state.rs index 2c1b266f..9a3b2d9e 100644 --- a/crates/contributor-rewards/src/scheduler/state.rs +++ b/crates/contributor-rewards/src/scheduler/state.rs @@ -19,6 +19,12 @@ pub struct SchedulerState { pub last_check_time: DateTime, /// Last time rewards were successfully calculated pub last_success_time: Option>, + /// Last epoch for which distribution completed fully + #[serde(default)] + pub last_distributed_epoch: Option, + /// Number of consecutive distribution failures + #[serde(default)] + pub consecutive_distribution_failures: u32, } impl Default for SchedulerState { @@ -29,6 +35,8 @@ impl Default for SchedulerState { consecutive_failures: 0, last_check_time: Utc::now(), last_success_time: None, + last_distributed_epoch: None, + consecutive_distribution_failures: 0, } } } @@ -157,4 +165,70 @@ impl SchedulerState { pub fn is_in_failure_state(&self, max_failures: u32) -> bool { self.consecutive_failures >= max_failures } + + /// Returns true if this epoch hasn't been fully distributed yet. + pub fn should_distribute_epoch(&self, epoch: u64) -> bool { + self.last_distributed_epoch.is_none_or(|last| epoch > last) + } + + /// Record a completed distribution epoch and reset the failure counter. + pub fn mark_distribution_success(&mut self, epoch: u64) { + self.last_distributed_epoch = Some(epoch); + self.consecutive_distribution_failures = 0; + info!("Distribution complete for epoch {epoch}"); + } + + /// Increment the consecutive distribution failure counter. + pub fn mark_distribution_failure(&mut self) { + self.consecutive_distribution_failures += 1; + error!( + "Distribution failure #{}", + self.consecutive_distribution_failures + ); + } +} + +#[cfg(test)] +mod tests { + use super::SchedulerState; + + #[test] + fn test_should_distribute_epoch_when_never_distributed() { + let state = SchedulerState::default(); + assert!(state.should_distribute_epoch(0)); + assert!(state.should_distribute_epoch(42)); + } + + #[test] + fn test_should_distribute_epoch_skips_already_distributed() { + let mut state = SchedulerState::default(); + state.mark_distribution_success(10); + assert!(!state.should_distribute_epoch(10)); + assert!(!state.should_distribute_epoch(9)); + assert!(state.should_distribute_epoch(11)); + } + + #[test] + fn test_mark_distribution_success_resets_failures() { + let mut state = SchedulerState::default(); + state.mark_distribution_failure(); + state.mark_distribution_failure(); + assert_eq!(state.consecutive_distribution_failures, 2); + state.mark_distribution_success(5); + assert_eq!(state.consecutive_distribution_failures, 0); + assert_eq!(state.last_distributed_epoch, Some(5)); + } + + #[test] + fn test_backward_compat_missing_distribution_fields() { + let old_json = r#"{ + "last_processed_epoch": 42, + "consecutive_failures": 0, + "last_check_time": "2026-01-01T00:00:00Z" + }"#; + let state: SchedulerState = serde_json::from_str(old_json).unwrap(); + assert_eq!(state.last_distributed_epoch, None); + assert_eq!(state.consecutive_distribution_failures, 0); + assert_eq!(state.last_processed_epoch, Some(42)); + } } diff --git a/crates/contributor-rewards/src/scheduler/worker.rs b/crates/contributor-rewards/src/scheduler/worker.rs index 249b8f2d..e7a1ff06 100644 --- a/crates/contributor-rewards/src/scheduler/worker.rs +++ b/crates/contributor-rewards/src/scheduler/worker.rs @@ -35,8 +35,8 @@ use tracing::{debug, error, info, warn}; use crate::{ calculator::{ - WriteConfig, distribute, keypair_loader::load_keypair, ledger_operations::WriteResult, - orchestrator::Orchestrator, + DistributionOutcome, WriteConfig, distribute, keypair_loader::load_keypair, + ledger_operations::WriteResult, orchestrator::Orchestrator, }, cli::snapshot::{CompleteSnapshot, SnapshotMetadata}, ingestor::{epoch::EpochFinder, fetcher::Fetcher}, @@ -162,17 +162,57 @@ impl ScheduleWorker { } // Try to distribute rewards for the eligible epoch. - match self.try_distribute_rewards().await { - Ok(true) => { - info!("Distributed rewards successfully"); - metrics::counter!("doublezero_contributor_rewards_distribution_success") - .increment(1); - } - Ok(false) => debug!("No rewards to distribute"), - Err(e) => { - warn!("Failed to distribute rewards: {e}"); - metrics::counter!("doublezero_contributor_rewards_distribution_failure") - .increment(1); + if !self.dry_run { + match self.try_get_distribution_epoch().await { + Ok((dz_epoch, rewards_accountant_key)) => { + let calc_epoch = state.last_processed_epoch; + info!( + "Calculation epoch: {:?} | Distribution epoch: {dz_epoch}", + calc_epoch + ); + + if !state.should_distribute_epoch(dz_epoch) { + debug!("Epoch {dz_epoch} already fully distributed, skipping"); + } else { + match self + .try_distribute_rewards(dz_epoch, &rewards_accountant_key) + .await + { + Ok(DistributionOutcome::AlreadyComplete) => { + state.mark_distribution_success(dz_epoch); + state.save(&self.state_file)?; + } + Ok(DistributionOutcome::Distributed(n)) => { + info!("Distributed {n} contributors for epoch {dz_epoch}"); + metrics::counter!( + "doublezero_contributor_rewards_distribution_success" + ) + .increment(1); + } + Ok(DistributionOutcome::NotReady) => { + debug!("Distribution not ready for epoch {dz_epoch}"); + } + Err(e) => { + error!( + "Failed to distribute rewards for epoch {dz_epoch}: {e}" + ); + state.mark_distribution_failure(); + state.save(&self.state_file)?; + metrics::counter!( + "doublezero_contributor_rewards_distribution_failure" + ) + .increment(1); + if state.consecutive_distribution_failures % 10 == 0 { + error!( + "Distribution has failed {} consecutive times", + state.consecutive_distribution_failures + ); + } + } + } + } + } + Err(e) => warn!("Failed to fetch distribution epoch: {e}"), } } } @@ -180,13 +220,30 @@ impl ScheduleWorker { Ok(()) } - /// Attempt to distribute rewards for the eligible epoch. - async fn try_distribute_rewards(&self) -> Result { - if self.dry_run { - debug!("Dry run mode, skipping reward distribution"); - return Ok(false); - } + /// Fetch the config and compute the distribution-eligible epoch. + async fn try_get_distribution_epoch(&self) -> Result<(u64, Pubkey)> { + let connection = + SolanaConnection::new(self.orchestrator.settings.rpc.solana_write_url.clone()); + let (_, config) = try_fetch_config(&connection).await?; + + let deferral_period = config + .checked_minimum_epoch_duration_to_finalize_rewards() + .context("Minimum epoch duration to finalize rewards not set")?; + + let dz_epoch_value = config + .next_completed_dz_epoch + .value() + .saturating_sub(deferral_period.into()); + Ok((dz_epoch_value, config.rewards_accountant_key)) + } + + /// Attempt to distribute rewards for the given epoch. + async fn try_distribute_rewards( + &self, + dz_epoch_value: u64, + rewards_accountant_key: &Pubkey, + ) -> Result { let signer = load_keypair(&self.keypair_path)?; let connection = SolanaConnection::new(self.orchestrator.settings.rpc.solana_write_url.clone()); @@ -202,23 +259,12 @@ impl ScheduleWorker { dry_run: false, }; - let (_, config) = try_fetch_config(&wallet.connection).await?; - - let deferral_period = config - .checked_minimum_epoch_duration_to_finalize_rewards() - .context("Minimum epoch duration to finalize rewards not set")?; - - let dz_epoch_value = config - .next_completed_dz_epoch - .value() - .saturating_sub(deferral_period.into()); - let shapley_prefix = self.orchestrator.settings.get_contributor_rewards_prefix(); distribute::try_distribute_epoch_rewards( &wallet, &dz_connection, - &config.rewards_accountant_key, + rewards_accountant_key, dz_epoch_value, &shapley_prefix, ) diff --git a/crates/contributor-rewards/tests/test_scheduler.rs b/crates/contributor-rewards/tests/test_scheduler.rs index 8cee52a4..61042cc5 100644 --- a/crates/contributor-rewards/tests/test_scheduler.rs +++ b/crates/contributor-rewards/tests/test_scheduler.rs @@ -197,13 +197,16 @@ mod tests { let obj = &parsed.as_object(); - let state_keys: Vec<&String> = obj.unwrap().keys().collect(); - // make sure the keys don't change + let mut state_keys: Vec<&String> = obj.unwrap().keys().collect(); + state_keys.sort(); + // make sure the keys don't change unexpectedly assert_eq!( state_keys, [ + "consecutive_distribution_failures", "consecutive_failures", "last_check_time", + "last_distributed_epoch", "last_processed_epoch", "last_snapshot_location", "last_success_time" From dcf0ff5793df5bffab22f62431d796dfa1e06e78 Mon Sep 17 00:00:00 2001 From: Rahul Garg Date: Wed, 18 Feb 2026 13:07:20 -0700 Subject: [PATCH 3/3] fix(contributor-rewards): add metric counter on distribution epoch fetch failure --- crates/contributor-rewards/src/scheduler/worker.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/contributor-rewards/src/scheduler/worker.rs b/crates/contributor-rewards/src/scheduler/worker.rs index e7a1ff06..abbf224b 100644 --- a/crates/contributor-rewards/src/scheduler/worker.rs +++ b/crates/contributor-rewards/src/scheduler/worker.rs @@ -212,7 +212,13 @@ impl ScheduleWorker { } } } - Err(e) => warn!("Failed to fetch distribution epoch: {e}"), + Err(e) => { + warn!("Failed to fetch distribution epoch: {e}"); + metrics::counter!( + "doublezero_contributor_rewards_distribution_fetch_failure" + ) + .increment(1); + } } } }