Skip to content

Commit 3bb2cfa

Browse files
committed
fix: storage bootstrap sync + memory optimization
- Add pending_writes cache for read-your-own-writes during WASM sync cycles (propose_write populates cache, get() checks cache first, cleared after sync) - Clear pending writes before get_weights (reads consensus-confirmed data only) - validated_storage: remove proposal from HashMap after commit (was never cleaned) - validated_storage: bound committed HashMap to last 1000 entries - validated_storage: cleanup_expired removes all expired proposals unconditionally - state_consensus: prune completed results to last 100 blocks - Sled cache limited to 256MB (was 1GB default) - Explicit sled flush on shutdown after state persistence - Add clear_pending_writes() to StorageBackend trait (default no-op)
1 parent ca24f70 commit 3bb2cfa

File tree

7 files changed

+81
-11
lines changed

7 files changed

+81
-11
lines changed

bins/validator-node/src/challenge_storage.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,26 @@ use platform_distributed_storage::{
55
};
66
use platform_p2p_consensus::{P2PCommand, P2PMessage, StorageProposal, StorageProposalMessage};
77
use sha2::{Digest, Sha256};
8+
use std::collections::HashMap;
89
use std::sync::Arc;
910
use tokio::sync::mpsc;
1011
use wasm_runtime_interface::storage::{StorageBackend, StorageHostError};
1112

1213
/// Channel for local storage proposals (proposer adds to own state)
1314
pub type LocalProposalSender = mpsc::Sender<StorageProposal>;
1415

16+
/// Pending write value: Some(data) = write, None = delete
17+
type PendingValue = Option<Vec<u8>>;
18+
1519
pub struct ChallengeStorageBackend {
1620
storage: Arc<dyn DistributedStore>,
1721
p2p_tx: Option<mpsc::Sender<P2PCommand>>,
1822
local_proposal_tx: Option<LocalProposalSender>,
1923
keypair: Option<Keypair>,
24+
/// Write-through cache for read-your-own-writes during a sync cycle.
25+
/// Key: (challenge_id, hex-encoded key). Value: pending data (None = delete).
26+
/// Cleared via `clear_pending_writes()` after each sync cycle completes.
27+
pending_writes: parking_lot::RwLock<HashMap<(String, String), PendingValue>>,
2028
}
2129

2230
impl ChallengeStorageBackend {
@@ -27,6 +35,7 @@ impl ChallengeStorageBackend {
2735
p2p_tx: None,
2836
local_proposal_tx: None,
2937
keypair: None,
38+
pending_writes: parking_lot::RwLock::new(HashMap::new()),
3039
}
3140
}
3241

@@ -41,8 +50,15 @@ impl ChallengeStorageBackend {
4150
p2p_tx: Some(p2p_tx),
4251
local_proposal_tx: Some(local_proposal_tx),
4352
keypair: Some(keypair),
53+
pending_writes: parking_lot::RwLock::new(HashMap::new()),
4454
}
4555
}
56+
57+
/// Clear the pending writes cache. Call after each sync cycle completes
58+
/// so that subsequent reads go through consensus-confirmed storage.
59+
pub fn clear_pending_writes(&self) {
60+
self.pending_writes.write().clear();
61+
}
4662
}
4763

