Skip to content

Commit 7c2e58d

Browse files
committed
fix: comprehensive audit fixes - state divergence, amplification, memory leaks, validation
CRITICAL: - validated_agent_logs: deterministic pruning (sorted keys instead of HashMap random iteration order that caused state divergence) - DataResponse/StorageSyncResponse: add target field so non-targeted validators skip processing (O(n²) broadcast amplification fix) HIGH: - CoreStateResponse: verify signature + state_hash before merge_from (prevents malicious state injection) - consensus decisions HashMap: prune to last 1000 (was unbounded) - cleanup_stale_data: prune task_progress, review_assignments by age, cap leaderboard to top 1000 per challenge, prune completed_evaluations to last 10 epochs MEDIUM: - Evaluation messages: reject from non-validators (was accepting with 0 stake) - Submission messages: reject from unregistered miners - committed HashMap: deterministic eviction by timestamp (was random) - Shutdown: 30s timeout to prevent indefinite hang
1 parent 3bb2cfa commit 7c2e58d

File tree

5 files changed

+174
-42
lines changed

5 files changed

+174
-42
lines changed

bins/validator-node/src/main.rs

Lines changed: 97 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1594,6 +1594,8 @@ async fn main() -> Result<()> {
15941594
state_manager.apply(|state| state.cleanup_old_state_mutations(300_000));
15951595
// Clean up old storage proposals (5 min timeout)
15961596
state_manager.apply(|state| state.cleanup_old_proposals(300_000));
1597+
// Prune stale task progress, reviews, leaderboard, completed evaluations (1 hour)
1598+
state_manager.apply(|state| state.cleanup_stale_data(3_600_000));
15971599
}
15981600

