Skip to content

Commit b9b2f62

Browse files
committed
perf: optimize storage - batch flush, prune audit data, disable indexing
- Replace per-write flush with batched flush every 5s (reduces sled blob accumulation) - Disable audit logging and indexing in TrackedStorage (massive I/O reduction) - Startup migration: prune audit_block/audit_key/idx/counter data from sled - Export→reimport cycle to reclaim dead blob space (47GB → expected few GB) - Add flush_if_dirty() for periodic batched flushing
1 parent de112cc commit b9b2f62

File tree

3 files changed

+120
-63
lines changed

3 files changed

+120
-63
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bins/validator-node/src/main.rs

Lines changed: 94 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -367,15 +367,21 @@ async fn main() -> Result<()> {
367367
let local_storage = LocalStorageBuilder::new(&validator_hotkey)
368368
.path(db_path.to_string_lossy().to_string())
369369
.build()?;
370+
let local_storage = Arc::new(local_storage);
370371

371-
// Wrap with TrackedStorage for automatic compression, indexing, and audit
372+
// Wrap with TrackedStorage for compression only (audit + indexing disabled to reduce disk I/O)
372373
let tracked_config = TrackedStorageConfig {
373374
validator_id: validator_hotkey.clone(),
375+
enable_audit: false,
376+
enable_indexing: false,
374377
..Default::default()
375378
};
376-
let tracked_storage = TrackedStorage::new(Arc::new(local_storage), tracked_config);
379+
let tracked_storage = TrackedStorage::new(
380+
Arc::clone(&local_storage) as Arc<dyn platform_distributed_storage::DistributedStore>,
381+
tracked_config,
382+
);
377383
let storage = Arc::new(tracked_storage);
378-
info!("Distributed storage initialized with compression and tracking");
384+
info!("Distributed storage initialized with compression (audit/indexing disabled)");
379385

380386
// Determine listen address - p2p_port overrides listen_addr if specified
381387
let listen_addr = if let Some(port) = args.p2p_port {
@@ -982,6 +988,7 @@ async fn main() -> Result<()> {
982988
let mut last_sync_block: u64 = 0;
983989
let sync_block_interval: u64 = 1; // Call WASM sync every block, WASM decides frequency internally
984990
let mut storage_stats_interval = tokio::time::interval(Duration::from_secs(300));
991+
let mut storage_flush_interval = tokio::time::interval(Duration::from_secs(5));
985992
// Track last synced block per challenge for delta sync
986993
let challenge_last_sync: Arc<
987994
RwLock<std::collections::HashMap<platform_core::ChallengeId, u64>>,
@@ -1410,6 +1417,12 @@ async fn main() -> Result<()> {
14101417
}
14111418

14121419
// Periodic state persistence
1420+
_ = storage_flush_interval.tick() => {
1421+
if let Err(e) = local_storage.flush_if_dirty().await {
1422+
warn!("Periodic storage flush failed: {}", e);
1423+
}
1424+
}
1425+
14131426
_ = state_persist_interval.tick() => {
14141427
if let Err(e) = persist_state_to_storage(&storage, &state_manager).await {
14151428
warn!("Failed to persist P2P state: {}", e);
@@ -4260,13 +4273,15 @@ async fn fetch_remote_weights() -> anyhow::Result<Vec<(u8, Vec<u16>, Vec<u16>)>>
42604273
Ok(result)
42614274
}
42624275

4263-
/// Compact the sled distributed.db if blob files are wasting space.
4276+
/// Compact the sled distributed.db: prune useless data, then reclaim disk via export→reimport.
42644277
///
4265-
/// Sled accumulates dead data in blob files that are never reclaimed.
4266-
/// This does a full export→delete→reimport cycle to reclaim disk space.
4267-
/// A marker file `.compacted_v1` is written after success so it only runs once.
4278+
/// Purges: audit logs (audit_block:*, audit_key:*), old index entries (idx:*, idx_meta:*),
4279+
/// counter entries (counter:*). These are generated by TrackedStorage and no longer needed.
4280+
/// Then does sled export→delete→reimport to reclaim dead blob space.
4281+
///
4282+
/// A marker file `.compacted_v2` prevents re-running on subsequent startups.
42684283
fn compact_storage_if_needed(db_path: &std::path::Path) -> anyhow::Result<()> {
4269-
let marker = db_path.parent().unwrap_or(db_path).join(".compacted_v1");
4284+
let marker = db_path.parent().unwrap_or(db_path).join(".compacted_v2");
42704285
if marker.exists() {
42714286
return Ok(());
42724287
}
@@ -4275,97 +4290,117 @@ fn compact_storage_if_needed(db_path: &std::path::Path) -> anyhow::Result<()> {
42754290
return Ok(());
42764291
}
42774292

4278-
// Check total size of blob files
4293+
// Calculate blob size
42794294
let blobs_dir = db_path.join("blobs");
4280-
let total_blob_size = if blobs_dir.exists() {
4281-
std::fs::read_dir(&blobs_dir)
4282-
.map(|entries| {
4283-
entries
4284-
.filter_map(|e| e.ok())
4285-
.filter_map(|e| e.metadata().ok())
4286-
.map(|m| m.len())
4287-
.sum::<u64>()
4288-
})
4289-
.unwrap_or(0)
4290-
} else {
4291-
0
4292-
};
4293-
4294-
let threshold = 2 * 1024 * 1024 * 1024u64; // 2 GB
4295-
if total_blob_size < threshold {
4296-
info!(
4297-
blob_size_mb = total_blob_size / (1024 * 1024),
4298-
"Storage compaction not needed"
4299-
);
4300-
std::fs::write(&marker, b"skipped").ok();
4301-
return Ok(());
4302-
}
4295+
let total_blob_size = dir_size(&blobs_dir);
43034296

4297+
// Always run if v2 marker doesn't exist (prune audit data even if blobs are small)
43044298
warn!(
43054299
blob_size_gb = total_blob_size / (1024 * 1024 * 1024),
4306-
"Large blob files detected, compacting distributed.db (this may take a few minutes)"
4300+
"Compacting distributed.db: pruning audit/index data and reclaiming disk space"
43074301
);
43084302

4309-
// Phase 1: Open and export all data
4303+
// Phase 1: Open DB and prune useless namespaces from the data tree
43104304
let db = sled::open(db_path)?;
4311-
let export = db.export();
4312-
let db_size_before = total_blob_size;
4305+
let data_tree = db.open_tree("data")?;
4306+
let index_tree = db.open_tree("index")?;
4307+
4308+
let prune_prefixes: &[&[u8]] = &[
4309+
b"audit_block:",
4310+
b"audit_key:",
4311+
b"idx:",
4312+
b"idx_meta:",
4313+
b"counter:",
4314+
];
4315+
4316+
let mut pruned_count = 0u64;
4317+
let mut pruned_bytes = 0u64;
4318+
4319+
for prefix in prune_prefixes {
4320+
for entry in data_tree.scan_prefix(prefix) {
4321+
if let Ok((key, value)) = entry {
4322+
pruned_bytes += key.len() as u64 + value.len() as u64;
4323+
data_tree.remove(&key)?;
4324+
// Also remove from index tree
4325+
if let Some(ns_end) = key.iter().position(|&b| b == b':') {
4326+
let ns = &key[..ns_end];
4327+
let key_part = &key[ns_end + 1..];
4328+
let index_key =
4329+
format!("{}:{}", String::from_utf8_lossy(ns), hex::encode(key_part));
4330+
index_tree.remove(index_key.as_bytes())?;
4331+
}
4332+
pruned_count += 1;
4333+
}
4334+
}
4335+
}
4336+
4337+
info!(
4338+
pruned_entries = pruned_count,
4339+
pruned_mb = pruned_bytes / (1024 * 1024),
4340+
"Pruned audit/index/counter data"
4341+
);
43134342

4343+
db.flush()?;
4344+
drop(data_tree);
4345+
drop(index_tree);
4346+
4347+
// Phase 2: Export → reimport to reclaim sled blob space
4348+
let export = db.export();
43144349
let tree_count = export.len();
43154350
info!(trees = tree_count, "Exported data from distributed.db");
43164351
drop(db);
43174352

4318-
// Phase 2: Move old DB to temp, create fresh one
43194353
let tmp_path = db_path.with_extension("db.old");
43204354
if tmp_path.exists() {
43214355
std::fs::remove_dir_all(&tmp_path)?;
43224356
}
43234357
std::fs::rename(db_path, &tmp_path)?;
43244358

4325-
// Phase 3: Import into fresh DB
43264359
let new_db = sled::open(db_path)?;
43274360
new_db.import(export);
43284361
new_db.flush()?;
43294362
drop(new_db);
43304363

4331-
// Phase 4: Measure new size and cleanup
4332-
let new_blob_size = if db_path.join("blobs").exists() {
4333-
std::fs::read_dir(db_path.join("blobs"))
4334-
.map(|entries| {
4335-
entries
4336-
.filter_map(|e| e.ok())
4337-
.filter_map(|e| e.metadata().ok())
4338-
.map(|m| m.len())
4339-
.sum::<u64>()
4340-
})
4341-
.unwrap_or(0)
4342-
} else {
4343-
0
4344-
};
4364+
// Phase 3: Measure and cleanup
4365+
let new_blob_size = dir_size(&db_path.join("blobs"));
43454366

43464367
info!(
4347-
before_gb = db_size_before / (1024 * 1024 * 1024),
4368+
before_gb = total_blob_size / (1024 * 1024 * 1024),
43484369
after_gb = new_blob_size / (1024 * 1024 * 1024),
4349-
saved_gb = (db_size_before.saturating_sub(new_blob_size)) / (1024 * 1024 * 1024),
4370+
saved_gb = total_blob_size.saturating_sub(new_blob_size) / (1024 * 1024 * 1024),
43504371
"Storage compaction complete"
43514372
);
43524373

4353-
// Remove old DB
43544374
if let Err(e) = std::fs::remove_dir_all(&tmp_path) {
4355-
warn!(error = %e, "Failed to remove old distributed.db.old (can be deleted manually)");
4375+
warn!(error = %e, "Failed to remove old distributed.db.old");
43564376
}
43574377

4358-
// Write marker so we don't compact again
43594378
std::fs::write(
43604379
&marker,
43614380
format!(
4362-
"compacted: before={}GB after={}GB",
4363-
db_size_before / (1024 * 1024 * 1024),
4381+
"compacted_v2: pruned={} entries, before={}GB after={}GB",
4382+
pruned_count,
4383+
total_blob_size / (1024 * 1024 * 1024),
43644384
new_blob_size / (1024 * 1024 * 1024),
43654385
),
43664386
)?;
43674387

43684388
Ok(())
43694389
}
43704390

4391+
fn dir_size(path: &std::path::Path) -> u64 {
4392+
if !path.exists() {
4393+
return 0;
4394+
}
4395+
std::fs::read_dir(path)
4396+
.map(|entries| {
4397+
entries
4398+
.filter_map(|e| e.ok())
4399+
.filter_map(|e| e.metadata().ok())
4400+
.map(|m| m.len())
4401+
.sum()
4402+
})
4403+
.unwrap_or(0)
4404+
}
4405+
43714406
// Build trigger: 1771754356

crates/distributed-storage/src/local.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ pub struct LocalStorage {
9090
node_id: String,
9191
/// Cache for namespace entry counts
9292
namespace_counts: RwLock<HashMap<String, u64>>,
93+
/// Dirty flag for batched flushing
94+
dirty: std::sync::atomic::AtomicBool,
9395
}
9496

9597
impl LocalStorage {
@@ -107,6 +109,7 @@ impl LocalStorage {
107109
block_index_tree,
108110
node_id,
109111
namespace_counts: RwLock::new(HashMap::new()),
112+
dirty: std::sync::atomic::AtomicBool::new(false),
110113
};
111114

112115
// Initialize namespace counts
@@ -129,6 +132,7 @@ impl LocalStorage {
129132
block_index_tree,
130133
node_id,
131134
namespace_counts: RwLock::new(HashMap::new()),
135+
dirty: std::sync::atomic::AtomicBool::new(false),
132136
})
133137
}
134138

@@ -273,7 +277,7 @@ impl LocalStorage {
273277
if let Some(mut entry) = self.get_entry(key)? {
274278
entry.replication.mark_replicated(node_id);
275279
self.put_entry(key, &entry)?;
276-
self.flush_async().await?;
280+
self.mark_dirty();
277281
}
278282
Ok(())
279283
}
@@ -317,22 +321,39 @@ impl LocalStorage {
317321
Ok(self.get_entry(key)?.map(|e| e.replication))
318322
}
319323

324+
/// Mark storage as having unflushed writes
325+
fn mark_dirty(&self) {
326+
self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
327+
}
328+
320329
/// Flush all changes to disk synchronously (blocking)
321330
pub fn flush(&self) -> StorageResult<()> {
331+
self.dirty
332+
.store(false, std::sync::atomic::Ordering::Relaxed);
322333
self.db.flush()?;
323334
Ok(())
324335
}
325336

326337
/// Flush all changes to disk asynchronously using spawn_blocking
327338
pub async fn flush_async(&self) -> StorageResult<()> {
328339
let db = Arc::clone(&self.db);
340+
self.dirty
341+
.store(false, std::sync::atomic::Ordering::Relaxed);
329342
tokio::task::spawn_blocking(move || db.flush())
330343
.await
331344
.map_err(|e| StorageError::Database(format!("flush task panicked: {}", e)))?
332345
.map_err(StorageError::from)?;
333346
Ok(())
334347
}
335348

349+
/// Flush only if there are pending writes
350+
pub async fn flush_if_dirty(&self) -> StorageResult<()> {
351+
if self.dirty.load(std::sync::atomic::Ordering::Relaxed) {
352+
self.flush_async().await?;
353+
}
354+
Ok(())
355+
}
356+
336357
/// Get the underlying database (for advanced operations)
337358
pub fn db(&self) -> &Db {
338359
&self.db
@@ -637,7 +658,7 @@ impl DistributedStore for LocalStorage {
637658
};
638659

639660
self.put_entry(&key, &entry)?;
640-
self.flush_async().await?;
661+
self.mark_dirty();
641662

642663
debug!(
643664
"LocalStorage::put completed key={} version={}",
@@ -650,7 +671,7 @@ impl DistributedStore for LocalStorage {
650671
async fn delete(&self, key: &StorageKey) -> StorageResult<bool> {
651672
trace!("LocalStorage::delete key={}", key);
652673
let deleted = self.delete_entry(key)?;
653-
self.flush_async().await?;
674+
self.mark_dirty();
654675
Ok(deleted)
655676
}
656677

@@ -945,7 +966,7 @@ impl DistributedStore for LocalStorage {
945966
options: PutOptions,
946967
) -> StorageResult<ValueMetadata> {
947968
let metadata = self.put_entry_with_block_internal(&key, value, block_id, &options)?;
948-
self.flush_async().await?;
969+
self.mark_dirty();
949970
Ok(metadata)
950971
}
951972
}

0 commit comments

Comments
 (0)