From 92aaf0010d247859e1571c002d280dfd88a1cc8a Mon Sep 17 00:00:00 2001 From: John Ramsden Date: Fri, 2 Jan 2026 02:39:43 +0000 Subject: [PATCH 1/2] Fix desync between clean and evict There's a problem right now for chunk eviction. In get_evict_targets we remove from the lru and modify the priority queue accordingly. We then take a subset of what has been modified and clean it after retrieving the relevant zones in get_clean_targets. The problem is the priority queue can now be potentially adjusted by reads that occur, meaning the priorities can be in an inconsistent state since we do not evict them immediately. Need to track if chunk is valid, maintain a array of list of chunks representing all zones. Chunks should be initialized to invalid initially. When a first write occurs it should be set to valid. When it's removed from the lru it should be set to invalid and we update the priority queue accordingly. If we ever do a read, but it is to an invalid chunk, that means we need to update the priority queue. This also means we do not need to remove from the lru in get_clean_targets, saving us a lot of time. We simply remove the retain, and then in get_evict_targets, when we are removing from the lru we check if it is an invalid entry and if so we remove it and skip it. So basically our lru may end up holding invalid items but that's fine. --- oxcache/src/eviction.rs | 281 ++++++------------ oxcache/src/zone_state/zone_priority_queue.rs | 27 +- 2 files changed, 105 insertions(+), 203 deletions(-) diff --git a/oxcache/src/eviction.rs b/oxcache/src/eviction.rs index 772aed2..f991846 100644 --- a/oxcache/src/eviction.rs +++ b/oxcache/src/eviction.rs @@ -4,6 +4,7 @@ use crate::writerpool::WriterPool; use crate::zone_state::zone_priority_queue::{ZoneIndex, ZonePriorityQueue}; use flume::{Receiver, Sender}; use lru_mem::{LruCache}; +use ndarray::Array2; use nvme::types::{Chunk, Zone}; use std::io::ErrorKind; use std::sync::{ @@ -231,6 +232,7 @@ pub struct ChunkEvictionPolicy { nr_chunks_per_zone: Chunk, lru: LruCache, pq: ZonePriorityQueue, + validity: Array2, #[cfg(feature = "eviction-metrics")] pub metrics: Option>, } @@ -249,6 +251,13 @@ impl ChunkEvictionPolicy { high_water, nr_chunks_per_zone ); + + // Initialize all chunks as invalid (false) - become valid on first write + let validity = Array2::from_shape_fn( + (nr_zones as usize, nr_chunks_per_zone as usize), + |_| AtomicBool::new(false) + ); + Self { high_water, low_water, @@ -256,10 +265,35 @@ impl ChunkEvictionPolicy { nr_chunks_per_zone, lru: LruCache::new(usize::MAX), // Effectively unbounded pq: ZonePriorityQueue::new(nr_zones, clean_low_water), + validity, #[cfg(feature = "eviction-metrics")] metrics: None, } } + + /// Check if chunk is currently valid (tracked by eviction policy) + fn is_valid(&self, loc: &ChunkLocation) -> bool { + // Bounds check - zones/chunks outside our range + assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); + self.validity[[loc.zone as usize, loc.index as usize]] + .load(Ordering::Relaxed) + } + + /// Mark chunk as valid (actively tracked) + fn mark_valid(&self, loc: &ChunkLocation) { + // Bounds check - zones/chunks outside our range + assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); + self.validity[[loc.zone as usize, loc.index as usize]] + .store(true, Ordering::Relaxed); + } + + /// Mark chunk as invalid (evicted but not yet cleaned) + fn mark_invalid(&self, loc: &ChunkLocation) { + // Bounds check - ignore out of range chunks + assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); + self.validity[[loc.zone as usize, loc.index as usize]] + .store(false, Ordering::Relaxed); + } } impl EvictionPolicy for ChunkEvictionPolicy { @@ -271,6 +305,7 @@ impl EvictionPolicy for ChunkEvictionPolicy { metrics.record_write(&chunk); } + self.mark_valid(&chunk); self.lru.insert(chunk, ()).ok(); } @@ -280,9 +315,28 @@ impl EvictionPolicy for ChunkEvictionPolicy { metrics.record_read(&chunk); } - if self.lru.contains(&chunk) { + // Check validity first - chunk can be in LRU but marked invalid + // (after get_clean_targets marks all chunks in cleaned zones as invalid) + if !self.is_valid(&chunk) { + // Chunk was marked for eviction/cleaning but is still being accessed + // Re-validate it: add/promote in LRU and decrement PQ count + let zone = chunk.zone; + self.mark_valid(&chunk); + self.lru.insert(chunk, ()).ok(); + + // Decrement priority queue since this chunk is no longer invalid + self.pq.modify_priority(zone, -1); + + #[cfg(feature = "eviction-metrics")] + if let Some(ref metrics) = self.metrics { + // Optional: Track re-validation events + // metrics.record_chunk_revalidation(&chunk); + } + } else if self.lru.contains(&chunk) { + // Already tracked and valid - just promote it self.lru.insert(chunk, ()).ok(); } + // If chunk is not in LRU and is valid, it was already cleaned - do nothing } fn get_evict_targets(&mut self, always_evict: bool) -> Self::Target { @@ -307,19 +361,36 @@ impl EvictionPolicy for ChunkEvictionPolicy { let mut zone_counts = std::collections::HashMap::new(); // Collect evicted items and count by zone (batch the counting) - for _ in 0..cap { + // Continue until we have 'cap' VALID chunks + let mut collected = 0; + while collected < cap { if let Some((targ, _)) = self.lru.remove_lru() { + // Check if already invalid (lazily skip stale entries) + if !self.is_valid(&targ) { + // Already marked invalid in previous eviction round but not yet cleaned + // Skip it and continue removing more to reach our target count + continue; + } + let target_zone = targ.zone; - targets.push(targ); + targets.push(targ.clone()); + + // Mark chunk as invalid + self.mark_invalid(&targ); // Batch count instead of individual priority queue updates *zone_counts.entry(target_zone).or_insert(0) += 1; + + collected += 1; + } else { + // LRU is empty, can't collect more + break; } } // Batch update priority queue (far fewer operations) for (zone, count) in zone_counts { - self.pq.modify_priority(zone, count); + self.pq.modify_priority(zone, count as i64); } #[cfg(feature = "eviction-metrics")] @@ -338,13 +409,16 @@ impl EvictionPolicy for ChunkEvictionPolicy { clean_targets.sort_unstable(); - let zones_to_clean: std::collections::HashSet = - clean_targets.iter().copied().collect(); - - // Efficient selective removal - O(k) where k = items removed - // instead of O(n) where n = total LRU size - self.lru - .retain(|chunk_loc, _| !zones_to_clean.contains(&chunk_loc.zone)); + // Mark ALL chunks in cleaned zones as invalid + // This prevents chunks from being re-added to LRU during zone cleaning + // and ensures consistency when zone is being relocated + // This must be done because the actual locations can change + for zone in &clean_targets { + for chunk_idx in 0..self.nr_chunks_per_zone { + let loc = ChunkLocation::new(*zone, chunk_idx); + self.mark_invalid(&loc); + } + } clean_targets } @@ -662,189 +736,4 @@ mod tests { let got = policy.get_clean_targets().len(); assert_eq!(2, got, "Expected 2, but got {}", got); } - - #[test] - fn performance_test_large_lru_get_clean_targets() { - // Performance test with ~15.6M chunks in LRU - // 904 zones, 17232 chunks per zone = 15,581,728 total chunks - let nr_zones = 904; - let nr_chunks_per_zone = 17232; - let total_chunks = nr_zones * nr_chunks_per_zone; - - // Set clean_low_water to trigger cleaning when zones have 1+ evicted chunks - let clean_low_water = 1; - - // High/low water marks - trigger eviction when LRU approaches capacity - // Use more reasonable eviction ratios to avoid massive bulk evictions - let high_water = total_chunks - (total_chunks / 100); // 99% capacity - let low_water = total_chunks - (total_chunks / 50); // 98% capacity - - let mut policy = ChunkEvictionPolicy::new( - high_water, - low_water, - clean_low_water, - nr_zones, - nr_chunks_per_zone, - ); - - println!( - "Filling LRU with {} chunks across {} zones...", - total_chunks, nr_zones - ); - let start_fill = std::time::Instant::now(); - - // Fill the LRU with chunks from all zones - for zone in 0..nr_zones { - for chunk_idx in 0..nr_chunks_per_zone { - policy.write_update(ChunkLocation::new(zone, chunk_idx)); - } - } - - let fill_duration = start_fill.elapsed(); - println!("LRU fill took: {:?}", fill_duration); - println!("LRU size: {}", policy.lru.len()); - - // Trigger some evictions to populate the priority queue - // This will evict 500 chunks and mark zones for potential cleaning - println!("Triggering evictions to populate priority queue..."); - let evict_start = std::time::Instant::now(); - let evicted = policy.get_evict_targets(false); - let evict_duration = evict_start.elapsed(); - println!("Evicted {} chunks in {:?}", evicted.len(), evict_duration); - - // Now test get_clean_targets performance with different scenarios - - // Scenario 1: Small cleanup (few zones) - println!("\n=== Scenario 1: Small cleanup ==="); - let start_small = std::time::Instant::now(); - let clean_targets_small = policy.get_clean_targets(); - let small_duration = start_small.elapsed(); - println!( - "Small cleanup: {} zones cleaned in {:?}", - clean_targets_small.len(), - small_duration - ); - println!("LRU size after small cleanup: {}", policy.lru.len()); - - // Refill LRU with new chunks to simulate continued cache activity - println!("Refilling LRU with new chunks for scenario 2..."); - let refill_start = std::time::Instant::now(); - let chunks_to_add = (total_chunks as usize) / 3; // Add 33% more chunks - for i in 0..chunks_to_add { - let zone = i % (nr_zones as usize); - let chunk_idx = (i / (nr_zones as usize)) % (nr_chunks_per_zone as usize); - // Use high zone/chunk indices to avoid conflicts with existing chunks - policy.write_update(ChunkLocation::new( - (zone + nr_zones as usize) as u64, - chunk_idx as u64, - )); - } - let refill_duration = refill_start.elapsed(); - println!( - "Refilled LRU with {} chunks in {:?}", - chunks_to_add, refill_duration - ); - println!("LRU size after refill: {}", policy.lru.len()); - - // Trigger evictions to populate priority queue for scenario 2 - let evict2_start = std::time::Instant::now(); - let evicted2 = policy.get_evict_targets(false); - let evict2_duration = evict2_start.elapsed(); - println!( - "Second eviction: {} chunks in {:?}", - evicted2.len(), - evict2_duration - ); - - // Scenario 2: Medium cleanup - println!("\n=== Scenario 2: Medium cleanup ==="); - let start_medium = std::time::Instant::now(); - let clean_targets_medium = policy.get_clean_targets(); - let medium_duration = start_medium.elapsed(); - println!( - "Medium cleanup: {} zones cleaned in {:?}", - clean_targets_medium.len(), - medium_duration - ); - println!("LRU size after medium cleanup: {}", policy.lru.len()); - - // Refill LRU again for scenario 3 - println!("Refilling LRU with new chunks for scenario 3..."); - let refill2_start = std::time::Instant::now(); - let chunks_to_add2 = (total_chunks as usize) / 2; // Add 50% more chunks - for i in 0..chunks_to_add2 { - let zone = i % (nr_zones as usize); - let chunk_idx = (i / (nr_zones as usize)) % (nr_chunks_per_zone as usize); - // Use even higher indices to avoid conflicts - policy.write_update(ChunkLocation::new( - (zone + 2 * nr_zones as usize) as u64, - chunk_idx as u64, - )); - } - let refill2_duration = refill2_start.elapsed(); - println!( - "Refilled LRU with {} chunks in {:?}", - chunks_to_add2, refill2_duration - ); - println!("LRU size after second refill: {}", policy.lru.len()); - - // Trigger evictions for scenario 3 - let evict3_start = std::time::Instant::now(); - let evicted3 = policy.get_evict_targets(false); - let evict3_duration = evict3_start.elapsed(); - println!( - "Third eviction: {} chunks in {:?}", - evicted3.len(), - evict3_duration - ); - - // Scenario 3: Large cleanup - println!("\n=== Scenario 3: Large cleanup ==="); - let start_large = std::time::Instant::now(); - let clean_targets_large = policy.get_clean_targets(); - let large_duration = start_large.elapsed(); - println!( - "Large cleanup: {} zones cleaned in {:?}", - clean_targets_large.len(), - large_duration - ); - println!("LRU size after large cleanup: {}", policy.lru.len()); - - // Performance analysis - println!("\n=== Performance Analysis ==="); - println!("Initial LRU size: {}", total_chunks); - println!( - "Small cleanup time: {:?} ({} zones)", - small_duration, - clean_targets_small.len() - ); - println!( - "Medium cleanup time: {:?} ({} zones)", - medium_duration, - clean_targets_medium.len() - ); - println!( - "Large cleanup time: {:?} ({} zones)", - large_duration, - clean_targets_large.len() - ); - - // Calculate time per LRU item processed - if policy.lru.len() > 0 { - let time_per_item_ns = large_duration.as_nanos() as f64 / total_chunks as f64; - println!( - "Approximate time per LRU item processed: {:.2} ns", - time_per_item_ns - ); - } - - // Verify correctness - LRU should still function properly - assert!(policy.lru.len() <= total_chunks as usize); - - // Test that we can still perform normal operations - policy.write_update(ChunkLocation::new(999, 14)); - policy.read_update(ChunkLocation::new(0, 0)); - - println!("Test completed successfully!"); - } } diff --git a/oxcache/src/zone_state/zone_priority_queue.rs b/oxcache/src/zone_state/zone_priority_queue.rs index a739a71..d2841b4 100644 --- a/oxcache/src/zone_state/zone_priority_queue.rs +++ b/oxcache/src/zone_state/zone_priority_queue.rs @@ -48,14 +48,27 @@ impl ZonePriorityQueue { zones } - pub fn modify_priority(&mut self, ind: ZoneIndex, priority_increase: ZonePriority) { - // Only account for it if the entry exists - if self - .invalid_queue - .change_priority_by(&ind, |p| *p += priority_increase) - { - self.invalid_count = self.invalid_count.saturating_add(priority_increase); + pub fn modify_priority(&mut self, ind: ZoneIndex, priority_change: i64) { + // Handle both increments and decrements + if priority_change > 0 { + let increase = priority_change as ZonePriority; + if self + .invalid_queue + .change_priority_by(&ind, |p| *p += increase) + { + self.invalid_count = self.invalid_count.saturating_add(increase); + } + } else if priority_change < 0 { + let decrease = (-priority_change) as ZonePriority; + if self + .invalid_queue + .change_priority_by(&ind, |p| *p = p.saturating_sub(decrease)) + { + self.invalid_count = self.invalid_count.saturating_sub(decrease); + } } + // priority_change == 0: no-op + #[cfg(debug_assertions)] self.assert_consistent(); } From f685fd6251870cb16c764d44ed4da9254ee72f53 Mon Sep 17 00:00:00 2001 From: John Ramsden Date: Thu, 1 Jan 2026 21:20:10 -0800 Subject: [PATCH 2/2] Swap to bools, atomic not needed --- oxcache/src/eviction.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/oxcache/src/eviction.rs b/oxcache/src/eviction.rs index f991846..bb098eb 100644 --- a/oxcache/src/eviction.rs +++ b/oxcache/src/eviction.rs @@ -232,7 +232,7 @@ pub struct ChunkEvictionPolicy { nr_chunks_per_zone: Chunk, lru: LruCache, pq: ZonePriorityQueue, - validity: Array2, + validity: Array2, #[cfg(feature = "eviction-metrics")] pub metrics: Option>, } @@ -255,7 +255,7 @@ impl ChunkEvictionPolicy { // Initialize all chunks as invalid (false) - become valid on first write let validity = Array2::from_shape_fn( (nr_zones as usize, nr_chunks_per_zone as usize), - |_| AtomicBool::new(false) + |_| false ); Self { @@ -276,23 +276,20 @@ impl ChunkEvictionPolicy { // Bounds check - zones/chunks outside our range assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); self.validity[[loc.zone as usize, loc.index as usize]] - .load(Ordering::Relaxed) } /// Mark chunk as valid (actively tracked) - fn mark_valid(&self, loc: &ChunkLocation) { + fn mark_valid(&mut self, loc: &ChunkLocation) { // Bounds check - zones/chunks outside our range assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); - self.validity[[loc.zone as usize, loc.index as usize]] - .store(true, Ordering::Relaxed); + self.validity[[loc.zone as usize, loc.index as usize]] = true; } /// Mark chunk as invalid (evicted but not yet cleaned) - fn mark_invalid(&self, loc: &ChunkLocation) { + fn mark_invalid(&mut self, loc: &ChunkLocation) { // Bounds check - ignore out of range chunks assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); - self.validity[[loc.zone as usize, loc.index as usize]] - .store(false, Ordering::Relaxed); + self.validity[[loc.zone as usize, loc.index as usize]] = false } }