Skip to content

Commit 08df038

Browse files
committed
Merge branch 'main' of github.com:johnramsden/OxCache
2 parents 7b6f014 + 02b1247 commit 08df038

12 files changed

Lines changed: 161 additions & 151 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ nvme-loop.json
66
/eval/data/
77
/eval/plots/
88
user-data
9-
/eval/tables/
9+
/eval/tables/
10+
/logs-compressed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cortes.server.block.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ remote_artificial_delay_microsec = 40632
1919
eviction_policy = "promotional"
2020
high_water_evict = 1 # Number remaining from end, evicts if reaches here
2121
low_water_evict = 3 # Evict until below mark
22-
eviction_interval_ms = 1000 # Evict every 1s
22+
eviction_interval_ms = 100 # Evict every 0.1s
2323

2424
[metrics]
2525
ip_addr = "127.0.0.1"

cortes.server.zns.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ remote_artificial_delay_microsec = 40632
1919
eviction_policy = "promotional"
2020
high_water_evict = 1 # Number remaining from end, evicts if reaches here
2121
low_water_evict = 3 # Evict until below mark
22-
eviction_interval_ms = 1000 # Evict every 1s
22+
eviction_interval_ms = 100 # Evict every 0.1s
2323

2424
[metrics]
2525
ip_addr = "127.0.0.1"

oxcache/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ tracing-appender = "0.2.3"
4242
tracing = "0.1.41"
4343
chrono = "0.4.41"
4444
libc = "0.2.175"
45+
flate2 = "1.0"
4546

4647
[[test]]
4748
name = "integration_tests"

oxcache/src/cache/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub struct Cache {
3737
num_shards: usize,
3838
// Shared reverse mapping (zone, chunk_index) -> key
3939
// This needs to be shared because we look up by location, not by key
40-
zone_to_entry: RwLock<Array2<Option<Chunk>>>,
40+
pub zone_to_entry: RwLock<Array2<Option<Chunk>>>,
4141
}
4242

