Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 82 additions & 196 deletions oxcache/src/eviction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::writerpool::WriterPool;
use crate::zone_state::zone_priority_queue::{ZoneIndex, ZonePriorityQueue};
use flume::{Receiver, Sender};
use lru_mem::{LruCache};
use ndarray::Array2;
use nvme::types::{Chunk, Zone};
use std::io::ErrorKind;
use std::sync::{
Expand Down Expand Up @@ -231,6 +232,7 @@ pub struct ChunkEvictionPolicy {
nr_chunks_per_zone: Chunk,
lru: LruCache<ChunkLocation, ()>,
pq: ZonePriorityQueue,
validity: Array2<bool>,
#[cfg(feature = "eviction-metrics")]
pub metrics: Option<Arc<crate::eviction_metrics::EvictionMetrics>>,
}
Expand All @@ -249,17 +251,46 @@ impl ChunkEvictionPolicy {
high_water,
nr_chunks_per_zone
);

// Initialize all chunks as invalid (false) - become valid on first write
let validity = Array2::from_shape_fn(
(nr_zones as usize, nr_chunks_per_zone as usize),
|_| false
);

Self {
high_water,
low_water,
nr_zones,
nr_chunks_per_zone,
lru: LruCache::new(usize::MAX), // Effectively unbounded
pq: ZonePriorityQueue::new(nr_zones, clean_low_water),
validity,
#[cfg(feature = "eviction-metrics")]
metrics: None,
}
}

/// Check if chunk is currently valid (tracked by eviction policy)
fn is_valid(&self, loc: &ChunkLocation) -> bool {
// Bounds check - zones/chunks outside our range
assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone));
self.validity[[loc.zone as usize, loc.index as usize]]
}

/// Mark chunk as valid (actively tracked)
fn mark_valid(&mut self, loc: &ChunkLocation) {
// Bounds check - zones/chunks outside our range
assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone));
self.validity[[loc.zone as usize, loc.index as usize]] = true;
}

/// Mark chunk as invalid (evicted but not yet cleaned)
fn mark_invalid(&mut self, loc: &ChunkLocation) {
// Bounds check - ignore out of range chunks
assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone));
self.validity[[loc.zone as usize, loc.index as usize]] = false
}
}

impl EvictionPolicy for ChunkEvictionPolicy {
Expand All @@ -271,6 +302,7 @@ impl EvictionPolicy for ChunkEvictionPolicy {
metrics.record_write(&chunk);
}

self.mark_valid(&chunk);
self.lru.insert(chunk, ()).ok();
}

Expand All @@ -280,9 +312,28 @@ impl EvictionPolicy for ChunkEvictionPolicy {
metrics.record_read(&chunk);
}

if self.lru.contains(&chunk) {
// Check validity first - chunk can be in LRU but marked invalid
// (after get_clean_targets marks all chunks in cleaned zones as invalid)
if !self.is_valid(&chunk) {
// Chunk was marked for eviction/cleaning but is still being accessed
// Re-validate it: add/promote in LRU and decrement PQ count
let zone = chunk.zone;
self.mark_valid(&chunk);
self.lru.insert(chunk, ()).ok();

// Decrement priority queue since this chunk is no longer invalid
self.pq.modify_priority(zone, -1);

#[cfg(feature = "eviction-metrics")]
if let Some(ref metrics) = self.metrics {
// Optional: Track re-validation events
// metrics.record_chunk_revalidation(&chunk);
}
} else if self.lru.contains(&chunk) {
// Already tracked and valid - just promote it
self.lru.insert(chunk, ()).ok();
}
// If chunk is not in LRU and is valid, it was already cleaned - do nothing
}

