Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ nvme-loop.json
/eval/data/
/eval/plots/
user-data
/eval/tables/
/eval/tables/
/logs-compressed
2 changes: 1 addition & 1 deletion cortes.server.block.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ remote_artificial_delay_microsec = 40632
eviction_policy = "promotional"
high_water_evict = 1 # Number remaining from end, evicts if reaches here
low_water_evict = 3 # Evict until below mark
eviction_interval_ms = 1000 # Evict every 1s
eviction_interval_ms = 100 # Evict every 0.1s

[metrics]
ip_addr = "127.0.0.1"
Expand Down
2 changes: 1 addition & 1 deletion cortes.server.zns.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ remote_artificial_delay_microsec = 40632
eviction_policy = "promotional"
high_water_evict = 1 # Number remaining from end, evicts if reaches here
low_water_evict = 3 # Evict until below mark
eviction_interval_ms = 1000 # Evict every 1s
eviction_interval_ms = 100 # Evict every 0.1s

[metrics]
ip_addr = "127.0.0.1"
Expand Down
2 changes: 1 addition & 1 deletion oxcache/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Cache {
num_shards: usize,
// Shared reverse mapping (zone, chunk_index) -> key
// This needs to be shared because we look up by location, not by key
zone_to_entry: RwLock<Array2<Option<Chunk>>>,
pub zone_to_entry: RwLock<Array2<Option<Chunk>>>,
}

