From da55bc1680d04496575c593dcf2dd1e3603820f5 Mon Sep 17 00:00:00 2001 From: Phuong Date: Tue, 25 Nov 2025 17:50:41 -0800 Subject: [PATCH 01/15] Add sign task fix and depletion test --- .../node/src/protocol/signature.rs | 67 ++++++++++++---- .../node/src/storage/protocol_storage.rs | 76 ++++++++++++++++++- integration-tests/tests/cases/solana.rs | 11 +++ 3 files changed, 136 insertions(+), 18 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 6a707e84..d7b0a5a1 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -3,14 +3,16 @@ use crate::backlog::Backlog; use crate::config::Config; use crate::kdf::derive_delta; use crate::mesh::MeshState; +use crate::protocol::contract::primitives::intersect_vec; use crate::protocol::message::{ MessageChannel, PositMessage, PositProtocolId, SignatureMessage, Subscriber, }; use crate::protocol::posit::{PositAction, SinglePositCounter}; -use crate::protocol::presignature::PresignatureId; +use crate::protocol::presignature::{Presignature, PresignatureId}; use crate::protocol::Chain; use crate::rpc::{ContractStateWatcher, RpcChannel}; use crate::storage::presignature_storage::{PresignatureTaken, PresignatureTakenDropper}; +use crate::storage::protocol_storage::ArtifactTaken; use crate::storage::PresignatureStorage; use crate::types::SignatureProtocol; use crate::util::{AffinePointExt, JoinMap}; @@ -222,18 +224,33 @@ impl SignOrganizer { let is_proposer = proposer == ctx.me; let (presignature_id, presignature) = if is_proposer { tracing::info!(?sign_id, round = ?state.round, "proposer waiting for presignature"); + let stable = stable.iter().copied().collect::>(); + let mut recycle = Vec::new(); let fetch = tokio::time::timeout(Duration::from_secs(30), async { loop { if let Some(taken) = ctx.presignatures.take_mine(ctx.me).await { - break taken; + let participants = intersect_vec(&[&taken.artifact.participants, &stable]); + if participants.len() < ctx.threshold { + recycle.push(taken); + continue; + } + + break (taken, participants); } tokio::time::sleep(Duration::from_millis(500)).await; } }) .await; - let taken = match fetch { - Ok(taken) => taken, + let presignatures = ctx.presignatures.clone(); + tokio::spawn(async move { + for taken in recycle { + presignatures.recycle_mine(me, taken).await; + } + }); + + let (taken, participants) = match fetch { + Ok(value) => value, Err(_) => { tracing::warn!( ?sign_id, @@ -246,10 +263,11 @@ impl SignOrganizer { }; let presignature_id = taken.artifact.id; + tracing::info!(?sign_id, presignature_id, "proposer got presignature"); - // broadcast to stable and let them reject if they don't have the presignature. - for &p in &stable { + // broadcast to participants and let them reject if they don't have the presignature. + for p in participants { if p == ctx.me { continue; } @@ -455,6 +473,17 @@ impl SignPositor { tracing::warn!(?sign_id, ?from, ?proposer, "received Start from non-proposer, ignoring"); continue; } + + if participants.len() < ctx.threshold { + tracing::warn!( + ?sign_id, + ?round, + "not enough start participants" + ); + state.bump_round(); + return SignPhase::Organizing(SignOrganizer); + } + tracing::info!(?sign_id, participant = ?ctx.me, ?participants, "deliberator received Start"); break participants; } @@ -465,6 +494,10 @@ impl SignPositor { if counter.enough_rejects(ctx.threshold) { tracing::warn!(?sign_id, ?from, "received enough REJECTs, reorganizing"); + if let Some(taken) = presignature { + tracing::warn!(?sign_id, "recycling presignature due to REJECTs"); + ctx.presignatures.recycle_mine(ctx.me, taken).await; + } state.bump_round(); return SignPhase::Organizing(SignOrganizer); } @@ -485,6 +518,10 @@ impl SignPositor { threshold = ctx.threshold, "not enough presignature participants accepted, reorganizing" ); + if let Some(taken) = presignature { + tracing::warn!(?sign_id, "recycling presignature due to insufficient participants"); + ctx.presignatures.recycle_mine(ctx.me, taken).await; + } state.bump_round(); return SignPhase::Organizing(SignOrganizer); } @@ -528,6 +565,10 @@ impl SignPositor { threshold = ctx.threshold, "posit timeout: not enough presignature participants accepted, reorganizing" ); + if let Some(taken) = presignature { + tracing::warn!(?sign_id, "recycling presignature due to posit timeout"); + ctx.presignatures.recycle_mine(ctx.me, taken).await; + } state.bump_round(); return SignPhase::Organizing(SignOrganizer); } @@ -552,6 +593,10 @@ impl SignPositor { break participants; } else { tracing::warn!(?sign_id, "posit timeout without enough accepts, reorganizing"); + if let Some(taken) = presignature { + tracing::warn!(?sign_id, "recycling presignature due to posit timeout (no accepts)"); + ctx.presignatures.recycle_mine(ctx.me, taken).await; + } state.bump_round(); return SignPhase::Organizing(SignOrganizer); } @@ -564,16 +609,6 @@ impl SignPositor { } }; - if accepted_participants.is_empty() { - tracing::warn!( - ?sign_id, - ?round, - "no accepted participants after posit, reorganizing" - ); - state.bump_round(); - return SignPhase::Organizing(SignOrganizer); - } - SignPhase::Generating(SignGenerating { proposer, presignature_id, diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index 3eec011a..be6afb07 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -72,7 +72,7 @@ pub struct ArtifactTaken { pub struct ArtifactTakenDropper { pub id: A::Id, - dropper: Option>, + pub(crate) dropper: Option>, } impl Drop for ArtifactTakenDropper { @@ -87,7 +87,7 @@ impl Drop for ArtifactTakenDropper { } impl ArtifactTaken { - fn new(artifact: A, storage: ProtocolStorage) -> Self { + pub(crate) fn new(artifact: A, storage: ProtocolStorage) -> Self { Self { storage: ArtifactTakenDropper { id: artifact.id(), @@ -679,6 +679,78 @@ impl ProtocolStorage { } } + /// Return a taken artifact back to the available pool. + pub async fn recycle_mine(&self, me: Participant, taken: ArtifactTaken) -> bool { + const SCRIPT: &str = r#" + local artifact_key = KEYS[1] + local used_key = KEYS[2] + local mine_key = KEYS[3] + local reserved_key = KEYS[4] + local artifact_id = ARGV[1] + local artifact = ARGV[2] + + -- Remove from used set + redis.call("HDEL", used_key, artifact_id) + + -- Add back to artifact hash map + redis.call("HSET", artifact_key, artifact_id, artifact) + + -- Add back to mine set + redis.call("SADD", mine_key, artifact_id) + + -- Ensure it is still reserved + redis.call("SADD", reserved_key, artifact_id) + + return 1 + "#; + + let start = Instant::now(); + let (artifact, mut dropper) = taken.take(); + // We manually handle the return, so we don't want the dropper to unreserve it. + dropper.dropper.take(); + + let id = artifact.id(); + let Some(mut conn) = self.connect().await else { + tracing::warn!(id, "failed to return artifact: connection failed"); + return false; + }; + + let result: Result = redis::Script::new(SCRIPT) + .key(&self.artifact_key) + .key(&self.used_key) + .key(owner_key(&self.owner_keys, me)) + .key(&self.reserved_key) + .arg(id) + .arg(artifact) + .invoke_async(&mut conn) + .await; + + let elapsed = start.elapsed(); + crate::metrics::REDIS_LATENCY + .with_label_values(&[A::METRIC_LABEL, "return_mine", self.account_id.as_str()]) + .observe(elapsed.as_millis() as f64); + + match result { + Ok(_) => { + tracing::info!( + id, + elapsed_ms = elapsed.as_millis(), + "returned mine artifact" + ); + true + } + Err(err) => { + tracing::warn!( + id, + ?err, + elapsed_ms = elapsed.as_millis(), + "failed to return mine artifact" + ); + false + } + } + } + /// Check if an artifact is reserved. pub async fn contains_reserved(&self, id: A::Id) -> bool { let Some(mut conn) = self.connect().await else { diff --git a/integration-tests/tests/cases/solana.rs b/integration-tests/tests/cases/solana.rs index b1b0f6c6..07db4a1a 100644 --- a/integration-tests/tests/cases/solana.rs +++ b/integration-tests/tests/cases/solana.rs @@ -40,3 +40,14 @@ async fn test_solana_signature_basic() -> anyhow::Result<()> { anyhow::bail!("signature verification failed"); } } + +#[test(tokio::test)] +async fn test_solana_stockpile_depletion() -> anyhow::Result<()> { + let cluster = cluster::spawn().solana().await?; + + for i in 0..20 { + tracing::info!("producing signature {i}"); + cluster.sign().solana().await.unwrap(); + } + Ok(()) +} From 80c4de0c2d8639a30ed03752a50cbf5a8163d512 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Wed, 26 Nov 2025 07:21:48 +0000 Subject: [PATCH 02/15] Add concurrent sign request --- integration-tests/tests/cases/solana.rs | 43 ++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/integration-tests/tests/cases/solana.rs b/integration-tests/tests/cases/solana.rs index 07db4a1a..95e74f7b 100644 --- a/integration-tests/tests/cases/solana.rs +++ b/integration-tests/tests/cases/solana.rs @@ -41,13 +41,46 @@ async fn test_solana_signature_basic() -> anyhow::Result<()> { } } +// Concurrent variant: spawn many sign requests at once against a very small +// presignature stockpile and assert we make forward progress (no livelock). #[test(tokio::test)] -async fn test_solana_stockpile_depletion() -> anyhow::Result<()> { - let cluster = cluster::spawn().solana().await?; +async fn test_solana_stockpile_depletion_concurrent() -> anyhow::Result<()> { + let cluster = cluster::spawn() + .solana() + .with_config(|conf| { + // tiny presignature stock so contention is probable + conf.protocol.presignature.min_presignatures = 2; + conf.protocol.presignature.max_presignatures = 4; + }) + .await?; - for i in 0..20 { - tracing::info!("producing signature {i}"); - cluster.sign().solana().await.unwrap(); + // spawn many concurrent requests and assert each completes within a + // reasonable timeout. This catches stuck/livelock cases deterministically + // and fast because the concurrent pressure increases contention. + let concurrent = 30; + let mut futs = Vec::with_capacity(concurrent); + for _ in 0..concurrent { + // don't spawn new tasks at thread boundary; keep futures local so borrows are simple + futs.push(tokio::time::timeout( + std::time::Duration::from_secs(10), + async { + cluster + .sign() + .solana() + .await + .map_err(|e| anyhow::anyhow!(e)) + }, + )); } + + // wait for all futures to complete and ensure none timed out or errored + let all = futures::future::join_all(futs).await; + for r in all { + match r { + Ok(inner) => inner.map(|_| ())?, + Err(_) => anyhow::bail!("timed out waiting for sign request"), + } + } + Ok(()) } From a0e6d9eccb058a7500d06ca41e57b6e8b49733f2 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Wed, 26 Nov 2025 07:30:07 +0000 Subject: [PATCH 03/15] Set round --- chain-signatures/node/src/protocol/signature.rs | 4 ++-- integration-tests/tests/cases/solana.rs | 12 +++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index d7b0a5a1..8372a1a0 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -8,11 +8,10 @@ use crate::protocol::message::{ MessageChannel, PositMessage, PositProtocolId, SignatureMessage, Subscriber, }; use crate::protocol::posit::{PositAction, SinglePositCounter}; -use crate::protocol::presignature::{Presignature, PresignatureId}; +use crate::protocol::presignature::PresignatureId; use crate::protocol::Chain; use crate::rpc::{ContractStateWatcher, RpcChannel}; use crate::storage::presignature_storage::{PresignatureTaken, PresignatureTakenDropper}; -use crate::storage::protocol_storage::ArtifactTaken; use crate::storage::PresignatureStorage; use crate::types::SignatureProtocol; use crate::util::{AffinePointExt, JoinMap}; @@ -201,6 +200,7 @@ impl SignOrganizer { }); let is_mine = proposer == me; + state.round = selected_round; tracing::info!( ?sign_id, diff --git a/integration-tests/tests/cases/solana.rs b/integration-tests/tests/cases/solana.rs index 95e74f7b..6c85d91f 100644 --- a/integration-tests/tests/cases/solana.rs +++ b/integration-tests/tests/cases/solana.rs @@ -44,7 +44,7 @@ async fn test_solana_signature_basic() -> anyhow::Result<()> { // Concurrent variant: spawn many sign requests at once against a very small // presignature stockpile and assert we make forward progress (no livelock). #[test(tokio::test)] -async fn test_solana_stockpile_depletion_concurrent() -> anyhow::Result<()> { +async fn test_solana_stockpile_depletion() -> anyhow::Result<()> { let cluster = cluster::spawn() .solana() .with_config(|conf| { @@ -62,7 +62,7 @@ async fn test_solana_stockpile_depletion_concurrent() -> anyhow::Result<()> { for _ in 0..concurrent { // don't spawn new tasks at thread boundary; keep futures local so borrows are simple futs.push(tokio::time::timeout( - std::time::Duration::from_secs(10), + std::time::Duration::from_secs(15), async { cluster .sign() @@ -74,11 +74,9 @@ async fn test_solana_stockpile_depletion_concurrent() -> anyhow::Result<()> { } // wait for all futures to complete and ensure none timed out or errored - let all = futures::future::join_all(futs).await; - for r in all { - match r { - Ok(inner) => inner.map(|_| ())?, - Err(_) => anyhow::bail!("timed out waiting for sign request"), + for outcome in futures::future::join_all(futs).await { + if let Err(err) = outcome { + anyhow::bail!("timeout waiting for sign request to complete: {err:?}"); } } From b342aa7ca9e8905c4d05fe3c13d899e1b0724f50 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Wed, 26 Nov 2025 23:33:07 +0000 Subject: [PATCH 04/15] Remove intersect_vec for now --- .../node/src/protocol/signature.rs | 22 +++---------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 8372a1a0..8db6e68d 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -3,7 +3,6 @@ use crate::backlog::Backlog; use crate::config::Config; use crate::kdf::derive_delta; use crate::mesh::MeshState; -use crate::protocol::contract::primitives::intersect_vec; use crate::protocol::message::{ MessageChannel, PositMessage, PositProtocolId, SignatureMessage, Subscriber, }; @@ -224,32 +223,17 @@ impl SignOrganizer { let is_proposer = proposer == ctx.me; let (presignature_id, presignature) = if is_proposer { tracing::info!(?sign_id, round = ?state.round, "proposer waiting for presignature"); - let stable = stable.iter().copied().collect::>(); - let mut recycle = Vec::new(); let fetch = tokio::time::timeout(Duration::from_secs(30), async { loop { if let Some(taken) = ctx.presignatures.take_mine(ctx.me).await { - let participants = intersect_vec(&[&taken.artifact.participants, &stable]); - if participants.len() < ctx.threshold { - recycle.push(taken); - continue; - } - - break (taken, participants); + break taken; } tokio::time::sleep(Duration::from_millis(500)).await; } }) .await; - let presignatures = ctx.presignatures.clone(); - tokio::spawn(async move { - for taken in recycle { - presignatures.recycle_mine(me, taken).await; - } - }); - - let (taken, participants) = match fetch { + let taken = match fetch { Ok(value) => value, Err(_) => { tracing::warn!( @@ -267,7 +251,7 @@ impl SignOrganizer { tracing::info!(?sign_id, presignature_id, "proposer got presignature"); // broadcast to participants and let them reject if they don't have the presignature. - for p in participants { + for &p in &stable { if p == ctx.me { continue; } From 0bbfe73f345c70c51c31b363e3aa6599aa1db72e Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 00:37:58 +0000 Subject: [PATCH 05/15] Fix intersect vec --- .../node/src/protocol/signature.rs | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 8db6e68d..46021a54 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -3,6 +3,7 @@ use crate::backlog::Backlog; use crate::config::Config; use crate::kdf::derive_delta; use crate::mesh::MeshState; +use crate::protocol::contract::primitives::intersect_vec; use crate::protocol::message::{ MessageChannel, PositMessage, PositProtocolId, SignatureMessage, Subscriber, }; @@ -221,19 +222,34 @@ impl SignOrganizer { }; let is_proposer = proposer == ctx.me; - let (presignature_id, presignature) = if is_proposer { + let (presignature_id, presignature, stable) = if is_proposer { tracing::info!(?sign_id, round = ?state.round, "proposer waiting for presignature"); + let stable = stable.iter().copied().collect::>(); + let mut recycle = Vec::new(); let fetch = tokio::time::timeout(Duration::from_secs(30), async { loop { if let Some(taken) = ctx.presignatures.take_mine(ctx.me).await { - break taken; + let participants = intersect_vec(&[&taken.artifact.participants, &stable]); + if participants.len() < ctx.threshold { + recycle.push(taken); + continue; + } + + break (taken, participants); } tokio::time::sleep(Duration::from_millis(500)).await; } }) .await; - let taken = match fetch { + let presignatures = ctx.presignatures.clone(); + tokio::spawn(async move { + for taken in recycle { + presignatures.recycle_mine(me, taken).await; + } + }); + + let (taken, participants) = match fetch { Ok(value) => value, Err(_) => { tracing::warn!( @@ -251,7 +267,7 @@ impl SignOrganizer { tracing::info!(?sign_id, presignature_id, "proposer got presignature"); // broadcast to participants and let them reject if they don't have the presignature. - for &p in &stable { + for &p in &participants { if p == ctx.me { continue; } @@ -268,9 +284,11 @@ impl SignOrganizer { .await; } - (presignature_id, Some(taken)) + // Update stable to only include participants that are in both the presignature and stable set + let stable = participants.into_iter().collect::>(); + (presignature_id, Some(taken), stable) } else { - (PresignatureId::default(), None) + (PresignatureId::default(), None, stable) }; SignPhase::Posit(SignPositor { From 226abc6c5b8f4a1d2b18b6a5fa5daded04395cfa Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 02:20:31 +0000 Subject: [PATCH 06/15] Make SignatureSpawner global --- chain-signatures/node/src/cli.rs | 6 +- .../node/src/protocol/consensus.rs | 43 ++++-- chain-signatures/node/src/protocol/mod.rs | 8 +- .../node/src/protocol/signature.rs | 98 +++++++++--- chain-signatures/node/src/protocol/state.rs | 2 - .../node/src/protocol/test_setup.rs | 9 +- integration-tests/src/mpc_fixture/builder.rs | 4 +- .../src/mpc_fixture/fixture_interface.rs | 69 ++++++++- integration-tests/tests/cases/mpc.rs | 141 ++++++++++++++++++ 9 files changed, 323 insertions(+), 57 deletions(-) diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 7e5f4884..1ea43224 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -20,8 +20,7 @@ use local_ip_address::local_ip; use near_account_id::AccountId; use near_crypto::{InMemorySigner, PublicKey, SecretKey}; use sha3::Digest; -use std::sync::Arc; -use tokio::sync::{mpsc, watch, RwLock}; +use tokio::sync::{mpsc, watch}; use url::Url; use mpc_keys::hpke; @@ -311,7 +310,8 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { generating: msg_channel.subscribe_generation().await, resharing: msg_channel.subscribe_resharing().await, ready: msg_channel.subscribe_ready().await, - sign_rx: Arc::new(RwLock::new(sign_rx)), + sign_rx: Some(sign_rx), + sign_task: None, secret_storage: key_storage, triple_storage: triple_storage.clone(), presignature_storage: presignature_storage.clone(), diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index bce96aa8..43249b6a 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -107,13 +107,19 @@ impl ConsensusProtocol for StartedState { &public_key, ); - let sign_task = SignatureSpawnerTask::run( - me, - contract_state.threshold, - epoch, - ctx, - public_key, - ); + // Start the signature task if not already running + if ctx.sign_task.is_none() { + if let Some(sign_rx) = ctx.sign_rx.take() { + ctx.sign_task = Some(SignatureSpawnerTask::run( + me, + contract_state.threshold, + epoch, + ctx, + public_key, + sign_rx, + )); + } + } NodeState::Running(RunningState { epoch, @@ -124,7 +130,6 @@ impl ConsensusProtocol for StartedState { public_key, triple_task, presign_task, - sign_task, }) } } @@ -408,13 +413,20 @@ impl ConsensusProtocol for WaitingForConsensusState { &self.private_share, &self.public_key, ); - let sign_task = SignatureSpawnerTask::run( - me, - self.threshold, - self.epoch, - ctx, - self.public_key, - ); + + // Start the signature task if not already running + if ctx.sign_task.is_none() { + if let Some(sign_rx) = ctx.sign_rx.take() { + ctx.sign_task = Some(SignatureSpawnerTask::run( + me, + self.threshold, + self.epoch, + ctx, + self.public_key, + sign_rx, + )); + } + } NodeState::Running(RunningState { epoch: self.epoch, @@ -425,7 +437,6 @@ impl ConsensusProtocol for WaitingForConsensusState { public_key: self.public_key, triple_task, presign_task, - sign_task, }) } }, diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 6c0a73db..a480013e 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -28,6 +28,7 @@ use crate::mesh::MeshState; use crate::protocol::consensus::ConsensusProtocol; use crate::protocol::cryptography::CryptographicProtocol; use crate::protocol::message::{GeneratingMessage, ReadyMessage, ResharingMessage}; +use crate::protocol::signature::SignatureSpawnerTask; use crate::respond_bidirectional::RespondBidirectionalTx; use crate::rpc::{ContractStateWatcher, RpcChannel}; use crate::storage::presignature_storage::PresignatureStorage; @@ -37,10 +38,8 @@ use crate::storage::triple_storage::TripleStorage; use near_account_id::AccountId; use semver::Version; use std::path::Path; -use std::sync::Arc; use std::time::{Duration, Instant}; use sysinfo::{CpuRefreshKind, Disks, RefreshKind, System}; -use tokio::sync::RwLock; use tokio::sync::{mpsc, watch}; pub struct MpcSignProtocol { @@ -48,7 +47,10 @@ pub struct MpcSignProtocol { pub(crate) secret_storage: SecretNodeStorageBox, pub(crate) triple_storage: TripleStorage, pub(crate) presignature_storage: PresignatureStorage, - pub(crate) sign_rx: Arc>>, + pub(crate) sign_rx: Option>, + /// The signature spawner task handle - persists across resharing. + /// This is `None` until the node first enters the Running state. + pub(crate) sign_task: Option, pub(crate) generating: mpsc::Receiver, pub(crate) resharing: mpsc::Receiver, pub(crate) ready: mpsc::Receiver, diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 46021a54..9453f81f 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -28,9 +28,8 @@ use rand::rngs::StdRng; use rand::seq::IteratorRandom; use rand::SeedableRng; use std::collections::{BTreeSet, HashMap}; -use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::{mpsc, watch, RwLock}; +use tokio::sync::{mpsc, watch}; use tokio::task::JoinHandle; use near_account_id::AccountId; @@ -969,6 +968,50 @@ struct SignTask { } impl SignTask { + /// Waits until the contract is running with the same epoch as this task. + /// Returns the new running state, or None if the epoch has changed (meaning we should abort). + async fn wait_for_running(&self) -> Option<()> { + let sign_id = self.sign_id; + let task_epoch = self.epoch; + let mut contract = self.contract.clone(); + + loop { + let state = contract.state(); + match state { + Some(crate::protocol::ProtocolState::Running(running)) => { + if running.epoch != task_epoch { + tracing::info!( + ?sign_id, + old_epoch = task_epoch, + new_epoch = running.epoch, + "signature task aborting: epoch changed during resharing" + ); + return None; + } + // Same epoch, we're good to continue + return Some(()); + } + Some(crate::protocol::ProtocolState::Resharing(_)) => { + tracing::debug!( + ?sign_id, + epoch = task_epoch, + "signature task waiting: contract is resharing" + ); + // Wait for the contract state to change + let _ = contract.next_state().await; + } + Some(crate::protocol::ProtocolState::Initializing(_)) | None => { + tracing::debug!( + ?sign_id, + epoch = task_epoch, + "signature task waiting: contract is initializing or unavailable" + ); + let _ = contract.next_state().await; + } + } + } + } + async fn run( self, indexed: IndexedSignRequest, @@ -988,16 +1031,32 @@ impl SignTask { let mut phase = SignPhase::Organizing(SignOrganizer); loop { - // Check if we should abort due to resharing or epoch change + // Check if we should wait due to resharing or abort due to epoch change if let Some(contract_state) = self.contract.state() { match contract_state { crate::protocol::ProtocolState::Resharing(_) => { tracing::info!( ?sign_id, epoch = task_epoch, - "signature task interrupted: contract is resharing" + "signature task pausing: contract is resharing" ); - return Err(SignError::Aborted); + + // Wait for resharing to complete + if self.wait_for_running().await.is_none() { + // Epoch changed during resharing, abort + return Err(SignError::Aborted); + } + + tracing::info!( + ?sign_id, + epoch = task_epoch, + "signature task resuming: resharing complete, restarting from organizing" + ); + + // Reset to organizing phase after resharing completes + state.round = 0; + phase = SignPhase::Organizing(SignOrganizer); + continue; } crate::protocol::ProtocolState::Running(running) if running.epoch != task_epoch => @@ -1158,7 +1217,7 @@ impl SignatureSpawner { async fn run( mut self, - sign_rx: Arc>>, + mut sign_rx: mpsc::Receiver, mut contract: ContractStateWatcher, mut cfg: watch::Receiver, ) { @@ -1168,10 +1227,6 @@ impl SignatureSpawner { let all_participants = running.participants.keys().copied().collect(); let mut protocol = cfg.borrow().protocol.clone(); - // we acquire the lock but since this is a tokio lock, aborting the task while holding - // the lock is safe and will not deadlock other tasks trying to acquire the lock - let mut sign_rx = sign_rx.write().await; - loop { tokio::select! { sign = sign_rx.recv() => { @@ -1230,6 +1285,7 @@ impl SignatureSpawnerTask { epoch: u64, ctx: &MpcSignProtocol, public_key: PublicKey, + sign_rx: mpsc::Receiver, ) -> Self { let spawner = SignatureSpawner { me, @@ -1247,28 +1303,22 @@ impl SignatureSpawnerTask { }; Self { - handle: tokio::spawn(spawner.run( - ctx.sign_rx.clone(), - ctx.contract.clone(), - ctx.config.clone(), - )), + handle: tokio::spawn(spawner.run(sign_rx, ctx.contract.clone(), ctx.config.clone())), } } pub fn abort(&self) { - // NOTE: since dropping the handle here, PresignatureSpawner will drop their JoinSet/JoinMap - // which will also abort all ongoing presignature generation tasks. This is important to note - // since we do not want to leak any presignature generation tasks when we are resharing, and - // potentially wasting compute. + // NOTE: This aborts the spawner task and all ongoing signature generation tasks. + // This is typically only called during shutdown or catastrophic error recovery. + // During normal operation (including resharing), the spawner should NOT be aborted + // since sign tasks need to persist across resharing. self.handle.abort(); } } -impl Drop for SignatureSpawnerTask { - fn drop(&mut self) { - self.abort(); - } -} +// NOTE: We intentionally do NOT implement Drop for SignatureSpawnerTask. +// The spawner should persist across resharing, so dropping it should not abort the task. +// The spawner will naturally terminate when all channels close (e.g., during node shutdown). enum PendingPresignature { Available(PresignatureTaken), diff --git a/chain-signatures/node/src/protocol/state.rs b/chain-signatures/node/src/protocol/state.rs index c3f1fe0b..d3d93e7f 100644 --- a/chain-signatures/node/src/protocol/state.rs +++ b/chain-signatures/node/src/protocol/state.rs @@ -1,7 +1,6 @@ use super::contract::{primitives::Participants, ResharingContractState}; use super::triple::TripleSpawnerTask; use crate::protocol::presignature::PresignatureSpawnerTask; -use crate::protocol::signature::SignatureSpawnerTask; use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; use cait_sith::protocol::Participant; @@ -77,7 +76,6 @@ pub struct RunningState { pub public_key: PublicKey, pub triple_task: TripleSpawnerTask, pub presign_task: PresignatureSpawnerTask, - pub sign_task: SignatureSpawnerTask, } pub struct ResharingState { diff --git a/chain-signatures/node/src/protocol/test_setup.rs b/chain-signatures/node/src/protocol/test_setup.rs index 1847e9c7..3f89174f 100644 --- a/chain-signatures/node/src/protocol/test_setup.rs +++ b/chain-signatures/node/src/protocol/test_setup.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use crate::backlog::Backlog; use crate::config::Config; use crate::mesh::MeshState; @@ -8,7 +6,7 @@ use crate::rpc::{ContractStateWatcher, RpcChannel}; use crate::storage::secret_storage::SecretNodeStorageBox; use crate::storage::{PresignatureStorage, TripleStorage}; use near_sdk::AccountId; -use tokio::sync::{mpsc, watch, RwLock}; +use tokio::sync::{mpsc, watch}; pub struct TestProtocolStorage { pub secret_storage: SecretNodeStorageBox, @@ -17,7 +15,7 @@ pub struct TestProtocolStorage { } pub struct TestProtocolChannels { - pub sign_rx: Arc>>, + pub sign_rx: mpsc::Receiver, pub msg_channel: MessageChannel, pub rpc_channel: RpcChannel, pub config: watch::Receiver, @@ -39,7 +37,8 @@ impl MpcSignProtocol { secret_storage: storage.secret_storage, triple_storage: storage.triple_storage, presignature_storage: storage.presignature_storage, - sign_rx: channels.sign_rx, + sign_rx: Some(channels.sign_rx), + sign_task: None, msg_channel: channels.msg_channel, generating, resharing, diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 46eaf08c..3e07ac49 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -27,10 +27,8 @@ use mpc_node::rpc::RpcChannel; use mpc_node::storage::{secret_storage, triple_storage::TriplePair, Options}; use near_sdk::AccountId; use std::collections::HashMap; -use std::sync::Arc; use tokio::sync::mpsc::{self, Sender}; use tokio::sync::watch; -use tokio::sync::RwLock; pub struct MpcFixtureBuilder { prepared_nodes: Vec, @@ -423,7 +421,7 @@ impl MpcFixtureNodeBuilder { let (config_tx, config_rx) = watch::channel(self.config); let channels = protocol::test_setup::TestProtocolChannels { - sign_rx: Arc::new(RwLock::new(sign_rx)), + sign_rx, msg_channel: self.messaging.channel.clone(), rpc_channel, config: config_rx.clone(), diff --git a/integration-tests/src/mpc_fixture/fixture_interface.rs b/integration-tests/src/mpc_fixture/fixture_interface.rs index b8bec1da..5a5d0483 100644 --- a/integration-tests/src/mpc_fixture/fixture_interface.rs +++ b/integration-tests/src/mpc_fixture/fixture_interface.rs @@ -6,7 +6,8 @@ use cait_sith::protocol::Participant; use mpc_node::backlog::Backlog; use mpc_node::config::Config; use mpc_node::mesh::MeshState; -use mpc_node::protocol::state::NodeStateWatcher; +use mpc_node::protocol::contract::ResharingContractState; +use mpc_node::protocol::state::{NodeStateWatcher, NodeStatus}; use mpc_node::protocol::sync::SyncChannel; use mpc_node::protocol::{MessageChannel, ProtocolState, Sign}; use mpc_node::storage::{PresignatureStorage, TripleStorage}; @@ -74,9 +75,75 @@ impl MpcFixture { tokio::time::sleep(interval).await; } } + + /// Wait for all nodes to reach the Running state. + pub async fn wait_for_running(&self) { + for node in &self.nodes { + node.wait_for_running().await; + } + } + + /// Trigger resharing by transitioning the contract state to Resharing. + /// The participants remain the same (no node joins or leaves). + pub fn trigger_resharing(&self) { + let current_state = self.shared_contract_state.borrow().clone(); + if let Some(ProtocolState::Running(running)) = current_state { + let resharing_state = ResharingContractState { + old_epoch: running.epoch, + old_participants: running.participants.clone(), + new_participants: running.participants.clone(), + threshold: running.threshold, + public_key: running.public_key, + finished_votes: HashSet::new(), + cancel_votes: HashSet::new(), + }; + let _ = self + .shared_contract_state + .send(Some(ProtocolState::Resharing(resharing_state))); + tracing::info!("triggered resharing"); + } else { + tracing::warn!("cannot trigger resharing: contract not in Running state"); + } + } + + /// Complete resharing by transitioning the contract state back to Running. + /// Note: For testing purposes, we keep the same epoch since we're not actually + /// running the resharing protocol. This simulates a resharing that was cancelled + /// or where the participants didn't change. + pub fn complete_resharing(&self) { + let current_state = self.shared_contract_state.borrow().clone(); + if let Some(ProtocolState::Resharing(resharing)) = current_state { + let running_state = mpc_node::protocol::contract::RunningContractState { + // Keep the same epoch since we're not actually resharing + epoch: resharing.old_epoch, + participants: resharing.new_participants.clone(), + threshold: resharing.threshold, + public_key: resharing.public_key, + candidates: Default::default(), + join_votes: Default::default(), + leave_votes: Default::default(), + }; + let _ = self + .shared_contract_state + .send(Some(ProtocolState::Running(running_state))); + tracing::info!(epoch = resharing.old_epoch, "completed resharing (same epoch)"); + } else { + tracing::warn!("cannot complete resharing: contract not in Resharing state"); + } + } } impl MpcFixtureNode { + /// Wait for this node to reach the Running state. + pub async fn wait_for_running(&self) { + loop { + if matches!(self.state.status(), NodeStatus::Running { .. }) { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + } + pub async fn wait_for_triples(&self, threshold_per_node: usize) { loop { let count = self.triple_storage.len_by_owner(self.me).await; diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index 2fbce107..182a71c8 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -198,6 +198,147 @@ async fn test_basic_sign() { ); } +/// Test that sign tasks survive resharing and complete after resharing finishes. +/// This test: +/// 1. Waits for all nodes to be running +/// 2. Starts a signing request +/// 3. Triggers resharing mid-signature +/// 4. Completes resharing +/// 5. Verifies the signature still completes +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_sign_task_survives_resharing() { + let network = MpcFixtureBuilder::default() + .only_generate_signatures() + .build() + .await; + + // Wait for all nodes to reach Running state + tokio::time::timeout(Duration::from_secs(5), network.wait_for_running()) + .await + .expect("nodes should reach Running state"); + + tokio::time::timeout( + Duration::from_millis(300), + network.wait_for_presignatures(2), + ) + .await + .expect("should start with enough presignatures"); + + tracing::info!("sending sign request"); + let request = sign_request(0); + network[0] + .sign_tx + .send(Sign::Request(request.clone())) + .await + .unwrap(); + network[1] + .sign_tx + .send(Sign::Request(request.clone())) + .await + .unwrap(); + network[2] + .sign_tx + .send(Sign::Request(request.clone())) + .await + .unwrap(); + + // Give some time for the sign task to start organizing + tokio::time::sleep(Duration::from_millis(50)).await; + + // Trigger resharing while sign task is in progress + tracing::info!("triggering resharing"); + network.trigger_resharing(); + + // Give some time for nodes to notice resharing and pause + tokio::time::sleep(Duration::from_millis(200)).await; + + // Complete resharing + tracing::info!("completing resharing"); + network.complete_resharing(); + + // The signature should still complete after resharing + let timeout = Duration::from_secs(15); + let actions = tokio::time::timeout(timeout, network.wait_for_actions(1)) + .await + .expect("signature should complete after resharing"); + + assert_eq!(actions.len(), 1); + let action_str = actions.iter().next().unwrap(); + assert!( + action_str.contains("RpcAction::Publish"), + "unexpected rpc action {action_str}" + ); + tracing::info!("signature completed successfully after resharing"); +} + +/// Test that new sign requests during resharing are queued and complete after resharing. +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_sign_request_during_resharing() { + let network = MpcFixtureBuilder::default() + .only_generate_signatures() + .build() + .await; + + // Wait for all nodes to reach Running state + tokio::time::timeout(Duration::from_secs(5), network.wait_for_running()) + .await + .expect("nodes should reach Running state"); + + tokio::time::timeout( + Duration::from_millis(300), + network.wait_for_presignatures(2), + ) + .await + .expect("should start with enough presignatures"); + + // Trigger resharing first + tracing::info!("triggering resharing before sign request"); + network.trigger_resharing(); + + // Give some time for nodes to notice resharing + tokio::time::sleep(Duration::from_millis(100)).await; + + // Send a sign request while in resharing + tracing::info!("sending sign request during resharing"); + let request = sign_request(1); + network[0] + .sign_tx + .send(Sign::Request(request.clone())) + .await + .unwrap(); + network[1] + .sign_tx + .send(Sign::Request(request.clone())) + .await + .unwrap(); + network[2] + .sign_tx + .send(Sign::Request(request.clone())) + .await + .unwrap(); + + // Give some time for the request to be queued + tokio::time::sleep(Duration::from_millis(100)).await; + + // Complete resharing + tracing::info!("completing resharing"); + network.complete_resharing(); + + // The signature should complete after resharing + let timeout = Duration::from_secs(15); + let actions = tokio::time::timeout(timeout, network.wait_for_actions(1)) + .await + .expect("signature should complete after resharing"); + + assert_eq!(actions.len(), 1); + let action_str = actions.iter().next().unwrap(); + assert!( + action_str.contains("RpcAction::Publish"), + "unexpected rpc action {action_str}" + ); + tracing::info!("signature request during resharing completed successfully"); +} + fn sign_request(seed: u8) -> IndexedSignRequest { IndexedSignRequest { id: SignId::new([seed; 32]), From 7c181c32732a87a4836d43a33c759e0743ffdab4 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 03:12:46 +0000 Subject: [PATCH 07/15] Make signature spawner use contract for info --- chain-signatures/node/src/cli.rs | 20 ++- .../node/src/protocol/consensus.rs | 29 ---- chain-signatures/node/src/protocol/mod.rs | 8 +- .../node/src/protocol/signature.rs | 153 ++++++++++++++---- .../node/src/protocol/test_setup.rs | 23 ++- chain-signatures/node/src/rpc.rs | 2 +- integration-tests/src/mpc_fixture/builder.rs | 2 +- 7 files changed, 162 insertions(+), 75 deletions(-) diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 1ea43224..549928fd 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -5,6 +5,7 @@ use crate::mesh::Mesh; use crate::node_client::{self, NodeClient}; use crate::protocol::message::MessageChannel; use crate::protocol::presignature::Presignature; +use crate::protocol::signature::SignatureSpawnerTask; use crate::protocol::state::Node; use crate::protocol::sync::SyncTask; use crate::protocol::{spawn_system_metrics, MpcSignProtocol}; @@ -303,6 +304,22 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { contract_watcher.clone(), ) .await; + + // Start the signature spawner task immediately. + // It will wait for the contract to reach Running state before processing requests, + // and dynamically obtains governance info from the contract state. + let sign_task = SignatureSpawnerTask::run( + account_id.clone(), + sign_rx, + contract_watcher.clone(), + config_rx.clone(), + presignature_storage.clone(), + mesh_state.clone(), + msg_channel.clone(), + rpc_channel.clone(), + backlog.clone(), + ); + let protocol = MpcSignProtocol { my_account_id: account_id.clone(), rpc_channel, @@ -310,8 +327,7 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { generating: msg_channel.subscribe_generation().await, resharing: msg_channel.subscribe_resharing().await, ready: msg_channel.subscribe_ready().await, - sign_rx: Some(sign_rx), - sign_task: None, + sign_task, secret_storage: key_storage, triple_storage: triple_storage.clone(), presignature_storage: presignature_storage.clone(), diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index 43249b6a..5733fc05 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -6,7 +6,6 @@ use super::state::{ use super::MpcSignProtocol; use crate::protocol::contract::primitives::Participants; use crate::protocol::presignature::PresignatureSpawnerTask; -use crate::protocol::signature::SignatureSpawnerTask; use crate::protocol::state::GeneratingState; use crate::protocol::triple::TripleSpawnerTask; use crate::protocol::Governance; @@ -107,20 +106,6 @@ impl ConsensusProtocol for StartedState { &public_key, ); - // Start the signature task if not already running - if ctx.sign_task.is_none() { - if let Some(sign_rx) = ctx.sign_rx.take() { - ctx.sign_task = Some(SignatureSpawnerTask::run( - me, - contract_state.threshold, - epoch, - ctx, - public_key, - sign_rx, - )); - } - } - NodeState::Running(RunningState { epoch, me, @@ -414,20 +399,6 @@ impl ConsensusProtocol for WaitingForConsensusState { &self.public_key, ); - // Start the signature task if not already running - if ctx.sign_task.is_none() { - if let Some(sign_rx) = ctx.sign_rx.take() { - ctx.sign_task = Some(SignatureSpawnerTask::run( - me, - self.threshold, - self.epoch, - ctx, - self.public_key, - sign_rx, - )); - } - } - NodeState::Running(RunningState { epoch: self.epoch, me, diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index a480013e..d5665aa5 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -47,10 +47,10 @@ pub struct MpcSignProtocol { pub(crate) secret_storage: SecretNodeStorageBox, pub(crate) triple_storage: TripleStorage, pub(crate) presignature_storage: PresignatureStorage, - pub(crate) sign_rx: Option>, - /// The signature spawner task handle - persists across resharing. - /// This is `None` until the node first enters the Running state. - pub(crate) sign_task: Option, + /// The signature spawner task handle - started immediately and persists across resharing. + /// The spawner waits for the contract to reach Running state before processing requests, + /// and dynamically updates governance info when the contract state changes. + pub(crate) sign_task: SignatureSpawnerTask, pub(crate) generating: mpsc::Receiver, pub(crate) resharing: mpsc::Receiver, pub(crate) ready: mpsc::Receiver, diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 9453f81f..bd84f13c 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1,4 +1,3 @@ -use super::MpcSignProtocol; use crate::backlog::Backlog; use crate::config::Config; use crate::kdf::derive_delta; @@ -1090,6 +1089,17 @@ enum SignTaskMessage { }, } +/// Governance information obtained from the contract state. +/// Updated whenever the contract transitions to a new Running state. +#[derive(Clone)] +struct GovernanceInfo { + me: Participant, + threshold: usize, + epoch: u64, + public_key: PublicKey, + participants: BTreeSet, +} + pub struct SignatureSpawner { /// Presignature storage that maintains all presignatures. presignatures: PresignatureStorage, @@ -1099,11 +1109,7 @@ pub struct SignatureSpawner { inboxes: HashMap>, mesh_state: watch::Receiver, - me: Participant, my_account_id: AccountId, - threshold: usize, - public_key: PublicKey, - epoch: u64, msg: MessageChannel, rpc: RpcChannel, backlog: Backlog, @@ -1115,7 +1121,7 @@ impl SignatureSpawner { fn spawn_task( &mut self, indexed: IndexedSignRequest, - participants: BTreeSet, + gov: &GovernanceInfo, contract: ContractStateWatcher, cfg: ProtocolConfig, ) { @@ -1125,12 +1131,12 @@ impl SignatureSpawner { // Subscribe to (or create) the posit inbox for this sign request let rx = self.inboxes.entry(sign_id).or_default().subscribe(); let task = SignTask { - me: self.me, - participants, + me: gov.me, + participants: gov.participants.clone(), sign_id, - threshold: self.threshold, - public_key: self.public_key, - epoch: self.epoch, + threshold: gov.threshold, + public_key: gov.public_key, + epoch: gov.epoch, my_account_id: self.my_account_id.clone(), presignatures: self.presignatures.clone(), msg: self.msg.clone(), @@ -1152,9 +1158,10 @@ impl SignatureSpawner { presignature_id: PresignatureId, from: Participant, action: PositAction, + gov: &GovernanceInfo, ) { // Ignore messages from ourselves - if from == self.me { + if from == gov.me { return; } let _ = self @@ -1182,7 +1189,7 @@ impl SignatureSpawner { &mut self, sign: Sign, cfg: &ProtocolConfig, - participants: &BTreeSet, + gov: &GovernanceInfo, contract: &ContractStateWatcher, ) { match sign { @@ -1205,7 +1212,7 @@ impl SignatureSpawner { .with_label_values(&[indexed.chain.as_str(), self.my_account_id.as_str()]) .inc(); - self.spawn_task(indexed, participants.clone(), contract.clone(), cfg.clone()); + self.spawn_task(indexed, gov, contract.clone(), cfg.clone()); } } @@ -1215,6 +1222,29 @@ impl SignatureSpawner { .set(self.tasks.len() as i64); } + /// Waits for the contract to be in Running state and extracts governance info. + /// Returns None if we are not a participant in the current running state. + async fn wait_governance(contract: &mut ContractStateWatcher) -> GovernanceInfo { + loop { + let running = contract.wait_running().await; + if let Some(me) = running + .participants + .find_participant(contract.my_account_id()) + { + return GovernanceInfo { + me: *me, + threshold: running.threshold, + epoch: running.epoch, + public_key: PublicKey::from(running.public_key), + participants: running.participants.keys().copied().collect(), + }; + } + // We're not a participant, wait for next state change + tracing::warn!("signature spawner: we are not a participant, waiting for state change"); + let _ = contract.next_state().await; + } + } + async fn run( mut self, mut sign_rx: mpsc::Receiver, @@ -1223,8 +1253,15 @@ impl SignatureSpawner { ) { let mut posits = self.msg.subscribe_signature_posit().await; - let running = contract.wait_running().await; - let all_participants = running.participants.keys().copied().collect(); + // Wait for initial governance info + let mut gov = Self::wait_governance(&mut contract).await; + tracing::info!( + me = ?gov.me, + epoch = gov.epoch, + threshold = gov.threshold, + "signature spawner initialized with governance info" + ); + let mut protocol = cfg.borrow().protocol.clone(); loop { @@ -1234,10 +1271,10 @@ impl SignatureSpawner { tracing::warn!("signature spawner sign_rx closed, terminating"); break; }; - self.handle_request(sign, &protocol, &all_participants, &contract); + self.handle_request(sign, &protocol, &gov, &contract); } Some((sign_id, presignature_id, from, action)) = posits.recv() => { - self.handle_posit(sign_id, presignature_id, from, action).await; + self.handle_posit(sign_id, presignature_id, from, action, &gov).await; } Some(result) = self.tasks.join_next(), if !self.tasks.is_empty() => { let (sign_id, result) = match result { @@ -1262,6 +1299,50 @@ impl SignatureSpawner { Ok(()) = cfg.changed() => { protocol = cfg.borrow().protocol.clone(); } + _ = contract.next_state() => { + // Contract state changed - check if we need to update governance info + if let Some(state) = contract.state() { + match state { + crate::protocol::ProtocolState::Running(running) => { + if running.epoch != gov.epoch { + // Epoch changed, update governance info + if let Some(me) = running.participants.find_participant(contract.my_account_id()) { + tracing::info!( + old_epoch = gov.epoch, + new_epoch = running.epoch, + new_me = ?me, + "signature spawner: epoch changed, updating governance info" + ); + gov = GovernanceInfo { + me: *me, + threshold: running.threshold, + epoch: running.epoch, + public_key: running.public_key, + participants: running.participants.keys().copied().collect(), + }; + } else { + tracing::warn!( + epoch = running.epoch, + "signature spawner: we are no longer a participant after epoch change" + ); + // Wait for next state change where we might be a participant again + } + } + } + crate::protocol::ProtocolState::Resharing(_) => { + tracing::info!( + epoch = gov.epoch, + "signature spawner: contract is resharing, existing tasks will pause" + ); + // Individual SignTasks handle pausing during resharing + // We continue to accept new requests but they'll queue up + } + crate::protocol::ProtocolState::Initializing(_) => { + tracing::debug!("signature spawner: contract is initializing"); + } + } + } + } } } } @@ -1279,31 +1360,35 @@ pub struct SignatureSpawnerTask { } impl SignatureSpawnerTask { + /// Start the signature spawner task. + /// + /// The spawner will wait for the contract to reach Running state before processing requests. + /// It dynamically obtains governance info (me, threshold, epoch, public_key) from the contract + /// state and updates when the contract transitions to a new epoch after resharing. pub fn run( - me: Participant, - threshold: usize, - epoch: u64, - ctx: &MpcSignProtocol, - public_key: PublicKey, + my_account_id: AccountId, sign_rx: mpsc::Receiver, + contract: ContractStateWatcher, + config: watch::Receiver, + presignature_storage: PresignatureStorage, + mesh_state: watch::Receiver, + msg_channel: MessageChannel, + rpc_channel: RpcChannel, + backlog: Backlog, ) -> Self { let spawner = SignatureSpawner { - me, tasks: JoinMap::new(), inboxes: HashMap::new(), - my_account_id: ctx.my_account_id.clone(), - threshold, - public_key, - epoch, - presignatures: ctx.presignature_storage.clone(), - mesh_state: ctx.mesh_state.clone(), - msg: ctx.msg_channel.clone(), - rpc: ctx.rpc_channel.clone(), - backlog: ctx.backlog.clone(), + my_account_id, + presignatures: presignature_storage, + mesh_state, + msg: msg_channel, + rpc: rpc_channel, + backlog, }; Self { - handle: tokio::spawn(spawner.run(sign_rx, ctx.contract.clone(), ctx.config.clone())), + handle: tokio::spawn(spawner.run(sign_rx, contract, config)), } } diff --git a/chain-signatures/node/src/protocol/test_setup.rs b/chain-signatures/node/src/protocol/test_setup.rs index 3f89174f..cf82a65c 100644 --- a/chain-signatures/node/src/protocol/test_setup.rs +++ b/chain-signatures/node/src/protocol/test_setup.rs @@ -1,6 +1,7 @@ use crate::backlog::Backlog; use crate::config::Config; use crate::mesh::MeshState; +use crate::protocol::signature::SignatureSpawnerTask; use crate::protocol::{MessageChannel, MpcSignProtocol, Sign}; use crate::rpc::{ContractStateWatcher, RpcChannel}; use crate::storage::secret_storage::SecretNodeStorageBox; @@ -15,7 +16,6 @@ pub struct TestProtocolStorage { } pub struct TestProtocolChannels { - pub sign_rx: mpsc::Receiver, pub msg_channel: MessageChannel, pub rpc_channel: RpcChannel, pub config: watch::Receiver, @@ -28,17 +28,32 @@ impl MpcSignProtocol { storage: TestProtocolStorage, channels: TestProtocolChannels, contract: ContractStateWatcher, + sign_rx: mpsc::Receiver, ) -> Self { let generating = channels.msg_channel.subscribe_generation().await; let resharing = channels.msg_channel.subscribe_resharing().await; let ready = channels.msg_channel.subscribe_ready().await; + let backlog = Backlog::new(); + + // Start the signature spawner task immediately + let sign_task = SignatureSpawnerTask::run( + my_account_id.clone(), + sign_rx, + contract.clone(), + channels.config.clone(), + storage.presignature_storage.clone(), + channels.mesh_state.clone(), + channels.msg_channel.clone(), + channels.rpc_channel.clone(), + backlog.clone(), + ); + Self { my_account_id, secret_storage: storage.secret_storage, triple_storage: storage.triple_storage, presignature_storage: storage.presignature_storage, - sign_rx: Some(channels.sign_rx), - sign_task: None, + sign_task, msg_channel: channels.msg_channel, generating, resharing, @@ -47,7 +62,7 @@ impl MpcSignProtocol { contract, config: channels.config, mesh_state: channels.mesh_state, - backlog: Backlog::new(), + backlog, } } } diff --git a/chain-signatures/node/src/rpc.rs b/chain-signatures/node/src/rpc.rs index 9b9fdbfe..199c0ac8 100644 --- a/chain-signatures/node/src/rpc.rs +++ b/chain-signatures/node/src/rpc.rs @@ -177,7 +177,7 @@ impl ContractStateWatcher { ) } - pub fn account_id(&self) -> &AccountId { + pub fn my_account_id(&self) -> &AccountId { &self.account_id } diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 3e07ac49..57420599 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -421,7 +421,6 @@ impl MpcFixtureNodeBuilder { let (config_tx, config_rx) = watch::channel(self.config); let channels = protocol::test_setup::TestProtocolChannels { - sign_rx, msg_channel: self.messaging.channel.clone(), rpc_channel, config: config_rx.clone(), @@ -442,6 +441,7 @@ impl MpcFixtureNodeBuilder { storage, channels, context.contract_state.clone(), + sign_rx, ) .await; From 3b2b7ed5d073eb1b553fdfc05ff93ba84a130745 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 04:26:12 +0000 Subject: [PATCH 08/15] Make SignatgureSpawner send pause|abort|resume --- chain-signatures/node/src/protocol/posit.rs | 2 +- .../node/src/protocol/signature.rs | 280 +++++++++--------- chain-signatures/node/src/rpc.rs | 52 ++++ 3 files changed, 193 insertions(+), 141 deletions(-) diff --git a/chain-signatures/node/src/protocol/posit.rs b/chain-signatures/node/src/protocol/posit.rs index 98ff50ee..a60ba9e8 100644 --- a/chain-signatures/node/src/protocol/posit.rs +++ b/chain-signatures/node/src/protocol/posit.rs @@ -29,7 +29,7 @@ impl Positor { } /// All actions that can be taken when a new posit is introduced for a protocol. -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum PositAction { Propose, Start(Vec), diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index bd84f13c..144dc8ce 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -9,7 +9,7 @@ use crate::protocol::message::{ use crate::protocol::posit::{PositAction, SinglePositCounter}; use crate::protocol::presignature::PresignatureId; use crate::protocol::Chain; -use crate::rpc::{ContractStateWatcher, RpcChannel}; +use crate::rpc::{ContractStateWatcher, GovernanceInfo, RpcChannel}; use crate::storage::presignature_storage::{PresignatureTaken, PresignatureTakenDropper}; use crate::storage::PresignatureStorage; use crate::types::SignatureProtocol; @@ -21,7 +21,7 @@ use cait_sith::PresignOutput; use chrono::Utc; use k256::Secp256k1; use mpc_contract::config::ProtocolConfig; -use mpc_crypto::{derive_key, PublicKey}; +use mpc_crypto::derive_key; use mpc_primitives::{SignArgs, SignId}; use rand::rngs::StdRng; use rand::seq::IteratorRandom; @@ -317,13 +317,16 @@ impl SignPositor { presignature_id, from, action, - } = &task_msg; + } = task_msg else { + // Pause/Resume/Abort are handled by the outer run loop, not here + continue; + }; if !matches!(action, PositAction::Propose) { continue; } - if from == &proposer { + if from == proposer { tracing::info!( ?sign_id, presignature_id, @@ -332,7 +335,7 @@ impl SignPositor { ); // Check if we have access to this presignature (in storage or generating) - if !ctx.presignatures.contains(*presignature_id).await { + if !ctx.presignatures.contains(presignature_id).await { tracing::warn!( ?sign_id, presignature_id, @@ -343,7 +346,7 @@ impl SignPositor { ctx.me, proposer, PositMessage { - id: PositProtocolId::Signature(sign_id, *presignature_id), + id: PositProtocolId::Signature(sign_id, presignature_id), from: ctx.me, action: PositAction::Reject, }, @@ -352,7 +355,7 @@ impl SignPositor { continue; } - break *presignature_id; + break presignature_id; } else { tracing::warn!( ?sign_id, @@ -364,9 +367,9 @@ impl SignPositor { ctx.msg .send( ctx.me, - *from, + from, PositMessage { - id: PositProtocolId::Signature(sign_id, *presignature_id), + id: PositProtocolId::Signature(sign_id, presignature_id), from: ctx.me, action: PositAction::Reject, }, @@ -466,7 +469,10 @@ impl SignPositor { let accepted_participants = loop { tokio::select! { Some(task_msg) = task_rx.recv() => { - let SignTaskMessage::PositMessage { presignature_id: _, from, action } = task_msg; + let SignTaskMessage::PositMessage { presignature_id: _, from, action } = task_msg else { + // Pause/Resume/Abort are handled by the outer run loop, not here + continue; + }; if is_deliberator { if let PositAction::Start(participants) = action { if from != proposer { @@ -947,11 +953,11 @@ impl Drop for SignGenerator { struct SignTask { me: Participant, - participants: BTreeSet, - sign_id: SignId, threshold: usize, - public_key: PublicKey, epoch: u64, + public_key: mpc_crypto::PublicKey, + participants: BTreeSet, + sign_id: SignId, my_account_id: AccountId, presignatures: PresignatureStorage, msg: MessageChannel, @@ -963,54 +969,9 @@ struct SignTask { backlog: Backlog, cfg: ProtocolConfig, - contract: ContractStateWatcher, } impl SignTask { - /// Waits until the contract is running with the same epoch as this task. - /// Returns the new running state, or None if the epoch has changed (meaning we should abort). - async fn wait_for_running(&self) -> Option<()> { - let sign_id = self.sign_id; - let task_epoch = self.epoch; - let mut contract = self.contract.clone(); - - loop { - let state = contract.state(); - match state { - Some(crate::protocol::ProtocolState::Running(running)) => { - if running.epoch != task_epoch { - tracing::info!( - ?sign_id, - old_epoch = task_epoch, - new_epoch = running.epoch, - "signature task aborting: epoch changed during resharing" - ); - return None; - } - // Same epoch, we're good to continue - return Some(()); - } - Some(crate::protocol::ProtocolState::Resharing(_)) => { - tracing::debug!( - ?sign_id, - epoch = task_epoch, - "signature task waiting: contract is resharing" - ); - // Wait for the contract state to change - let _ = contract.next_state().await; - } - Some(crate::protocol::ProtocolState::Initializing(_)) | None => { - tracing::debug!( - ?sign_id, - epoch = task_epoch, - "signature task waiting: contract is initializing or unavailable" - ); - let _ = contract.next_state().await; - } - } - } - } - async fn run( self, indexed: IndexedSignRequest, @@ -1030,45 +991,87 @@ impl SignTask { let mut phase = SignPhase::Organizing(SignOrganizer); loop { - // Check if we should wait due to resharing or abort due to epoch change - if let Some(contract_state) = self.contract.state() { - match contract_state { - crate::protocol::ProtocolState::Resharing(_) => { - tracing::info!( - ?sign_id, - epoch = task_epoch, - "signature task pausing: contract is resharing" - ); + // Check for control messages from the spawner (resharing, epoch change, abort) + // We do a non-blocking check before advancing the phase + match task_rx.try_recv() { + Ok(SignTaskMessage::Abort) => { + tracing::info!(?sign_id, epoch = task_epoch, "signature task received abort"); + return Err(SignError::Aborted); + } + Ok(SignTaskMessage::Pause) => { + tracing::info!( + ?sign_id, + epoch = task_epoch, + "signature task pausing: contract is resharing" + ); - // Wait for resharing to complete - if self.wait_for_running().await.is_none() { - // Epoch changed during resharing, abort - return Err(SignError::Aborted); + // Wait for Resume or Abort + loop { + match task_rx.recv().await { + Some(SignTaskMessage::Resume(new_gov)) => { + tracing::info!( + ?sign_id, + old_epoch = task_epoch, + new_epoch = new_gov.epoch, + "signature task resuming with new governance info" + ); + // Note: We don't update self.gov here since we're checking + // if the epoch changed. If it did, we should abort. + if new_gov.epoch != task_epoch { + tracing::info!( + ?sign_id, + old_epoch = task_epoch, + new_epoch = new_gov.epoch, + "signature task aborting: epoch changed during resharing" + ); + return Err(SignError::Aborted); + } + // Same epoch, reset to organizing and continue + state.round = 0; + phase = SignPhase::Organizing(SignOrganizer); + break; + } + Some(SignTaskMessage::Abort) => { + tracing::info!( + ?sign_id, + epoch = task_epoch, + "signature task aborted during pause" + ); + return Err(SignError::Aborted); + } + Some(SignTaskMessage::Pause) => { + // Already paused, ignore + continue; + } + Some(SignTaskMessage::PositMessage { .. }) => { + // Ignore posit messages while paused + continue; + } + None => { + tracing::warn!( + ?sign_id, + "signature task channel closed during pause" + ); + return Err(SignError::Aborted); + } } - - tracing::info!( - ?sign_id, - epoch = task_epoch, - "signature task resuming: resharing complete, restarting from organizing" - ); - - // Reset to organizing phase after resharing completes - state.round = 0; - phase = SignPhase::Organizing(SignOrganizer); - continue; } - crate::protocol::ProtocolState::Running(running) - if running.epoch != task_epoch => - { - tracing::info!( - ?sign_id, - old_epoch = task_epoch, - new_epoch = running.epoch, - "signature task interrupted: epoch changed" - ); - return Err(SignError::Aborted); - } - _ => {} + continue; + } + Ok(SignTaskMessage::Resume(_)) => { + // Resume without being paused, ignore + } + Ok(SignTaskMessage::PositMessage { .. }) => { + // Posit messages are handled during phase advancement, put it back + // Actually, we can't put it back easily. The phase.advance will recv from task_rx. + // This is a design issue - we'll handle it differently. + } + Err(mpsc::error::TryRecvError::Empty) => { + // No control message, continue with phase + } + Err(mpsc::error::TryRecvError::Disconnected) => { + tracing::warn!(?sign_id, "signature task channel disconnected"); + return Err(SignError::Aborted); } } @@ -1081,23 +1084,20 @@ impl SignTask { } /// Message types that can be sent to a running signature task +#[derive(Clone)] enum SignTaskMessage { + /// Posit message from another node PositMessage { presignature_id: PresignatureId, from: Participant, action: PositAction, }, -} - -/// Governance information obtained from the contract state. -/// Updated whenever the contract transitions to a new Running state. -#[derive(Clone)] -struct GovernanceInfo { - me: Participant, - threshold: usize, - epoch: u64, - public_key: PublicKey, - participants: BTreeSet, + /// Pause the task (resharing started) + Pause, + /// Resume the task with new governance info (resharing complete) + Resume(GovernanceInfo), + /// Abort the task (epoch changed or we're no longer a participant) + Abort, } pub struct SignatureSpawner { @@ -1122,7 +1122,6 @@ impl SignatureSpawner { &mut self, indexed: IndexedSignRequest, gov: &GovernanceInfo, - contract: ContractStateWatcher, cfg: ProtocolConfig, ) { let sign_id = indexed.id; @@ -1132,18 +1131,17 @@ impl SignatureSpawner { let rx = self.inboxes.entry(sign_id).or_default().subscribe(); let task = SignTask { me: gov.me, - participants: gov.participants.clone(), - sign_id, threshold: gov.threshold, - public_key: gov.public_key, epoch: gov.epoch, + public_key: gov.public_key, + participants: gov.participants.clone(), + sign_id, my_account_id: self.my_account_id.clone(), presignatures: self.presignatures.clone(), msg: self.msg.clone(), rpc: self.rpc.clone(), backlog: self.backlog.clone(), cfg, - contract, }; // Spawn the async task with organizing loop @@ -1185,12 +1183,32 @@ impl SignatureSpawner { } } + /// Send a message to all running tasks + async fn broadcast_to_tasks(&mut self, msg: SignTaskMessage) { + for inbox in self.inboxes.values_mut() { + let _ = inbox.send(msg.clone()).await; + } + } + + /// Abort all running tasks + fn abort_all_tasks(&mut self) { + tracing::info!( + task_count = self.tasks.len(), + "aborting all signature tasks" + ); + // Collect sign_ids from inboxes (which tracks all active sign requests) + let sign_ids: Vec = self.inboxes.keys().copied().collect(); + for sign_id in sign_ids { + self.tasks.abort(sign_id); + self.inboxes.remove(&sign_id); + } + } + fn handle_request( &mut self, sign: Sign, cfg: &ProtocolConfig, gov: &GovernanceInfo, - contract: &ContractStateWatcher, ) { match sign { Sign::Completion(sign_id) => { @@ -1212,7 +1230,7 @@ impl SignatureSpawner { .with_label_values(&[indexed.chain.as_str(), self.my_account_id.as_str()]) .inc(); - self.spawn_task(indexed, gov, contract.clone(), cfg.clone()); + self.spawn_task(indexed, gov, cfg.clone()); } } @@ -1222,29 +1240,6 @@ impl SignatureSpawner { .set(self.tasks.len() as i64); } - /// Waits for the contract to be in Running state and extracts governance info. - /// Returns None if we are not a participant in the current running state. - async fn wait_governance(contract: &mut ContractStateWatcher) -> GovernanceInfo { - loop { - let running = contract.wait_running().await; - if let Some(me) = running - .participants - .find_participant(contract.my_account_id()) - { - return GovernanceInfo { - me: *me, - threshold: running.threshold, - epoch: running.epoch, - public_key: PublicKey::from(running.public_key), - participants: running.participants.keys().copied().collect(), - }; - } - // We're not a participant, wait for next state change - tracing::warn!("signature spawner: we are not a participant, waiting for state change"); - let _ = contract.next_state().await; - } - } - async fn run( mut self, mut sign_rx: mpsc::Receiver, @@ -1254,7 +1249,7 @@ impl SignatureSpawner { let mut posits = self.msg.subscribe_signature_posit().await; // Wait for initial governance info - let mut gov = Self::wait_governance(&mut contract).await; + let mut gov = contract.wait_governance().await; tracing::info!( me = ?gov.me, epoch = gov.epoch, @@ -1271,7 +1266,7 @@ impl SignatureSpawner { tracing::warn!("signature spawner sign_rx closed, terminating"); break; }; - self.handle_request(sign, &protocol, &gov, &contract); + self.handle_request(sign, &protocol, &gov); } Some((sign_id, presignature_id, from, action)) = posits.recv() => { self.handle_posit(sign_id, presignature_id, from, action, &gov).await; @@ -1320,11 +1315,16 @@ impl SignatureSpawner { public_key: running.public_key, participants: running.participants.keys().copied().collect(), }; + // Resume all tasks with the new governance info + self.broadcast_to_tasks(SignTaskMessage::Resume(gov.clone())).await; } else { tracing::warn!( epoch = running.epoch, "signature spawner: we are no longer a participant after epoch change" ); + // Abort all tasks since we're no longer a participant + self.broadcast_to_tasks(SignTaskMessage::Abort).await; + self.abort_all_tasks(); // Wait for next state change where we might be a participant again } } @@ -1332,10 +1332,10 @@ impl SignatureSpawner { crate::protocol::ProtocolState::Resharing(_) => { tracing::info!( epoch = gov.epoch, - "signature spawner: contract is resharing, existing tasks will pause" + "signature spawner: contract is resharing, pausing all tasks" ); - // Individual SignTasks handle pausing during resharing - // We continue to accept new requests but they'll queue up + // Pause all running tasks + self.broadcast_to_tasks(SignTaskMessage::Pause).await; } crate::protocol::ProtocolState::Initializing(_) => { tracing::debug!("signature spawner: contract is initializing"); diff --git a/chain-signatures/node/src/rpc.rs b/chain-signatures/node/src/rpc.rs index 199c0ac8..0836c4b1 100644 --- a/chain-signatures/node/src/rpc.rs +++ b/chain-signatures/node/src/rpc.rs @@ -7,6 +7,9 @@ use crate::protocol::contract::RunningContractState; use crate::protocol::{Chain, Governance, IndexedSignRequest, ProtocolState, SignRequestType}; use crate::util::AffinePointExt as _; +use mpc_crypto::PublicKey; +use std::collections::BTreeSet; + use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; use solana_sdk::signer::keypair::Keypair; @@ -123,6 +126,17 @@ impl RpcChannel { } } +/// Governance information obtained from the contract state. +/// Updated whenever the contract transitions to a new Running state. +#[derive(Debug, Clone)] +pub struct GovernanceInfo { + pub me: Participant, + pub threshold: usize, + pub epoch: u64, + pub public_key: PublicKey, + pub participants: BTreeSet, +} + #[derive(Clone)] pub struct ContractStateWatcher { account_id: AccountId, @@ -297,6 +311,44 @@ impl ContractStateWatcher { } } + /// Waits for the contract to be in Running state and extracts governance info. + /// Keeps waiting if we are not a participant in the current running state. + pub async fn wait_governance(&mut self) -> GovernanceInfo { + loop { + let running = self.wait_running().await; + if let Some(me) = running.participants.find_participant(&self.account_id) { + return GovernanceInfo { + me: *me, + threshold: running.threshold, + epoch: running.epoch, + public_key: PublicKey::from(running.public_key), + participants: running.participants.keys().copied().collect(), + }; + } + // We're not a participant, wait for next state change + tracing::warn!("wait_governance: we are not a participant, waiting for state change"); + let _ = self.contract_state.changed().await; + } + } + + /// Try to get governance info from current state without waiting. + /// Returns None if not in Running state or if we're not a participant. + pub fn governance(&self) -> Option { + match self.borrow_state().as_ref()? { + ProtocolState::Running(running) => { + let me = running.participants.find_participant(&self.account_id)?; + Some(GovernanceInfo { + me: *me, + threshold: running.threshold, + epoch: running.epoch, + public_key: PublicKey::from(running.public_key), + participants: running.participants.keys().copied().collect(), + }) + } + _ => None, + } + } + /// Create a list of contract states that share a single channel but use different account ids. #[cfg(feature = "test-feature")] pub fn test_batch( From 2e2409fbab69f9eadaf9c3722f7223f808cbf9c6 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 04:51:05 +0000 Subject: [PATCH 09/15] Move to SignTask tracking contract state --- .../node/src/protocol/signature.rs | 192 ++++++------------ 1 file changed, 58 insertions(+), 134 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 144dc8ce..0a6ca2e9 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -317,10 +317,7 @@ impl SignPositor { presignature_id, from, action, - } = task_msg else { - // Pause/Resume/Abort are handled by the outer run loop, not here - continue; - }; + } = task_msg; if !matches!(action, PositAction::Propose) { continue; @@ -469,10 +466,7 @@ impl SignPositor { let accepted_participants = loop { tokio::select! { Some(task_msg) = task_rx.recv() => { - let SignTaskMessage::PositMessage { presignature_id: _, from, action } = task_msg else { - // Pause/Resume/Abort are handled by the outer run loop, not here - continue; - }; + let SignTaskMessage::PositMessage { presignature_id: _, from, action } = task_msg; if is_deliberator { if let PositAction::Start(participants) = action { if from != proposer { @@ -962,6 +956,7 @@ struct SignTask { presignatures: PresignatureStorage, msg: MessageChannel, rpc: RpcChannel, + contract: ContractStateWatcher, // TODO: will be used in the future when we move requests channels // into the backlog. @@ -972,18 +967,35 @@ struct SignTask { } impl SignTask { + /// Check if the contract is in a state where we should pause. + /// Returns true if we should pause (initializing or resharing). + fn should_pause(&self) -> bool { + match self.contract.state() { + Some(crate::protocol::ProtocolState::Running(_)) => false, + Some(crate::protocol::ProtocolState::Initializing(_)) + | Some(crate::protocol::ProtocolState::Resharing(_)) + | None => true, + } + } + + /// Wait for the contract to return to Running state after being paused. + /// Updates governance info from the new Running state. + /// Returns the new governance info. + async fn wait_for_running(&mut self) -> GovernanceInfo { + self.contract.wait_governance().await + } + async fn run( - self, + mut self, indexed: IndexedSignRequest, mesh_state: watch::Receiver, mut task_rx: mpsc::Receiver, ) -> Result<(), SignError> { let sign_id = self.sign_id; - let task_epoch = self.epoch; tracing::info!( ?sign_id, me = ?self.me, - epoch = task_epoch, + epoch = self.epoch, "signature task starting with organizing loop" ); @@ -991,88 +1003,34 @@ impl SignTask { let mut phase = SignPhase::Organizing(SignOrganizer); loop { - // Check for control messages from the spawner (resharing, epoch change, abort) - // We do a non-blocking check before advancing the phase - match task_rx.try_recv() { - Ok(SignTaskMessage::Abort) => { - tracing::info!(?sign_id, epoch = task_epoch, "signature task received abort"); - return Err(SignError::Aborted); - } - Ok(SignTaskMessage::Pause) => { - tracing::info!( - ?sign_id, - epoch = task_epoch, - "signature task pausing: contract is resharing" - ); + // Check if we should pause due to contract state change (resharing/initializing) + if self.should_pause() { + tracing::info!( + ?sign_id, + epoch = self.epoch, + "signature task pausing: contract not in running state" + ); - // Wait for Resume or Abort - loop { - match task_rx.recv().await { - Some(SignTaskMessage::Resume(new_gov)) => { - tracing::info!( - ?sign_id, - old_epoch = task_epoch, - new_epoch = new_gov.epoch, - "signature task resuming with new governance info" - ); - // Note: We don't update self.gov here since we're checking - // if the epoch changed. If it did, we should abort. - if new_gov.epoch != task_epoch { - tracing::info!( - ?sign_id, - old_epoch = task_epoch, - new_epoch = new_gov.epoch, - "signature task aborting: epoch changed during resharing" - ); - return Err(SignError::Aborted); - } - // Same epoch, reset to organizing and continue - state.round = 0; - phase = SignPhase::Organizing(SignOrganizer); - break; - } - Some(SignTaskMessage::Abort) => { - tracing::info!( - ?sign_id, - epoch = task_epoch, - "signature task aborted during pause" - ); - return Err(SignError::Aborted); - } - Some(SignTaskMessage::Pause) => { - // Already paused, ignore - continue; - } - Some(SignTaskMessage::PositMessage { .. }) => { - // Ignore posit messages while paused - continue; - } - None => { - tracing::warn!( - ?sign_id, - "signature task channel closed during pause" - ); - return Err(SignError::Aborted); - } - } - } - continue; - } - Ok(SignTaskMessage::Resume(_)) => { - // Resume without being paused, ignore - } - Ok(SignTaskMessage::PositMessage { .. }) => { - // Posit messages are handled during phase advancement, put it back - // Actually, we can't put it back easily. The phase.advance will recv from task_rx. - // This is a design issue - we'll handle it differently. - } - Err(mpsc::error::TryRecvError::Empty) => { - // No control message, continue with phase - } - Err(mpsc::error::TryRecvError::Disconnected) => { - tracing::warn!(?sign_id, "signature task channel disconnected"); - return Err(SignError::Aborted); - } + // Wait for contract to return to Running state + let new_gov = self.wait_for_running().await; + tracing::info!( + ?sign_id, + old_epoch = self.epoch, + new_epoch = new_gov.epoch, + "signature task resuming with governance info" + ); + + // Update our governance info + self.me = new_gov.me; + self.threshold = new_gov.threshold; + self.epoch = new_gov.epoch; + self.public_key = new_gov.public_key; + self.participants = new_gov.participants; + + // Reset to organizing phase and continue + state.round = 0; + phase = SignPhase::Organizing(SignOrganizer); + continue; } phase = match phase.advance(&self, &mut state, &mut task_rx).await { @@ -1092,12 +1050,6 @@ enum SignTaskMessage { from: Participant, action: PositAction, }, - /// Pause the task (resharing started) - Pause, - /// Resume the task with new governance info (resharing complete) - Resume(GovernanceInfo), - /// Abort the task (epoch changed or we're no longer a participant) - Abort, } pub struct SignatureSpawner { @@ -1108,6 +1060,7 @@ pub struct SignatureSpawner { /// Buffered inboxes for posit messages, allowing us to queue before tasks spawn inboxes: HashMap>, mesh_state: watch::Receiver, + contract: ContractStateWatcher, my_account_id: AccountId, msg: MessageChannel, @@ -1140,6 +1093,7 @@ impl SignatureSpawner { presignatures: self.presignatures.clone(), msg: self.msg.clone(), rpc: self.rpc.clone(), + contract: self.contract.clone(), backlog: self.backlog.clone(), cfg, }; @@ -1183,33 +1137,7 @@ impl SignatureSpawner { } } - /// Send a message to all running tasks - async fn broadcast_to_tasks(&mut self, msg: SignTaskMessage) { - for inbox in self.inboxes.values_mut() { - let _ = inbox.send(msg.clone()).await; - } - } - - /// Abort all running tasks - fn abort_all_tasks(&mut self) { - tracing::info!( - task_count = self.tasks.len(), - "aborting all signature tasks" - ); - // Collect sign_ids from inboxes (which tracks all active sign requests) - let sign_ids: Vec = self.inboxes.keys().copied().collect(); - for sign_id in sign_ids { - self.tasks.abort(sign_id); - self.inboxes.remove(&sign_id); - } - } - - fn handle_request( - &mut self, - sign: Sign, - cfg: &ProtocolConfig, - gov: &GovernanceInfo, - ) { + fn handle_request(&mut self, sign: Sign, cfg: &ProtocolConfig, gov: &GovernanceInfo) { match sign { Sign::Completion(sign_id) => { self.handle_completion(sign_id); @@ -1315,27 +1243,22 @@ impl SignatureSpawner { public_key: running.public_key, participants: running.participants.keys().copied().collect(), }; - // Resume all tasks with the new governance info - self.broadcast_to_tasks(SignTaskMessage::Resume(gov.clone())).await; + // Tasks will notice the state change and update themselves } else { tracing::warn!( epoch = running.epoch, "signature spawner: we are no longer a participant after epoch change" ); - // Abort all tasks since we're no longer a participant - self.broadcast_to_tasks(SignTaskMessage::Abort).await; - self.abort_all_tasks(); - // Wait for next state change where we might be a participant again + // Tasks will notice and wait_governance will block until we're a participant again } } } crate::protocol::ProtocolState::Resharing(_) => { tracing::info!( epoch = gov.epoch, - "signature spawner: contract is resharing, pausing all tasks" + "signature spawner: contract is resharing, tasks will pause themselves" ); - // Pause all running tasks - self.broadcast_to_tasks(SignTaskMessage::Pause).await; + // Tasks will notice the state change and pause themselves } crate::protocol::ProtocolState::Initializing(_) => { tracing::debug!("signature spawner: contract is initializing"); @@ -1382,6 +1305,7 @@ impl SignatureSpawnerTask { my_account_id, presignatures: presignature_storage, mesh_state, + contract: contract.clone(), msg: msg_channel, rpc: rpc_channel, backlog, From e9bb12b24dd343e402237ac8897cbab92676497b Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 05:22:40 +0000 Subject: [PATCH 10/15] Some working tokio::select --- .../node/src/protocol/signature.rs | 80 +++++++++++-------- 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 0a6ca2e9..093800c4 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1001,41 +1001,55 @@ impl SignTask { let mut state = SignState::new(indexed, mesh_state); let mut phase = SignPhase::Organizing(SignOrganizer); + let mut paused = false; loop { - // Check if we should pause due to contract state change (resharing/initializing) - if self.should_pause() { - tracing::info!( - ?sign_id, - epoch = self.epoch, - "signature task pausing: contract not in running state" - ); - - // Wait for contract to return to Running state - let new_gov = self.wait_for_running().await; - tracing::info!( - ?sign_id, - old_epoch = self.epoch, - new_epoch = new_gov.epoch, - "signature task resuming with governance info" - ); - - // Update our governance info - self.me = new_gov.me; - self.threshold = new_gov.threshold; - self.epoch = new_gov.epoch; - self.public_key = new_gov.public_key; - self.participants = new_gov.participants; - - // Reset to organizing phase and continue - state.round = 0; - phase = SignPhase::Organizing(SignOrganizer); - continue; - } - - phase = match phase.advance(&self, &mut state, &mut task_rx).await { - SignPhase::Complete(result) => return result, - other => other, + tokio::select! { + new_phase = phase.advance(&self, &mut state, &mut task_rx), if !paused => { + match new_phase { + SignPhase::Complete(result) => return result, + other => phase = other, + } + } + Some(state) = self.contract.next_state() => { + match state { + crate::protocol::ProtocolState::Running(running) => { + paused = false; + if running.epoch != self.epoch { + // Epoch changed, update governance info + if let Some(&me) = running.participants.find_participant(&self.my_account_id) { + tracing::info!( + old_epoch = self.epoch, + new_epoch = running.epoch, + ?me, + "signature spawner: epoch changed, updating governance info" + ); + + // Update our governance info + self.me = me; + self.threshold = running.threshold; + self.epoch = running.epoch; + self.public_key = running.public_key; + self.participants = running.participants.keys().copied().collect(); + } else { + tracing::warn!( + epoch = running.epoch, + "signature spawner: we are no longer a participant after epoch change" + ); + return Err(SignError::Aborted); + } + } + } + crate::protocol::ProtocolState::Initializing(_) | + crate::protocol::ProtocolState::Resharing(_) => { + tracing::info!( + ?sign_id, + "contract entered non-running state" + ); + paused = true; + } + } + } } } } From 23846bcdb53751b880726685136764f6a6bd8d54 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 05:22:51 +0000 Subject: [PATCH 11/15] More working tokio::select --- .../node/src/protocol/signature.rs | 261 ++++++++++-------- 1 file changed, 153 insertions(+), 108 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 093800c4..c19629f2 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -105,19 +105,56 @@ enum SignPhase { Complete(Result<(), SignError>), } +/// Context passed to phase advancement - contains immutable references to task resources +struct SignContext<'a> { + gov: &'a GovernanceInfo, + sign_id: SignId, + my_account_id: &'a AccountId, + presignatures: &'a PresignatureStorage, + msg: &'a MessageChannel, + rpc: &'a RpcChannel, + cfg: &'a ProtocolConfig, +} + +impl<'a> SignContext<'a> { + fn me(&self) -> Participant { + self.gov.me + } + + fn threshold(&self) -> usize { + self.gov.threshold + } + + fn epoch(&self) -> u64 { + self.gov.epoch + } + + fn public_key(&self) -> mpc_crypto::PublicKey { + self.gov.public_key + } + + fn participants(&self) -> &BTreeSet { + &self.gov.participants + } +} + impl SignPhase { async fn advance( - self, - ctx: &SignTask, + &mut self, + ctx: &SignContext<'_>, state: &mut SignState, task_rx: &mut mpsc::Receiver, - ) -> SignPhase { - match self { + ) { + // Use take pattern to extract the current phase and replace with a temporary Complete(Ok(())) + // This allows us to call advance methods that consume self while still allowing &mut self here + let current = std::mem::replace(self, SignPhase::Complete(Ok(()))); + let next = match current { SignPhase::Organizing(phase) => phase.advance(ctx, state).await, SignPhase::Posit(phase) => phase.advance(ctx, state, task_rx).await, SignPhase::Generating(phase) => phase.advance(ctx, state).await, SignPhase::Complete(result) => SignPhase::Complete(result), - } + }; + *self = next; } } @@ -136,7 +173,7 @@ impl SignOrganizer { /// Waits for threshold stable participants to be present. async fn wait_stable( &self, - ctx: &SignTask, + ctx: &SignContext<'_>, state: &mut SignState, threshold: usize, ) -> Option> { @@ -168,12 +205,12 @@ impl SignOrganizer { } } - async fn advance(self, ctx: &SignTask, state: &mut SignState) -> SignPhase { + async fn advance(self, ctx: &SignContext<'_>, state: &mut SignState) -> SignPhase { let sign_id = ctx.sign_id; - let threshold = ctx.threshold; - let me = ctx.me; + let threshold = ctx.threshold(); + let me = ctx.me(); let entropy = state.indexed.args.entropy; - let participants = ctx.participants.iter().copied().collect::>(); + let participants = ctx.participants().iter().copied().collect::>(); tracing::info!(?sign_id, round = ?state.round, "entering organizing phase"); let (stable, proposer) = { @@ -219,16 +256,16 @@ impl SignOrganizer { (stable, proposer) }; - let is_proposer = proposer == ctx.me; + let is_proposer = proposer == ctx.me(); let (presignature_id, presignature, stable) = if is_proposer { tracing::info!(?sign_id, round = ?state.round, "proposer waiting for presignature"); let stable = stable.iter().copied().collect::>(); let mut recycle = Vec::new(); let fetch = tokio::time::timeout(Duration::from_secs(30), async { loop { - if let Some(taken) = ctx.presignatures.take_mine(ctx.me).await { + if let Some(taken) = ctx.presignatures.take_mine(ctx.me()).await { let participants = intersect_vec(&[&taken.artifact.participants, &stable]); - if participants.len() < ctx.threshold { + if participants.len() < ctx.threshold() { recycle.push(taken); continue; } @@ -266,16 +303,16 @@ impl SignOrganizer { // broadcast to participants and let them reject if they don't have the presignature. for &p in &participants { - if p == ctx.me { + if p == ctx.me() { continue; } ctx.msg .send( - ctx.me, + ctx.me(), p, PositMessage { id: PositProtocolId::Signature(sign_id, presignature_id), - from: ctx.me, + from: ctx.me(), action: PositAction::Propose, }, ) @@ -301,7 +338,7 @@ impl SignOrganizer { impl SignPositor { /// Deliberator waits for the proposer to send a Propose message with a presignature_id. async fn wait_propose( - ctx: &SignTask, + ctx: &SignContext<'_>, state: &mut SignState, task_rx: &mut mpsc::Receiver, proposer: Participant, @@ -340,11 +377,11 @@ impl SignPositor { ); ctx.msg .send( - ctx.me, + ctx.me(), proposer, PositMessage { id: PositProtocolId::Signature(sign_id, presignature_id), - from: ctx.me, + from: ctx.me(), action: PositAction::Reject, }, ) @@ -363,11 +400,11 @@ impl SignPositor { ctx.msg .send( - ctx.me, + ctx.me(), from, PositMessage { id: PositProtocolId::Signature(sign_id, presignature_id), - from: ctx.me, + from: ctx.me(), action: PositAction::Reject, }, ) @@ -394,11 +431,11 @@ impl SignPositor { // received propose, send Accept ctx.msg .send( - ctx.me, + ctx.me(), proposer, PositMessage { id: PositProtocolId::Signature(sign_id, presignature_id), - from: ctx.me, + from: ctx.me(), action: PositAction::Accept, }, ) @@ -409,7 +446,7 @@ impl SignPositor { async fn advance( self, - ctx: &SignTask, + ctx: &SignContext<'_>, state: &mut SignState, task_rx: &mut mpsc::Receiver, ) -> SignPhase { @@ -422,7 +459,7 @@ impl SignPositor { let sign_id = ctx.sign_id; let round = state.round; - let is_proposer = proposer == ctx.me; + let is_proposer = proposer == ctx.me(); let is_deliberator = !is_proposer; // Get the presignature participants - only these nodes participated in generating it @@ -457,7 +494,7 @@ impl SignPositor { // GUARANTEE: at least threshold participants from organizing phase. let posit_participants = stable.iter().copied().collect::>(); - let mut counter = SinglePositCounter::new(ctx.me, &posit_participants); + let mut counter = SinglePositCounter::new(ctx.me(), &posit_participants); let posit_timeout = Duration::from_secs(60); let posit_deadline = tokio::time::sleep(posit_timeout); @@ -474,7 +511,7 @@ impl SignPositor { continue; } - if participants.len() < ctx.threshold { + if participants.len() < ctx.threshold() { tracing::warn!( ?sign_id, ?round, @@ -484,7 +521,7 @@ impl SignPositor { return SignPhase::Organizing(SignOrganizer); } - tracing::info!(?sign_id, participant = ?ctx.me, ?participants, "deliberator received Start"); + tracing::info!(?sign_id, participant = ?ctx.me(), ?participants, "deliberator received Start"); break participants; } } else { @@ -492,11 +529,11 @@ impl SignPositor { continue; } - if counter.enough_rejects(ctx.threshold) { + if counter.enough_rejects(ctx.threshold()) { tracing::warn!(?sign_id, ?from, "received enough REJECTs, reorganizing"); if let Some(taken) = presignature { tracing::warn!(?sign_id, "recycling presignature due to REJECTs"); - ctx.presignatures.recycle_mine(ctx.me, taken).await; + ctx.presignatures.recycle_mine(ctx.me(), taken).await; } state.bump_round(); return SignPhase::Organizing(SignOrganizer); @@ -509,35 +546,35 @@ impl SignPositor { participants.retain(|p| presignature_participants.contains(p)); } - if participants.len() < ctx.threshold { + if participants.len() < ctx.threshold() { tracing::warn!( ?sign_id, presig_participants = ?presignature_participants, accepts = ?counter.accepts, filtered_participants = ?participants, - threshold = ctx.threshold, + threshold = ctx.threshold(), "not enough presignature participants accepted, reorganizing" ); if let Some(taken) = presignature { tracing::warn!(?sign_id, "recycling presignature due to insufficient participants"); - ctx.presignatures.recycle_mine(ctx.me, taken).await; + ctx.presignatures.recycle_mine(ctx.me(), taken).await; } state.bump_round(); return SignPhase::Organizing(SignOrganizer); } - tracing::info!(?sign_id, me = ?ctx.me, ?participants, "proposer broadcasting Start"); + tracing::info!(?sign_id, me = ?ctx.me(), ?participants, "proposer broadcasting Start"); for &p in &participants { - if p == ctx.me { + if p == ctx.me() { continue; } ctx.msg .send( - ctx.me, + ctx.me(), p, PositMessage { id: PositProtocolId::Signature(sign_id, presignature_id), - from: ctx.me, + from: ctx.me(), action: PositAction::Start(participants.clone()), }, ) @@ -549,25 +586,25 @@ impl SignPositor { } _ = &mut posit_deadline => { if is_proposer { - if counter.enough_accepts(ctx.threshold) { + if counter.enough_accepts(ctx.threshold()) { // Only include participants who both accepted AND were part of the presignature generation let mut participants = counter.accepts.iter().copied().collect::>(); if !presignature_participants.is_empty() { participants.retain(|p| presignature_participants.contains(p)); } - if participants.len() < ctx.threshold { + if participants.len() < ctx.threshold() { tracing::warn!( ?sign_id, presig_participants = ?presignature_participants, accepts = ?counter.accepts, filtered_participants = ?participants, - threshold = ctx.threshold, + threshold = ctx.threshold(), "posit timeout: not enough presignature participants accepted, reorganizing" ); if let Some(taken) = presignature { tracing::warn!(?sign_id, "recycling presignature due to posit timeout"); - ctx.presignatures.recycle_mine(ctx.me, taken).await; + ctx.presignatures.recycle_mine(ctx.me(), taken).await; } state.bump_round(); return SignPhase::Organizing(SignOrganizer); @@ -575,16 +612,16 @@ impl SignPositor { tracing::info!(?sign_id, "posit timeout with enough accepts, broadcasting Start"); for &p in &participants { - if p == ctx.me { + if p == ctx.me() { continue; } ctx.msg .send( - ctx.me, + ctx.me(), p, PositMessage { id: PositProtocolId::Signature(sign_id, presignature_id), - from: ctx.me, + from: ctx.me(), action: PositAction::Start(participants.clone()), }, ) @@ -595,7 +632,7 @@ impl SignPositor { tracing::warn!(?sign_id, "posit timeout without enough accepts, reorganizing"); if let Some(taken) = presignature { tracing::warn!(?sign_id, "recycling presignature due to posit timeout (no accepts)"); - ctx.presignatures.recycle_mine(ctx.me, taken).await; + ctx.presignatures.recycle_mine(ctx.me(), taken).await; } state.bump_round(); return SignPhase::Organizing(SignOrganizer); @@ -619,7 +656,7 @@ impl SignPositor { } impl SignGenerating { - async fn advance(mut self, ctx: &SignTask, state: &mut SignState) -> SignPhase { + async fn advance(mut self, ctx: &SignContext<'_>, state: &mut SignState) -> SignPhase { let sign_id = ctx.sign_id; let round = state.round; @@ -701,7 +738,7 @@ struct SignGenerator { impl SignGenerator { async fn new( - ctx: &SignTask, + ctx: &SignContext<'_>, proposer: Participant, indexed: IndexedSignRequest, presignature: PendingPresignature, @@ -719,7 +756,7 @@ impl SignGenerator { let sign_id = indexed.id; tracing::info!( - me = ?ctx.me, + me = ?ctx.me(), ?sign_id, presignature_id, "starting protocol to generate a new signature", @@ -736,8 +773,8 @@ impl SignGenerator { }; let protocol = Box::new(cait_sith::sign( &participants, - ctx.me, - derive_key(ctx.public_key, indexed.args.epsilon), + ctx.me(), + derive_key(ctx.public_key(), indexed.args.epsilon), output, indexed.args.payload, )?); @@ -782,10 +819,10 @@ impl SignGenerator { } } - async fn run(mut self, ctx: &SignTask) -> Result<(), SignError> { + async fn run(mut self, ctx: &SignContext<'_>) -> Result<(), SignError> { let my_account_id = &ctx.my_account_id; - let me = ctx.me; - let epoch = ctx.epoch; + let me = ctx.me(); + let epoch = ctx.epoch(); let accrued_wait_delay = crate::metrics::SIGNATURE_ACCRUED_WAIT_DELAY .with_label_values(&[my_account_id.as_str()]); @@ -897,7 +934,7 @@ impl SignGenerator { if self.proposer == me { ctx.rpc.publish( - ctx.public_key, + ctx.public_key(), self.indexed.clone(), output, self.participants.clone(), @@ -946,11 +983,7 @@ impl Drop for SignGenerator { } struct SignTask { - me: Participant, - threshold: usize, - epoch: u64, - public_key: mpc_crypto::PublicKey, - participants: BTreeSet, + gov: GovernanceInfo, sign_id: SignId, my_account_id: AccountId, presignatures: PresignatureStorage, @@ -967,24 +1000,6 @@ struct SignTask { } impl SignTask { - /// Check if the contract is in a state where we should pause. - /// Returns true if we should pause (initializing or resharing). - fn should_pause(&self) -> bool { - match self.contract.state() { - Some(crate::protocol::ProtocolState::Running(_)) => false, - Some(crate::protocol::ProtocolState::Initializing(_)) - | Some(crate::protocol::ProtocolState::Resharing(_)) - | None => true, - } - } - - /// Wait for the contract to return to Running state after being paused. - /// Updates governance info from the new Running state. - /// Returns the new governance info. - async fn wait_for_running(&mut self) -> GovernanceInfo { - self.contract.wait_governance().await - } - async fn run( mut self, indexed: IndexedSignRequest, @@ -994,62 +1009,96 @@ impl SignTask { let sign_id = self.sign_id; tracing::info!( ?sign_id, - me = ?self.me, - epoch = self.epoch, + me = ?self.gov.me, + epoch = self.gov.epoch, "signature task starting with organizing loop" ); let mut state = SignState::new(indexed, mesh_state); let mut phase = SignPhase::Organizing(SignOrganizer); - let mut paused = false; loop { + // Create context before select to avoid borrow issues + let ctx = SignContext { + gov: &self.gov, + sign_id: self.sign_id, + my_account_id: &self.my_account_id, + presignatures: &self.presignatures, + msg: &self.msg, + rpc: &self.rpc, + cfg: &self.cfg, + }; + tokio::select! { - new_phase = phase.advance(&self, &mut state, &mut task_rx), if !paused => { - match new_phase { - SignPhase::Complete(result) => return result, - other => phase = other, - } - } - Some(state) = self.contract.next_state() => { - match state { + biased; + + // Contract state changes take priority - cancel ongoing work + result = self.contract.next_state() => { + let Some(contract_state) = result else { + tracing::warn!(?sign_id, "contract state channel closed"); + return Err(SignError::Aborted); + }; + + match contract_state { crate::protocol::ProtocolState::Running(running) => { - paused = false; - if running.epoch != self.epoch { - // Epoch changed, update governance info + // Update governance info if epoch changed + if running.epoch != self.gov.epoch { if let Some(&me) = running.participants.find_participant(&self.my_account_id) { tracing::info!( - old_epoch = self.epoch, + ?sign_id, + old_epoch = self.gov.epoch, new_epoch = running.epoch, ?me, - "signature spawner: epoch changed, updating governance info" + "signature task: epoch changed, updating governance info" ); - - // Update our governance info - self.me = me; - self.threshold = running.threshold; - self.epoch = running.epoch; - self.public_key = running.public_key; - self.participants = running.participants.keys().copied().collect(); + self.gov = GovernanceInfo { + me, + threshold: running.threshold, + epoch: running.epoch, + public_key: running.public_key, + participants: running.participants.keys().copied().collect(), + }; + // Reset to organizing phase with new governance + state.round = 0; + phase = SignPhase::Organizing(SignOrganizer); } else { tracing::warn!( + ?sign_id, epoch = running.epoch, - "signature spawner: we are no longer a participant after epoch change" + "signature task: we are no longer a participant after epoch change" ); return Err(SignError::Aborted); } } + // If same epoch and running, just continue } crate::protocol::ProtocolState::Initializing(_) | crate::protocol::ProtocolState::Resharing(_) => { tracing::info!( ?sign_id, - "contract entered non-running state" + epoch = self.gov.epoch, + "signature task: contract entered non-running state, waiting for running" + ); + // Wait for contract to return to Running state + self.gov = self.contract.wait_governance().await; + tracing::info!( + ?sign_id, + new_epoch = self.gov.epoch, + "signature task: contract returned to running, resuming" ); - paused = true; + // Reset to organizing phase + state.round = 0; + phase = SignPhase::Organizing(SignOrganizer); } } } + + // Phase advancement + _ = phase.advance(&ctx, &mut state, &mut task_rx) => { + if let SignPhase::Complete(result) = phase { + return result; + } + } } } } @@ -1097,11 +1146,7 @@ impl SignatureSpawner { // Subscribe to (or create) the posit inbox for this sign request let rx = self.inboxes.entry(sign_id).or_default().subscribe(); let task = SignTask { - me: gov.me, - threshold: gov.threshold, - epoch: gov.epoch, - public_key: gov.public_key, - participants: gov.participants.clone(), + gov: gov.clone(), sign_id, my_account_id: self.my_account_id.clone(), presignatures: self.presignatures.clone(), From da01bde447e300a03a4d6b8923503d8d7054ca78 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 05:38:49 +0000 Subject: [PATCH 12/15] Working internal sign task transition --- .../node/src/protocol/signature.rs | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index c19629f2..ad83c333 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -9,13 +9,13 @@ use crate::protocol::message::{ use crate::protocol::posit::{PositAction, SinglePositCounter}; use crate::protocol::presignature::PresignatureId; use crate::protocol::Chain; +use crate::protocol::{ProtocolState, SignRequestType}; use crate::rpc::{ContractStateWatcher, GovernanceInfo, RpcChannel}; use crate::storage::presignature_storage::{PresignatureTaken, PresignatureTakenDropper}; use crate::storage::PresignatureStorage; use crate::types::SignatureProtocol; use crate::util::{AffinePointExt, JoinMap}; -use crate::protocol::SignRequestType; use cait_sith::protocol::{Action, InitializationError, Participant}; use cait_sith::PresignOutput; use chrono::Utc; @@ -148,6 +148,7 @@ impl SignPhase { // Use take pattern to extract the current phase and replace with a temporary Complete(Ok(())) // This allows us to call advance methods that consume self while still allowing &mut self here let current = std::mem::replace(self, SignPhase::Complete(Ok(()))); + let next = match current { SignPhase::Organizing(phase) => phase.advance(ctx, state).await, SignPhase::Posit(phase) => phase.advance(ctx, state, task_rx).await, @@ -1030,17 +1031,16 @@ impl SignTask { }; tokio::select! { - biased; - // Contract state changes take priority - cancel ongoing work - result = self.contract.next_state() => { - let Some(contract_state) = result else { + biased; + contract = self.contract.next_state() => { + let Some(contract) = contract else { tracing::warn!(?sign_id, "contract state channel closed"); return Err(SignError::Aborted); }; - match contract_state { - crate::protocol::ProtocolState::Running(running) => { + match contract { + ProtocolState::Running(running) => { // Update governance info if epoch changed if running.epoch != self.gov.epoch { if let Some(&me) = running.participants.find_participant(&self.my_account_id) { @@ -1049,7 +1049,7 @@ impl SignTask { old_epoch = self.gov.epoch, new_epoch = running.epoch, ?me, - "signature task: epoch changed, updating governance info" + "sign task: epoch changed, updating governance info" ); self.gov = GovernanceInfo { me, @@ -1065,26 +1065,25 @@ impl SignTask { tracing::warn!( ?sign_id, epoch = running.epoch, - "signature task: we are no longer a participant after epoch change" + "sign task: we are no longer a participant after epoch change" ); return Err(SignError::Aborted); } } // If same epoch and running, just continue } - crate::protocol::ProtocolState::Initializing(_) | - crate::protocol::ProtocolState::Resharing(_) => { + ProtocolState::Initializing(_) | ProtocolState::Resharing(_) => { tracing::info!( ?sign_id, epoch = self.gov.epoch, - "signature task: contract entered non-running state, waiting for running" + "sign task: contract entered non-running state, waiting for running" ); // Wait for contract to return to Running state self.gov = self.contract.wait_governance().await; tracing::info!( ?sign_id, new_epoch = self.gov.epoch, - "signature task: contract returned to running, resuming" + "sign task: contract returned to running, resuming" ); // Reset to organizing phase state.round = 0; @@ -1093,7 +1092,7 @@ impl SignTask { } } - // Phase advancement + // This will be cancelled when the contract state changes. _ = phase.advance(&ctx, &mut state, &mut task_rx) => { if let SignPhase::Complete(result) = phase { return result; @@ -1285,7 +1284,7 @@ impl SignatureSpawner { // Contract state changed - check if we need to update governance info if let Some(state) = contract.state() { match state { - crate::protocol::ProtocolState::Running(running) => { + ProtocolState::Running(running) => { if running.epoch != gov.epoch { // Epoch changed, update governance info if let Some(me) = running.participants.find_participant(contract.my_account_id()) { @@ -1312,14 +1311,14 @@ impl SignatureSpawner { } } } - crate::protocol::ProtocolState::Resharing(_) => { + ProtocolState::Resharing(_) => { tracing::info!( epoch = gov.epoch, "signature spawner: contract is resharing, tasks will pause themselves" ); // Tasks will notice the state change and pause themselves } - crate::protocol::ProtocolState::Initializing(_) => { + ProtocolState::Initializing(_) => { tracing::debug!("signature spawner: contract is initializing"); } } From ca03cf2ccb3c6c5edb050a97d7612358b4d06c77 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 05:50:38 +0000 Subject: [PATCH 13/15] Clippy --- chain-signatures/node/src/cli.rs | 3 --- chain-signatures/node/src/protocol/mod.rs | 15 +++++++-------- chain-signatures/node/src/protocol/signature.rs | 1 + chain-signatures/node/src/protocol/test_setup.rs | 3 --- .../src/mpc_fixture/fixture_interface.rs | 5 ++++- 5 files changed, 12 insertions(+), 15 deletions(-) diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 549928fd..d8a866a1 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -322,7 +322,6 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { let protocol = MpcSignProtocol { my_account_id: account_id.clone(), - rpc_channel, msg_channel: msg_channel.clone(), generating: msg_channel.subscribe_generation().await, resharing: msg_channel.subscribe_resharing().await, @@ -331,10 +330,8 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { secret_storage: key_storage, triple_storage: triple_storage.clone(), presignature_storage: presignature_storage.clone(), - contract: contract_watcher.clone(), config: config_rx, mesh_state: mesh_state.clone(), - backlog: backlog.clone(), }; tracing::info!("protocol initialized"); diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index d5665aa5..6fd878c1 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -22,7 +22,6 @@ pub use signature::{IndexedSignRequest, Sign}; use signet_program::SignBidirectionalEvent; pub use state::{Node, NodeState}; -use crate::backlog::Backlog; use crate::config::Config; use crate::mesh::MeshState; use crate::protocol::consensus::ConsensusProtocol; @@ -30,7 +29,7 @@ use crate::protocol::cryptography::CryptographicProtocol; use crate::protocol::message::{GeneratingMessage, ReadyMessage, ResharingMessage}; use crate::protocol::signature::SignatureSpawnerTask; use crate::respond_bidirectional::RespondBidirectionalTx; -use crate::rpc::{ContractStateWatcher, RpcChannel}; +use crate::rpc::ContractStateWatcher; use crate::storage::presignature_storage::PresignatureStorage; use crate::storage::secret_storage::SecretNodeStorageBox; use crate::storage::triple_storage::TripleStorage; @@ -47,19 +46,19 @@ pub struct MpcSignProtocol { pub(crate) secret_storage: SecretNodeStorageBox, pub(crate) triple_storage: TripleStorage, pub(crate) presignature_storage: PresignatureStorage, - /// The signature spawner task handle - started immediately and persists across resharing. - /// The spawner waits for the contract to reach Running state before processing requests, - /// and dynamically updates governance info when the contract state changes. pub(crate) sign_task: SignatureSpawnerTask, pub(crate) generating: mpsc::Receiver, pub(crate) resharing: mpsc::Receiver, pub(crate) ready: mpsc::Receiver, pub(crate) msg_channel: MessageChannel, - pub(crate) rpc_channel: RpcChannel, - pub(crate) contract: ContractStateWatcher, pub(crate) config: watch::Receiver, pub(crate) mesh_state: watch::Receiver, - pub(crate) backlog: Backlog, +} + +impl Drop for MpcSignProtocol { + fn drop(&mut self) { + self.sign_task.abort(); + } } /// Interface required by the [`MpcSignProtocol`] to participate in the diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index ad83c333..adddcdac 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1346,6 +1346,7 @@ impl SignatureSpawnerTask { /// The spawner will wait for the contract to reach Running state before processing requests. /// It dynamically obtains governance info (me, threshold, epoch, public_key) from the contract /// state and updates when the contract transitions to a new epoch after resharing. + #[allow(clippy::too_many_arguments)] pub fn run( my_account_id: AccountId, sign_rx: mpsc::Receiver, diff --git a/chain-signatures/node/src/protocol/test_setup.rs b/chain-signatures/node/src/protocol/test_setup.rs index cf82a65c..794f2dd4 100644 --- a/chain-signatures/node/src/protocol/test_setup.rs +++ b/chain-signatures/node/src/protocol/test_setup.rs @@ -58,11 +58,8 @@ impl MpcSignProtocol { generating, resharing, ready, - rpc_channel: channels.rpc_channel, - contract, config: channels.config, mesh_state: channels.mesh_state, - backlog, } } } diff --git a/integration-tests/src/mpc_fixture/fixture_interface.rs b/integration-tests/src/mpc_fixture/fixture_interface.rs index 5a5d0483..75458b31 100644 --- a/integration-tests/src/mpc_fixture/fixture_interface.rs +++ b/integration-tests/src/mpc_fixture/fixture_interface.rs @@ -126,7 +126,10 @@ impl MpcFixture { let _ = self .shared_contract_state .send(Some(ProtocolState::Running(running_state))); - tracing::info!(epoch = resharing.old_epoch, "completed resharing (same epoch)"); + tracing::info!( + epoch = resharing.old_epoch, + "completed resharing (same epoch)" + ); } else { tracing::warn!("cannot complete resharing: contract not in Resharing state"); } From df69855ae31a56ac7532b01a86019df79b6c7c7e Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Thu, 27 Nov 2025 06:53:38 +0000 Subject: [PATCH 14/15] Remove clone impl --- chain-signatures/node/src/protocol/posit.rs | 2 +- chain-signatures/node/src/protocol/signature.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/chain-signatures/node/src/protocol/posit.rs b/chain-signatures/node/src/protocol/posit.rs index a60ba9e8..98ff50ee 100644 --- a/chain-signatures/node/src/protocol/posit.rs +++ b/chain-signatures/node/src/protocol/posit.rs @@ -29,7 +29,7 @@ impl Positor { } /// All actions that can be taken when a new posit is introduced for a protocol. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] pub enum PositAction { Propose, Start(Vec), diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index adddcdac..9b32c1fb 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1104,7 +1104,6 @@ impl SignTask { } /// Message types that can be sent to a running signature task -#[derive(Clone)] enum SignTaskMessage { /// Posit message from another node PositMessage { From 47c6f36909acab8028f75d545e99ff562eb2527e Mon Sep 17 00:00:00 2001 From: Phuong N Date: Tue, 13 Jan 2026 21:57:36 +0000 Subject: [PATCH 15/15] Fix rogue respond issue --- integration-tests/src/actions/sign.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/actions/sign.rs b/integration-tests/src/actions/sign.rs index d667e50f..820f0360 100644 --- a/integration-tests/src/actions/sign.rs +++ b/integration-tests/src/actions/sign.rs @@ -852,7 +852,13 @@ impl SignAction<'_> { .await?; let err = wait_for::rogue_message_responded(rogue_status).await?; - assert!(err.contains(&errors::RespondError::InvalidSignature.to_string())); + // The rogue respond can race with the honest one; if the honest response lands first + // the contract returns RequestNotFound instead of InvalidSignature. + assert!( + err.contains(&errors::RespondError::InvalidSignature.to_string()) + || err.contains(&errors::InvalidParameters::RequestNotFound.to_string()), + "unexpected rogue respond error: {err}" + ); Some(rogue) } else { None