From c8c9e94ab2c4b0893bf0f21b6ea3019647cc82ba Mon Sep 17 00:00:00 2001 From: John Ramsden Date: Wed, 7 Jan 2026 14:45:54 -0800 Subject: [PATCH 1/4] Redesign chunk LRU with a simpler design Rather than trying to keep so much state in sync, simplify the methodology to: * always invalidate based on the lru, completely including the map, now state is always consistent without keeping track of many data structures to see if lru entries are invalid and such * cleaning simply looks at priorities and migrates data accordingly This significantly simplifies the design. To keep the lru consistent we remove the lru entries within the eviction phase for the respective Zone We insert them back and we migrate data Additionally, to exclude corruption from currently writing zones, we remove candidate zones from the open zones list if they are resident --- .gitignore | 3 +- cortes.server.block.toml | 2 +- cortes.server.zns.toml | 2 +- oxcache/src/cache/mod.rs | 2 +- oxcache/src/device.rs | 160 +++++++++++++--- oxcache/src/eviction.rs | 175 ++++++++---------- oxcache/src/writerpool.rs | 14 +- oxcache/src/zone_state/zone_list.rs | 2 +- oxcache/src/zone_state/zone_priority_queue.rs | 55 +++++- 9 files changed, 274 insertions(+), 141 deletions(-) diff --git a/.gitignore b/.gitignore index 353e4aa..7f1f057 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ nvme-loop.json /eval/data/ /eval/plots/ user-data -/eval/tables/ \ No newline at end of file +/eval/tables/ +/logs-compressed diff --git a/cortes.server.block.toml b/cortes.server.block.toml index e1956ed..ac30c72 100644 --- a/cortes.server.block.toml +++ b/cortes.server.block.toml @@ -19,7 +19,7 @@ remote_artificial_delay_microsec = 40632 eviction_policy = "promotional" high_water_evict = 1 # Number remaining from end, evicts if reaches here low_water_evict = 3 # Evict until below mark -eviction_interval_ms = 1000 # Evict every 1s +eviction_interval_ms = 100 # Evict every 0.1s [metrics] ip_addr = "127.0.0.1" diff --git a/cortes.server.zns.toml b/cortes.server.zns.toml index 4391de0..32e1ec9 100644 --- a/cortes.server.zns.toml +++ b/cortes.server.zns.toml @@ -19,7 +19,7 @@ remote_artificial_delay_microsec = 40632 eviction_policy = "promotional" high_water_evict = 1 # Number remaining from end, evicts if reaches here low_water_evict = 3 # Evict until below mark -eviction_interval_ms = 1000 # Evict every 1s +eviction_interval_ms = 100 # Evict every 0.1s [metrics] ip_addr = "127.0.0.1" diff --git a/oxcache/src/cache/mod.rs b/oxcache/src/cache/mod.rs index df43cb5..6c30f95 100644 --- a/oxcache/src/cache/mod.rs +++ b/oxcache/src/cache/mod.rs @@ -37,7 +37,7 @@ pub struct Cache { num_shards: usize, // Shared reverse mapping (zone, chunk_index) -> key // This needs to be shared because we look up by location, not by key - zone_to_entry: RwLock>>, + pub zone_to_entry: RwLock>>, } impl Cache { diff --git a/oxcache/src/device.rs b/oxcache/src/device.rs index 88ae199..82d49b1 100644 --- a/oxcache/src/device.rs +++ b/oxcache/src/device.rs @@ -8,6 +8,7 @@ use crate::writerpool::{BatchWriteRequest, WriterPool}; use crate::zone_state::zone_list::{ZoneList, ZoneObtainFailure}; use aligned_vec::{AVec, RuntimeAlign}; use bytes::Bytes; +use ndarray::s; use flume::Sender; use nvme::info::{get_active_zones, get_lba_at, is_zoned_device, nvme_get_info, report_zones_all}; use nvme::ops::{close_zone, finish_zone, reset_zone, zns_append}; @@ -215,8 +216,8 @@ fn check_first_evict_bench() { } fn trigger_eviction(eviction_channel: Sender) -> io::Result<()> { - tracing::debug!( - "DEVICE: [Thread {:?}] Sending eviction trigger", + tracing::info!( + "[trigger_eviction] Thread {:?} triggering eviction and BLOCKING for response", std::thread::current().id() ); let (resp_tx, resp_rx) = flume::bounded(1); @@ -228,17 +229,21 @@ fn trigger_eviction(eviction_channel: Sender) -> io::Result<()> )); }; + tracing::info!( + "[trigger_eviction] Thread {:?} WAITING for eviction response...", + std::thread::current().id() + ); match resp_rx.recv() { Ok(result) => match result { Ok(_) => { - tracing::debug!( - "DEVICE: [Thread {:?}] Eviction completed successfully", + tracing::info!( + "[trigger_eviction] Thread {:?} eviction completed successfully, UNBLOCKING", std::thread::current().id() ); } Err(e) => { tracing::error!( - "DEVICE: [Thread {:?}] Eviction failed: {}", + "[trigger_eviction] Thread {:?} eviction FAILED: {}", std::thread::current().id(), e ); @@ -250,7 +255,7 @@ fn trigger_eviction(eviction_channel: Sender) -> io::Result<()> }, Err(e) => { tracing::error!( - "DEVICE: [Thread {:?}] Failed to receive eviction response: {}", + "[trigger_eviction] Thread {:?} failed to receive eviction response: {}", std::thread::current().id(), e ); @@ -275,29 +280,55 @@ impl Zoned { <= self.config.max_active_resources as usize ); + tracing::trace!("[get_free_zone] Thread {:?} requesting zone (is_eviction={}, free={}, open={}, writing={})", + std::thread::current().id(), is_eviction, zone_list.free_zones.len(), + zone_list.open_zones.len(), zone_list.writing_zones.len()); + match zone_list.remove_with_eviction_bypass(is_eviction) { - Ok(zone_idx) => Ok(zone_idx), + Ok(zone_idx) => { + tracing::trace!("[get_free_zone] Thread {:?} got zone {} (is_eviction={})", + std::thread::current().id(), zone_idx, is_eviction); + Ok(zone_idx) + }, Err(error) => match error { ZoneObtainFailure::EvictNow => { + tracing::warn!("[get_free_zone] Thread {:?} got EvictNow (is_eviction={})", + std::thread::current().id(), is_eviction); Err(io::Error::new(ErrorKind::StorageFull, "Cache is full")) } - ZoneObtainFailure::Wait => loop { - zone_list = wait_notify.wait(zone_list).unwrap(); - match zone_list.remove_with_eviction_bypass(is_eviction) { - Ok(idx) => return Ok(idx), - Err(err) => match err { - ZoneObtainFailure::EvictNow => { - return Err(io::Error::new(ErrorKind::Other, "Cache is full")); - } - ZoneObtainFailure::Wait => continue, - }, + ZoneObtainFailure::Wait => { + tracing::info!("[get_free_zone] Thread {:?} entering condvar wait (is_eviction={}, free={}, open={})", + std::thread::current().id(), is_eviction, zone_list.free_zones.len(), zone_list.open_zones.len()); + loop { + zone_list = wait_notify.wait(zone_list).unwrap(); + tracing::info!("[get_free_zone] Thread {:?} woke up from condvar (is_eviction={}, free={}, open={})", + std::thread::current().id(), is_eviction, zone_list.free_zones.len(), zone_list.open_zones.len()); + match zone_list.remove_with_eviction_bypass(is_eviction) { + Ok(idx) => { + tracing::info!("[get_free_zone] Thread {:?} got zone {} after wait", + std::thread::current().id(), idx); + return Ok(idx); + }, + Err(err) => match err { + ZoneObtainFailure::EvictNow => { + tracing::warn!("[get_free_zone] Thread {:?} got EvictNow after wait", + std::thread::current().id()); + return Err(io::Error::new(ErrorKind::Other, "Cache is full")); + } + ZoneObtainFailure::Wait => { + tracing::info!("[get_free_zone] Thread {:?} needs to wait again", + std::thread::current().id()); + continue; + }, + }, + } } }, }, } } - fn complete_write(&self, zone_idx: Zone, finish_zone: bool) -> io::Result<()> { + fn complete_write(&self, zone_idx: Zone, finish_zone: bool, suppress_notify: bool) -> io::Result<()> { let (mtx, notify) = &*self.zones; let mut zone_list = mtx.lock().unwrap(); // assert!(zone_list.get_open_zones() == active_zones, "{} vs {}", zone_list.get_open_zones(), active_zones); @@ -309,7 +340,15 @@ impl Zoned { zone_list.write_finish(zone_idx, self, finish_zone)?; // Tell other threads that we finished writing, so they can // come and try to open a new zone if needed. - notify.notify_all(); + // Skip notification during eviction batch writes to avoid race condition + if !suppress_notify { + tracing::info!("[complete_write] Zone {} finished, notifying ALL waiters (free={}, open={})", + zone_idx, zone_list.free_zones.len(), zone_list.open_zones.len()); + notify.notify_all(); + } else { + tracing::info!("[complete_write] Zone {} finished during eviction, SUPPRESSING notify (free={}, open={})", + zone_idx, zone_list.free_zones.len(), zone_list.open_zones.len()); + } Ok(()) } @@ -409,14 +448,15 @@ impl Zoned { chunk_ind } - fn chunked_append(&self, data: Bytes, zone_index: Zone) -> io::Result { - tracing::debug!("Chunk appending to zone {}", zone_index); + fn chunked_append(&self, data: Bytes, zone_index: Zone, suppress_notify: bool) -> io::Result { + tracing::trace!("[chunked_append] Thread {:?} appending to zone {} (suppress_notify={})", + std::thread::current().id(), zone_index, suppress_notify); let total_sz = data.len() as Byte; let write_sz = total_sz.min(self.max_write_size); - tracing::debug!( - "[Device]: Total size = {}, Write size = {}, max_write_size = {}", + tracing::trace!( + "[chunked_append]: Total size = {}, Write size = {}, max_write_size = {}", total_sz, write_sz, self.max_write_size @@ -424,6 +464,8 @@ impl Zoned { // Only locks if needed // this is AWFUL + tracing::trace!("[chunked_append] Thread {:?} acquiring zone_append_lock for zone {}", + std::thread::current().id(), zone_index); let _maybe_guard = if total_sz > self.max_write_size { ( None, @@ -435,6 +477,8 @@ impl Zoned { None, ) }; + tracing::trace!("[chunked_append] Thread {:?} acquired zone_append_lock for zone {}", + std::thread::current().id(), zone_index); // Sequentially write looped @@ -507,7 +551,7 @@ impl Zoned { // println!("Finished writing to zone {} - {:?} - finish_zone={:?}", zone_index, cl, finish_zone); - self.complete_write(zone_index, finish_zone)?; + self.complete_write(zone_index, finish_zone, suppress_notify)?; Ok(cl) } } @@ -552,7 +596,7 @@ impl Device for Zoned { ); let start = std::time::Instant::now(); - let res = self.chunked_append(data, zone_index); + let res = self.chunked_append(data, zone_index, is_eviction); METRICS.update_metric_histogram_latency( "disk_write_latency_ms", start.elapsed(), @@ -683,12 +727,37 @@ impl Device for Zoned { // Remove from map (invalidation) RUNTIME.block_on(cache.remove_entries(&chunk_locations))?; + // Lock zones to prevent new writes during cleaning + { + let (zone_mtx, _) = &*self.zones; + let mut zones = zone_mtx.lock().unwrap(); + for zone in &clean_locations { + // Remove from open_zones so no new chunks can be allocated from this zone + zones.open_zones.retain(|z| z != zone); + } + tracing::info!("[Evict:Chunk] Locked {} zones for cleaning", clean_locations.len()); + } + // Cleaning let self_clone = self.clone(); for zone in clean_locations.iter() { + // Collect old chunk locations in this zone BEFORE cleaning starts + let old_chunks = RUNTIME.block_on(async { + let zone_mapping = cache.zone_to_entry.read().await; + let zone_slice = s![*zone as usize, ..]; + zone_mapping.slice(zone_slice) + .iter() + .enumerate() + .filter_map(|(chunk_idx, opt_key)| { + opt_key.as_ref().map(|_| ChunkLocation::new(*zone, chunk_idx as Chunk)) + }) + .collect::>() + }); + let cache_clone = cache.clone(); let self_clone = self_clone.clone(); let writer_pool = writer_pool.clone(); + let eviction_policy_clone = eviction_policy.clone(); RUNTIME.block_on(cache_clone.clean_zone_and_update_map( zone.clone(), @@ -716,20 +785,41 @@ impl Device for Zoned { { let writer_pool = writer_pool.clone(); let self_clone = self_clone; + let eviction_policy = eviction_policy_clone; + let old_locations = old_chunks; // Capture old locations // Writer callback |payloads: Vec<(CacheKey, bytes::Bytes)>| { async move { + tracing::info!( + "[evict:Chunk] Starting zone {} cleaning, {} valid chunks to relocate", + zone, + payloads.len() + ); + + // Remove old LRU entries FIRST, before reset_zone makes the zone available again + // This prevents relocated chunks from being accidentaly added then removed { + let mut policy = eviction_policy.lock().unwrap(); + if let EvictionPolicyWrapper::Chunk(c) = &mut *policy { + c.chunks_relocated(&old_locations); + } + } + + let cv = { // Return zones back to the zone list and reset the zone let _guard = self_clone.zone_append_lock[*zone as usize] .write() .unwrap(); let (zone_mtx, cv) = &*self_clone.zones; let mut zones = zone_mtx.lock().unwrap(); + tracing::info!("[evict:Chunk] Resetting zone {} (free={}, open={} before reset)", + zone, zones.free_zones.len(), zones.open_zones.len()); zones.reset_zone(*zone, &*self_clone)?; - cv.notify_all(); - } // Drop the mutex, so we don't have to put it in an await + tracing::info!("[evict:Chunk] Zone {} reset complete (free={}, open={} after reset)", + zone, zones.free_zones.len(), zones.open_zones.len()); + cv + }; // Drop the mutex, so we don't have to put it in an await // Use prioritized batch write for eviction let keys: Vec<_> = @@ -747,6 +837,7 @@ impl Device for Zoned { responder: batch_tx, }; + tracing::info!("[evict:Chunk] Sending batch write for zone {} with {} chunks", zone, batch_request.data.len()); writer_pool.send_priority_batch(batch_request).await?; let batch_response = @@ -775,6 +866,15 @@ impl Device for Zoned { let write_results = write_results?; + tracing::info!( + "[evict:Chunk] Batch write completed for zone {}, {} chunks written, notifying ALL waiters", + zone, + write_results.len() + ); + + // Notify waiting writers AFTER batch write completes successfully + cv.notify_all(); + Ok(write_results) // Vec<(Chunk, ChunkLocation)> } } @@ -792,9 +892,13 @@ impl Device for Zoned { RUNTIME.block_on(cache.remove_zones(&zones_to_evict))?; - let (zone_mtx, _) = &*self.zones; + let (zone_mtx, cv) = &*self.zones; let mut zones = zone_mtx.lock().unwrap(); zones.reset_zones(&zones_to_evict, &*self)?; + drop(zones); + + // Notify waiting writers after zones are reset + cv.notify_all(); // Reset atomic counters for evicted zones let policy = eviction_policy.lock().unwrap(); diff --git a/oxcache/src/eviction.rs b/oxcache/src/eviction.rs index bb098eb..df0764a 100644 --- a/oxcache/src/eviction.rs +++ b/oxcache/src/eviction.rs @@ -4,7 +4,6 @@ use crate::writerpool::WriterPool; use crate::zone_state::zone_priority_queue::{ZoneIndex, ZonePriorityQueue}; use flume::{Receiver, Sender}; use lru_mem::{LruCache}; -use ndarray::Array2; use nvme::types::{Chunk, Zone}; use std::io::ErrorKind; use std::sync::{ @@ -86,7 +85,7 @@ impl EvictionPolicyWrapper { EvictionPolicyWrapper::Chunk(c) => { let et = c.get_evict_targets(always_evict); let ct = if get_clean_targets { - Some(c.get_clean_targets()) + Some(c.get_clean_targets(always_evict)) } else { None }; @@ -120,7 +119,7 @@ pub trait EvictionPolicy: Send + Sync { fn read_update(&mut self, chunk: ChunkLocation); fn get_evict_targets(&mut self, always_evict: bool) -> Self::Target; - fn get_clean_targets(&mut self) -> Self::CleanTarget; + fn get_clean_targets(&mut self, force_clean: bool) -> Self::CleanTarget; } #[derive(Debug)] @@ -219,7 +218,7 @@ impl EvictionPolicy for PromotionalEvictionPolicy { targets } - fn get_clean_targets(&mut self) -> Self::CleanTarget { + fn get_clean_targets(&mut self, _force_clean: bool) -> Self::CleanTarget { () } } @@ -232,7 +231,6 @@ pub struct ChunkEvictionPolicy { nr_chunks_per_zone: Chunk, lru: LruCache, pq: ZonePriorityQueue, - validity: Array2, #[cfg(feature = "eviction-metrics")] pub metrics: Option>, } @@ -252,12 +250,6 @@ impl ChunkEvictionPolicy { nr_chunks_per_zone ); - // Initialize all chunks as invalid (false) - become valid on first write - let validity = Array2::from_shape_fn( - (nr_zones as usize, nr_chunks_per_zone as usize), - |_| false - ); - Self { high_water, low_water, @@ -265,32 +257,11 @@ impl ChunkEvictionPolicy { nr_chunks_per_zone, lru: LruCache::new(usize::MAX), // Effectively unbounded pq: ZonePriorityQueue::new(nr_zones, clean_low_water), - validity, #[cfg(feature = "eviction-metrics")] metrics: None, } } - /// Check if chunk is currently valid (tracked by eviction policy) - fn is_valid(&self, loc: &ChunkLocation) -> bool { - // Bounds check - zones/chunks outside our range - assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); - self.validity[[loc.zone as usize, loc.index as usize]] - } - - /// Mark chunk as valid (actively tracked) - fn mark_valid(&mut self, loc: &ChunkLocation) { - // Bounds check - zones/chunks outside our range - assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); - self.validity[[loc.zone as usize, loc.index as usize]] = true; - } - - /// Mark chunk as invalid (evicted but not yet cleaned) - fn mark_invalid(&mut self, loc: &ChunkLocation) { - // Bounds check - ignore out of range chunks - assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); - self.validity[[loc.zone as usize, loc.index as usize]] = false - } } impl EvictionPolicy for ChunkEvictionPolicy { @@ -302,7 +273,6 @@ impl EvictionPolicy for ChunkEvictionPolicy { metrics.record_write(&chunk); } - self.mark_valid(&chunk); self.lru.insert(chunk, ()).ok(); } @@ -312,28 +282,9 @@ impl EvictionPolicy for ChunkEvictionPolicy { metrics.record_read(&chunk); } - // Check validity first - chunk can be in LRU but marked invalid - // (after get_clean_targets marks all chunks in cleaned zones as invalid) - if !self.is_valid(&chunk) { - // Chunk was marked for eviction/cleaning but is still being accessed - // Re-validate it: add/promote in LRU and decrement PQ count - let zone = chunk.zone; - self.mark_valid(&chunk); - self.lru.insert(chunk, ()).ok(); - - // Decrement priority queue since this chunk is no longer invalid - self.pq.modify_priority(zone, -1); - - #[cfg(feature = "eviction-metrics")] - if let Some(ref metrics) = self.metrics { - // Optional: Track re-validation events - // metrics.record_chunk_revalidation(&chunk); - } - } else if self.lru.contains(&chunk) { - // Already tracked and valid - just promote it + if self.lru.contains(&chunk) { self.lru.insert(chunk, ()).ok(); } - // If chunk is not in LRU and is valid, it was already cleaned - do nothing } fn get_evict_targets(&mut self, always_evict: bool) -> Self::Target { @@ -341,55 +292,51 @@ impl EvictionPolicy for ChunkEvictionPolicy { let nr_chunks = self.nr_zones * self.nr_chunks_per_zone; let high_water_mark = nr_chunks - self.high_water; + tracing::info!( + "[ChunkPolicy] get_evict_targets: lru_len={}, high_water_mark={}, always_evict={}", + lru_len, high_water_mark, always_evict + ); + if !always_evict && lru_len < high_water_mark { + tracing::info!("[ChunkPolicy] Below high water mark, no eviction needed"); return vec![]; } let low_water_mark = nr_chunks - self.low_water; - // Prevent underflow when lru_len < low_water_mark if lru_len < low_water_mark { + tracing::warn!("[ChunkPolicy] LRU too small ({} < {}), cannot evict!", lru_len, low_water_mark); return vec![]; } - let cap = lru_len - low_water_mark; + let count_to_evict = lru_len - low_water_mark; + tracing::info!("[ChunkPolicy] Will evict {} chunks", count_to_evict); - let mut targets = Vec::with_capacity(cap as usize); + let mut targets = Vec::with_capacity(count_to_evict as usize); let mut zone_counts = std::collections::HashMap::new(); // Collect evicted items and count by zone (batch the counting) - // Continue until we have 'cap' VALID chunks - let mut collected = 0; - while collected < cap { + for _ in 0..count_to_evict { if let Some((targ, _)) = self.lru.remove_lru() { - // Check if already invalid (lazily skip stale entries) - if !self.is_valid(&targ) { - // Already marked invalid in previous eviction round but not yet cleaned - // Skip it and continue removing more to reach our target count - continue; - } - let target_zone = targ.zone; - targets.push(targ.clone()); - - // Mark chunk as invalid - self.mark_invalid(&targ); + targets.push(targ); // Batch count instead of individual priority queue updates *zone_counts.entry(target_zone).or_insert(0) += 1; - - collected += 1; - } else { - // LRU is empty, can't collect more - break; } } // Batch update priority queue (far fewer operations) for (zone, count) in zone_counts { - self.pq.modify_priority(zone, count as i64); + self.pq.modify_priority(zone, count); } + tracing::info!( + "[ChunkPolicy] Evicted {} chunks, LRU now has {} entries", + targets.len(), + self.lru.len() + ); + #[cfg(feature = "eviction-metrics")] if let Some(ref metrics) = self.metrics { metrics.record_chunk_evictions(&targets); @@ -398,29 +345,41 @@ impl EvictionPolicy for ChunkEvictionPolicy { targets } - fn get_clean_targets(&mut self) -> Self::CleanTarget { - let mut clean_targets = self.pq.remove_if_thresh_met(); + fn get_clean_targets(&mut self, _force_clean: bool) -> Self::CleanTarget { + // Always use threshold-based cleaning (simpler, no force mode needed) + let clean_targets = self.pq.remove_if_thresh_met(false); + if clean_targets.is_empty() { + tracing::info!("[ChunkPolicy] No zones meet cleaning threshold"); return clean_targets; } - clean_targets.sort_unstable(); - - // Mark ALL chunks in cleaned zones as invalid - // This prevents chunks from being re-added to LRU during zone cleaning - // and ensures consistency when zone is being relocated - // This must be done because the actual locations can change - for zone in &clean_targets { - for chunk_idx in 0..self.nr_chunks_per_zone { - let loc = ChunkLocation::new(*zone, chunk_idx); - self.mark_invalid(&loc); - } - } + tracing::info!( + "[ChunkPolicy] Will clean {} zones based on priority queue", + clean_targets.len() + ); clean_targets } } +impl ChunkEvictionPolicy { + /// Remove old LRU entries after chunks are relocated during zone cleaning. + /// Note: New entries are automatically added by writer pool calling write_update() + pub fn chunks_relocated(&mut self, old_locations: &[ChunkLocation]) { + for old_loc in old_locations { + // Remove stale entry from LRU + // The relocated chunk's new location is already in LRU via write_update() + self.lru.remove(old_loc); + } + + tracing::debug!( + "[ChunkPolicy] Removed {} old LRU entries after relocation", + old_locations.len() + ); + } +} + pub struct Evictor { shutdown: Arc, handle: Option>, @@ -455,19 +414,20 @@ impl Evictor { while !shutdown_clone.load(Ordering::Relaxed) { let (sender, always_evict) = match evict_rx.recv_timeout(evict_interval) { Ok(s) => { - tracing::debug!("Received immediate eviction request"); + tracing::info!("[Evictor] Received immediate eviction request"); (Some(s.sender), true) } Err(flume::RecvTimeoutError::Timeout) => { - tracing::debug!("Timer eviction"); + tracing::debug!("[Evictor] Timer eviction (periodic check)"); (None, false) } Err(flume::RecvTimeoutError::Disconnected) => { - tracing::debug!("Disconnected"); + tracing::info!("[Evictor] Channel disconnected, shutting down"); break; } }; + tracing::info!("[Evictor] Starting eviction (always_evict={})", always_evict); let device_clone = device_clone.clone(); let eviction_start = std::time::Instant::now(); let result = match device_clone.evict( @@ -476,11 +436,17 @@ impl Evictor { eviction_policy_clone.clone(), always_evict, ) { - Err(e) => Err(e.to_string()), - Ok(_) => Ok(()), + Err(e) => { + tracing::error!("[Evictor] Eviction FAILED: {}", e); + Err(e.to_string()) + }, + Ok(_) => { + tracing::info!("[Evictor] Eviction completed successfully"); + Ok(()) + }, }; let eviction_duration = eviction_start.elapsed(); - tracing::debug!("[Eviction] Total eviction took {:?}", eviction_duration); + tracing::info!("[Evictor] Total eviction took {:?}", eviction_duration); if let Some(sender) = sender { tracing::debug!("Sending eviction response to sender: {:?}", result); @@ -718,19 +684,26 @@ mod tests { #[test] fn check_chunk_priority_queue() { - // 4 zones, 2 chunks per zone. Should evict at 3 inserted + // 4 zones, 2 chunks per zone = 8 total chunks + // high_water=3, low_water=6, clean_low_water=1 let mut policy = ChunkEvictionPolicy::new(3, 6, 1, 4, 2); + // Fill 3 zones (6 chunks) for z in 0..3 { for i in 0..2 { policy.write_update(ChunkLocation::new(z, i)); } } - let got = policy.get_evict_targets(false).len(); - assert_eq!(4, got, "Expected 4, but got {}", got); + // Should evict: lru_len=6, need to reach low_water_mark=8-6=2 + // So evict 6-2=4 chunks + let evicted = policy.get_evict_targets(false); + assert_eq!(4, evicted.len(), "Should evict 4 chunks"); - let got = policy.get_clean_targets().len(); - assert_eq!(2, got, "Expected 2, but got {}", got); + // After eviction, 4 chunks invalidated across zones + // Zones should have 2, 1, 1 invalid chunks (or similar distribution) + // clean_low_water=1, so zones with >=1 invalid should be cleaned + let to_clean = policy.get_clean_targets(false); + assert!(to_clean.len() >= 2, "Should clean at least 2 zones with invalids"); } } diff --git a/oxcache/src/writerpool.rs b/oxcache/src/writerpool.rs index 9b6f678..10c5b15 100644 --- a/oxcache/src/writerpool.rs +++ b/oxcache/src/writerpool.rs @@ -116,6 +116,7 @@ impl Writer { } fn receive_priority(&self) { while let Ok(batch_msg) = self.priority_receiver.recv() { + tracing::info!("[WriterPool] Priority-only writer {} received batch request", self.id); self.process_batch_request(batch_msg); } } @@ -183,12 +184,20 @@ impl Writer { fn process_batch_request(&self, batch_req: BatchWriteRequest) { let data_len = batch_req.data.len(); // Store length before moving + tracing::info!("[WriterPool] Priority writer {} starting batch of {} chunks", self.id, data_len); let mut locations = Vec::with_capacity(data_len); - for data in batch_req.data.into_iter() { + for (i, data) in batch_req.data.into_iter().enumerate() { + tracing::info!("[WriterPool] Priority writer {} processing chunk {}/{}", self.id, i+1, data_len); // Use eviction bypass for priority batch requests (eviction writes) let result = self.device.append_with_eviction_bypass(data, true); + if let Err(ref e) = result { + tracing::error!("[WriterPool] Priority writer {} chunk {}/{} FAILED: {}", self.id, i+1, data_len, e); + } else { + tracing::info!("[WriterPool] Priority writer {} chunk {}/{} succeeded", self.id, i+1, data_len); + } + if let Ok(ref loc) = result { let mtx = Arc::clone(&self.eviction); let policy = mtx.lock().unwrap(); @@ -226,9 +235,12 @@ impl Writer { } let resp = BatchWriteResponse { locations }; + tracing::info!("[WriterPool] Priority writer {} completed batch, sending response", self.id); let snd = batch_req.responder.send(resp); if snd.is_err() { tracing::error!("Failed to send batch response from writer"); + } else { + tracing::info!("[WriterPool] Priority writer {} batch response sent successfully", self.id); } } } diff --git a/oxcache/src/zone_state/zone_list.rs b/oxcache/src/zone_state/zone_list.rs index 8620804..956e32e 100644 --- a/oxcache/src/zone_state/zone_list.rs +++ b/oxcache/src/zone_state/zone_list.rs @@ -279,7 +279,7 @@ impl ZoneList { if is_eviction && self.is_full() { // Even eviction can't proceed if completely full - tracing::debug!("Completely full, even eviction cannot proceed"); + tracing::warn!("Completely full, even eviction cannot proceed"); return Err(EvictNow); } diff --git a/oxcache/src/zone_state/zone_priority_queue.rs b/oxcache/src/zone_state/zone_priority_queue.rs index d2841b4..3e8fbce 100644 --- a/oxcache/src/zone_state/zone_priority_queue.rs +++ b/oxcache/src/zone_state/zone_priority_queue.rs @@ -36,15 +36,58 @@ impl ZonePriorityQueue { } // If above high watermark, clean until strictly below low watermark - pub fn remove_if_thresh_met(&mut self) -> Vec { + // If force_all=true, clean ALL zones with invalid chunks (desperate/always_evict mode) + pub fn remove_if_thresh_met(&mut self, force_all: bool) -> Vec { let mut zones = Vec::new(); - tracing::trace!( - "[evict:Chunk] Cleaning zones, invalid={}", - self.invalid_count + + // Count how many zones have invalid chunks + let zones_with_invalids = self.invalid_queue.iter().filter(|(_, p)| **p > 0).count(); + + tracing::info!( + "[PriorityQueue] remove_if_thresh_met called: invalid_count={}, low_water_thresh={}, force_all={}, zones_with_invalids={}, will_clean={}", + self.invalid_count, + self.low_water_thresh, + force_all, + zones_with_invalids, + force_all || self.invalid_count >= self.low_water_thresh ); - while self.invalid_count >= self.low_water_thresh { - zones.push(self.pop_reset()); + + if force_all && zones_with_invalids > 0 { + // Desperate mode: clean ALL zones with invalid chunks + tracing::warn!( + "[PriorityQueue] ⚠️ FORCE CLEANING all {} zones with invalid chunks (always_evict=true)!", + zones_with_invalids + ); + while let Some((zone, prio)) = self.invalid_queue.peek() { + if *prio == 0 { + break; // No more zones with invalid chunks + } + tracing::info!( + "[PriorityQueue] Force cleaning zone {} with {} invalid chunks", + zone, prio + ); + zones.push(self.pop_reset()); + } + } else { + // Normal mode: clean until below threshold + while self.invalid_count >= self.low_water_thresh { + let (zone, prio) = self.invalid_queue.peek().unwrap(); + tracing::info!( + "[PriorityQueue] Cleaning zone {} with {} invalid chunks (invalid_count={} >= thresh={})", + zone, prio, self.invalid_count, self.low_water_thresh + ); + zones.push(self.pop_reset()); + } } + + let zones_with_invalids_after = self.invalid_queue.iter().filter(|(_, p)| **p > 0).count(); + tracing::info!( + "[PriorityQueue] After cleaning: returning {} zones, invalid_count={}, {} zones still have invalid chunks", + zones.len(), + self.invalid_count, + zones_with_invalids_after + ); + zones } From ee1d43bc0fb8b83102db2248025b023d2dd2c7b4 Mon Sep 17 00:00:00 2001 From: John Ramsden Date: Wed, 7 Jan 2026 21:41:56 -0800 Subject: [PATCH 2/4] Remove excess logging --- oxcache/src/device.rs | 51 +------------------ oxcache/src/eviction.rs | 27 +--------- oxcache/src/writerpool.rs | 12 ----- oxcache/src/zone_state/zone_priority_queue.rs | 51 ++----------------- 4 files changed, 6 insertions(+), 135 deletions(-) diff --git a/oxcache/src/device.rs b/oxcache/src/device.rs index 82d49b1..f53f821 100644 --- a/oxcache/src/device.rs +++ b/oxcache/src/device.rs @@ -216,10 +216,6 @@ fn check_first_evict_bench() { } fn trigger_eviction(eviction_channel: Sender) -> io::Result<()> { - tracing::info!( - "[trigger_eviction] Thread {:?} triggering eviction and BLOCKING for response", - std::thread::current().id() - ); let (resp_tx, resp_rx) = flume::bounded(1); if let Err(e) = eviction_channel.send(EvictorMessage { sender: resp_tx }) { tracing::error!("[append] Failed to send eviction message: {}", e); @@ -229,18 +225,9 @@ fn trigger_eviction(eviction_channel: Sender) -> io::Result<()> )); }; - tracing::info!( - "[trigger_eviction] Thread {:?} WAITING for eviction response...", - std::thread::current().id() - ); match resp_rx.recv() { Ok(result) => match result { - Ok(_) => { - tracing::info!( - "[trigger_eviction] Thread {:?} eviction completed successfully, UNBLOCKING", - std::thread::current().id() - ); - } + Ok(_) => { } Err(e) => { tracing::error!( "[trigger_eviction] Thread {:?} eviction FAILED: {}", @@ -292,21 +279,13 @@ impl Zoned { }, Err(error) => match error { ZoneObtainFailure::EvictNow => { - tracing::warn!("[get_free_zone] Thread {:?} got EvictNow (is_eviction={})", - std::thread::current().id(), is_eviction); Err(io::Error::new(ErrorKind::StorageFull, "Cache is full")) } ZoneObtainFailure::Wait => { - tracing::info!("[get_free_zone] Thread {:?} entering condvar wait (is_eviction={}, free={}, open={})", - std::thread::current().id(), is_eviction, zone_list.free_zones.len(), zone_list.open_zones.len()); loop { zone_list = wait_notify.wait(zone_list).unwrap(); - tracing::info!("[get_free_zone] Thread {:?} woke up from condvar (is_eviction={}, free={}, open={})", - std::thread::current().id(), is_eviction, zone_list.free_zones.len(), zone_list.open_zones.len()); match zone_list.remove_with_eviction_bypass(is_eviction) { Ok(idx) => { - tracing::info!("[get_free_zone] Thread {:?} got zone {} after wait", - std::thread::current().id(), idx); return Ok(idx); }, Err(err) => match err { @@ -316,8 +295,6 @@ impl Zoned { return Err(io::Error::new(ErrorKind::Other, "Cache is full")); } ZoneObtainFailure::Wait => { - tracing::info!("[get_free_zone] Thread {:?} needs to wait again", - std::thread::current().id()); continue; }, }, @@ -342,12 +319,7 @@ impl Zoned { // come and try to open a new zone if needed. // Skip notification during eviction batch writes to avoid race condition if !suppress_notify { - tracing::info!("[complete_write] Zone {} finished, notifying ALL waiters (free={}, open={})", - zone_idx, zone_list.free_zones.len(), zone_list.open_zones.len()); notify.notify_all(); - } else { - tracing::info!("[complete_write] Zone {} finished during eviction, SUPPRESSING notify (free={}, open={})", - zone_idx, zone_list.free_zones.len(), zone_list.open_zones.len()); } Ok(()) @@ -735,7 +707,6 @@ impl Device for Zoned { // Remove from open_zones so no new chunks can be allocated from this zone zones.open_zones.retain(|z| z != zone); } - tracing::info!("[Evict:Chunk] Locked {} zones for cleaning", clean_locations.len()); } // Cleaning @@ -791,12 +762,6 @@ impl Device for Zoned { // Writer callback |payloads: Vec<(CacheKey, bytes::Bytes)>| { async move { - tracing::info!( - "[evict:Chunk] Starting zone {} cleaning, {} valid chunks to relocate", - zone, - payloads.len() - ); - // Remove old LRU entries FIRST, before reset_zone makes the zone available again // This prevents relocated chunks from being accidentaly added then removed { @@ -813,11 +778,7 @@ impl Device for Zoned { .unwrap(); let (zone_mtx, cv) = &*self_clone.zones; let mut zones = zone_mtx.lock().unwrap(); - tracing::info!("[evict:Chunk] Resetting zone {} (free={}, open={} before reset)", - zone, zones.free_zones.len(), zones.open_zones.len()); zones.reset_zone(*zone, &*self_clone)?; - tracing::info!("[evict:Chunk] Zone {} reset complete (free={}, open={} after reset)", - zone, zones.free_zones.len(), zones.open_zones.len()); cv }; // Drop the mutex, so we don't have to put it in an await @@ -827,9 +788,6 @@ impl Device for Zoned { let data_vec: Vec<_> = payloads.iter().map(|(_, data)| data.clone()).collect(); - // Used to verify no RACE, TODO: Remove! - // tokio::time::sleep(Duration::from_secs(5)).await; - let (batch_tx, batch_rx) = flume::bounded(1); let batch_request = BatchWriteRequest { @@ -837,7 +795,6 @@ impl Device for Zoned { responder: batch_tx, }; - tracing::info!("[evict:Chunk] Sending batch write for zone {} with {} chunks", zone, batch_request.data.len()); writer_pool.send_priority_batch(batch_request).await?; let batch_response = @@ -866,12 +823,6 @@ impl Device for Zoned { let write_results = write_results?; - tracing::info!( - "[evict:Chunk] Batch write completed for zone {}, {} chunks written, notifying ALL waiters", - zone, - write_results.len() - ); - // Notify waiting writers AFTER batch write completes successfully cv.notify_all(); diff --git a/oxcache/src/eviction.rs b/oxcache/src/eviction.rs index df0764a..8486818 100644 --- a/oxcache/src/eviction.rs +++ b/oxcache/src/eviction.rs @@ -298,7 +298,6 @@ impl EvictionPolicy for ChunkEvictionPolicy { ); if !always_evict && lru_len < high_water_mark { - tracing::info!("[ChunkPolicy] Below high water mark, no eviction needed"); return vec![]; } @@ -310,7 +309,6 @@ impl EvictionPolicy for ChunkEvictionPolicy { } let count_to_evict = lru_len - low_water_mark; - tracing::info!("[ChunkPolicy] Will evict {} chunks", count_to_evict); let mut targets = Vec::with_capacity(count_to_evict as usize); let mut zone_counts = std::collections::HashMap::new(); @@ -331,12 +329,6 @@ impl EvictionPolicy for ChunkEvictionPolicy { self.pq.modify_priority(zone, count); } - tracing::info!( - "[ChunkPolicy] Evicted {} chunks, LRU now has {} entries", - targets.len(), - self.lru.len() - ); - #[cfg(feature = "eviction-metrics")] if let Some(ref metrics) = self.metrics { metrics.record_chunk_evictions(&targets); @@ -346,37 +338,24 @@ impl EvictionPolicy for ChunkEvictionPolicy { } fn get_clean_targets(&mut self, _force_clean: bool) -> Self::CleanTarget { - // Always use threshold-based cleaning (simpler, no force mode needed) - let clean_targets = self.pq.remove_if_thresh_met(false); + let clean_targets = self.pq.remove_if_thresh_met(); if clean_targets.is_empty() { - tracing::info!("[ChunkPolicy] No zones meet cleaning threshold"); return clean_targets; } - tracing::info!( - "[ChunkPolicy] Will clean {} zones based on priority queue", - clean_targets.len() - ); - clean_targets } } impl ChunkEvictionPolicy { /// Remove old LRU entries after chunks are relocated during zone cleaning. - /// Note: New entries are automatically added by writer pool calling write_update() pub fn chunks_relocated(&mut self, old_locations: &[ChunkLocation]) { for old_loc in old_locations { // Remove stale entry from LRU // The relocated chunk's new location is already in LRU via write_update() self.lru.remove(old_loc); } - - tracing::debug!( - "[ChunkPolicy] Removed {} old LRU entries after relocation", - old_locations.len() - ); } } @@ -427,9 +406,7 @@ impl Evictor { } }; - tracing::info!("[Evictor] Starting eviction (always_evict={})", always_evict); let device_clone = device_clone.clone(); - let eviction_start = std::time::Instant::now(); let result = match device_clone.evict( cache_clone.clone(), writer_pool.clone(), @@ -445,8 +422,6 @@ impl Evictor { Ok(()) }, }; - let eviction_duration = eviction_start.elapsed(); - tracing::info!("[Evictor] Total eviction took {:?}", eviction_duration); if let Some(sender) = sender { tracing::debug!("Sending eviction response to sender: {:?}", result); diff --git a/oxcache/src/writerpool.rs b/oxcache/src/writerpool.rs index 10c5b15..07ed686 100644 --- a/oxcache/src/writerpool.rs +++ b/oxcache/src/writerpool.rs @@ -116,7 +116,6 @@ impl Writer { } fn receive_priority(&self) { while let Ok(batch_msg) = self.priority_receiver.recv() { - tracing::info!("[WriterPool] Priority-only writer {} received batch request", self.id); self.process_batch_request(batch_msg); } } @@ -184,20 +183,12 @@ impl Writer { fn process_batch_request(&self, batch_req: BatchWriteRequest) { let data_len = batch_req.data.len(); // Store length before moving - tracing::info!("[WriterPool] Priority writer {} starting batch of {} chunks", self.id, data_len); let mut locations = Vec::with_capacity(data_len); for (i, data) in batch_req.data.into_iter().enumerate() { - tracing::info!("[WriterPool] Priority writer {} processing chunk {}/{}", self.id, i+1, data_len); // Use eviction bypass for priority batch requests (eviction writes) let result = self.device.append_with_eviction_bypass(data, true); - if let Err(ref e) = result { - tracing::error!("[WriterPool] Priority writer {} chunk {}/{} FAILED: {}", self.id, i+1, data_len, e); - } else { - tracing::info!("[WriterPool] Priority writer {} chunk {}/{} succeeded", self.id, i+1, data_len); - } - if let Ok(ref loc) = result { let mtx = Arc::clone(&self.eviction); let policy = mtx.lock().unwrap(); @@ -235,12 +226,9 @@ impl Writer { } let resp = BatchWriteResponse { locations }; - tracing::info!("[WriterPool] Priority writer {} completed batch, sending response", self.id); let snd = batch_req.responder.send(resp); if snd.is_err() { tracing::error!("Failed to send batch response from writer"); - } else { - tracing::info!("[WriterPool] Priority writer {} batch response sent successfully", self.id); } } } diff --git a/oxcache/src/zone_state/zone_priority_queue.rs b/oxcache/src/zone_state/zone_priority_queue.rs index 3e8fbce..2237533 100644 --- a/oxcache/src/zone_state/zone_priority_queue.rs +++ b/oxcache/src/zone_state/zone_priority_queue.rs @@ -37,57 +37,14 @@ impl ZonePriorityQueue { // If above high watermark, clean until strictly below low watermark // If force_all=true, clean ALL zones with invalid chunks (desperate/always_evict mode) - pub fn remove_if_thresh_met(&mut self, force_all: bool) -> Vec { + pub fn remove_if_thresh_met(&mut self) -> Vec { let mut zones = Vec::new(); - // Count how many zones have invalid chunks - let zones_with_invalids = self.invalid_queue.iter().filter(|(_, p)| **p > 0).count(); - - tracing::info!( - "[PriorityQueue] remove_if_thresh_met called: invalid_count={}, low_water_thresh={}, force_all={}, zones_with_invalids={}, will_clean={}", - self.invalid_count, - self.low_water_thresh, - force_all, - zones_with_invalids, - force_all || self.invalid_count >= self.low_water_thresh - ); - - if force_all && zones_with_invalids > 0 { - // Desperate mode: clean ALL zones with invalid chunks - tracing::warn!( - "[PriorityQueue] ⚠️ FORCE CLEANING all {} zones with invalid chunks (always_evict=true)!", - zones_with_invalids - ); - while let Some((zone, prio)) = self.invalid_queue.peek() { - if *prio == 0 { - break; // No more zones with invalid chunks - } - tracing::info!( - "[PriorityQueue] Force cleaning zone {} with {} invalid chunks", - zone, prio - ); - zones.push(self.pop_reset()); - } - } else { - // Normal mode: clean until below threshold - while self.invalid_count >= self.low_water_thresh { - let (zone, prio) = self.invalid_queue.peek().unwrap(); - tracing::info!( - "[PriorityQueue] Cleaning zone {} with {} invalid chunks (invalid_count={} >= thresh={})", - zone, prio, self.invalid_count, self.low_water_thresh - ); - zones.push(self.pop_reset()); - } + // Clean until below threshold + while self.invalid_count >= self.low_water_thresh { + zones.push(self.pop_reset()); } - let zones_with_invalids_after = self.invalid_queue.iter().filter(|(_, p)| **p > 0).count(); - tracing::info!( - "[PriorityQueue] After cleaning: returning {} zones, invalid_count={}, {} zones still have invalid chunks", - zones.len(), - self.invalid_count, - zones_with_invalids_after - ); - zones } From fbeac998cf961b3e8f2e646cd88bbdcca8e3ea2a Mon Sep 17 00:00:00 2001 From: John Ramsden Date: Wed, 7 Jan 2026 21:43:56 -0800 Subject: [PATCH 3/4] Fix warning --- oxcache/src/writerpool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oxcache/src/writerpool.rs b/oxcache/src/writerpool.rs index 07ed686..9d81b2e 100644 --- a/oxcache/src/writerpool.rs +++ b/oxcache/src/writerpool.rs @@ -185,7 +185,7 @@ impl Writer { let data_len = batch_req.data.len(); // Store length before moving let mut locations = Vec::with_capacity(data_len); - for (i, data) in batch_req.data.into_iter().enumerate() { + for (_i, data) in batch_req.data.into_iter().enumerate() { // Use eviction bypass for priority batch requests (eviction writes) let result = self.device.append_with_eviction_bypass(data, true); From ebfe8fa6e070b1b5f534e553b80bf3a95fecf837 Mon Sep 17 00:00:00 2001 From: John Ramsden Date: Wed, 7 Jan 2026 21:49:32 -0800 Subject: [PATCH 4/4] Remove unused force clean --- oxcache/src/eviction.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/oxcache/src/eviction.rs b/oxcache/src/eviction.rs index 8486818..c642a4f 100644 --- a/oxcache/src/eviction.rs +++ b/oxcache/src/eviction.rs @@ -85,7 +85,7 @@ impl EvictionPolicyWrapper { EvictionPolicyWrapper::Chunk(c) => { let et = c.get_evict_targets(always_evict); let ct = if get_clean_targets { - Some(c.get_clean_targets(always_evict)) + Some(c.get_clean_targets()) } else { None }; @@ -119,7 +119,7 @@ pub trait EvictionPolicy: Send + Sync { fn read_update(&mut self, chunk: ChunkLocation); fn get_evict_targets(&mut self, always_evict: bool) -> Self::Target; - fn get_clean_targets(&mut self, force_clean: bool) -> Self::CleanTarget; + fn get_clean_targets(&mut self) -> Self::CleanTarget; } #[derive(Debug)] @@ -218,7 +218,7 @@ impl EvictionPolicy for PromotionalEvictionPolicy { targets } - fn get_clean_targets(&mut self, _force_clean: bool) -> Self::CleanTarget { + fn get_clean_targets(&mut self) -> Self::CleanTarget { () } } @@ -337,7 +337,7 @@ impl EvictionPolicy for ChunkEvictionPolicy { targets } - fn get_clean_targets(&mut self, _force_clean: bool) -> Self::CleanTarget { + fn get_clean_targets(&mut self) -> Self::CleanTarget { let clean_targets = self.pq.remove_if_thresh_met(); if clean_targets.is_empty() { @@ -678,7 +678,7 @@ mod tests { // After eviction, 4 chunks invalidated across zones // Zones should have 2, 1, 1 invalid chunks (or similar distribution) // clean_low_water=1, so zones with >=1 invalid should be cleaned - let to_clean = policy.get_clean_targets(false); + let to_clean = policy.get_clean_targets(); assert!(to_clean.len() >= 2, "Should clean at least 2 zones with invalids"); } }