Skip to content

Commit caf596a

Browse files
committed
fix: wire P2P backpressure to resolve 'no available capacity' errors
- Add process_retries() periodic task (500ms interval) - Wire ChannelMetrics with RealP2PSender::with_metrics() - Replace all 15 try_send() calls with send_timeout() by priority: - HIGH (200ms): StateMutationProposal/Vote (2 calls) - MEDIUM (100ms): Heartbeat (1 call) - LOW (10ms): Storage/Challenge broadcasts (12 calls) - Fix bug: increment dropped_count when max retries exceeded - Add retry queue integration tests Fixes P2P broadcast channel overflow that was dropping messages without backpressure, causing 'no available capacity' errors.
1 parent 317365a commit caf596a

File tree

4 files changed

+278
-17
lines changed

4 files changed

+278
-17
lines changed

bins/validator-node/src/main.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use platform_distributed_storage::{
3434

3535
use platform_p2p_consensus::{
3636
ChainState, ConsensusEngine, EvaluationMessage, EvaluationMetrics, EvaluationRecord,
37-
HeartbeatMessage, JobRecord, JobStatus, NetworkEvent, P2PConfig, P2PMessage, P2PNetwork,
37+
HeartbeatMessage, JobRecord, JobStatus, MessagePriority, NetworkEvent, P2PConfig, P2PMessage, P2PNetwork,
3838
StateManager, StorageProposal, StorageVoteMessage, TaskProgressRecord, ValidatorRecord,
3939
ValidatorSet,
4040
};
@@ -1512,7 +1512,7 @@ async fn main() -> Result<()> {
15121512
};
15131513

15141514
let msg = P2PMessage::ChallengeUpdate(update_msg);
1515-
if let Err(e) = p2p_broadcast_tx.try_send(platform_p2p_consensus::P2PCommand::Broadcast(msg)) {
1515+
if let Err(e) = p2p_broadcast_tx.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(msg), MessagePriority::Low.timeout()).await {
15161516
error!("Failed to broadcast sudo action: {}", e);
15171517
} else {
15181518
info!("Sudo action broadcast to P2P network");
@@ -1543,7 +1543,7 @@ async fn main() -> Result<()> {
15431543
};
15441544

15451545
let msg = P2PMessage::ChallengeUpdate(update_msg);
1546-
if let Err(e) = p2p_broadcast_tx.try_send(platform_p2p_consensus::P2PCommand::Broadcast(msg)) {
1546+
if let Err(e) = p2p_broadcast_tx.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(msg), MessagePriority::Low.timeout()).await {
15471547
error!("Failed to broadcast ChallengeUpdate: {}", e);
15481548
} else {
15491549
info!(
@@ -1582,7 +1582,7 @@ async fn main() -> Result<()> {
15821582
);
15831583

15841584
if let Err(e) = p2p_broadcast_tx
1585-
.try_send(platform_p2p_consensus::P2PCommand::Broadcast(mutation_msg))
1585+
.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(mutation_msg), MessagePriority::High.timeout()).await
15861586
{
15871587
warn!(error = %e, "Failed to broadcast StateMutationProposal");
15881588
}
@@ -1836,7 +1836,7 @@ async fn main() -> Result<()> {
18361836
},
18371837
});
18381838

1839-
if let Err(e) = p2p_broadcast_tx.try_send(platform_p2p_consensus::P2PCommand::Broadcast(heartbeat)) {
1839+
if let Err(e) = p2p_broadcast_tx.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(heartbeat), MessagePriority::Medium.timeout()).await {
18401840
warn!("Failed to broadcast heartbeat: {}", e);
18411841
}
18421842

@@ -2171,7 +2171,7 @@ async fn main() -> Result<()> {
21712171
}
21722172
);
21732173

2174-
if let Err(e) = p2p_broadcast_tx.try_send(platform_p2p_consensus::P2PCommand::Broadcast(proposal_msg)) {
2174+
if let Err(e) = p2p_broadcast_tx.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(proposal_msg), MessagePriority::Low.timeout()).await {
21752175
warn!(error = %e, "Failed to broadcast sync proposal");
21762176
}
21772177
}
@@ -2676,7 +2676,7 @@ async fn handle_network_event(
26762676
);
26772677

26782678
if let Err(e) =
2679-
p2p_sender.try_send(platform_p2p_consensus::P2PCommand::Broadcast(req))
2679+
p2p_sender.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(req), MessagePriority::Low.timeout()).await
26802680
{
26812681
warn!(error = %e, "Failed to send core state request");
26822682
}
@@ -2910,7 +2910,7 @@ async fn handle_network_event(
29102910
);
29112911

