Skip to content

Commit 760aaff

Browse files
committed
feat: complete tracked storage with compression, indexing, and audit
- Add TrackedStorage wrapper with automatic LZ4 compression - Automatic block tracking on every write (uses current block as fallback) - Automatic audit logging on every write/delete - Automatic index updates on every write - Replace LocalStorage with TrackedStorage in validator - Wire block_id to all storage.put() calls via put_options_with_block() - Update ChallengeStorageBackend to use dyn DistributedStore trait - Add periodic storage stats logging (every 5 min) - Set block on every new Bittensor block via storage.set_block() - Pass block_height and epoch to WASM instances - Clean up unused imports
1 parent ae6d0af commit 760aaff

File tree

10 files changed

+573
-36
lines changed

10 files changed

+573
-36
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bins/validator-node/src/challenge_storage.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use platform_core::{ChallengeId, Keypair};
22
use platform_distributed_storage::{
3-
DistributedStore, GetOptions as DGetOptions, LocalStorage, PutOptions as DPutOptions,
3+
DistributedStore, GetOptions as DGetOptions, PutOptions as DPutOptions,
44
StorageKey as DStorageKey,
55
};
66
use platform_p2p_consensus::{P2PCommand, P2PMessage, StorageProposal, StorageProposalMessage};
@@ -13,14 +13,15 @@ use wasm_runtime_interface::storage::{StorageBackend, StorageHostError};
1313
pub type LocalProposalSender = mpsc::Sender<StorageProposal>;
1414

1515
pub struct ChallengeStorageBackend {
16-
storage: Arc<LocalStorage>,
16+
storage: Arc<dyn DistributedStore>,
1717
p2p_tx: Option<mpsc::Sender<P2PCommand>>,
1818
local_proposal_tx: Option<LocalProposalSender>,
1919
keypair: Option<Keypair>,
2020
}
2121