15991601
// Weight submission is handled entirely by CommitWindowOpen in handle_block_event.
@@ -1836,35 +1838,45 @@ async fn main() -> Result<()> {
18361838
_ = &mut shutdown_signal => {
18371839
info!("Received shutdown signal, persisting state...");
18381840

1839-
// Persist core state (wasm_challenge_configs, routes, validators)
1840-
if let Err(e) = persist_core_state_to_storage(&storage, &chain_state).await {
1841-
error!("Failed to persist core state on shutdown: {}", e);
1842-
} else {
1843-
info!("Core state persisted on shutdown");
1844-
}
1841+
let shutdown_result = tokio::time::timeout(
1842+
std::time::Duration::from_secs(30),
1843+
async {
1844+
// Persist core state (wasm_challenge_configs, routes, validators)
1845+
if let Err(e) = persist_core_state_to_storage(&storage, &chain_state).await {
1846+
error!("Failed to persist core state on shutdown: {}", e);
1847+
} else {
1848+
info!("Core state persisted on shutdown");
1849+
}
18451850

1846-
// Persist P2P consensus state
1847-
if let Err(e) = persist_state_to_storage(&storage, &state_manager).await {
1848-
error!("Failed to persist P2P state on shutdown: {}", e);
1849-
} else {
1850-
info!("P2P state persisted on shutdown");
1851-
}
1851+
// Persist P2P consensus state
1852+
if let Err(e) = persist_state_to_storage(&storage, &state_manager).await {
1853+
error!("Failed to persist P2P state on shutdown: {}", e);
1854+
} else {
1855+
info!("P2P state persisted on shutdown");
1856+
}
18521857

1853-
// Flush storage to disk to ensure all writes are durable
1854-
if let Err(e) = local_storage.flush_if_dirty().await {
1855-
error!("Failed to flush storage on shutdown: {}", e);
1856-
} else {
1857-
info!("Storage flushed to disk on shutdown");
1858-
}
1858+
// Flush storage to disk to ensure all writes are durable
1859+
if let Err(e) = local_storage.flush_if_dirty().await {
1860+
error!("Failed to flush storage on shutdown: {}", e);
1861+
} else {
1862+
info!("Storage flushed to disk on shutdown");
1863+
}
18591864

1860-
// Create checkpoint
1861-
if let Some(handler) = shutdown_handler.as_mut() {
1862-
if let Err(e) = handler.create_checkpoint() {
1863-
error!("Failed to create shutdown checkpoint: {}", e);
1864-
} else {
1865-
info!("Shutdown checkpoint saved successfully");
1865+
// Create checkpoint
1866+
if let Some(handler) = shutdown_handler.as_mut() {
1867+
if let Err(e) = handler.create_checkpoint() {
1868+
error!("Failed to create shutdown checkpoint: {}", e);
1869+
} else {
1870+
info!("Shutdown checkpoint saved successfully");
1871+
}
1872+
}
18661873
}
1874+
).await;
1875+
1876+
if shutdown_result.is_err() {
1877+
error!("Shutdown timed out after 30s, forcing exit");
18671878
}
1879+
18681880
info!("Shutting down...");
18691881
break;
18701882
}
@@ -2216,6 +2228,14 @@ async fn handle_network_event(
22162228
}
22172229
}
22182230
P2PMessage::Submission(sub) => {
2231+
// Verify miner is a registered hotkey
2232+
if !chain_state.read().registered_hotkeys.contains(&sub.miner) {
2233+
warn!(
2234+
miner = %sub.miner.to_ss58(),
2235+
"Submission from unregistered miner, ignoring"
2236+
);
2237+
return;
2238+
}
22192239
info!(
22202240
submission_id = %sub.submission_id,
22212241
challenge_id = %sub.challenge_id,
@@ -2251,17 +2271,22 @@ async fn handle_network_event(
22512271
}
22522272
}
22532273
P2PMessage::Evaluation(eval) => {
2274+
let validator_hotkey = eval.validator.clone();
2275+
let vinfo = validator_set.get_validator(&validator_hotkey);
2276+
if vinfo.is_none() {
2277+
warn!(
2278+
validator = %validator_hotkey.to_ss58(),
2279+
"Evaluation from non-validator, ignoring"
2280+
);
2281+
return;
2282+
}
2283+
let stake = vinfo.map(|v| v.stake).unwrap_or(0);
22542284
info!(
22552285
submission_id = %eval.submission_id,
22562286
validator = %eval.validator.to_ss58(),
22572287
score = eval.score,
22582288
"Received evaluation from peer validator"
22592289
);
2260-
let validator_hotkey = eval.validator.clone();
2261-
let stake = validator_set
2262-
.get_validator(&validator_hotkey)
2263-
.map(|v| v.stake)
2264-
.unwrap_or(0);
22652290
let validator_eval = platform_p2p_consensus::ValidatorEvaluation {
22662291
score: eval.score,
22672292
stake,
@@ -2391,6 +2416,7 @@ async fn handle_network_event(
23912416
platform_p2p_consensus::DataResponseMessage {
23922417
request_id: request_id.clone(),
23932418
responder: keypair.hotkey(),
2419+
target: Some(req.requester.clone()),
23942420
challenge_id,
23952421
data_type: "challenge_storage".to_string(),
23962422
data,
@@ -2424,6 +2450,12 @@ async fn handle_network_event(
24242450
}
24252451
}
24262452
P2PMessage::DataResponse(resp) => {
2453+
if let Some(ref target) = resp.target {
2454+
if *target != keypair.hotkey() {
2455+
return;
2456+
}
2457+
}
2458+
24272459
debug!(
24282460
request_id = %resp.request_id,
24292461
responder = %resp.responder.to_ss58(),
@@ -3434,9 +3466,38 @@ async fn handle_network_event(
34343466
if !validator_set.is_validator(&resp.responder) {
34353467
warn!(responder = %resp.responder.to_ss58(), "Core state response from unknown validator");
34363468
} else {
3469+
// Verify application-level signature before trusting the data
3470+
let sign_data = bincode::serialize(&(
3471+
&resp.state_hash,
3472+
resp.mutation_sequence,
3473+
resp.timestamp,
3474+
))
3475+
.unwrap_or_default();
3476+
let signed_msg = platform_core::SignedMessage {
3477+
message: sign_data,
3478+
signature: resp.signature.clone(),
3479+
signer: resp.responder.clone(),
3480+
};
3481+
if !signed_msg.verify().unwrap_or(false) {
3482+
warn!(
3483+
responder = %resp.responder.to_ss58(),
3484+
"Invalid signature on CoreStateResponse, ignoring"
3485+
);
3486+
return;
3487+
}
3488+
34373489
// Deserialize and merge
34383490
match serde_json::from_slice::<platform_core::ChainState>(&resp.state_data) {
34393491
Ok(peer_state) => {
3492+
// Verify state_hash matches the actual content
3493+
if peer_state.state_hash != resp.state_hash {
3494+
warn!(
3495+
responder = %resp.responder.to_ss58(),
3496+
"CoreStateResponse state_hash mismatch, ignoring"
3497+
);
3498+
return;
3499+
}
3500+
34403501
let mut cs = chain_state.write();
34413502
if cs.merge_from(&peer_state) {
34423503
info!(
@@ -3603,6 +3664,7 @@ async fn handle_network_event(
36033664
platform_p2p_consensus::StorageSyncResponseMessage {
36043665
challenge_id: req.challenge_id,
36053666
responder: keypair.hotkey(),
3667+
target: Some(req.requester.clone()),
36063668
data_hash,
36073669
entries,
36083670
total_entries: total,
@@ -3636,6 +3698,12 @@ async fn handle_network_event(
36363698
}
36373699
}
36383700
P2PMessage::StorageSyncResponse(resp) => {
3701+
if let Some(ref target) = resp.target {
3702+
if *target != keypair.hotkey() {
3703+
return;
3704+
}
3705+
}
3706+
36393707
if !validator_set.is_validator(&resp.responder) {
36403708
warn!(responder = %resp.responder.to_ss58(), "Storage sync response from non-validator");
36413709
} else if resp.entries.is_empty() {

crates/distributed-storage/src/validated_storage.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -684,15 +684,13 @@ impl<S: DistributedStore + 'static> ValidatedStorage<S> {
684684
{
685685
let mut committed = self.committed.write().await;
686686
committed.insert(*proposal_id, result.clone());
687-
// Evict old committed results to bound memory (keep last 1000)
687+
// Evict oldest committed results to bound memory (keep last 1000)
688688
if committed.len() > 1000 {
689-
let oldest: Vec<[u8; 32]> = committed
690-
.iter()
691-
.take(committed.len() - 1000)
692-
.map(|(id, _)| *id)
693-
.collect();
694-
for id in oldest {
695-
committed.remove(&id);
689+
let mut by_time: Vec<([u8; 32], i64)> =
690+
committed.iter().map(|(id, r)| (*id, r.timestamp)).collect();
691+
by_time.sort_by_key(|(_, ts)| *ts);
692+
for (id, _) in by_time.iter().take(committed.len() - 1000) {
693+
committed.remove(id);
696694
}
697695
}
698696
}

crates/p2p-consensus/src/consensus.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,15 @@ impl ConsensusEngine {
718718
// Store decision
719719
let seq = round.sequence;
720720
drop(round_guard);
721-
self.decisions.write().insert(seq, decision.clone());
721+
{
722+
let mut decisions = self.decisions.write();
723+
decisions.insert(seq, decision.clone());
724+
// Prune old decisions to bound memory (keep last 1000)
725+
if decisions.len() > 1000 {
726+
let cutoff = seq.saturating_sub(1000);
727+
decisions.retain(|s, _| *s > cutoff);
728+
}
729+
}
722730

723731
// Increment sequence
724732
*self.next_sequence.write() += 1;

crates/p2p-consensus/src/messages.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,9 @@ pub struct DataResponseMessage {
526526
pub request_id: String,
527527
/// Validator providing the data
528528
pub responder: Hotkey,
529+
/// Target requester (only this validator should process the response)
530+
#[serde(default)]
531+
pub target: Option<Hotkey>,
529532
/// Challenge the data belongs to
530533
pub challenge_id: ChallengeId,
531534
/// Type of data being returned
@@ -1090,6 +1093,9 @@ pub struct StorageSyncResponseMessage {
10901093
pub challenge_id: ChallengeId,
10911094
/// Responding validator
10921095
pub responder: Hotkey,
1096+
/// Target requester (only this validator should process the response)
1097+
#[serde(default)]
1098+
pub target: Option<Hotkey>,
10931099
/// Hash of all data in this response
10941100
pub data_hash: [u8; 32],
10951101
/// Key-value entries

crates/p2p-consensus/src/state.rs

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,6 +1317,57 @@ impl ChainState {
13171317
removed
13181318
}
13191319

1320+
/// Prune stale data from unbounded collections.
1321+
/// Call periodically (e.g., alongside cleanup_stale_jobs).
1322+
pub fn cleanup_stale_data(&mut self, max_age_ms: i64) {
1323+
let now = chrono::Utc::now().timestamp_millis();
1324+
let mut changed = false;
1325+
1326+
// Prune old task progress entries
1327+
let before = self.task_progress.len();
1328+
self.task_progress
1329+
.retain(|_, r| now - r.updated_at < max_age_ms);
1330+
if self.task_progress.len() != before {
1331+
changed = true;
1332+
}
1333+
1334+
// Prune old review assignments (keep only recent)
1335+
let before = self.review_assignments.len();
1336+
self.review_assignments
1337+
.retain(|_, reviews| reviews.iter().any(|r| now - r.created_at < max_age_ms));
1338+
if self.review_assignments.len() != before {
1339+
changed = true;
1340+
}
1341+
1342+
// Cap leaderboard entries per challenge to top 1000
1343+
for entries in self.leaderboard.values_mut() {
1344+
if entries.len() > 1000 {
1345+
entries.sort_by(|a, b| {
1346+
b.score
1347+
.partial_cmp(&a.score)
1348+
.unwrap_or(std::cmp::Ordering::Equal)
1349+
});
1350+
entries.truncate(1000);
1351+
changed = true;
1352+
}
1353+
}
1354+
1355+
// Prune completed_evaluations to last 10 epochs
1356+
if self.completed_evaluations.len() > 10 {
1357+
let cutoff = self.epoch.saturating_sub(10);
1358+
let before = self.completed_evaluations.len();
1359+
self.completed_evaluations
1360+
.retain(|epoch, _| *epoch > cutoff);
1361+
if self.completed_evaluations.len() != before {
1362+
changed = true;
1363+
}
1364+
}
1365+
1366+
if changed {
1367+
self.increment_sequence();
1368+
}
1369+
}
1370+
13201371
pub fn assign_review(&mut self, record: ReviewRecord) {
13211372
self.review_assignments
13221373
.entry(record.submission_id.clone())
@@ -1439,11 +1490,12 @@ impl ChainState {
14391490
max = MAX_VALIDATED_AGENT_LOGS,
14401491
"Validated agent logs at capacity; pruning oldest entries"
14411492
);
1442-
let keys_to_remove: Vec<String> = self
1443-
.validated_agent_logs
1444-
.keys()
1493+
let mut all_keys: Vec<String> =
1494+
self.validated_agent_logs.keys().cloned().collect();
1495+
all_keys.sort();
1496+
let keys_to_remove: Vec<String> = all_keys
1497+
.into_iter()
14451498
.take(self.validated_agent_logs.len() / 10)
1446-
.cloned()
14471499
.collect();
14481500
for key in keys_to_remove {
14491501
self.validated_agent_logs.remove(&key);

0 commit comments

Comments
 (0)