29122912
if let Err(e) = p2p_sender
2913-
.try_send(platform_p2p_consensus::P2PCommand::Broadcast(response))
2913+
.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(response), MessagePriority::Low.timeout()).await
29142914
{
29152915
warn!(error = %e, "Failed to send storage data response");
29162916
} else {
@@ -3072,7 +3072,7 @@ async fn handle_network_event(
30723072
},
30733073
);
30743074
if let Err(e) =
3075-
p2p_sender.try_send(platform_p2p_consensus::P2PCommand::Broadcast(resp))
3075+
p2p_sender.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(resp), MessagePriority::Low.timeout()).await
30763076
{
30773077
warn!(error = %e, "Failed to send leaderboard response");
30783078
}
@@ -3574,7 +3574,7 @@ async fn handle_network_event(
35743574
}
35753575

35763576
if let Err(e) =
3577-
p2p_sender.try_send(platform_p2p_consensus::P2PCommand::Broadcast(vote_msg))
3577+
p2p_sender.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(vote_msg), MessagePriority::Low.timeout()).await
35783578
{
35793579
warn!(error = %e, "Failed to broadcast storage vote");
35803580
}
@@ -3776,7 +3776,7 @@ async fn handle_network_event(
37763776
);
37773777

37783778
if let Err(e) = p2p_sender
3779-
.try_send(platform_p2p_consensus::P2PCommand::Broadcast(req))
3779+
.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(req), MessagePriority::Low.timeout()).await
37803780
{
37813781
warn!(error = %e, "Failed to send storage sync request");
37823782
}
@@ -3847,7 +3847,7 @@ async fn handle_network_event(
38473847
);
38483848

38493849
if let Err(e) =
3850-
p2p_sender.try_send(platform_p2p_consensus::P2PCommand::Broadcast(vote_msg))
3850+
p2p_sender.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(vote_msg), MessagePriority::High.timeout()).await
38513851
{
38523852
warn!(error = %e, "Failed to broadcast state mutation vote");
38533853
}
@@ -4032,7 +4032,7 @@ async fn handle_network_event(
40324032
);
40334033

40344034
if let Err(e) =
4035-
p2p_sender.try_send(platform_p2p_consensus::P2PCommand::Broadcast(response))
4035+
p2p_sender.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(response), MessagePriority::Low.timeout()).await
40364036
{
40374037
warn!(error = %e, "Failed to send core state response");
40384038
}
@@ -4171,8 +4171,8 @@ async fn handle_network_event(
41714171
);
41724172

41734173
if let Err(e) =
4174-
p2p_sender.try_send(platform_p2p_consensus::P2PCommand::Broadcast(req))
4175-
{
4174+
p2p_sender.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(req), MessagePriority::Low.timeout()).await
4175+
{
41764176
warn!(error = %e, "Failed to send storage sync request");
41774177
}
41784178
}
@@ -4283,7 +4283,7 @@ async fn handle_network_event(
42834283
);
42844284

42854285
if let Err(e) = p2p_sender
4286-
.try_send(platform_p2p_consensus::P2PCommand::Broadcast(resp))
4286+
.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(resp), MessagePriority::Low.timeout()).await
42874287
{
42884288
warn!(error = %e, "Failed to send storage sync response");
42894289
} else {
@@ -5812,7 +5812,7 @@ async fn process_wasm_evaluations(
58125812
signature,
58135813
timestamp,
58145814
});
5815-
if let Err(e) = p2p_sender.try_send(platform_p2p_consensus::P2PCommand::Broadcast(eval_msg))
5815+
if let Err(e) = p2p_sender.send_timeout(platform_p2p_consensus::P2PCommand::Broadcast(eval_msg), MessagePriority::Low.timeout()).await
58165816
{
58175817
warn!(
58185818
submission_id = %submission_id,

crates/p2p-consensus/src/network.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ impl RealP2PSender {
465465
Err(e) => {
466466
let next_attempt = attempt + 1;
467467
if !self.retry_queue.enqueue(cmd, next_attempt) {
468+
if let Some(ref metrics) = self.metrics {
469+
metrics.increment_dropped();
470+
}
468471
error!(
469472
attempt = next_attempt,
470473
error = %e,

tests/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ required-features = ["wasm-tests"]
5353
name = "channel_overflow_test"
5454
path = "channel_overflow_test.rs"
5555

56+
[[test]]
57+
name = "retry_queue_integration_test"
58+
path = "retry_queue_integration_test.rs"
59+
5660
[dependencies]
5761
platform-core = { path = "../crates/core" }
5862
platform-storage = { path = "../crates/storage" }

0 commit comments

Comments
 (0)