impl Cache {
Expand Down
129 changes: 92 additions & 37 deletions oxcache/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::writerpool::{BatchWriteRequest, WriterPool};
use crate::zone_state::zone_list::{ZoneList, ZoneObtainFailure};
use aligned_vec::{AVec, RuntimeAlign};
use bytes::Bytes;
use ndarray::s;
use flume::Sender;
use nvme::info::{get_active_zones, get_lba_at, is_zoned_device, nvme_get_info, report_zones_all};
use nvme::ops::{close_zone, finish_zone, reset_zone, zns_append};
Expand Down Expand Up @@ -215,10 +216,6 @@ fn check_first_evict_bench() {
}

fn trigger_eviction(eviction_channel: Sender<EvictorMessage>) -> io::Result<()> {
tracing::debug!(
"DEVICE: [Thread {:?}] Sending eviction trigger",
std::thread::current().id()
);
let (resp_tx, resp_rx) = flume::bounded(1);
if let Err(e) = eviction_channel.send(EvictorMessage { sender: resp_tx }) {
tracing::error!("[append] Failed to send eviction message: {}", e);
Expand All @@ -230,15 +227,10 @@ fn trigger_eviction(eviction_channel: Sender<EvictorMessage>) -> io::Result<()>

match resp_rx.recv() {
Ok(result) => match result {
Ok(_) => {
tracing::debug!(
"DEVICE: [Thread {:?}] Eviction completed successfully",
std::thread::current().id()
);
}
Ok(_) => { }
Err(e) => {
tracing::error!(
"DEVICE: [Thread {:?}] Eviction failed: {}",
"[trigger_eviction] Thread {:?} eviction FAILED: {}",
std::thread::current().id(),
e
);
Expand All @@ -250,7 +242,7 @@ fn trigger_eviction(eviction_channel: Sender<EvictorMessage>) -> io::Result<()>
},
Err(e) => {
tracing::error!(
"DEVICE: [Thread {:?}] Failed to receive eviction response: {}",
"[trigger_eviction] Thread {:?} failed to receive eviction response: {}",
std::thread::current().id(),
e
);
Expand All @@ -275,29 +267,45 @@ impl Zoned {
<= self.config.max_active_resources as usize
);

tracing::trace!("[get_free_zone] Thread {:?} requesting zone (is_eviction={}, free={}, open={}, writing={})",
std::thread::current().id(), is_eviction, zone_list.free_zones.len(),
zone_list.open_zones.len(), zone_list.writing_zones.len());

match zone_list.remove_with_eviction_bypass(is_eviction) {
Ok(zone_idx) => Ok(zone_idx),
Ok(zone_idx) => {
tracing::trace!("[get_free_zone] Thread {:?} got zone {} (is_eviction={})",
std::thread::current().id(), zone_idx, is_eviction);
Ok(zone_idx)
},
Err(error) => match error {
ZoneObtainFailure::EvictNow => {
Err(io::Error::new(ErrorKind::StorageFull, "Cache is full"))
}
ZoneObtainFailure::Wait => loop {
zone_list = wait_notify.wait(zone_list).unwrap();
match zone_list.remove_with_eviction_bypass(is_eviction) {
Ok(idx) => return Ok(idx),
Err(err) => match err {
ZoneObtainFailure::EvictNow => {
return Err(io::Error::new(ErrorKind::Other, "Cache is full"));
}
ZoneObtainFailure::Wait => continue,
},
ZoneObtainFailure::Wait => {
loop {
zone_list = wait_notify.wait(zone_list).unwrap();
match zone_list.remove_with_eviction_bypass(is_eviction) {
Ok(idx) => {
return Ok(idx);
},
Err(err) => match err {
ZoneObtainFailure::EvictNow => {
tracing::warn!("[get_free_zone] Thread {:?} got EvictNow after wait",
std::thread::current().id());
return Err(io::Error::new(ErrorKind::Other, "Cache is full"));
}
ZoneObtainFailure::Wait => {
continue;
},
},
}
}
},
},
}
}

fn complete_write(&self, zone_idx: Zone, finish_zone: bool) -> io::Result<()> {
fn complete_write(&self, zone_idx: Zone, finish_zone: bool, suppress_notify: bool) -> io::Result<()> {
let (mtx, notify) = &*self.zones;
let mut zone_list = mtx.lock().unwrap();
// assert!(zone_list.get_open_zones() == active_zones, "{} vs {}", zone_list.get_open_zones(), active_zones);
Expand All @@ -309,7 +317,10 @@ impl Zoned {
zone_list.write_finish(zone_idx, self, finish_zone)?;
// Tell other threads that we finished writing, so they can
// come and try to open a new zone if needed.
notify.notify_all();
// Skip notification during eviction batch writes to avoid race condition
if !suppress_notify {
notify.notify_all();
}

Ok(())
}
Expand Down Expand Up @@ -409,21 +420,24 @@ impl Zoned {
chunk_ind
}

fn chunked_append(&self, data: Bytes, zone_index: Zone) -> io::Result<ChunkLocation> {
tracing::debug!("Chunk appending to zone {}", zone_index);
fn chunked_append(&self, data: Bytes, zone_index: Zone, suppress_notify: bool) -> io::Result<ChunkLocation> {
tracing::trace!("[chunked_append] Thread {:?} appending to zone {} (suppress_notify={})",
std::thread::current().id(), zone_index, suppress_notify);

let total_sz = data.len() as Byte;
let write_sz = total_sz.min(self.max_write_size);

tracing::debug!(
"[Device]: Total size = {}, Write size = {}, max_write_size = {}",
tracing::trace!(
"[chunked_append]: Total size = {}, Write size = {}, max_write_size = {}",
total_sz,
write_sz,
self.max_write_size
);

// Only locks if needed
// this is AWFUL
tracing::trace!("[chunked_append] Thread {:?} acquiring zone_append_lock for zone {}",
std::thread::current().id(), zone_index);
let _maybe_guard = if total_sz > self.max_write_size {
(
None,
Expand All @@ -435,6 +449,8 @@ impl Zoned {
None,
)
};
tracing::trace!("[chunked_append] Thread {:?} acquired zone_append_lock for zone {}",
std::thread::current().id(), zone_index);

// Sequentially write looped

Expand Down Expand Up @@ -507,7 +523,7 @@ impl Zoned {

// println!("Finished writing to zone {} - {:?} - finish_zone={:?}", zone_index, cl, finish_zone);

self.complete_write(zone_index, finish_zone)?;
self.complete_write(zone_index, finish_zone, suppress_notify)?;
Ok(cl)
}
}
Expand Down Expand Up @@ -552,7 +568,7 @@ impl Device for Zoned {
);

let start = std::time::Instant::now();
let res = self.chunked_append(data, zone_index);
let res = self.chunked_append(data, zone_index, is_eviction);
METRICS.update_metric_histogram_latency(
"disk_write_latency_ms",
start.elapsed(),
Expand Down Expand Up @@ -683,12 +699,36 @@ impl Device for Zoned {
// Remove from map (invalidation)
RUNTIME.block_on(cache.remove_entries(&chunk_locations))?;

// Lock zones to prevent new writes during cleaning
{
let (zone_mtx, _) = &*self.zones;
let mut zones = zone_mtx.lock().unwrap();
for zone in &clean_locations {
// Remove from open_zones so no new chunks can be allocated from this zone
zones.open_zones.retain(|z| z != zone);
}
}

// Cleaning
let self_clone = self.clone();
for zone in clean_locations.iter() {
// Collect old chunk locations in this zone BEFORE cleaning starts
let old_chunks = RUNTIME.block_on(async {
let zone_mapping = cache.zone_to_entry.read().await;
let zone_slice = s![*zone as usize, ..];
zone_mapping.slice(zone_slice)
.iter()
.enumerate()
.filter_map(|(chunk_idx, opt_key)| {
opt_key.as_ref().map(|_| ChunkLocation::new(*zone, chunk_idx as Chunk))
})
.collect::<Vec<_>>()
});

let cache_clone = cache.clone();
let self_clone = self_clone.clone();
let writer_pool = writer_pool.clone();
let eviction_policy_clone = eviction_policy.clone();

RUNTIME.block_on(cache_clone.clean_zone_and_update_map(
zone.clone(),
Expand Down Expand Up @@ -716,30 +756,38 @@ impl Device for Zoned {
{
let writer_pool = writer_pool.clone();
let self_clone = self_clone;
let eviction_policy = eviction_policy_clone;
let old_locations = old_chunks; // Capture old locations

// Writer callback
|payloads: Vec<(CacheKey, bytes::Bytes)>| {
async move {
// Remove old LRU entries FIRST, before reset_zone makes the zone available again
// This prevents relocated chunks from being accidentaly added then removed
{
let mut policy = eviction_policy.lock().unwrap();
if let EvictionPolicyWrapper::Chunk(c) = &mut *policy {
c.chunks_relocated(&old_locations);
}
}

let cv = {
// Return zones back to the zone list and reset the zone
let _guard = self_clone.zone_append_lock[*zone as usize]
.write()
.unwrap();
let (zone_mtx, cv) = &*self_clone.zones;
let mut zones = zone_mtx.lock().unwrap();
zones.reset_zone(*zone, &*self_clone)?;
cv.notify_all();
} // Drop the mutex, so we don't have to put it in an await
cv
}; // Drop the mutex, so we don't have to put it in an await

// Use prioritized batch write for eviction
let keys: Vec<_> =
payloads.iter().map(|(key, _)| key.clone()).collect();
let data_vec: Vec<_> =
payloads.iter().map(|(_, data)| data.clone()).collect();

// Used to verify no RACE, TODO: Remove!
// tokio::time::sleep(Duration::from_secs(5)).await;

let (batch_tx, batch_rx) = flume::bounded(1);

let batch_request = BatchWriteRequest {
Expand Down Expand Up @@ -775,6 +823,9 @@ impl Device for Zoned {

let write_results = write_results?;

// Notify waiting writers AFTER batch write completes successfully
cv.notify_all();

Ok(write_results) // Vec<(Chunk, ChunkLocation)>
}
}
Expand All @@ -792,9 +843,13 @@ impl Device for Zoned {

RUNTIME.block_on(cache.remove_zones(&zones_to_evict))?;

let (zone_mtx, _) = &*self.zones;
let (zone_mtx, cv) = &*self.zones;
let mut zones = zone_mtx.lock().unwrap();
zones.reset_zones(&zones_to_evict, &*self)?;
drop(zones);

// Notify waiting writers after zones are reset
cv.notify_all();

// Reset atomic counters for evicted zones
let policy = eviction_policy.lock().unwrap();
Expand Down
Loading
Loading