diff --git a/oxcache/src/eviction.rs b/oxcache/src/eviction.rs index 772aed2..bb098eb 100644 --- a/oxcache/src/eviction.rs +++ b/oxcache/src/eviction.rs @@ -4,6 +4,7 @@ use crate::writerpool::WriterPool; use crate::zone_state::zone_priority_queue::{ZoneIndex, ZonePriorityQueue}; use flume::{Receiver, Sender}; use lru_mem::{LruCache}; +use ndarray::Array2; use nvme::types::{Chunk, Zone}; use std::io::ErrorKind; use std::sync::{ @@ -231,6 +232,7 @@ pub struct ChunkEvictionPolicy { nr_chunks_per_zone: Chunk, lru: LruCache, pq: ZonePriorityQueue, + validity: Array2, #[cfg(feature = "eviction-metrics")] pub metrics: Option>, } @@ -249,6 +251,13 @@ impl ChunkEvictionPolicy { high_water, nr_chunks_per_zone ); + + // Initialize all chunks as invalid (false) - become valid on first write + let validity = Array2::from_shape_fn( + (nr_zones as usize, nr_chunks_per_zone as usize), + |_| false + ); + Self { high_water, low_water, @@ -256,10 +265,32 @@ impl ChunkEvictionPolicy { nr_chunks_per_zone, lru: LruCache::new(usize::MAX), // Effectively unbounded pq: ZonePriorityQueue::new(nr_zones, clean_low_water), + validity, #[cfg(feature = "eviction-metrics")] metrics: None, } } + + /// Check if chunk is currently valid (tracked by eviction policy) + fn is_valid(&self, loc: &ChunkLocation) -> bool { + // Bounds check - zones/chunks outside our range + assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); + self.validity[[loc.zone as usize, loc.index as usize]] + } + + /// Mark chunk as valid (actively tracked) + fn mark_valid(&mut self, loc: &ChunkLocation) { + // Bounds check - zones/chunks outside our range + assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); + self.validity[[loc.zone as usize, loc.index as usize]] = true; + } + + /// Mark chunk as invalid (evicted but not yet cleaned) + fn mark_invalid(&mut self, loc: &ChunkLocation) { + // Bounds check - ignore out of range chunks + assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone)); + self.validity[[loc.zone as usize, loc.index as usize]] = false + } } impl EvictionPolicy for ChunkEvictionPolicy { @@ -271,6 +302,7 @@ impl EvictionPolicy for ChunkEvictionPolicy { metrics.record_write(&chunk); } + self.mark_valid(&chunk); self.lru.insert(chunk, ()).ok(); } @@ -280,9 +312,28 @@ impl EvictionPolicy for ChunkEvictionPolicy { metrics.record_read(&chunk); } - if self.lru.contains(&chunk) { + // Check validity first - chunk can be in LRU but marked invalid + // (after get_clean_targets marks all chunks in cleaned zones as invalid) + if !self.is_valid(&chunk) { + // Chunk was marked for eviction/cleaning but is still being accessed + // Re-validate it: add/promote in LRU and decrement PQ count + let zone = chunk.zone; + self.mark_valid(&chunk); + self.lru.insert(chunk, ()).ok(); + + // Decrement priority queue since this chunk is no longer invalid + self.pq.modify_priority(zone, -1); + + #[cfg(feature = "eviction-metrics")] + if let Some(ref metrics) = self.metrics { + // Optional: Track re-validation events + // metrics.record_chunk_revalidation(&chunk); + } + } else if self.lru.contains(&chunk) { + // Already tracked and valid - just promote it self.lru.insert(chunk, ()).ok(); } + // If chunk is not in LRU and is valid, it was already cleaned - do nothing } fn get_evict_targets(&mut self, always_evict: bool) -> Self::Target { @@ -307,19 +358,36 @@ impl EvictionPolicy for ChunkEvictionPolicy { let mut zone_counts = std::collections::HashMap::new(); // Collect evicted items and count by zone (batch the counting) - for _ in 0..cap { + // Continue until we have 'cap' VALID chunks + let mut collected = 0; + while collected < cap { if let Some((targ, _)) = self.lru.remove_lru() { + // Check if already invalid (lazily skip stale entries) + if !self.is_valid(&targ) { + // Already marked invalid in previous eviction round but not yet cleaned + // Skip it and continue removing more to reach our target count + continue; + } + let target_zone = targ.zone; - targets.push(targ); + targets.push(targ.clone()); + + // Mark chunk as invalid + self.mark_invalid(&targ); // Batch count instead of individual priority queue updates *zone_counts.entry(target_zone).or_insert(0) += 1; + + collected += 1; + } else { + // LRU is empty, can't collect more + break; } } // Batch update priority queue (far fewer operations) for (zone, count) in zone_counts { - self.pq.modify_priority(zone, count); + self.pq.modify_priority(zone, count as i64); } #[cfg(feature = "eviction-metrics")] @@ -338,13 +406,16 @@ impl EvictionPolicy for ChunkEvictionPolicy { clean_targets.sort_unstable(); - let zones_to_clean: std::collections::HashSet = - clean_targets.iter().copied().collect(); - - // Efficient selective removal - O(k) where k = items removed - // instead of O(n) where n = total LRU size - self.lru - .retain(|chunk_loc, _| !zones_to_clean.contains(&chunk_loc.zone)); + // Mark ALL chunks in cleaned zones as invalid + // This prevents chunks from being re-added to LRU during zone cleaning + // and ensures consistency when zone is being relocated + // This must be done because the actual locations can change + for zone in &clean_targets { + for chunk_idx in 0..self.nr_chunks_per_zone { + let loc = ChunkLocation::new(*zone, chunk_idx); + self.mark_invalid(&loc); + } + } clean_targets } @@ -662,189 +733,4 @@ mod tests { let got = policy.get_clean_targets().len(); assert_eq!(2, got, "Expected 2, but got {}", got); } - - #[test] - fn performance_test_large_lru_get_clean_targets() { - // Performance test with ~15.6M chunks in LRU - // 904 zones, 17232 chunks per zone = 15,581,728 total chunks - let nr_zones = 904; - let nr_chunks_per_zone = 17232; - let total_chunks = nr_zones * nr_chunks_per_zone; - - // Set clean_low_water to trigger cleaning when zones have 1+ evicted chunks - let clean_low_water = 1; - - // High/low water marks - trigger eviction when LRU approaches capacity - // Use more reasonable eviction ratios to avoid massive bulk evictions - let high_water = total_chunks - (total_chunks / 100); // 99% capacity - let low_water = total_chunks - (total_chunks / 50); // 98% capacity - - let mut policy = ChunkEvictionPolicy::new( - high_water, - low_water, - clean_low_water, - nr_zones, - nr_chunks_per_zone, - ); - - println!( - "Filling LRU with {} chunks across {} zones...", - total_chunks, nr_zones - ); - let start_fill = std::time::Instant::now(); - - // Fill the LRU with chunks from all zones - for zone in 0..nr_zones { - for chunk_idx in 0..nr_chunks_per_zone { - policy.write_update(ChunkLocation::new(zone, chunk_idx)); - } - } - - let fill_duration = start_fill.elapsed(); - println!("LRU fill took: {:?}", fill_duration); - println!("LRU size: {}", policy.lru.len()); - - // Trigger some evictions to populate the priority queue - // This will evict 500 chunks and mark zones for potential cleaning - println!("Triggering evictions to populate priority queue..."); - let evict_start = std::time::Instant::now(); - let evicted = policy.get_evict_targets(false); - let evict_duration = evict_start.elapsed(); - println!("Evicted {} chunks in {:?}", evicted.len(), evict_duration); - - // Now test get_clean_targets performance with different scenarios - - // Scenario 1: Small cleanup (few zones) - println!("\n=== Scenario 1: Small cleanup ==="); - let start_small = std::time::Instant::now(); - let clean_targets_small = policy.get_clean_targets(); - let small_duration = start_small.elapsed(); - println!( - "Small cleanup: {} zones cleaned in {:?}", - clean_targets_small.len(), - small_duration - ); - println!("LRU size after small cleanup: {}", policy.lru.len()); - - // Refill LRU with new chunks to simulate continued cache activity - println!("Refilling LRU with new chunks for scenario 2..."); - let refill_start = std::time::Instant::now(); - let chunks_to_add = (total_chunks as usize) / 3; // Add 33% more chunks - for i in 0..chunks_to_add { - let zone = i % (nr_zones as usize); - let chunk_idx = (i / (nr_zones as usize)) % (nr_chunks_per_zone as usize); - // Use high zone/chunk indices to avoid conflicts with existing chunks - policy.write_update(ChunkLocation::new( - (zone + nr_zones as usize) as u64, - chunk_idx as u64, - )); - } - let refill_duration = refill_start.elapsed(); - println!( - "Refilled LRU with {} chunks in {:?}", - chunks_to_add, refill_duration - ); - println!("LRU size after refill: {}", policy.lru.len()); - - // Trigger evictions to populate priority queue for scenario 2 - let evict2_start = std::time::Instant::now(); - let evicted2 = policy.get_evict_targets(false); - let evict2_duration = evict2_start.elapsed(); - println!( - "Second eviction: {} chunks in {:?}", - evicted2.len(), - evict2_duration - ); - - // Scenario 2: Medium cleanup - println!("\n=== Scenario 2: Medium cleanup ==="); - let start_medium = std::time::Instant::now(); - let clean_targets_medium = policy.get_clean_targets(); - let medium_duration = start_medium.elapsed(); - println!( - "Medium cleanup: {} zones cleaned in {:?}", - clean_targets_medium.len(), - medium_duration - ); - println!("LRU size after medium cleanup: {}", policy.lru.len()); - - // Refill LRU again for scenario 3 - println!("Refilling LRU with new chunks for scenario 3..."); - let refill2_start = std::time::Instant::now(); - let chunks_to_add2 = (total_chunks as usize) / 2; // Add 50% more chunks - for i in 0..chunks_to_add2 { - let zone = i % (nr_zones as usize); - let chunk_idx = (i / (nr_zones as usize)) % (nr_chunks_per_zone as usize); - // Use even higher indices to avoid conflicts - policy.write_update(ChunkLocation::new( - (zone + 2 * nr_zones as usize) as u64, - chunk_idx as u64, - )); - } - let refill2_duration = refill2_start.elapsed(); - println!( - "Refilled LRU with {} chunks in {:?}", - chunks_to_add2, refill2_duration - ); - println!("LRU size after second refill: {}", policy.lru.len()); - - // Trigger evictions for scenario 3 - let evict3_start = std::time::Instant::now(); - let evicted3 = policy.get_evict_targets(false); - let evict3_duration = evict3_start.elapsed(); - println!( - "Third eviction: {} chunks in {:?}", - evicted3.len(), - evict3_duration - ); - - // Scenario 3: Large cleanup - println!("\n=== Scenario 3: Large cleanup ==="); - let start_large = std::time::Instant::now(); - let clean_targets_large = policy.get_clean_targets(); - let large_duration = start_large.elapsed(); - println!( - "Large cleanup: {} zones cleaned in {:?}", - clean_targets_large.len(), - large_duration - ); - println!("LRU size after large cleanup: {}", policy.lru.len()); - - // Performance analysis - println!("\n=== Performance Analysis ==="); - println!("Initial LRU size: {}", total_chunks); - println!( - "Small cleanup time: {:?} ({} zones)", - small_duration, - clean_targets_small.len() - ); - println!( - "Medium cleanup time: {:?} ({} zones)", - medium_duration, - clean_targets_medium.len() - ); - println!( - "Large cleanup time: {:?} ({} zones)", - large_duration, - clean_targets_large.len() - ); - - // Calculate time per LRU item processed - if policy.lru.len() > 0 { - let time_per_item_ns = large_duration.as_nanos() as f64 / total_chunks as f64; - println!( - "Approximate time per LRU item processed: {:.2} ns", - time_per_item_ns - ); - } - - // Verify correctness - LRU should still function properly - assert!(policy.lru.len() <= total_chunks as usize); - - // Test that we can still perform normal operations - policy.write_update(ChunkLocation::new(999, 14)); - policy.read_update(ChunkLocation::new(0, 0)); - - println!("Test completed successfully!"); - } } diff --git a/oxcache/src/zone_state/zone_priority_queue.rs b/oxcache/src/zone_state/zone_priority_queue.rs index a739a71..d2841b4 100644 --- a/oxcache/src/zone_state/zone_priority_queue.rs +++ b/oxcache/src/zone_state/zone_priority_queue.rs @@ -48,14 +48,27 @@ impl ZonePriorityQueue { zones } - pub fn modify_priority(&mut self, ind: ZoneIndex, priority_increase: ZonePriority) { - // Only account for it if the entry exists - if self - .invalid_queue - .change_priority_by(&ind, |p| *p += priority_increase) - { - self.invalid_count = self.invalid_count.saturating_add(priority_increase); + pub fn modify_priority(&mut self, ind: ZoneIndex, priority_change: i64) { + // Handle both increments and decrements + if priority_change > 0 { + let increase = priority_change as ZonePriority; + if self + .invalid_queue + .change_priority_by(&ind, |p| *p += increase) + { + self.invalid_count = self.invalid_count.saturating_add(increase); + } + } else if priority_change < 0 { + let decrease = (-priority_change) as ZonePriority; + if self + .invalid_queue + .change_priority_by(&ind, |p| *p = p.saturating_sub(decrease)) + { + self.invalid_count = self.invalid_count.saturating_sub(decrease); + } } + // priority_change == 0: no-op + #[cfg(debug_assertions)] self.assert_consistent(); }