4864
/// Build a standardized storage key for challenge data.
@@ -54,6 +70,12 @@ fn build_challenge_storage_key(challenge_id: &str, key: &[u8]) -> DStorageKey {
5470

5571
impl StorageBackend for ChallengeStorageBackend {
5672
fn get(&self, challenge_id: &str, key: &[u8]) -> Result<Option<Vec<u8>>, StorageHostError> {
73+
// Check pending writes cache first (read-your-own-writes during sync)
74+
let cache_key = (challenge_id.to_string(), hex::encode(key));
75+
if let Some(pending) = self.pending_writes.read().get(&cache_key) {
76+
return Ok(pending.clone());
77+
}
78+
5779
let storage_key = build_challenge_storage_key(challenge_id, key);
5880
let result = tokio::task::block_in_place(|| {
5981
tokio::runtime::Handle::current()
@@ -76,9 +98,19 @@ impl StorageBackend for ChallengeStorageBackend {
7698
hasher.update(value);
7799
let proposal_id: [u8; 32] = hasher.finalize().into();
78100

79-
// DO NOT write locally here - data is only written after P2P consensus is reached.
80-
// All validators (including the proposer) write in the StorageVote handler when
81-
// 2f+1 votes approve. This ensures consistency across all nodes.
101+
// Cache the write locally so WASM can read-its-own-writes during the
102+
// current sync cycle. This cache is NOT persisted to storage and does NOT
103+
// affect other validators. Actual storage write happens after P2P consensus.
104+
// The cache is cleared after each sync cycle via clear_pending_writes().
105+
{
106+
let cache_key = (challenge_id.to_string(), hex::encode(key));
107+
let cache_value = if value.is_empty() {
108+
None // delete
109+
} else {
110+
Some(value.to_vec())
111+
};
112+
self.pending_writes.write().insert(cache_key, cache_value);
113+
}
82114

83115
// Broadcast via P2P for consensus
84116
if let (Some(tx), Some(kp)) = (&self.p2p_tx, &self.keypair) {

bins/validator-node/src/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1850,6 +1850,13 @@ async fn main() -> Result<()> {
18501850
info!("P2P state persisted on shutdown");
18511851
}
18521852

1853+
// Flush storage to disk to ensure all writes are durable
1854+
if let Err(e) = local_storage.flush_if_dirty().await {
1855+
error!("Failed to flush storage on shutdown: {}", e);
1856+
} else {
1857+
info!("Storage flushed to disk on shutdown");
1858+
}
1859+
18531860
// Create checkpoint
18541861
if let Some(handler) = shutdown_handler.as_mut() {
18551862
if let Err(e) = handler.create_checkpoint() {

bins/validator-node/src/wasm_executor.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,8 @@ impl WasmChallengeExecutor {
10471047
block_height: u64,
10481048
epoch: u64,
10491049
) -> Result<Vec<platform_challenge_sdk::WeightAssignment>> {
1050+
// Ensure get_weights reads only consensus-confirmed data, not pending sync writes
1051+
self.config.storage_backend.clear_pending_writes();
10501052
let start = Instant::now();
10511053

10521054
let module = self
@@ -1221,6 +1223,10 @@ impl WasmChallengeExecutor {
12211223
"WASM sync completed"
12221224
);
12231225

1226+
// Clear the pending writes cache so subsequent reads (e.g., get_weights)
1227+
// only see consensus-confirmed data, not transient sync writes.
1228+
self.config.storage_backend.clear_pending_writes();
1229+
12241230
Ok(sync_result)
12251231
}
12261232

crates/distributed-storage/src/local.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,13 @@ pub struct LocalStorage {
9595
}
9696

9797
impl LocalStorage {
98-
/// Open or create a local storage at the given path
98+
/// Open or create a local storage at the given path.
99+
/// Sled cache is limited to 256MB to bound memory usage.
99100
pub fn open<P: AsRef<Path>>(path: P, node_id: String) -> StorageResult<Self> {
100-
let db = sled::open(path)?;
101+
let db = sled::Config::new()
102+
.path(path)
103+
.cache_capacity(256 * 1024 * 1024) // 256MB cache limit
104+
.open()?;
101105
let data_tree = db.open_tree("data")?;
102106
let index_tree = db.open_tree("index")?;
103107
let block_index_tree = db.open_tree("block_index")?;

crates/distributed-storage/src/state_consensus.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,9 +834,16 @@ impl StateRootConsensus {
834834
}
835835

836836
/// Store a completed consensus result.
837+
/// Prunes old results to bound memory (keeps last 100 blocks).
837838
pub fn store_completed(&mut self, result: ConsensusResult) {
838839
let block = result.block_number;
839840
self.completed.insert(block, result);
841+
842+
// Prune old completed results to prevent unbounded growth
843+
if self.completed.len() > 100 {
844+
let cutoff = block.saturating_sub(100);
845+
self.completed.retain(|b, _| *b > cutoff);
846+
}
840847
}
841848
}
842849

crates/distributed-storage/src/validated_storage.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -684,13 +684,23 @@ impl<S: DistributedStore + 'static> ValidatedStorage<S> {
684684
{
685685
let mut committed = self.committed.write().await;
686686
committed.insert(*proposal_id, result.clone());
687+
// Evict old committed results to bound memory (keep last 1000)
688+
if committed.len() > 1000 {
689+
let oldest: Vec<[u8; 32]> = committed
690+
.iter()
691+
.take(committed.len() - 1000)
692+
.map(|(id, _)| *id)
693+
.collect();
694+
for id in oldest {
695+
committed.remove(&id);
696+
}
697+
}
687698
}
688699

700+
// Remove committed proposal from pending to free memory
689701
{
690702
let mut proposals = self.proposals.write().await;
691-
if let Some(state) = proposals.get_mut(proposal_id) {
692-
state.consensus_result = Some(result.clone());
693-
}
703+
proposals.remove(proposal_id);
694704
}
695705

696706
Ok(result)
@@ -746,9 +756,9 @@ impl<S: DistributedStore + 'static> ValidatedStorage<S> {
746756
let timeout = self.config.proposal_timeout_ms;
747757
let before = proposals.len();
748758

749-
proposals.retain(|_, state| {
750-
!state.proposal.is_expired(timeout) || state.consensus_result.is_some()
751-
});
759+
// Remove expired proposals regardless of consensus state.
760+
// Committed proposals are already removed in commit_write().
761+
proposals.retain(|_, state| !state.proposal.is_expired(timeout));
752762

753763
let removed = before - proposals.len();
754764
if removed > 0 {

crates/wasm-runtime-interface/src/storage.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,10 @@ pub trait StorageBackend: Send + Sync {
315315
let _ = (challenge_id, prefix);
316316
Ok(0)
317317
}
318+
319+
/// Clear any pending write cache. Called after sync cycles complete
320+
/// so subsequent reads reflect only consensus-confirmed data.
321+
fn clear_pending_writes(&self) {}
318322
}
319323

320324
pub struct NoopStorageBackend;

0 commit comments

Comments
 (0)