Skip to content

Commit 98d2ea0

Browse files
committed
fix: per-challenge storage sync and consistency
Fixes 3 critical storage synchronization issues: 1. Key format mismatch: Standardize storage keys to use namespace=challenge_id, key=hex::encode(key_bytes). Both ChallengeStorageBackend and consensus write handler now use identical format via build_challenge_storage_key(). 2. Pre-consensus write removed: propose_write() no longer writes locally before P2P consensus. Data is ONLY written after 2f+1 validators approve via StorageVote. Single-node mode (no P2P) still writes locally for testing. 3. Storage sync mechanism: Implemented DataRequest/DataResponse handlers for 'challenge_storage' type. When StorageRootSync detects divergence, node requests all data for that challenge via DataRequest. Peer responds with all key-value pairs via list_prefix(). Receiving node merges entries it doesn't have (import-only, no overwrite).
1 parent 2f824a2 commit 98d2ea0

File tree

2 files changed

+195
-28
lines changed

2 files changed

+195
-28
lines changed

bins/validator-node/src/challenge_storage.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,16 @@ impl ChallengeStorageBackend {
3737
}
3838
}
3939

40+
/// Build a standardized storage key for challenge data.
41+
/// Format: namespace = challenge_id, key = hex-encoded key bytes.
42+
/// This MUST match the format used in consensus writes (main.rs StorageVote handler).
43+
fn build_challenge_storage_key(challenge_id: &str, key: &[u8]) -> DStorageKey {
44+
DStorageKey::new(challenge_id, hex::encode(key))
45+
}
46+
4047
impl StorageBackend for ChallengeStorageBackend {
4148
fn get(&self, challenge_id: &str, key: &[u8]) -> Result<Option<Vec<u8>>, StorageHostError> {
42-
let storage_key = DStorageKey::new(challenge_id, hex::encode(key));
49+
let storage_key = build_challenge_storage_key(challenge_id, key);
4350
let result = tokio::task::block_in_place(|| {
4451
tokio::runtime::Handle::current()
4552
.block_on(self.storage.get(&storage_key, DGetOptions::default()))
@@ -54,23 +61,18 @@ impl StorageBackend for ChallengeStorageBackend {
5461
key: &[u8],
5562
value: &[u8],
5663
) -> Result<[u8; 32], StorageHostError> {
57-
let storage_key = DStorageKey::new(challenge_id, hex::encode(key));
58-
tokio::task::block_in_place(|| {
59-
tokio::runtime::Handle::current().block_on(self.storage.put(
60-
storage_key,
61-
value.to_vec(),
62-
DPutOptions::default(),
63-
))
64-
})
65-
.map_err(|e| StorageHostError::StorageError(e.to_string()))?;
66-
64+
// Compute proposal ID from content
6765
let mut hasher = Sha256::new();
6866
hasher.update(challenge_id.as_bytes());
6967
hasher.update(key);
7068
hasher.update(value);
7169
let proposal_id: [u8; 32] = hasher.finalize().into();
7270

73-
// Broadcast via P2P if configured
71+
// DO NOT write locally here - data is only written after P2P consensus is reached.
72+
// All validators (including the proposer) write in the StorageVote handler when
73+
// 2f+1 votes approve. This ensures consistency across all nodes.
74+
75+
// Broadcast via P2P for consensus
7476
if let (Some(tx), Some(kp)) = (&self.p2p_tx, &self.keypair) {
7577
let challenge_uuid = uuid::Uuid::parse_str(challenge_id).unwrap_or_else(|_| {
7678
// Derive a deterministic UUID from the challenge_id string
@@ -97,13 +99,24 @@ impl StorageBackend for ChallengeStorageBackend {
9799

98100
// Fire and forget - don't block WASM execution on P2P
99101
let _ = tx.try_send(P2PCommand::Broadcast(msg));
102+
} else {
103+
// No P2P configured - write locally for single-node/test mode
104+
let storage_key = build_challenge_storage_key(challenge_id, key);
105+
tokio::task::block_in_place(|| {
106+
tokio::runtime::Handle::current().block_on(self.storage.put(
107+
storage_key,
108+
value.to_vec(),
109+
DPutOptions::default(),
110+
))
111+
})
112+
.map_err(|e| StorageHostError::StorageError(e.to_string()))?;
100113
}
101114

102115
Ok(proposal_id)
103116
}
104117

105118
fn delete(&self, challenge_id: &str, key: &[u8]) -> Result<bool, StorageHostError> {
106-
let storage_key = DStorageKey::new(challenge_id, hex::encode(key));
119+
let storage_key = build_challenge_storage_key(challenge_id, key);
107120
tokio::task::block_in_place(|| {
108121
tokio::runtime::Handle::current().block_on(self.storage.delete(&storage_key))
109122
})
@@ -115,7 +128,7 @@ impl StorageBackend for ChallengeStorageBackend {
115128
challenge_id: &str,
116129
key: &[u8],
117130
) -> Result<Option<Vec<u8>>, StorageHostError> {
118-
let storage_key = DStorageKey::new(challenge_id, hex::encode(key));
131+
let storage_key = build_challenge_storage_key(challenge_id, key);
119132
let result = tokio::task::block_in_place(|| {
120133
tokio::runtime::Handle::current()
121134
.block_on(self.storage.get(&storage_key, DGetOptions::default()))

bins/validator-node/src/main.rs

Lines changed: 168 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,6 +1766,72 @@ async fn handle_network_event(
17661766
data_type = %req.data_type,
17671767
"Received data request"
17681768
);
1769+
1770+
if !validator_set.is_validator(&req.requester) {
1771+
warn!(requester = %req.requester.to_hex(), "Data request from unknown validator");
1772+
} else if req.data_type == "challenge_storage" {
1773+
// Respond with storage data for the requested challenge
1774+
let challenge_id_str = req.challenge_id.to_string();
1775+
1776+
// List all keys in this challenge's namespace using list_prefix
1777+
let list_result = storage
1778+
.list_prefix(&challenge_id_str, None, 10000, None)
1779+
.await;
1780+
1781+
match list_result {
1782+
Ok(list_result) => {
1783+
// Collect all key-value pairs for this challenge
1784+
let mut storage_entries: Vec<(String, Vec<u8>)> = Vec::new();
1785+
for (key, stored) in list_result.items {
1786+
// Store key as hex-encoded string (matches our key format)
1787+
let key_str = String::from_utf8(key.key.clone())
1788+
.unwrap_or_else(|_| hex::encode(&key.key));
1789+
storage_entries.push((key_str, stored.data));
1790+
}
1791+
1792+
let data = bincode::serialize(&storage_entries).unwrap_or_default();
1793+
let timestamp = chrono::Utc::now().timestamp_millis();
1794+
let request_id = req.request_id.clone();
1795+
let challenge_id = req.challenge_id;
1796+
let sign_data = bincode::serialize(&(&request_id, &data, timestamp))
1797+
.unwrap_or_default();
1798+
let signature = keypair.sign_bytes(&sign_data).unwrap_or_default();
1799+
1800+
let response = P2PMessage::DataResponse(
1801+
platform_p2p_consensus::DataResponseMessage {
1802+
request_id: request_id.clone(),
1803+
responder: keypair.hotkey(),
1804+
challenge_id,
1805+
data_type: "challenge_storage".to_string(),
1806+
data,
1807+
timestamp,
1808+
signature,
1809+
},
1810+
);
1811+
1812+
if let Err(e) = p2p_cmd_tx
1813+
.send(platform_p2p_consensus::P2PCommand::Broadcast(response))
1814+
.await
1815+
{
1816+
warn!(error = %e, "Failed to send storage data response");
1817+
} else {
1818+
info!(
1819+
request_id = %request_id,
1820+
challenge_id = %challenge_id,
1821+
entries = storage_entries.len(),
1822+
"Sent challenge storage data response"
1823+
);
1824+
}
1825+
}
1826+
Err(e) => {
1827+
warn!(
1828+
challenge_id = %req.challenge_id,
1829+
error = %e,
1830+
"Failed to list storage entries for data request"
1831+
);
1832+
}
1833+
}
1834+
}
17691835
}
17701836
P2PMessage::DataResponse(resp) => {
17711837
debug!(
@@ -1775,6 +1841,61 @@ async fn handle_network_event(
17751841
data_bytes = resp.data.len(),
17761842
"Received data response"
17771843
);
1844+
1845+
if !validator_set.is_validator(&resp.responder) {
1846+
warn!(responder = %resp.responder.to_hex(), "Data response from unknown validator");
1847+
} else if resp.data_type == "challenge_storage" {
1848+
// Merge storage data from peer
1849+
match bincode::deserialize::<Vec<(String, Vec<u8>)>>(&resp.data) {
1850+
Ok(entries) => {
1851+
let challenge_id_str = resp.challenge_id.to_string();
1852+
let mut imported = 0;
1853+
1854+
for (key, value) in entries {
1855+
let storage_key = StorageKey::new(&challenge_id_str, key);
1856+
// Only import if we don't have this key
1857+
match storage
1858+
.get(
1859+
&storage_key,
1860+
platform_distributed_storage::GetOptions::default(),
1861+
)
1862+
.await
1863+
{
1864+
Ok(None) => {
1865+
if let Err(e) = storage
1866+
.put(storage_key, value, PutOptions::default())
1867+
.await
1868+
{
1869+
warn!(error = %e, "Failed to import storage entry");
1870+
} else {
1871+
imported += 1;
1872+
}
1873+
}
1874+
Ok(Some(_)) => {
1875+
// Already have this key, skip
1876+
}
1877+
Err(e) => {
1878+
warn!(error = %e, "Failed to check existing storage entry");
1879+
}
1880+
}
1881+
}
1882+
1883+
if imported > 0 {
1884+
info!(
1885+
challenge_id = %resp.challenge_id,
1886+
imported = imported,
1887+
"Imported storage entries from peer"
1888+
);
1889+
}
1890+
}
1891+
Err(e) => {
1892+
warn!(
1893+
error = %e,
1894+
"Failed to deserialize challenge storage data response"
1895+
);
1896+
}
1897+
}
1898+
}
17781899
}
17791900
P2PMessage::TaskProgress(progress) => {
17801901
debug!(
@@ -1926,7 +2047,7 @@ async fn handle_network_event(
19262047

19272048
// Persist core state immediately so challenge survives restart
19282049
if let Err(e) =
1929-
persist_core_state_to_storage(storage, &chain_state)
2050+
persist_core_state_to_storage(storage, chain_state)
19302051
.await
19312052
{
19322053
warn!("Failed to persist core state after P2P challenge registration: {}", e);
@@ -2169,9 +2290,11 @@ async fn handle_network_event(
21692290
.apply(|state| state.remove_storage_proposal(&vote.proposal_id));
21702291

21712292
if let Some(proposal) = proposal_opt {
2293+
// Use same key format as ChallengeStorageBackend:
2294+
// namespace = challenge_id.to_string(), key = hex::encode(key_bytes)
21722295
let storage_key = StorageKey::new(
2173-
&format!("challenge:{}", proposal.challenge_id),
2174-
&proposal.key,
2296+
&proposal.challenge_id.to_string(),
2297+
hex::encode(&proposal.key),
21752298
);
21762299

21772300
match storage
@@ -2267,15 +2390,46 @@ async fn handle_network_event(
22672390
let local_map: std::collections::HashMap<_, _> =
22682391
local_roots.into_iter().collect();
22692392
for (cid, remote_root) in &msg.roots {
2270-
if let Some(local_root) = local_map.get(cid) {
2271-
if local_root != remote_root {
2272-
warn!(
2273-
challenge_id = %cid,
2274-
local = %hex::encode(&local_root[..8]),
2275-
remote = %hex::encode(&remote_root[..8]),
2276-
remote_validator = %msg.validator.to_hex(),
2277-
"Storage root divergence detected"
2278-
);
2393+
let needs_sync = match local_map.get(cid) {
2394+
Some(local_root) => local_root != remote_root,
2395+
None => true, // We don't have this challenge's storage at all
2396+
};
2397+
2398+
if needs_sync {
2399+
info!(
2400+
challenge_id = %cid,
2401+
remote_validator = %msg.validator.to_hex(),
2402+
"Storage divergence detected, requesting challenge data"
2403+
);
2404+
2405+
// Request storage data for this challenge
2406+
let request_id = format!(
2407+
"storage_sync_{}_{}",
2408+
cid,
2409+
chrono::Utc::now().timestamp_millis()
2410+
);
2411+
let timestamp = chrono::Utc::now().timestamp_millis();
2412+
let sign_data =
2413+
bincode::serialize(&(&request_id, timestamp)).unwrap_or_default();
2414+
let signature = keypair.sign_bytes(&sign_data).unwrap_or_default();
2415+
2416+
let req = P2PMessage::DataRequest(
2417+
platform_p2p_consensus::DataRequestMessage {
2418+
request_id,
2419+
requester: keypair.hotkey(),
2420+
challenge_id: *cid,
2421+
data_type: "challenge_storage".to_string(),
2422+
data_key: "all".to_string(),
2423+
timestamp,
2424+
signature,
2425+
},
2426+
);
2427+
2428+
if let Err(e) = p2p_cmd_tx
2429+
.send(platform_p2p_consensus::P2PCommand::Broadcast(req))
2430+
.await
2431+
{
2432+
warn!(error = %e, "Failed to send storage sync request");
22792433
}
22802434
}
22812435
}
@@ -2452,7 +2606,7 @@ async fn handle_network_event(
24522606

24532607
// Persist immediately after consensus-approved mutation
24542608
if let Err(e) =
2455-
persist_core_state_to_storage(storage, &chain_state).await
2609+
persist_core_state_to_storage(storage, chain_state).await
24562610
{
24572611
warn!(
24582612
"Failed to persist core state after consensus mutation: {}",
@@ -2533,7 +2687,7 @@ async fn handle_network_event(
25332687

25342688
// Persist merged state
25352689
if let Err(e) =
2536-
persist_core_state_to_storage(storage, &chain_state).await
2690+
persist_core_state_to_storage(storage, chain_state).await
25372691
{
25382692
warn!("Failed to persist merged core state: {}", e);
25392693
}

0 commit comments

Comments
 (0)