@@ -359,14 +359,13 @@ async fn main() -> Result<()> {
359359 std:: fs:: create_dir_all ( & args. data_dir ) ?;
360360 let data_dir = std:: fs:: canonicalize ( & args. data_dir ) ?;
361361
362+ // Compact distributed storage on startup if needed (one-time migration)
363+ let db_path = data_dir. join ( "distributed.db" ) ;
364+ compact_storage_if_needed ( & db_path) ?;
365+
362366 // Initialize distributed storage with compression and tracking
363367 let local_storage = LocalStorageBuilder :: new ( & validator_hotkey)
364- . path (
365- data_dir
366- . join ( "distributed.db" )
367- . to_string_lossy ( )
368- . to_string ( ) ,
369- )
368+ . path ( db_path. to_string_lossy ( ) . to_string ( ) )
370369 . build ( ) ?;
371370
372371 // Wrap with TrackedStorage for automatic compression, indexing, and audit
@@ -4261,4 +4260,112 @@ async fn fetch_remote_weights() -> anyhow::Result<Vec<(u8, Vec<u16>, Vec<u16>)>>
42614260 Ok ( result)
42624261}
42634262
4263+ /// Compact the sled distributed.db if blob files are wasting space.
4264+ ///
4265+ /// Sled accumulates dead data in blob files that are never reclaimed.
4266+ /// This does a full export→delete→reimport cycle to reclaim disk space.
4267+ /// A marker file `.compacted_v1` is written after success so it only runs once.
4268+ fn compact_storage_if_needed ( db_path : & std:: path:: Path ) -> anyhow:: Result < ( ) > {
4269+ let marker = db_path. parent ( ) . unwrap_or ( db_path) . join ( ".compacted_v1" ) ;
4270+ if marker. exists ( ) {
4271+ return Ok ( ( ) ) ;
4272+ }
4273+
4274+ if !db_path. exists ( ) {
4275+ return Ok ( ( ) ) ;
4276+ }
4277+
4278+ // Check total size of blob files
4279+ let blobs_dir = db_path. join ( "blobs" ) ;
4280+ let total_blob_size = if blobs_dir. exists ( ) {
4281+ std:: fs:: read_dir ( & blobs_dir)
4282+ . map ( |entries| {
4283+ entries
4284+ . filter_map ( |e| e. ok ( ) )
4285+ . filter_map ( |e| e. metadata ( ) . ok ( ) )
4286+ . map ( |m| m. len ( ) )
4287+ . sum :: < u64 > ( )
4288+ } )
4289+ . unwrap_or ( 0 )
4290+ } else {
4291+ 0
4292+ } ;
4293+
4294+ let threshold = 2 * 1024 * 1024 * 1024u64 ; // 2 GB
4295+ if total_blob_size < threshold {
4296+ info ! (
4297+ blob_size_mb = total_blob_size / ( 1024 * 1024 ) ,
4298+ "Storage compaction not needed"
4299+ ) ;
4300+ std:: fs:: write ( & marker, b"skipped" ) . ok ( ) ;
4301+ return Ok ( ( ) ) ;
4302+ }
4303+
4304+ warn ! (
4305+ blob_size_gb = total_blob_size / ( 1024 * 1024 * 1024 ) ,
4306+ "Large blob files detected, compacting distributed.db (this may take a few minutes)"
4307+ ) ;
4308+
4309+ // Phase 1: Open and export all data
4310+ let db = sled:: open ( db_path) ?;
4311+ let export = db. export ( ) ;
4312+ let db_size_before = total_blob_size;
4313+
4314+ let tree_count = export. len ( ) ;
4315+ info ! ( trees = tree_count, "Exported data from distributed.db" ) ;
4316+ drop ( db) ;
4317+
4318+ // Phase 2: Move old DB to temp, create fresh one
4319+ let tmp_path = db_path. with_extension ( "db.old" ) ;
4320+ if tmp_path. exists ( ) {
4321+ std:: fs:: remove_dir_all ( & tmp_path) ?;
4322+ }
4323+ std:: fs:: rename ( db_path, & tmp_path) ?;
4324+
4325+ // Phase 3: Import into fresh DB
4326+ let new_db = sled:: open ( db_path) ?;
4327+ new_db. import ( export) ;
4328+ new_db. flush ( ) ?;
4329+ drop ( new_db) ;
4330+
4331+ // Phase 4: Measure new size and cleanup
4332+ let new_blob_size = if db_path. join ( "blobs" ) . exists ( ) {
4333+ std:: fs:: read_dir ( db_path. join ( "blobs" ) )
4334+ . map ( |entries| {
4335+ entries
4336+ . filter_map ( |e| e. ok ( ) )
4337+ . filter_map ( |e| e. metadata ( ) . ok ( ) )
4338+ . map ( |m| m. len ( ) )
4339+ . sum :: < u64 > ( )
4340+ } )
4341+ . unwrap_or ( 0 )
4342+ } else {
4343+ 0
4344+ } ;
4345+
4346+ info ! (
4347+ before_gb = db_size_before / ( 1024 * 1024 * 1024 ) ,
4348+ after_gb = new_blob_size / ( 1024 * 1024 * 1024 ) ,
4349+ saved_gb = ( db_size_before. saturating_sub( new_blob_size) ) / ( 1024 * 1024 * 1024 ) ,
4350+ "Storage compaction complete"
4351+ ) ;
4352+
4353+ // Remove old DB
4354+ if let Err ( e) = std:: fs:: remove_dir_all ( & tmp_path) {
4355+ warn ! ( error = %e, "Failed to remove old distributed.db.old (can be deleted manually)" ) ;
4356+ }
4357+
4358+ // Write marker so we don't compact again
4359+ std:: fs:: write (
4360+ & marker,
4361+ format ! (
4362+ "compacted: before={}GB after={}GB" ,
4363+ db_size_before / ( 1024 * 1024 * 1024 ) ,
4364+ new_blob_size / ( 1024 * 1024 * 1024 ) ,
4365+ ) ,
4366+ ) ?;
4367+
4368+ Ok ( ( ) )
4369+ }
4370+
42644371// Build trigger: 1771754356
0 commit comments