2222
impl ChallengeStorageBackend {
23-
pub fn new(storage: Arc<LocalStorage>) -> Self {
23+
#[allow(dead_code)]
24+
pub fn new(storage: Arc<dyn DistributedStore>) -> Self {
2425
Self {
2526
storage,
2627
p2p_tx: None,
@@ -30,7 +31,7 @@ impl ChallengeStorageBackend {
3031
}
3132

3233
pub fn with_p2p(
33-
storage: Arc<LocalStorage>,
34+
storage: Arc<dyn DistributedStore>,
3435
p2p_tx: mpsc::Sender<P2PCommand>,
3536
local_proposal_tx: LocalProposalSender,
3637
keypair: Keypair,

bins/validator-node/src/main.rs

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
//! Fully decentralized P2P validator for the Platform network.
44
//! Uses libp2p for gossipsub consensus and Kademlia DHT for storage.
55
//! Submits weights to Bittensor at epoch boundaries.
6+
#![allow(
7+
clippy::too_many_arguments,
8+
clippy::await_holding_lock,
9+
clippy::needless_borrow
10+
)]
611

712
mod challenge_storage;
813
mod wasm_executor;
@@ -23,8 +28,8 @@ use platform_core::{
2328
ChallengeId, Hotkey, Keypair, SUDO_KEY_SS58,
2429
};
2530
use platform_distributed_storage::{
26-
DistributedStore, DistributedStoreExt, LocalStorage, LocalStorageBuilder, PutOptions,
27-
StorageKey,
31+
DistributedStore, DistributedStoreExt, LocalStorageBuilder, PutOptions, StorageKey,
32+
TrackedStorage, TrackedStorageConfig,
2833
};
2934
use platform_epoch::{EpochConfig, WeightAggregator};
3035
use platform_p2p_consensus::{
@@ -69,7 +74,7 @@ fn sanitize_for_log(s: &str) -> String {
6974
/// Helper to mutate ChainState and automatically persist changes.
7075
/// This ensures we never forget to persist after mutations.
7176
async fn mutate_and_persist<F, R>(
72-
storage: Arc<LocalStorage>,
77+
storage: Arc<TrackedStorage>,
7378
chain_state: Arc<RwLock<platform_core::ChainState>>,
7479
operation: &str,
7580
f: F,
@@ -349,17 +354,24 @@ async fn main() -> Result<()> {
349354
std::fs::create_dir_all(&args.data_dir)?;
350355
let data_dir = std::fs::canonicalize(&args.data_dir)?;
351356

352-
// Initialize distributed storage
353-
let storage = LocalStorageBuilder::new(&validator_hotkey)
357+
// Initialize distributed storage with compression and tracking
358+
let local_storage = LocalStorageBuilder::new(&validator_hotkey)
354359
.path(
355360
data_dir
356361
.join("distributed.db")
357362
.to_string_lossy()
358363
.to_string(),
359364
)
360365
.build()?;
361-
let storage = Arc::new(storage);
362-
info!("Distributed storage initialized");
366+
367+
// Wrap with TrackedStorage for automatic compression, indexing, and audit
368+
let tracked_config = TrackedStorageConfig {
369+
validator_id: validator_hotkey.clone(),
370+
..Default::default()
371+
};
372+
let tracked_storage = TrackedStorage::new(Arc::new(local_storage), tracked_config);
373+
let storage = Arc::new(tracked_storage);
374+
info!("Distributed storage initialized with compression and tracking");
363375

364376
// Determine listen address - p2p_port overrides listen_addr if specified
365377
let listen_addr = if let Some(port) = args.p2p_port {
@@ -641,20 +653,22 @@ async fn main() -> Result<()> {
641653
let (local_proposal_tx, mut local_proposal_rx) =
642654
tokio::sync::mpsc::channel::<StorageProposal>(256);
643655

656+
// Cast storage to trait object for WASM executor
657+
let storage_dyn: Arc<dyn DistributedStore> = Arc::clone(&storage) as Arc<dyn DistributedStore>;
644658
let wasm_executor = match WasmChallengeExecutor::new(WasmExecutorConfig {
645659
module_dir: wasm_module_dir.clone(),
646660
max_memory_bytes: args.wasm_max_memory,
647661
enable_fuel: args.wasm_enable_fuel,
648662
fuel_limit: args.wasm_fuel_limit,
649663
storage_host_config: wasm_runtime_interface::StorageHostConfig::default(),
650664
storage_backend: std::sync::Arc::new(challenge_storage::ChallengeStorageBackend::with_p2p(
651-
Arc::clone(&storage),
665+
storage_dyn.clone(),
652666
p2p_cmd_tx.clone(),
653667
local_proposal_tx,
654668
keypair.clone(),
655669
)),
656670
chutes_api_key: None,
657-
distributed_storage: Some(Arc::clone(&storage)),
671+
distributed_storage: Some(storage_dyn),
658672
}) {
659673
Ok(executor) => {
660674
info!(
@@ -921,6 +935,7 @@ async fn main() -> Result<()> {
921935
let mut challenge_sync_interval = tokio::time::interval(Duration::from_secs(60)); // Check every minute
922936
let mut last_sync_block: u64 = 0; // Last block where sync was triggered
923937
let sync_block_interval: u64 = 60; // Sync every 60 blocks
938+
let mut storage_stats_interval = tokio::time::interval(Duration::from_secs(300)); // Log stats every 5 min
924939

925940
// Clone p2p_cmd_tx for use in the loop
926941
let p2p_broadcast_tx = p2p_cmd_tx.clone();
@@ -960,6 +975,7 @@ async fn main() -> Result<()> {
960975
&wasm_executor,
961976
&keypair,
962977
&chain_state,
978+
&storage,
963979
).await;
964980
}
965981

@@ -1107,7 +1123,7 @@ async fn main() -> Result<()> {
11071123
let challenge_id_str = challenge_id.to_string();
11081124
let wasm_key = StorageKey::new("wasm", &challenge_id_str);
11091125
let wasm_data = data.clone(); // Clone for route loading later
1110-
match storage.put(wasm_key, data, PutOptions::default()).await {
1126+
match storage.put(wasm_key, data, put_options_with_block(&state_manager)).await {
11111127
Ok(metadata) => {
11121128
info!(
11131129
challenge_id = %challenge_id,
@@ -1365,7 +1381,7 @@ async fn main() -> Result<()> {
13651381
let result = if p.value.is_empty() {
13661382
storage.delete(&storage_key).await
13671383
} else {
1368-
storage.put(storage_key.clone(), p.value.clone(), PutOptions::default()).await.map(|_| true)
1384+
storage.put(storage_key.clone(), p.value.clone(), put_options_with_block(&state_manager)).await.map(|_| true)
13691385
};
13701386
match result {
13711387
Ok(_) => {
@@ -1579,6 +1595,28 @@ async fn main() -> Result<()> {
15791595
}
15801596
}
15811597

1598+
// Periodic storage stats logging
1599+
_ = storage_stats_interval.tick() => {
1600+
let stats = storage.tracked_stats().await;
1601+
if stats.total_writes > 0 {
1602+
let savings = if stats.bytes_written_uncompressed > 0 {
1603+
100.0 * (1.0 - stats.compression_ratio)
1604+
} else {
1605+
0.0
1606+
};
1607+
info!(
1608+
total_writes = stats.total_writes,
1609+
total_reads = stats.total_reads,
1610+
uncompressed_bytes = stats.bytes_written_uncompressed,
1611+
compressed_bytes = stats.bytes_written_compressed,
1612+
compression_ratio = format!("{:.2}", stats.compression_ratio),
1613+
savings_pct = format!("{:.1}%", savings),
1614+
current_block = storage.current_block(),
1615+
"Storage statistics"
1616+
);
1617+
}
1618+
}
1619+
15821620
// Ctrl+C
15831621
_ = tokio::signal::ctrl_c() => {
15841622
info!("Received shutdown signal, persisting state...");
@@ -1649,7 +1687,10 @@ fn load_keypair(args: &Args) -> Result<Keypair> {
16491687
}
16501688

16511689
/// Load persisted state from distributed storage
1652-
async fn load_state_from_storage(storage: &Arc<LocalStorage>, netuid: u16) -> Option<StateManager> {
1690+
async fn load_state_from_storage(
1691+
storage: &Arc<TrackedStorage>,
1692+
netuid: u16,
1693+
) -> Option<StateManager> {
16531694
let key = StorageKey::new("state", STATE_STORAGE_KEY);
16541695
match storage.get_json::<ChainState>(&key).await {
16551696
Ok(Some(state)) => {
@@ -1682,7 +1723,7 @@ async fn load_state_from_storage(storage: &Arc<LocalStorage>, netuid: u16) -> Op
16821723

16831724
/// Persist current state to distributed storage
16841725
async fn persist_state_to_storage(
1685-
storage: &Arc<LocalStorage>,
1726+
storage: &Arc<TrackedStorage>,
16861727
state_manager: &Arc<StateManager>,
16871728
) -> Result<()> {
16881729
let state = state_manager.snapshot();
@@ -1693,7 +1734,7 @@ async fn persist_state_to_storage(
16931734

16941735
/// Load persisted core ChainState (wasm_challenge_configs, routes, etc.)
16951736
async fn load_core_state_from_storage(
1696-
storage: &Arc<LocalStorage>,
1737+
storage: &Arc<TrackedStorage>,
16971738
) -> Option<platform_core::ChainState> {
16981739
let key = StorageKey::new("state", CORE_STATE_STORAGE_KEY);
16991740
match storage.get_json::<platform_core::ChainState>(&key).await {
@@ -1719,7 +1760,7 @@ async fn load_core_state_from_storage(
17191760

17201761
/// Persist core ChainState to distributed storage
17211762
async fn persist_core_state_to_storage(
1722-
storage: &Arc<LocalStorage>,
1763+
storage: &Arc<TrackedStorage>,
17231764
chain_state: &Arc<RwLock<platform_core::ChainState>>,
17241765
) -> Result<()> {
17251766
let state = chain_state.read().clone();
@@ -1791,7 +1832,7 @@ async fn handle_network_event(
17911832
validator_set: &Arc<ValidatorSet>,
17921833
state_manager: &Arc<StateManager>,
17931834
wasm_executor_ref: &Option<Arc<WasmChallengeExecutor>>,
1794-
storage: &Arc<LocalStorage>,
1835+
storage: &Arc<TrackedStorage>,
17951836
keypair: &Keypair,
17961837
p2p_cmd_tx: &tokio::sync::mpsc::Sender<platform_p2p_consensus::P2PCommand>,
17971838
chain_state: &Arc<RwLock<platform_core::ChainState>>,
@@ -2902,7 +2943,7 @@ async fn handle_network_event(
29022943
platform_p2p_consensus::StateMutationType::WasmUpload { challenge_id } => {
29032944
let challenge_id_str = challenge_id.to_string();
29042945
let wasm_key = StorageKey::new("wasm", &challenge_id_str);
2905-
match storage.put(wasm_key, entry.data.clone(), PutOptions::default()).await {
2946+
match storage.put(wasm_key, entry.data.clone(), put_options_with_block(&state_manager)).await {
29062947
Ok(metadata) => {
29072948
let cid = *challenge_id;
29082949
let owner = entry.proposer.clone();
@@ -3134,10 +3175,13 @@ async fn handle_block_event(
31343175
wasm_executor: &Option<Arc<WasmChallengeExecutor>>,
31353176
keypair: &Keypair,
31363177
chain_state: &Arc<RwLock<platform_core::ChainState>>,
3178+
storage: &Arc<TrackedStorage>,
31373179
) {
31383180
match event {
31393181
BlockSyncEvent::NewBlock { block_number, .. } => {
31403182
debug!("Block {}", block_number);
3183+
// Update storage block tracking
3184+
storage.set_block(block_number);
31413185
// Link state to Bittensor block (block hash not available in event, use zeros)
31423186
state_manager.apply(|state| {
31433187
state.link_to_bittensor_block(block_number, [0u8; 32]);

bins/validator-node/src/wasm_executor.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
use anyhow::{Context, Result};
22
use parking_lot::RwLock;
33
use platform_challenge_sdk_wasm::{EvaluationInput, EvaluationOutput, WeightEntry};
4-
use platform_distributed_storage::DistributedStore;
54
use std::collections::HashMap;
65
use std::path::PathBuf;
76
use std::sync::Arc;
87
use std::time::Instant;
98
use tracing::{debug, info};
109
use wasm_runtime_interface::{
11-
ConsensusPolicy, ExecPolicy, InMemoryStorageBackend, InstanceConfig, LlmPolicy,
12-
NetworkHostFunctions, NetworkPolicy, RuntimeConfig, SandboxPolicy, StorageBackend,
13-
StorageHostConfig, TerminalPolicy, TimePolicy, WasmModule, WasmRuntime, WasmRuntimeError,
10+
ConsensusPolicy, ExecPolicy, InMemoryStorageBackend, InstanceConfig, LlmPolicy, NetworkPolicy,
11+
RuntimeConfig, SandboxPolicy, StorageBackend, StorageHostConfig, TerminalPolicy, TimePolicy,
12+
WasmModule, WasmRuntime, WasmRuntimeError,
1413
};
1514

1615
const MAX_EVALUATION_OUTPUT_SIZE: usize = 64 * 1024 * 1024;
1716
const MAX_ROUTE_OUTPUT_SIZE: u64 = 16 * 1024 * 1024;
1817
const MAX_TASK_OUTPUT_SIZE: u64 = 16 * 1024 * 1024;
1918

19+
#[allow(dead_code)]
2020
pub struct WasmExecutorConfig {
2121
pub module_dir: PathBuf,
2222
pub max_memory_bytes: u64,
@@ -26,7 +26,7 @@ pub struct WasmExecutorConfig {
2626
pub storage_backend: Arc<dyn StorageBackend>,
2727
pub chutes_api_key: Option<String>,
2828
/// Optional distributed storage for loading WASM modules
29-
pub distributed_storage: Option<Arc<platform_distributed_storage::LocalStorage>>,
29+
pub distributed_storage: Option<Arc<dyn platform_distributed_storage::DistributedStore>>,
3030
}
3131

3232
impl std::fmt::Debug for WasmExecutorConfig {
@@ -912,6 +912,7 @@ impl WasmChallengeExecutor {
912912
/// Execute get_weights on a WASM challenge module.
913913
/// Returns Vec<WeightAssignment> with hotkey (SS58/hex) + f64 weight.
914914
/// The caller is responsible for converting hotkeys to UIDs via metagraph.
915+
#[allow(dead_code)]
915916
pub fn execute_get_weights(
916917
&self,
917918
module_path: &str,
@@ -999,6 +1000,7 @@ impl WasmChallengeExecutor {
9991000

10001001
/// Execute sync on a WASM challenge module.
10011002
/// Returns WasmSyncResult with leaderboard hash and stats for consensus.
1003+
#[allow(dead_code)]
10021004
pub fn execute_sync(
10031005
&self,
10041006
module_path: &str,
@@ -1205,6 +1207,7 @@ impl WasmChallengeExecutor {
12051207
}
12061208

12071209
/// Async version that also checks distributed storage
1210+
#[allow(dead_code)]
12081211
pub async fn module_exists_async(&self, module_path: &str) -> bool {
12091212
// Check module cache first
12101213
if self.module_cache.read().contains_key(module_path) {

crates/distributed-storage/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ parking_lot = { workspace = true }
3333
uuid = { workspace = true }
3434
base64 = "0.22"
3535

36+
# Compression
37+
lz4_flex = "0.11"
38+
3639
[dev-dependencies]
3740
tempfile = { workspace = true }
3841
tokio-test = { workspace = true }

crates/distributed-storage/src/index.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
//!
33
//! Provides automatic indexing of stored values for efficient lookups.
44
5-
use async_trait::async_trait;
65
use serde::{Deserialize, Serialize};
76
use std::collections::HashMap;
87
use std::sync::Arc;

crates/distributed-storage/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ pub mod replication;
9595
pub mod state_consensus;
9696
pub mod store;
9797
pub mod submission;
98+
pub mod tracked;
9899
pub mod validated_storage;
99100
pub mod weights;
100101

@@ -125,6 +126,9 @@ pub use weights::{StoredWeights, ValidatorWeightVote, WeightAggregator, WeightHi
125126
pub use audit::{AuditEntry, AuditLog, AuditOperation};
126127
pub use index::{AtomicCounter, IndexDefinition, IndexEntry, IndexManager, IndexPage};
127128

129+
// Tracked storage with compression
130+
pub use tracked::{CompressionMode, TrackedStorage, TrackedStorageConfig, TrackedStorageStats};
131+
128132
// Challenge-specific storage
129133
pub use challenge_store::{
130134
ChallengeStorage, ChallengeStore, ChallengeStoreRegistry, MerkleNode, MerkleProof,

0 commit comments

Comments
 (0)