fn get_evict_targets(&mut self, always_evict: bool) -> Self::Target {
Expand All @@ -307,19 +358,36 @@ impl EvictionPolicy for ChunkEvictionPolicy {
let mut zone_counts = std::collections::HashMap::new();

// Collect evicted items and count by zone (batch the counting)
for _ in 0..cap {
// Continue until we have 'cap' VALID chunks
let mut collected = 0;
while collected < cap {
if let Some((targ, _)) = self.lru.remove_lru() {
// Check if already invalid (lazily skip stale entries)
if !self.is_valid(&targ) {
// Already marked invalid in previous eviction round but not yet cleaned
// Skip it and continue removing more to reach our target count
continue;
}

let target_zone = targ.zone;
targets.push(targ);
targets.push(targ.clone());

// Mark chunk as invalid
self.mark_invalid(&targ);

// Batch count instead of individual priority queue updates
*zone_counts.entry(target_zone).or_insert(0) += 1;

collected += 1;
} else {
// LRU is empty, can't collect more
break;
}
}

// Batch update priority queue (far fewer operations)
for (zone, count) in zone_counts {
self.pq.modify_priority(zone, count);
self.pq.modify_priority(zone, count as i64);
}

#[cfg(feature = "eviction-metrics")]
Expand All @@ -338,13 +406,16 @@ impl EvictionPolicy for ChunkEvictionPolicy {

clean_targets.sort_unstable();

let zones_to_clean: std::collections::HashSet<nvme::types::Zone> =
clean_targets.iter().copied().collect();

// Efficient selective removal - O(k) where k = items removed
// instead of O(n) where n = total LRU size
self.lru
.retain(|chunk_loc, _| !zones_to_clean.contains(&chunk_loc.zone));
// Mark ALL chunks in cleaned zones as invalid
// This prevents chunks from being re-added to LRU during zone cleaning
// and ensures consistency when zone is being relocated
// This must be done because the actual locations can change
for zone in &clean_targets {
for chunk_idx in 0..self.nr_chunks_per_zone {
let loc = ChunkLocation::new(*zone, chunk_idx);
self.mark_invalid(&loc);
}
}

clean_targets
}
Expand Down Expand Up @@ -662,189 +733,4 @@ mod tests {
let got = policy.get_clean_targets().len();
assert_eq!(2, got, "Expected 2, but got {}", got);
}

#[test]
fn performance_test_large_lru_get_clean_targets() {
// Performance test with ~15.6M chunks in LRU
// 904 zones, 17232 chunks per zone = 15,581,728 total chunks
let nr_zones = 904;
let nr_chunks_per_zone = 17232;
let total_chunks = nr_zones * nr_chunks_per_zone;

// Set clean_low_water to trigger cleaning when zones have 1+ evicted chunks
let clean_low_water = 1;

// High/low water marks - trigger eviction when LRU approaches capacity
// Use more reasonable eviction ratios to avoid massive bulk evictions
let high_water = total_chunks - (total_chunks / 100); // 99% capacity
let low_water = total_chunks - (total_chunks / 50); // 98% capacity

let mut policy = ChunkEvictionPolicy::new(
high_water,
low_water,
clean_low_water,
nr_zones,
nr_chunks_per_zone,
);

println!(
"Filling LRU with {} chunks across {} zones...",
total_chunks, nr_zones
);
let start_fill = std::time::Instant::now();

// Fill the LRU with chunks from all zones
for zone in 0..nr_zones {
for chunk_idx in 0..nr_chunks_per_zone {
policy.write_update(ChunkLocation::new(zone, chunk_idx));
}
}

let fill_duration = start_fill.elapsed();
println!("LRU fill took: {:?}", fill_duration);
println!("LRU size: {}", policy.lru.len());

// Trigger some evictions to populate the priority queue
// This will evict 500 chunks and mark zones for potential cleaning
println!("Triggering evictions to populate priority queue...");
let evict_start = std::time::Instant::now();
let evicted = policy.get_evict_targets(false);
let evict_duration = evict_start.elapsed();
println!("Evicted {} chunks in {:?}", evicted.len(), evict_duration);

// Now test get_clean_targets performance with different scenarios

// Scenario 1: Small cleanup (few zones)
println!("\n=== Scenario 1: Small cleanup ===");
let start_small = std::time::Instant::now();
let clean_targets_small = policy.get_clean_targets();
let small_duration = start_small.elapsed();
println!(
"Small cleanup: {} zones cleaned in {:?}",
clean_targets_small.len(),
small_duration
);
println!("LRU size after small cleanup: {}", policy.lru.len());

// Refill LRU with new chunks to simulate continued cache activity
println!("Refilling LRU with new chunks for scenario 2...");
let refill_start = std::time::Instant::now();
let chunks_to_add = (total_chunks as usize) / 3; // Add 33% more chunks
for i in 0..chunks_to_add {
let zone = i % (nr_zones as usize);
let chunk_idx = (i / (nr_zones as usize)) % (nr_chunks_per_zone as usize);
// Use high zone/chunk indices to avoid conflicts with existing chunks
policy.write_update(ChunkLocation::new(
(zone + nr_zones as usize) as u64,
chunk_idx as u64,
));
}
let refill_duration = refill_start.elapsed();
println!(
"Refilled LRU with {} chunks in {:?}",
chunks_to_add, refill_duration
);
println!("LRU size after refill: {}", policy.lru.len());

// Trigger evictions to populate priority queue for scenario 2
let evict2_start = std::time::Instant::now();
let evicted2 = policy.get_evict_targets(false);
let evict2_duration = evict2_start.elapsed();
println!(
"Second eviction: {} chunks in {:?}",
evicted2.len(),
evict2_duration
);

// Scenario 2: Medium cleanup
println!("\n=== Scenario 2: Medium cleanup ===");
let start_medium = std::time::Instant::now();
let clean_targets_medium = policy.get_clean_targets();
let medium_duration = start_medium.elapsed();
println!(
"Medium cleanup: {} zones cleaned in {:?}",
clean_targets_medium.len(),
medium_duration
);
println!("LRU size after medium cleanup: {}", policy.lru.len());

// Refill LRU again for scenario 3
println!("Refilling LRU with new chunks for scenario 3...");
let refill2_start = std::time::Instant::now();
let chunks_to_add2 = (total_chunks as usize) / 2; // Add 50% more chunks
for i in 0..chunks_to_add2 {
let zone = i % (nr_zones as usize);
let chunk_idx = (i / (nr_zones as usize)) % (nr_chunks_per_zone as usize);
// Use even higher indices to avoid conflicts
policy.write_update(ChunkLocation::new(
(zone + 2 * nr_zones as usize) as u64,
chunk_idx as u64,
));
}
let refill2_duration = refill2_start.elapsed();
println!(
"Refilled LRU with {} chunks in {:?}",
chunks_to_add2, refill2_duration
);
println!("LRU size after second refill: {}", policy.lru.len());

// Trigger evictions for scenario 3
let evict3_start = std::time::Instant::now();
let evicted3 = policy.get_evict_targets(false);
let evict3_duration = evict3_start.elapsed();
println!(
"Third eviction: {} chunks in {:?}",
evicted3.len(),
evict3_duration
);

// Scenario 3: Large cleanup
println!("\n=== Scenario 3: Large cleanup ===");
let start_large = std::time::Instant::now();
let clean_targets_large = policy.get_clean_targets();
let large_duration = start_large.elapsed();
println!(
"Large cleanup: {} zones cleaned in {:?}",
clean_targets_large.len(),
large_duration
);
println!("LRU size after large cleanup: {}", policy.lru.len());

// Performance analysis
println!("\n=== Performance Analysis ===");
println!("Initial LRU size: {}", total_chunks);
println!(
"Small cleanup time: {:?} ({} zones)",
small_duration,
clean_targets_small.len()
);
println!(
"Medium cleanup time: {:?} ({} zones)",
medium_duration,
clean_targets_medium.len()
);
println!(
"Large cleanup time: {:?} ({} zones)",
large_duration,
clean_targets_large.len()
);

// Calculate time per LRU item processed
if policy.lru.len() > 0 {
let time_per_item_ns = large_duration.as_nanos() as f64 / total_chunks as f64;
println!(
"Approximate time per LRU item processed: {:.2} ns",
time_per_item_ns
);
}

// Verify correctness - LRU should still function properly
assert!(policy.lru.len() <= total_chunks as usize);

// Test that we can still perform normal operations
policy.write_update(ChunkLocation::new(999, 14));
policy.read_update(ChunkLocation::new(0, 0));

println!("Test completed successfully!");
}
}
27 changes: 20 additions & 7 deletions oxcache/src/zone_state/zone_priority_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,27 @@ impl ZonePriorityQueue {
zones
}

pub fn modify_priority(&mut self, ind: ZoneIndex, priority_increase: ZonePriority) {
// Only account for it if the entry exists
if self
.invalid_queue
.change_priority_by(&ind, |p| *p += priority_increase)
{
self.invalid_count = self.invalid_count.saturating_add(priority_increase);
pub fn modify_priority(&mut self, ind: ZoneIndex, priority_change: i64) {
// Handle both increments and decrements
if priority_change > 0 {
let increase = priority_change as ZonePriority;
if self
.invalid_queue
.change_priority_by(&ind, |p| *p += increase)
{
self.invalid_count = self.invalid_count.saturating_add(increase);
}
} else if priority_change < 0 {
let decrease = (-priority_change) as ZonePriority;
if self
.invalid_queue
.change_priority_by(&ind, |p| *p = p.saturating_sub(decrease))
{
self.invalid_count = self.invalid_count.saturating_sub(decrease);
}
}
// priority_change == 0: no-op

#[cfg(debug_assertions)]
self.assert_consistent();
}
Expand Down
Loading