4343
impl Cache {

oxcache/src/device.rs

Lines changed: 92 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::writerpool::{BatchWriteRequest, WriterPool};
88
use crate::zone_state::zone_list::{ZoneList, ZoneObtainFailure};
99
use aligned_vec::{AVec, RuntimeAlign};
1010
use bytes::Bytes;
11+
use ndarray::s;
1112
use flume::Sender;
1213
use nvme::info::{get_active_zones, get_lba_at, is_zoned_device, nvme_get_info, report_zones_all};
1314
use nvme::ops::{close_zone, finish_zone, reset_zone, zns_append};
@@ -215,10 +216,6 @@ fn check_first_evict_bench() {
215216
}
216217

217218
fn trigger_eviction(eviction_channel: Sender<EvictorMessage>) -> io::Result<()> {
218-
tracing::debug!(
219-
"DEVICE: [Thread {:?}] Sending eviction trigger",
220-
std::thread::current().id()
221-
);
222219
let (resp_tx, resp_rx) = flume::bounded(1);
223220
if let Err(e) = eviction_channel.send(EvictorMessage { sender: resp_tx }) {
224221
tracing::error!("[append] Failed to send eviction message: {}", e);
@@ -230,15 +227,10 @@ fn trigger_eviction(eviction_channel: Sender<EvictorMessage>) -> io::Result<()>
230227

231228
match resp_rx.recv() {
232229
Ok(result) => match result {
233-
Ok(_) => {
234-
tracing::debug!(
235-
"DEVICE: [Thread {:?}] Eviction completed successfully",
236-
std::thread::current().id()
237-
);
238-
}
230+
Ok(_) => { }
239231
Err(e) => {
240232
tracing::error!(
241-
"DEVICE: [Thread {:?}] Eviction failed: {}",
233+
"[trigger_eviction] Thread {:?} eviction FAILED: {}",
242234
std::thread::current().id(),
243235
e
244236
);
@@ -250,7 +242,7 @@ fn trigger_eviction(eviction_channel: Sender<EvictorMessage>) -> io::Result<()>
250242
},
251243
Err(e) => {
252244
tracing::error!(
253-
"DEVICE: [Thread {:?}] Failed to receive eviction response: {}",
245+
"[trigger_eviction] Thread {:?} failed to receive eviction response: {}",
254246
std::thread::current().id(),
255247
e
256248
);
@@ -275,29 +267,45 @@ impl Zoned {
275267
<= self.config.max_active_resources as usize
276268
);
277269

270+
tracing::trace!("[get_free_zone] Thread {:?} requesting zone (is_eviction={}, free={}, open={}, writing={})",
271+
std::thread::current().id(), is_eviction, zone_list.free_zones.len(),
272+
zone_list.open_zones.len(), zone_list.writing_zones.len());
273+
278274
match zone_list.remove_with_eviction_bypass(is_eviction) {
279-
Ok(zone_idx) => Ok(zone_idx),
275+
Ok(zone_idx) => {
276+
tracing::trace!("[get_free_zone] Thread {:?} got zone {} (is_eviction={})",
277+
std::thread::current().id(), zone_idx, is_eviction);
278+
Ok(zone_idx)
279+
},
280280
Err(error) => match error {
281281
ZoneObtainFailure::EvictNow => {
282282
Err(io::Error::new(ErrorKind::StorageFull, "Cache is full"))
283283
}
284-
ZoneObtainFailure::Wait => loop {
285-
zone_list = wait_notify.wait(zone_list).unwrap();
286-
match zone_list.remove_with_eviction_bypass(is_eviction) {
287-
Ok(idx) => return Ok(idx),
288-
Err(err) => match err {
289-
ZoneObtainFailure::EvictNow => {
290-
return Err(io::Error::new(ErrorKind::Other, "Cache is full"));
291-
}
292-
ZoneObtainFailure::Wait => continue,
293-
},
284+
ZoneObtainFailure::Wait => {
285+
loop {
286+
zone_list = wait_notify.wait(zone_list).unwrap();
287+
match zone_list.remove_with_eviction_bypass(is_eviction) {
288+
Ok(idx) => {
289+
return Ok(idx);
290+
},
291+
Err(err) => match err {
292+
ZoneObtainFailure::EvictNow => {
293+
tracing::warn!("[get_free_zone] Thread {:?} got EvictNow after wait",
294+
std::thread::current().id());
295+
return Err(io::Error::new(ErrorKind::Other, "Cache is full"));
296+
}
297+
ZoneObtainFailure::Wait => {
298+
continue;
299+
},
300+
},
301+
}
294302
}
295303
},
296304
},
297305
}
298306
}
299307

300-
fn complete_write(&self, zone_idx: Zone, finish_zone: bool) -> io::Result<()> {
308+
fn complete_write(&self, zone_idx: Zone, finish_zone: bool, suppress_notify: bool) -> io::Result<()> {
301309
let (mtx, notify) = &*self.zones;
302310
let mut zone_list = mtx.lock().unwrap();
303311
// assert!(zone_list.get_open_zones() == active_zones, "{} vs {}", zone_list.get_open_zones(), active_zones);
@@ -309,7 +317,10 @@ impl Zoned {
309317
zone_list.write_finish(zone_idx, self, finish_zone)?;
310318
// Tell other threads that we finished writing, so they can
311319
// come and try to open a new zone if needed.
312-
notify.notify_all();
320+
// Skip notification during eviction batch writes to avoid race condition
321+
if !suppress_notify {
322+
notify.notify_all();
323+
}
313324

314325
Ok(())
315326
}
@@ -409,21 +420,24 @@ impl Zoned {
409420
chunk_ind
410421
}
411422

412-
fn chunked_append(&self, data: Bytes, zone_index: Zone) -> io::Result<ChunkLocation> {
413-
tracing::debug!("Chunk appending to zone {}", zone_index);
423+
fn chunked_append(&self, data: Bytes, zone_index: Zone, suppress_notify: bool) -> io::Result<ChunkLocation> {
424+
tracing::trace!("[chunked_append] Thread {:?} appending to zone {} (suppress_notify={})",
425+
std::thread::current().id(), zone_index, suppress_notify);
414426

415427
let total_sz = data.len() as Byte;
416428
let write_sz = total_sz.min(self.max_write_size);
417429

418-
tracing::debug!(
419-
"[Device]: Total size = {}, Write size = {}, max_write_size = {}",
430+
tracing::trace!(
431+
"[chunked_append]: Total size = {}, Write size = {}, max_write_size = {}",
420432
total_sz,
421433
write_sz,
422434
self.max_write_size
423435
);
424436

425437
// Only locks if needed
426438
// this is AWFUL
439+
tracing::trace!("[chunked_append] Thread {:?} acquiring zone_append_lock for zone {}",
440+
std::thread::current().id(), zone_index);
427441
let _maybe_guard = if total_sz > self.max_write_size {
428442
(
429443
None,
@@ -435,6 +449,8 @@ impl Zoned {
435449
None,
436450
)
437451
};
452+
tracing::trace!("[chunked_append] Thread {:?} acquired zone_append_lock for zone {}",
453+
std::thread::current().id(), zone_index);
438454

439455
// Sequentially write looped
440456

@@ -507,7 +523,7 @@ impl Zoned {
507523

508524
// println!("Finished writing to zone {} - {:?} - finish_zone={:?}", zone_index, cl, finish_zone);
509525

510-
self.complete_write(zone_index, finish_zone)?;
526+
self.complete_write(zone_index, finish_zone, suppress_notify)?;
511527
Ok(cl)
512528
}
513529
}
@@ -552,7 +568,7 @@ impl Device for Zoned {
552568
);
553569

554570
let start = std::time::Instant::now();
555-
let res = self.chunked_append(data, zone_index);
571+
let res = self.chunked_append(data, zone_index, is_eviction);
556572
METRICS.update_metric_histogram_latency(
557573
"disk_write_latency_ms",
558574
start.elapsed(),
@@ -683,12 +699,36 @@ impl Device for Zoned {
683699
// Remove from map (invalidation)
684700
RUNTIME.block_on(cache.remove_entries(&chunk_locations))?;
685701

702+
// Lock zones to prevent new writes during cleaning
703+
{
704+
let (zone_mtx, _) = &*self.zones;
705+
let mut zones = zone_mtx.lock().unwrap();
706+
for zone in &clean_locations {
707+
// Remove from open_zones so no new chunks can be allocated from this zone
708+
zones.open_zones.retain(|z| z != zone);
709+
}
710+
}
711+
686712
// Cleaning
687713
let self_clone = self.clone();
688714
for zone in clean_locations.iter() {
715+
// Collect old chunk locations in this zone BEFORE cleaning starts
716+
let old_chunks = RUNTIME.block_on(async {
717+
let zone_mapping = cache.zone_to_entry.read().await;
718+
let zone_slice = s![*zone as usize, ..];
719+
zone_mapping.slice(zone_slice)
720+
.iter()
721+
.enumerate()
722+
.filter_map(|(chunk_idx, opt_key)| {
723+
opt_key.as_ref().map(|_| ChunkLocation::new(*zone, chunk_idx as Chunk))
724+
})
725+
.collect::<Vec<_>>()
726+
});
727+
689728
let cache_clone = cache.clone();
690729
let self_clone = self_clone.clone();
691730
let writer_pool = writer_pool.clone();
731+
let eviction_policy_clone = eviction_policy.clone();
692732

693733
RUNTIME.block_on(cache_clone.clean_zone_and_update_map(
694734
zone.clone(),
@@ -716,30 +756,38 @@ impl Device for Zoned {
716756
{
717757
let writer_pool = writer_pool.clone();
718758
let self_clone = self_clone;
759+
let eviction_policy = eviction_policy_clone;
760+
let old_locations = old_chunks; // Capture old locations
719761

720762
// Writer callback
721763
|payloads: Vec<(CacheKey, bytes::Bytes)>| {
722764
async move {
765+
// Remove old LRU entries FIRST, before reset_zone makes the zone available again
766+
// This prevents relocated chunks from being accidentaly added then removed
723767
{
768+
let mut policy = eviction_policy.lock().unwrap();
769+
if let EvictionPolicyWrapper::Chunk(c) = &mut *policy {
770+
c.chunks_relocated(&old_locations);
771+
}
772+
}
773+
774+
let cv = {
724775
// Return zones back to the zone list and reset the zone
725776
let _guard = self_clone.zone_append_lock[*zone as usize]
726777
.write()
727778
.unwrap();
728779
let (zone_mtx, cv) = &*self_clone.zones;
729780
let mut zones = zone_mtx.lock().unwrap();
730781
zones.reset_zone(*zone, &*self_clone)?;
731-
cv.notify_all();
732-
} // Drop the mutex, so we don't have to put it in an await
782+
cv
783+
}; // Drop the mutex, so we don't have to put it in an await
733784

734785
// Use prioritized batch write for eviction
735786
let keys: Vec<_> =
736787
payloads.iter().map(|(key, _)| key.clone()).collect();
737788
let data_vec: Vec<_> =
738789
payloads.iter().map(|(_, data)| data.clone()).collect();
739790

740-
// Used to verify no RACE, TODO: Remove!
741-
// tokio::time::sleep(Duration::from_secs(5)).await;
742-
743791
let (batch_tx, batch_rx) = flume::bounded(1);
744792

745793
let batch_request = BatchWriteRequest {
@@ -775,6 +823,9 @@ impl Device for Zoned {
775823

776824
let write_results = write_results?;
777825

826+
// Notify waiting writers AFTER batch write completes successfully
827+
cv.notify_all();
828+
778829
Ok(write_results) // Vec<(Chunk, ChunkLocation)>
779830
}
780831
}
@@ -792,9 +843,13 @@ impl Device for Zoned {
792843

793844
RUNTIME.block_on(cache.remove_zones(&zones_to_evict))?;
794845

795-
let (zone_mtx, _) = &*self.zones;
846+
let (zone_mtx, cv) = &*self.zones;
796847
let mut zones = zone_mtx.lock().unwrap();
797848
zones.reset_zones(&zones_to_evict, &*self)?;
849+
drop(zones);
850+
851+
// Notify waiting writers after zones are reset
852+
cv.notify_all();
798853

799854
// Reset atomic counters for evicted zones
800855
let policy = eviction_policy.lock().unwrap();

0 commit comments

Comments
 (0)