From 20553593175e44cda9949a527496f3a46a241887 Mon Sep 17 00:00:00 2001 From: slc Date: Wed, 20 Aug 2025 05:17:27 -0700 Subject: [PATCH 1/3] formatting --- nvme/src/info.rs | 4 +- oxcache/src/cache/mod.rs | 11 +- oxcache/src/device.rs | 89 ++++++---- oxcache/src/eviction.rs | 45 +++--- oxcache/src/lib.rs | 2 +- oxcache/src/main.rs | 61 +++---- oxcache/src/metrics.rs | 64 ++++---- oxcache/src/readerpool.rs | 8 +- oxcache/src/server.rs | 12 +- oxcache/src/writerpool.rs | 10 +- oxcache/src/zone_state/zone_list.rs | 153 ++++++++++++------ oxcache/src/zone_state/zone_priority_queue.rs | 24 ++- 12 files changed, 303 insertions(+), 180 deletions(-) diff --git a/nvme/src/info.rs b/nvme/src/info.rs index bd04a14..08f95f8 100644 --- a/nvme/src/info.rs +++ b/nvme/src/info.rs @@ -231,7 +231,9 @@ pub fn get_zone_capacity(fd: RawFd, nsid: u32) -> Result Result { - Ok(report_zones_all(fd, nsid)?.1[zone as usize].zone_state.clone()) + Ok(report_zones_all(fd, nsid)?.1[zone as usize] + .zone_state + .clone()) } /// Returns a report of all zones for the given NVMe device and namespace. diff --git a/oxcache/src/cache/mod.rs b/oxcache/src/cache/mod.rs index 86db817..1d850a9 100644 --- a/oxcache/src/cache/mod.rs +++ b/oxcache/src/cache/mod.rs @@ -1,13 +1,13 @@ +use crate::cache::bucket::Chunk as CacheKey; use crate::cache::bucket::{Chunk, ChunkLocation, ChunkState}; +use bytes::Bytes; use ndarray::{Array2, ArrayBase, s}; use nvme::types::{self, Zone}; use std::io::ErrorKind; use std::iter::zip; use std::sync::Arc; use std::{collections::HashMap, io}; -use bytes::Bytes; use tokio::sync::{Notify, RwLock}; -use crate::cache::bucket::Chunk as CacheKey; pub mod bucket; @@ -247,7 +247,10 @@ impl Cache { ChunkState::Ready(loc) => Arc::clone(loc), ChunkState::Waiting(_) => { // TODO: Shouldnt occur since zone was full - return Err(io::Error::new(ErrorKind::Other, "Encountered invalid waiting state during zone cleaning")) + return Err(io::Error::new( + ErrorKind::Other, + "Encountered invalid waiting state during zone cleaning", + )); } }; @@ -321,8 +324,6 @@ impl Cache { Ok(()) } - - pub async fn remove_entry(&self, chunk: &ChunkLocation) -> tokio::io::Result<()> { // to_relocate is a list of ChunkLocations that the caller wants to update // We pass in each chunk location and the writer function should return back with the list of updated chunk locations diff --git a/oxcache/src/device.rs b/oxcache/src/device.rs index c07d3d9..e53dfc5 100644 --- a/oxcache/src/device.rs +++ b/oxcache/src/device.rs @@ -1,10 +1,12 @@ -use std::{fs, io}; use crate::cache::Cache; +use crate::cache::bucket::Chunk as CacheKey; use crate::cache::bucket::ChunkLocation; use crate::eviction::{EvictTarget, EvictorMessage}; +use crate::metrics::{METRICS, MetricType}; use crate::server::RUNTIME; use crate::writerpool::{WriteRequest, WriterPool}; use crate::zone_state::zone_list::{ZoneList, ZoneObtainFailure}; +use crate::zone_state::zone_priority_queue::ZonePriorityQueue; use bytes::Bytes; use flume::Sender; use futures::future::join_all; @@ -14,9 +16,7 @@ use nvme::types::{Byte, Chunk, LogicalBlock, NVMeConfig, PerformOn, ZNSConfig, Z use std::io::ErrorKind; use std::os::fd::RawFd; use std::sync::{Arc, Condvar, Mutex, MutexGuard}; -use crate::metrics::{MetricType, METRICS}; -use crate::zone_state::zone_priority_queue::ZonePriorityQueue; -use crate::cache::bucket::Chunk as CacheKey; +use std::{fs, io}; pub struct Zoned { nvme_config: NVMeConfig, @@ -66,7 +66,12 @@ pub trait Device: Send + Sync { fn append(&self, data: Bytes) -> std::io::Result; /// This is expected to remove elements from the cache as well - fn evict(self: Arc, locations: EvictTarget, cache: Arc, writer_pool: Arc) -> io::Result<()>; + fn evict( + self: Arc, + locations: EvictTarget, + cache: Arc, + writer_pool: Arc, + ) -> io::Result<()>; fn read(&self, location: ChunkLocation) -> std::io::Result; @@ -248,7 +253,6 @@ impl Zoned { Ok(()) } - } impl Zoned { @@ -274,20 +278,23 @@ impl Zoned { config.chunk_size_in_lbas = chunk_size_in_logical_blocks; config.chunk_size_in_bytes = chunk_size; let num_zones: Zone = config.num_zones; - + // Apply max_zones restriction if specified let restricted_num_zones = if let Some(max_zones) = max_zones { if max_zones > num_zones { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, - format!("max_zones ({}) cannot be larger than the maximum number of zones available on the device ({})", max_zones, num_zones) + format!( + "max_zones ({}) cannot be larger than the maximum number of zones available on the device ({})", + max_zones, num_zones + ), )); } max_zones } else { num_zones }; - + let zone_list = ZoneList::new( restricted_num_zones, config.chunks_per_zone, @@ -453,7 +460,6 @@ impl Zoned { impl Device for Zoned { /// Hold internal state to keep track of zone state fn append(&self, data: Bytes) -> std::io::Result { - let sz = data.len() as u64; let zone_index: Zone = loop { @@ -473,7 +479,11 @@ impl Device for Zoned { let start = std::time::Instant::now(); let res = self.chunked_append(data, zone_index); - METRICS.update_metric_histogram_latency("disk_write_latency_ms", start.elapsed(), MetricType::MsLatency); + METRICS.update_metric_histogram_latency( + "disk_write_latency_ms", + start.elapsed(), + MetricType::MsLatency, + ); METRICS.update_metric_counter("written_bytes_total", sz); METRICS.update_metric_counter("bytes_total", sz); res @@ -486,13 +496,22 @@ impl Device for Zoned { let start = std::time::Instant::now(); self.read_into_buffer(self.max_write_size, slba, &mut data, &self.nvme_config)?; - METRICS.update_metric_histogram_latency("disk_read_latency_ms", start.elapsed(), MetricType::MsLatency); + METRICS.update_metric_histogram_latency( + "disk_read_latency_ms", + start.elapsed(), + MetricType::MsLatency, + ); METRICS.update_metric_counter("read_bytes_total", data.len() as u64); METRICS.update_metric_counter("bytes_total", data.len() as u64); Ok(Bytes::from(data)) } - fn evict(self: Arc, locations: EvictTarget, cache: Arc, writer_pool: Arc) -> io::Result<()> { + fn evict( + self: Arc, + locations: EvictTarget, + cache: Arc, + writer_pool: Arc, + ) -> io::Result<()> { let usage = self.get_use_percentage(); METRICS.update_metric_gauge("usage_percentage", usage as f64); @@ -514,9 +533,8 @@ impl Device for Zoned { let cache_clone = cache.clone(); let self_clone = self_clone.clone(); let writer_pool = writer_pool.clone(); - tokio::spawn( - async move { - cache_clone.clean_zone_and_update_map( + tokio::spawn(async move { + cache_clone.clean_zone_and_update_map( zone.clone(), // Reads all valid chunks in zone and returns buffer [(Chunk, Bytes)] // which is the list of chunks that need to be written back @@ -583,11 +601,11 @@ impl Device for Zoned { } }, ).await - // Interesting, the compiler will throw an error if I don't await here. - // The async move moves the value and declares that this block returns a Future - // If we don't await it, the Future is referencing a local value. So we await it - // to store it in the state of the Future. I think that's what's happening here - }); + // Interesting, the compiler will throw an error if I don't await here. + // The async move moves the value and declares that this block returns a Future + // If we don't await it, the Future is referencing a local value. So we await it + // to store it in the state of the Future. I think that's what's happening here + }); tracing::debug!("[evict:Chunk] Cleaned zone {}", zone); } @@ -713,20 +731,23 @@ impl BlockInterface { // Num_zones let num_zones = nvme_config.total_size_in_bytes / block_zone_capacity; - + // Apply max_zones restriction if specified let restricted_num_zones = if let Some(max_zones) = max_zones { if max_zones > num_zones { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, - format!("max_zones ({}) cannot be larger than the maximum number of zones available on the device ({})", max_zones, num_zones) + format!( + "max_zones ({}) cannot be larger than the maximum number of zones available on the device ({})", + max_zones, num_zones + ), )); } max_zones } else { num_zones }; - + // Chunks per zone let chunks_per_zone = block_zone_capacity / chunk_size; @@ -826,7 +847,11 @@ impl Device for BlockInterface { let start = std::time::Instant::now(); self.chunked_append(data, write_addr)?; - METRICS.update_metric_histogram_latency("disk_write_latency_ms", start.elapsed(), MetricType::MsLatency); + METRICS.update_metric_histogram_latency( + "disk_write_latency_ms", + start.elapsed(), + MetricType::MsLatency, + ); METRICS.update_metric_counter("written_bytes_total", sz); METRICS.update_metric_counter("bytes_total", sz); Ok(chunk_location) @@ -844,18 +869,26 @@ impl Device for BlockInterface { &mut data, &self.nvme_config, )?; - METRICS.update_metric_histogram_latency("disk_read_latency_ms", start.elapsed(), MetricType::MsLatency); + METRICS.update_metric_histogram_latency( + "disk_read_latency_ms", + start.elapsed(), + MetricType::MsLatency, + ); METRICS.update_metric_counter("read_bytes_total", data.len() as u64); METRICS.update_metric_counter("bytes_total", data.len() as u64); Ok(Bytes::from(data)) } - fn evict(self: Arc, locations: EvictTarget, cache: Arc, _writer_pool: Arc) -> io::Result<()> { + fn evict( + self: Arc, + locations: EvictTarget, + cache: Arc, + _writer_pool: Arc, + ) -> io::Result<()> { let usage = self.get_use_percentage(); METRICS.update_metric_gauge("usage_percentage", usage as f64); match locations { EvictTarget::Chunk(chunk_locations, _) => { - if chunk_locations.is_empty() { return Ok(()); } diff --git a/oxcache/src/eviction.rs b/oxcache/src/eviction.rs index aae18ca..cdeeeef 100644 --- a/oxcache/src/eviction.rs +++ b/oxcache/src/eviction.rs @@ -1,18 +1,18 @@ -use std::io::ErrorKind; use crate::cache::{Cache, bucket::ChunkLocation}; use crate::device::Device; use crate::writerpool::WriterPool; +use crate::zone_state::zone_priority_queue; use crate::zone_state::zone_priority_queue::{ZoneIndex, ZonePriorityQueue}; use flume::{Receiver, Sender}; use lru::LruCache; use nvme::types::{Chunk, Zone}; +use std::io::ErrorKind; use std::sync::{ Arc, Mutex, atomic::{AtomicBool, Ordering}, }; use std::thread::{self, JoinHandle}; use std::time::Duration; -use crate::zone_state::zone_priority_queue; pub enum EvictionPolicyWrapper { Dummy(DummyEvictionPolicy), @@ -40,7 +40,10 @@ impl EvictionPolicyWrapper { "dummy" => Ok(EvictionPolicyWrapper::Dummy(DummyEvictionPolicy::new())), "chunk" => { if clean_high_water.is_none() || clean_low_water.is_none() { - return Err(std::io::Error::new(ErrorKind::InvalidInput, "Chunk eviction must have clean_high_water and clean_low_water")); + return Err(std::io::Error::new( + ErrorKind::InvalidInput, + "Chunk eviction must have clean_high_water and clean_low_water", + )); } Ok(EvictionPolicyWrapper::Chunk(ChunkEvictionPolicy::new( high_water, @@ -50,7 +53,7 @@ impl EvictionPolicyWrapper { nr_zones, nr_chunks_per_zone, ))) - }, + } "promotional" => Ok(EvictionPolicyWrapper::Promotional( PromotionalEvictionPolicy::new(high_water, low_water, nr_zones, nr_chunks_per_zone), )), @@ -86,7 +89,7 @@ impl EvictionPolicyWrapper { let et = c.get_evict_targets(); let ct = c.get_clean_targets(); EvictTarget::Chunk(et, ct) - }, + } } } } @@ -121,7 +124,9 @@ impl EvictionPolicy for DummyEvictionPolicy { vec![] } - fn get_clean_targets(&mut self) -> Self::CleanTarget { () } + fn get_clean_targets(&mut self) -> Self::CleanTarget { + () + } } pub struct PromotionalEvictionPolicy { @@ -193,7 +198,9 @@ impl EvictionPolicy for PromotionalEvictionPolicy { targets } - fn get_clean_targets(&mut self) -> Self::CleanTarget { () } + fn get_clean_targets(&mut self) -> Self::CleanTarget { + () + } } pub struct ChunkEvictionPolicy { @@ -202,7 +209,7 @@ pub struct ChunkEvictionPolicy { nr_zones: Zone, nr_chunks_per_zone: Chunk, lru: LruCache, - pq: ZonePriorityQueue + pq: ZonePriorityQueue, } impl ChunkEvictionPolicy { @@ -220,7 +227,7 @@ impl ChunkEvictionPolicy { nr_zones, nr_chunks_per_zone, lru: LruCache::unbounded(), - pq: ZonePriorityQueue::new(nr_zones, clean_high_water, clean_low_water) + pq: ZonePriorityQueue::new(nr_zones, clean_high_water, clean_low_water), } } } @@ -284,7 +291,7 @@ impl Evictor { cache: Arc, evict_interval: Duration, evict_rx: Receiver, - writer_pool: Arc + writer_pool: Arc, ) -> std::io::Result { let shutdown = Arc::new(AtomicBool::new(false)); @@ -317,13 +324,14 @@ impl Evictor { drop(policy); let device_clone = device_clone.clone(); - let result = match device_clone.evict(targets, cache_clone.clone(), writer_pool.clone()) { - Err(e) => { - tracing::error!("Error evicting: {}", e); - Err(e.to_string()) - } - Ok(_) => Ok(()), - }; + let result = + match device_clone.evict(targets, cache_clone.clone(), writer_pool.clone()) { + Err(e) => { + tracing::error!("Error evicting: {}", e); + Err(e.to_string()) + } + Ok(_) => Ok(()), + }; if let Some(sender) = sender { tracing::debug!("Sending eviction response to sender: {:?}", result); @@ -521,8 +529,7 @@ mod tests { #[test] fn check_chunk_priority_queue() { // 8 zones, 1 chunks per zone. Should evict at 3 inserted - let mut policy = ChunkEvictionPolicy::new( - 2, 6, 1, 4, 4, 2); + let mut policy = ChunkEvictionPolicy::new(2, 6, 1, 4, 4, 2); for z in 0..3 { for i in 0..2 { diff --git a/oxcache/src/lib.rs b/oxcache/src/lib.rs index 27548a4..c4ad762 100644 --- a/oxcache/src/lib.rs +++ b/oxcache/src/lib.rs @@ -1,6 +1,7 @@ pub mod cache; pub mod device; pub mod eviction; +mod metrics; pub mod readerpool; pub mod remote; pub mod request; @@ -8,4 +9,3 @@ pub mod server; pub mod util; pub mod writerpool; pub mod zone_state; -mod metrics; diff --git a/oxcache/src/main.rs b/oxcache/src/main.rs index 19be738..8550c8f 100644 --- a/oxcache/src/main.rs +++ b/oxcache/src/main.rs @@ -2,15 +2,17 @@ use clap::Parser; use nvme::types::Byte; use oxcache; use oxcache::remote; -use oxcache::server::{RUNTIME, Server, ServerConfig, ServerEvictionConfig, ServerRemoteConfig, ServerMetricsConfig}; +use oxcache::server::{ + RUNTIME, Server, ServerConfig, ServerEvictionConfig, ServerMetricsConfig, ServerRemoteConfig, +}; use serde::Deserialize; use std::fs; use std::net::{IpAddr, SocketAddr}; use std::process::exit; use std::sync::OnceLock; -use tracing::{event, Level}; -use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; +use tracing::{Level, event}; use tracing_appender::{non_blocking, rolling}; +use tracing_subscriber::{EnvFilter, Layer, fmt, layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Parser, Debug)] #[command(author, version, about)] @@ -66,10 +68,10 @@ pub struct CliArgs { #[arg(long)] pub remote_artificial_delay_microsec: Option, - + #[arg(long)] pub metrics_ip_addr: Option, - + #[arg(long)] pub metrics_port: Option, @@ -244,11 +246,15 @@ fn load_config(cli: &CliArgs) -> Result if eviction_policy.to_lowercase() == "chunk" { let low_water_clean = if low_water_clean.is_none() { return Err("low_water_clean must be set".into()); - } else { low_water_clean.unwrap() }; + } else { + low_water_clean.unwrap() + }; let high_water_clean = if high_water_clean.is_none() { return Err("high_water_clean must be set".into()); - } else { high_water_clean.unwrap() }; + } else { + high_water_clean.unwrap() + }; if low_water_clean > high_water_clean { return Err("low_water_clean must be less than high_water_clean".into()); @@ -273,10 +279,8 @@ fn load_config(cli: &CliArgs) -> Result .or_else(|| config.as_ref()?.server.block_zone_capacity); let block_zone_capacity = block_zone_capacity.ok_or("Missing block_zone_capacity")?; - let max_zones = cli - .max_zones - .or_else(|| config.as_ref()?.server.max_zones); - + let max_zones = cli.max_zones.or_else(|| config.as_ref()?.server.max_zones); + // Metrics let metrics_port = cli @@ -288,16 +292,18 @@ fn load_config(cli: &CliArgs) -> Result .clone() .or_else(|| config.as_ref()?.metrics.ip_addr.clone()); - if metrics_port.is_none() && metrics_ip.is_some() || metrics_port.is_some() && metrics_ip.is_none() { + if metrics_port.is_none() && metrics_ip.is_some() + || metrics_port.is_some() && metrics_ip.is_none() + { return Err("Missing metrics ip or port, both must be set or neither".into()); } - + let metrics_exporter_addr = if metrics_port.is_some() { let ip = match metrics_ip.unwrap().parse::() { Ok(ip) => ip, Err(e) => { return Err(format!("Invalid metrics ip address {:?}", e).into()); - } + } }; Some(SocketAddr::new(ip, metrics_port.unwrap())) } else { @@ -329,8 +335,8 @@ fn load_config(cli: &CliArgs) -> Result max_write_size, max_zones, metrics: ServerMetricsConfig { - metrics_exporter_addr - } + metrics_exporter_addr, + }, }) } @@ -351,12 +357,13 @@ async fn async_main() -> Result<(), Box> { .clone() .or_else(|| app_config.as_ref().and_then(|c| c.log_level.clone())) .unwrap_or_else(|| "info".to_string()); - - let file_metrics_directory = cli - .file_metrics_directory - .clone() - .or_else(|| app_config.as_ref().and_then(|c| c.metrics.file_metrics_directory.clone())); - + + let file_metrics_directory = cli.file_metrics_directory.clone().or_else(|| { + app_config + .as_ref() + .and_then(|c| c.metrics.file_metrics_directory.clone()) + }); + init_logging(&log_level, file_metrics_directory.as_deref()); let config = load_config(&cli)?; @@ -399,11 +406,11 @@ static METRICS_GUARD: OnceLock = OnceLock::new(); pub fn init_logging(level: &str, metrics_directory: Option<&str>) { let directive = match level.to_lowercase().as_str() { "error" => "error", - "warn" => "warn", - "info" => "info", + "warn" => "warn", + "info" => "info", "debug" => "debug", "trace" => "trace", - _ => "info", + _ => "info", }; if let Some(metrics_dir) = metrics_directory { @@ -454,9 +461,7 @@ pub fn init_logging(level: &str, metrics_directory: Option<&str>) { .compact() .with_filter(EnvFilter::new(format!("{},metrics=off", directive))); - let _ = tracing_subscriber::registry() - .with(stdout_layer) - .try_init(); + let _ = tracing_subscriber::registry().with(stdout_layer).try_init(); } } diff --git a/oxcache/src/metrics.rs b/oxcache/src/metrics.rs index 1eaaba4..dfa596f 100644 --- a/oxcache/src/metrics.rs +++ b/oxcache/src/metrics.rs @@ -1,20 +1,18 @@ -use axum::{routing::get, Router}; +use axum::{Router, routing::get}; +use metrics::{counter, gauge, histogram}; use metrics_exporter_prometheus::PrometheusBuilder; +use once_cell::sync::Lazy; use std::net::{IpAddr, SocketAddr}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tracing::{event, info, Level}; -use metrics::{counter, gauge, histogram}; -use once_cell::sync::Lazy; +use tracing::{Level, event, info}; -use tracing_appender::rolling; -use tracing_appender::non_blocking; -use tracing_subscriber::fmt; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use tracing_appender::non_blocking; +use tracing_appender::rolling; +use tracing_subscriber::fmt; -pub static METRICS: Lazy = Lazy::new(|| { - MetricsRecorder::new() -}); +pub static METRICS: Lazy = Lazy::new(|| MetricsRecorder::new()); pub struct HitRatio { hits: u64, @@ -22,24 +20,19 @@ pub struct HitRatio { } impl HitRatio { pub fn new() -> Self { - Self { - hits: 0, misses: 0 - } + Self { hits: 0, misses: 0 } } pub fn update_hitratio(&mut self, hit_type: HitType) { match hit_type { - HitType::Hit => { - self.hits+=1 - }, - HitType::Miss => { - self.misses+=1 - } + HitType::Hit => self.hits += 1, + HitType::Miss => self.misses += 1, } } } pub enum HitType { - Hit, Miss + Hit, + Miss, } pub struct MetricState { @@ -49,20 +42,23 @@ pub struct MetricState { impl MetricState { pub fn new() -> Self { Self { - hit_ratio: Arc::new(Mutex::new(HitRatio::new())) + hit_ratio: Arc::new(Mutex::new(HitRatio::new())), } } } pub fn init_metrics_exporter(addr: SocketAddr) { let builder = PrometheusBuilder::new(); - let recorder_handle = builder.install_recorder().expect("failed to install recorder"); + let recorder_handle = builder + .install_recorder() + .expect("failed to install recorder"); // Spawn HTTP server in a tokio task (green thread) tokio::spawn(async move { - let app = Router::new().route("/metrics", get(move || async move { - recorder_handle.render() - })); + let app = Router::new().route( + "/metrics", + get(move || async move { recorder_handle.render() }), + ); tracing::info!("Serving metrics at http://{}/metrics", addr); @@ -81,7 +77,7 @@ pub struct MetricsRecorder { } pub enum MetricType { - MsLatency + MsLatency, } const MILLISECONDS: f64 = 1_000.0; @@ -98,17 +94,20 @@ impl MetricsRecorder { counters: Arc::new(Mutex::new(HashMap::new())), } } - pub fn update_metric_histogram_latency(&self, name: &str, value: Duration, metric_type: MetricType) { + pub fn update_metric_histogram_latency( + &self, + name: &str, + value: Duration, + metric_type: MetricType, + ) { let id = self.run_id.clone(); let h = histogram!(format!("{}_{}", self.prefix, name.to_string()), "run_id" => id.clone()); let v = match metric_type { - MetricType::MsLatency => { - value.as_secs_f64() * MILLISECONDS - }, + MetricType::MsLatency => value.as_secs_f64() * MILLISECONDS, _ => { tracing::warn!("Unknown metric type"); return; - }, + } }; h.record(v); event!(target: "metrics", Level::INFO, name = name, value=v); @@ -118,7 +117,7 @@ impl MetricsRecorder { let id = self.run_id.clone(); let h = counter!(format!("{}_{}", self.prefix, name.to_string()), "run_id" => id.clone()); h.increment(value); - + // Update thread-safe counter map { let mut counters = self.counters.lock().unwrap(); @@ -149,4 +148,3 @@ impl MetricsRecorder { self.update_metric_gauge("hitratio", ratio) } } - diff --git a/oxcache/src/readerpool.rs b/oxcache/src/readerpool.rs index 797f29a..f3f1716 100644 --- a/oxcache/src/readerpool.rs +++ b/oxcache/src/readerpool.rs @@ -1,10 +1,10 @@ use crate::eviction::EvictionPolicyWrapper; +use crate::metrics::{METRICS, MetricType}; use crate::{cache, device}; use bytes::Bytes; use flume::{Receiver, Sender, unbounded}; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; -use crate::metrics::{MetricType, METRICS}; #[derive(Debug)] pub struct ReadResponse { @@ -46,7 +46,11 @@ impl Reader { // println!("Reader {} processing: {:?}", self.id, msg); let start = std::time::Instant::now(); let result = self.device.read(msg.location.clone()); - METRICS.update_metric_histogram_latency("device_read_latency_ms", start.elapsed(), MetricType::MsLatency); + METRICS.update_metric_histogram_latency( + "device_read_latency_ms", + start.elapsed(), + MetricType::MsLatency, + ); if result.is_ok() { let mtx = Arc::clone(&self.eviction_policy); let mut policy = mtx.lock().unwrap(); diff --git a/oxcache/src/server.rs b/oxcache/src/server.rs index 15c5479..9ccb6a5 100644 --- a/oxcache/src/server.rs +++ b/oxcache/src/server.rs @@ -3,15 +3,16 @@ use crate::device::Device; use crate::eviction::{EvictionPolicyWrapper, Evictor, EvictorMessage}; use crate::readerpool::{ReadRequest, ReaderPool}; use crate::writerpool::{WriteRequest, WriterPool}; -use tracing::debug; use nvme::types::Byte; use std::error::Error; use std::hash::{DefaultHasher, Hash, Hasher}; use std::net::SocketAddr; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{Mutex, Notify}; +use tracing::debug; use crate::cache::bucket::Chunk; +use crate::metrics::{HitType, METRICS, MetricType, init_metrics_exporter}; use crate::remote::RemoteBackend; use crate::{device, request}; use bincode; @@ -25,7 +26,6 @@ use std::sync::Arc; use std::time::Duration; use tokio::runtime::{Builder, Runtime}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; -use crate::metrics::{init_metrics_exporter, HitType, MetricType, METRICS}; // Global tokio runtime // pub static RUNTIME: Lazy = // Lazy::new(|| Runtime::new().expect("Failed to create Tokio runtime")); @@ -171,7 +171,7 @@ impl Server { Arc::clone(&self.cache), Duration::from_secs(self.config.eviction.eviction_interval), self.evict_rx.clone(), - writerpool.clone() + writerpool.clone(), )?; // Shutdown signal @@ -426,7 +426,11 @@ async fn handle_connection( break; } } - METRICS.update_metric_histogram_latency("get_total_latency_ms", start.elapsed(), MetricType::MsLatency); + METRICS.update_metric_histogram_latency( + "get_total_latency_ms", + start.elapsed(), + MetricType::MsLatency, + ); } Ok(()) } diff --git a/oxcache/src/writerpool.rs b/oxcache/src/writerpool.rs index 6efe145..3ddc98b 100644 --- a/oxcache/src/writerpool.rs +++ b/oxcache/src/writerpool.rs @@ -1,3 +1,4 @@ +use crate::metrics::{METRICS, MetricType}; use crate::{ cache::{self}, device, @@ -7,7 +8,6 @@ use bytes::Bytes; use flume::{Receiver, Sender, unbounded}; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; -use crate::metrics::{MetricType, METRICS}; #[derive(Debug)] pub struct WriteResponse { @@ -55,8 +55,12 @@ impl Writer { let mut policy = mtx.lock().unwrap(); policy.write_update(loc.clone()); }); - METRICS.update_metric_histogram_latency("device_write_latency_ms", start.elapsed(), MetricType::MsLatency); - + METRICS.update_metric_histogram_latency( + "device_write_latency_ms", + start.elapsed(), + MetricType::MsLatency, + ); + let resp = WriteResponse { location: result }; let snd = msg.responder.send(resp); if snd.is_err() { diff --git a/oxcache/src/zone_state/zone_list.rs b/oxcache/src/zone_state/zone_list.rs index d2d8303..bcb34c0 100644 --- a/oxcache/src/zone_state/zone_list.rs +++ b/oxcache/src/zone_state/zone_list.rs @@ -30,7 +30,7 @@ pub enum ZoneStateDbg { ExplicitOpen, ImplicitOpen, Closed, - Full + Full, } #[derive(Clone, Debug)] @@ -48,7 +48,7 @@ impl ZoneDbg { Self { state: ZoneStateDbg::Empty, chunk_ptr: 0, - num_writers: 0 + num_writers: 0, } } } @@ -58,7 +58,7 @@ pub struct ZoneStateTracker { pub state: Vec, pub max_chunks: usize, pub mar: usize, - pub mor: usize + pub mor: usize, } impl ZoneStateTracker { @@ -66,21 +66,30 @@ impl ZoneStateTracker { Self { state: vec![ZoneDbg::new(); num_zones], max_chunks, - mar, mor + mar, + mor, } } pub fn open_zone(&mut self, zone_index: ZoneIndex) { - self.assert_zone_state(zone_index, - &[ZoneStateDbg::Empty, ZoneStateDbg::Closed, ZoneStateDbg::ImplicitOpen]); + self.assert_zone_state( + zone_index, + &[ + ZoneStateDbg::Empty, + ZoneStateDbg::Closed, + ZoneStateDbg::ImplicitOpen, + ], + ); self.check_zone_count(); self.state[zone_index as usize].state = ZoneStateDbg::ExplicitOpen; self.check_zone_count(); } pub fn close_zone(&mut self, zone_index: ZoneIndex) { - self.assert_zone_state(zone_index, - &[ZoneStateDbg::ImplicitOpen, ZoneStateDbg::ExplicitOpen]); + self.assert_zone_state( + zone_index, + &[ZoneStateDbg::ImplicitOpen, ZoneStateDbg::ExplicitOpen], + ); self.check_zone_count(); self.state[zone_index as usize].state = ZoneStateDbg::Closed; self.check_zone_count(); @@ -93,22 +102,31 @@ impl ZoneStateTracker { } pub fn finish_zone(&mut self, zone_index: ZoneIndex) { - self.assert_zone_state(zone_index, - &[ZoneStateDbg::ImplicitOpen, ZoneStateDbg::ExplicitOpen, ZoneStateDbg::Closed]); + self.assert_zone_state( + zone_index, + &[ + ZoneStateDbg::ImplicitOpen, + ZoneStateDbg::ExplicitOpen, + ZoneStateDbg::Closed, + ], + ); self.check_zone_count(); self.state[zone_index as usize].state = ZoneStateDbg::Empty; self.check_zone_count(); } pub fn write_zone(&mut self, zone_index: ZoneIndex) { - self.assert_zone_state(zone_index, - &[ZoneStateDbg::ImplicitOpen, + self.assert_zone_state( + zone_index, + &[ + ZoneStateDbg::ImplicitOpen, ZoneStateDbg::ExplicitOpen, ZoneStateDbg::Closed, - ZoneStateDbg::Empty]); + ZoneStateDbg::Empty, + ], + ); self.check_zone_count(); - self.state[zone_index as usize].chunk_ptr += 1; self.state[zone_index as usize].state = match self.state[zone_index as usize].state { ZoneStateDbg::Empty => ZoneStateDbg::ImplicitOpen, @@ -125,11 +143,15 @@ impl ZoneStateTracker { } pub fn finish_write_zone(&mut self, zone_index: ZoneIndex) { - self.assert_zone_state(zone_index, - &[ZoneStateDbg::ImplicitOpen, + self.assert_zone_state( + zone_index, + &[ + ZoneStateDbg::ImplicitOpen, ZoneStateDbg::ExplicitOpen, ZoneStateDbg::Closed, - ZoneStateDbg::Empty]); + ZoneStateDbg::Empty, + ], + ); self.check_zone_count(); if self.state[zone_index as usize].num_writers == 0 { @@ -143,9 +165,11 @@ impl ZoneStateTracker { } fn assert_zone_state(&self, zone_index: ZoneIndex, states: &[ZoneStateDbg]) { - assert!(states.iter().any( - |state| self.state[zone_index as usize].state == *state - )) + assert!( + states + .iter() + .any(|state| self.state[zone_index as usize].state == *state) + ) } fn check_zone_count(&self) { @@ -154,20 +178,21 @@ impl ZoneStateTracker { } fn open_zone_count(&self) -> usize { - self.state.iter() + self.state + .iter() .filter(|zone| { - zone.state == ZoneStateDbg::ImplicitOpen || - zone.state == ZoneStateDbg::ExplicitOpen + zone.state == ZoneStateDbg::ImplicitOpen || zone.state == ZoneStateDbg::ExplicitOpen }) .count() } fn active_zone_count(&self) -> usize { - self.state.iter() + self.state + .iter() .filter(|zone| { - zone.state == ZoneStateDbg::ImplicitOpen || - zone.state == ZoneStateDbg::ExplicitOpen || - zone.state == ZoneStateDbg::Closed + zone.state == ZoneStateDbg::ImplicitOpen + || zone.state == ZoneStateDbg::ExplicitOpen + || zone.state == ZoneStateDbg::Closed }) .count() } @@ -196,10 +221,15 @@ impl ZoneList { // List of all zones, initially all are "free" let avail_zones = (0..num_zones).collect(); let zones = (0..num_zones) - .map(|item| (item, Zone { - index: item, - chunks_available: (0..chunks_per_zone).rev().collect() - })) // (key, value) + .map(|item| { + ( + item, + Zone { + index: item, + chunks_available: (0..chunks_per_zone).rev().collect(), + }, + ) + }) // (key, value) .collect(); ZoneList { @@ -207,13 +237,14 @@ impl ZoneList { open_zones: VecDeque::with_capacity(max_active_resources), writing_zones: HashMap::with_capacity(max_active_resources), chunks_per_zone, - max_active_resources: max_active_resources-1, // Keep one reserved for eviction + max_active_resources: max_active_resources - 1, // Keep one reserved for eviction zones, state_tracker: ZoneStateTracker::new( num_zones as usize, chunks_per_zone as usize, max_active_resources, - max_active_resources), + max_active_resources, + ), } } @@ -413,15 +444,24 @@ impl ZoneList { // self.state_tracker.finish_write_zone(chunk.zone); let zone = self.zones.get_mut(&chunk.zone).unwrap(); - tracing::trace!("[ZoneList]: Returning chunk {:?} to {:?}", chunk, zone.chunks_available); + tracing::trace!( + "[ZoneList]: Returning chunk {:?} to {:?}", + chunk, + zone.chunks_available + ); assert!( !zone.chunks_available.contains(&chunk.index), "Zone {} should not contain chunk {} we are trying to return", - chunk.zone, chunk.index + chunk.zone, + chunk.index ); - tracing::trace!("[ZoneList]: Returning chunk {:?} to {:?}", chunk, zone.chunks_available); + tracing::trace!( + "[ZoneList]: Returning chunk {:?} to {:?}", + chunk, + zone.chunks_available + ); // Return it zone.chunks_available.push(chunk.index); @@ -431,7 +471,11 @@ impl ZoneList { if zone.chunks_available.len() == 1 { self.free_zones.push_back(chunk.zone); } - tracing::trace!("[ZoneList]: Returned chunk {:?} to {:?}", chunk, zone.chunks_available); + tracing::trace!( + "[ZoneList]: Returned chunk {:?} to {:?}", + chunk, + zone.chunks_available + ); #[cfg(debug_assertions)] self.check_invariants(); @@ -445,10 +489,7 @@ impl ZoneList { // Gets the number of open zones by counting the unique // zones listed in open_zones and writing_zones pub fn get_open_zones(&self) -> usize { - let open_zone_list = self - .open_zones - .iter() - .collect::>(); + let open_zone_list = self.open_zones.iter().collect::>(); self.writing_zones .keys() @@ -510,8 +551,7 @@ impl ZoneList { .sum(); // Total available chunks in completely free zones - let free_zone_chunks: Chunk = - (self.free_zones.len() as Chunk) * self.chunks_per_zone; + let free_zone_chunks: Chunk = (self.free_zones.len() as Chunk) * self.chunks_per_zone; // Grand total open_zone_chunks + free_zone_chunks @@ -519,10 +559,8 @@ impl ZoneList { /// Makes sure that the zone list is consistent with itself. fn check_invariants(&self) { - // free_zones & open are unique and dont share elems { - let set_free: HashSet<_> = self.free_zones.iter().collect(); let set_open: HashSet<_> = self.open_zones.iter().collect(); @@ -536,10 +574,18 @@ impl ZoneList { println!("free_zones: {:?}", self.free_zones); println!("zones: {:?}", self.zones); // no dupes - assert_eq!(set_free.len(), self.free_zones.len(), "Free list has duplicate elements"); + assert_eq!( + set_free.len(), + self.free_zones.len(), + "Free list has duplicate elements" + ); } - assert_eq!(set_open.len(), self.open_zones.len(), "Open list has duplicate elements"); + assert_eq!( + set_open.len(), + self.open_zones.len(), + "Open list has duplicate elements" + ); // for zone in &self.open_zones { // assert!( @@ -588,7 +634,11 @@ mod zone_list_tests { use std::sync::Arc; use crate::{ - cache::{bucket::ChunkLocation, Cache}, device::Device, eviction::EvictTarget, writerpool::WriterPool, zone_state::zone_list::ZoneObtainFailure::{EvictNow, Wait} + cache::{Cache, bucket::ChunkLocation}, + device::Device, + eviction::EvictTarget, + writerpool::WriterPool, + zone_state::zone_list::ZoneObtainFailure::{EvictNow, Wait}, }; use bytes::Bytes; use nvme::types::{Byte, LogicalBlock, NVMeConfig, Zone}; @@ -613,7 +663,12 @@ mod zone_list_tests { } /// This is expected to remove elements from the cache as well - fn evict(self: Arc, _locations: EvictTarget, _cache: Arc, _writer_pool: Arc) -> std::io::Result<()> { + fn evict( + self: Arc, + _locations: EvictTarget, + _cache: Arc, + _writer_pool: Arc, + ) -> std::io::Result<()> { Ok(()) } diff --git a/oxcache/src/zone_state/zone_priority_queue.rs b/oxcache/src/zone_state/zone_priority_queue.rs index 043c023..f529909 100644 --- a/oxcache/src/zone_state/zone_priority_queue.rs +++ b/oxcache/src/zone_state/zone_priority_queue.rs @@ -7,8 +7,8 @@ type ZonePriority = Chunk; pub struct ZonePriorityQueue { invalid_count: ZonePriority, invalid_queue: PriorityQueue, // max-heap by priority - high_water_thresh: Chunk, // trigger cleaning when >= this - low_water_thresh: Chunk, // clean down to < this + high_water_thresh: Chunk, // trigger cleaning when >= this + low_water_thresh: Chunk, // clean down to < this } impl ZonePriorityQueue { @@ -41,7 +41,10 @@ impl ZonePriorityQueue { // If above high watermark, clean until strictly below low watermark pub fn remove_if_thresh_met(&mut self) -> Vec { let mut zones = Vec::new(); - tracing::trace!("[evict:Chunk] Cleaning zones, invalid={}", self.invalid_count); + tracing::trace!( + "[evict:Chunk] Cleaning zones, invalid={}", + self.invalid_count + ); if self.invalid_count < self.high_water_thresh { return zones; } @@ -53,7 +56,10 @@ impl ZonePriorityQueue { pub fn modify_priority(&mut self, ind: ZoneIndex, priority_increase: ZonePriority) { // Only account for it if the entry exists - if self.invalid_queue.change_priority_by(&ind, |p| *p += priority_increase) { + if self + .invalid_queue + .change_priority_by(&ind, |p| *p += priority_increase) + { self.invalid_count = self.invalid_count.saturating_add(priority_increase); } #[cfg(debug_assertions)] @@ -63,16 +69,20 @@ impl ZonePriorityQueue { #[cfg(debug_assertions)] fn assert_consistent(&self) { let sum: ZonePriority = self.invalid_queue.iter().map(|(_, p)| *p).sum(); - assert_eq!(sum, self.invalid_count, "invalid_count out of sync with heap contents"); + assert_eq!( + sum, self.invalid_count, + "invalid_count out of sync with heap contents" + ); } // Helpers for tests/metrics #[allow(dead_code)] - pub fn invalid_total(&self) -> ZonePriority { self.invalid_count } + pub fn invalid_total(&self) -> ZonePriority { + self.invalid_count + } } #[cfg(test)] mod tests { use super::*; - } From 5a4e9766f7d58716129a07ba8fb003c742dbd8c8 Mon Sep 17 00:00:00 2001 From: slc Date: Wed, 20 Aug 2025 05:20:13 -0700 Subject: [PATCH 2/3] compiler warnings --- oxcache/src/cache/mod.rs | 3 +-- oxcache/src/device.rs | 2 +- oxcache/src/eviction.rs | 1 - oxcache/src/main.rs | 1 - oxcache/src/metrics.rs | 8 +++----- oxcache/src/server.rs | 1 - oxcache/src/zone_state/zone_list.rs | 5 ++--- oxcache/src/zone_state/zone_priority_queue.rs | 2 +- 8 files changed, 8 insertions(+), 15 deletions(-) diff --git a/oxcache/src/cache/mod.rs b/oxcache/src/cache/mod.rs index 1d850a9..0cf4e53 100644 --- a/oxcache/src/cache/mod.rs +++ b/oxcache/src/cache/mod.rs @@ -4,7 +4,6 @@ use bytes::Bytes; use ndarray::{Array2, ArrayBase, s}; use nvme::types::{self, Zone}; use std::io::ErrorKind; -use std::iter::zip; use std::sync::Arc; use std::{collections::HashMap, io}; use tokio::sync::{Notify, RwLock}; @@ -228,7 +227,7 @@ impl Cache { // Collect items and corresponding notifiers let (items, notifies) = { // TODO: Can we deadlock here? - let mut bm = self.bm.write().await; + let bm = self.bm.write().await; let zone_slice = s![zone as usize, ..]; let mut out = Vec::new(); let mut notifies = Vec::new(); diff --git a/oxcache/src/device.rs b/oxcache/src/device.rs index e53dfc5..53ba7aa 100644 --- a/oxcache/src/device.rs +++ b/oxcache/src/device.rs @@ -6,7 +6,6 @@ use crate::metrics::{METRICS, MetricType}; use crate::server::RUNTIME; use crate::writerpool::{WriteRequest, WriterPool}; use crate::zone_state::zone_list::{ZoneList, ZoneObtainFailure}; -use crate::zone_state::zone_priority_queue::ZonePriorityQueue; use bytes::Bytes; use flume::Sender; use futures::future::join_all; @@ -182,6 +181,7 @@ fn trigger_eviction(eviction_channel: Sender) -> io::Result<()> } impl Zoned { + #[allow(dead_code)] fn compact_zone( &self, zone_to_compact: Zone, diff --git a/oxcache/src/eviction.rs b/oxcache/src/eviction.rs index cdeeeef..7984e29 100644 --- a/oxcache/src/eviction.rs +++ b/oxcache/src/eviction.rs @@ -1,7 +1,6 @@ use crate::cache::{Cache, bucket::ChunkLocation}; use crate::device::Device; use crate::writerpool::WriterPool; -use crate::zone_state::zone_priority_queue; use crate::zone_state::zone_priority_queue::{ZoneIndex, ZonePriorityQueue}; use flume::{Receiver, Sender}; use lru::LruCache; diff --git a/oxcache/src/main.rs b/oxcache/src/main.rs index 8550c8f..6291314 100644 --- a/oxcache/src/main.rs +++ b/oxcache/src/main.rs @@ -10,7 +10,6 @@ use std::fs; use std::net::{IpAddr, SocketAddr}; use std::process::exit; use std::sync::OnceLock; -use tracing::{Level, event}; use tracing_appender::{non_blocking, rolling}; use tracing_subscriber::{EnvFilter, Layer, fmt, layer::SubscriberExt, util::SubscriberInitExt}; diff --git a/oxcache/src/metrics.rs b/oxcache/src/metrics.rs index dfa596f..6215167 100644 --- a/oxcache/src/metrics.rs +++ b/oxcache/src/metrics.rs @@ -2,15 +2,12 @@ use axum::{Router, routing::get}; use metrics::{counter, gauge, histogram}; use metrics_exporter_prometheus::PrometheusBuilder; use once_cell::sync::Lazy; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tracing::{Level, event, info}; +use tracing::{Level, event}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use tracing_appender::non_blocking; -use tracing_appender::rolling; -use tracing_subscriber::fmt; pub static METRICS: Lazy = Lazy::new(|| MetricsRecorder::new()); @@ -104,6 +101,7 @@ impl MetricsRecorder { let h = histogram!(format!("{}_{}", self.prefix, name.to_string()), "run_id" => id.clone()); let v = match metric_type { MetricType::MsLatency => value.as_secs_f64() * MILLISECONDS, + #[allow(unreachable_patterns)] _ => { tracing::warn!("Unknown metric type"); return; diff --git a/oxcache/src/server.rs b/oxcache/src/server.rs index 9ccb6a5..8315af3 100644 --- a/oxcache/src/server.rs +++ b/oxcache/src/server.rs @@ -9,7 +9,6 @@ use std::hash::{DefaultHasher, Hash, Hasher}; use std::net::SocketAddr; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{Mutex, Notify}; -use tracing::debug; use crate::cache::bucket::Chunk; use crate::metrics::{HitType, METRICS, MetricType, init_metrics_exporter}; diff --git a/oxcache/src/zone_state/zone_list.rs b/oxcache/src/zone_state/zone_list.rs index bcb34c0..247933a 100644 --- a/oxcache/src/zone_state/zone_list.rs +++ b/oxcache/src/zone_state/zone_list.rs @@ -8,7 +8,6 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::io::{self}; type ZoneIndex = nvme::types::Zone; -type ZonePriority = usize; #[derive(Debug, Clone)] pub struct Zone { @@ -300,7 +299,7 @@ impl ZoneList { res }; - let mut zone = zone.unwrap(); + let zone = zone.unwrap(); let zone_index = zone.index; if zone.chunks_available.len() <= 0 { @@ -416,7 +415,7 @@ impl ZoneList { z }; let zone_index = zone.unwrap(); - let mut zone = self.zones.get_mut(&zone_index).unwrap(); + let zone = self.zones.get_mut(&zone_index).unwrap(); let chunk = zone.chunks_available.pop().unwrap(); if zone.chunks_available.len() >= 1 { diff --git a/oxcache/src/zone_state/zone_priority_queue.rs b/oxcache/src/zone_state/zone_priority_queue.rs index f529909..8adb7d0 100644 --- a/oxcache/src/zone_state/zone_priority_queue.rs +++ b/oxcache/src/zone_state/zone_priority_queue.rs @@ -1,7 +1,7 @@ use priority_queue::PriorityQueue; pub(crate) type ZoneIndex = nvme::types::Zone; -use nvme::types::{Chunk, Zone}; +use nvme::types::Chunk; type ZonePriority = Chunk; pub struct ZonePriorityQueue { From 625be591684482a71d7f1f4b936de2012c9813a4 Mon Sep 17 00:00:00 2001 From: slc Date: Wed, 20 Aug 2025 17:55:28 -0700 Subject: [PATCH 3/3] make eviction not update the lru --- oxcache/src/device.rs | 2 +- oxcache/src/writerpool.rs | 41 ++++++++++++++++++++++++++++++++------- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/oxcache/src/device.rs b/oxcache/src/device.rs index 53ba7aa..daba35c 100644 --- a/oxcache/src/device.rs +++ b/oxcache/src/device.rs @@ -579,7 +579,7 @@ impl Device for Zoned { let (tx, rx) = flume::bounded(1); futures.push(async move { - writer_pool.send(WriteRequest{ + writer_pool.send_no_update_lru(WriteRequest{ data: data.clone(), responder: tx, }).await?; diff --git a/oxcache/src/writerpool.rs b/oxcache/src/writerpool.rs index 3ddc98b..ddc7123 100644 --- a/oxcache/src/writerpool.rs +++ b/oxcache/src/writerpool.rs @@ -20,18 +20,33 @@ pub struct WriteRequest { pub responder: Sender, } +#[derive(Debug)] +pub struct WriteRequestInternal { + pub data: Bytes, + pub responder: Sender, + pub update_lru: bool, +} + +fn request_update_lru(req: WriteRequest) -> WriteRequestInternal { + WriteRequestInternal { data: req.data, responder: req.responder, update_lru: true } +} + +fn request_no_update_lru(req: WriteRequest) -> WriteRequestInternal { + WriteRequestInternal { data: req.data, responder: req.responder, update_lru: false } +} + /// Represents an individual writer thread struct Writer { device: Arc, id: usize, - receiver: Receiver, + receiver: Receiver, eviction: Arc>, } impl Writer { fn new( id: usize, - receiver: Receiver, + receiver: Receiver, device: Arc, eviction: &Arc>, ) -> Self { @@ -52,8 +67,11 @@ impl Writer { let result = self.device.append(msg.data).inspect(|loc| { let mtx = Arc::clone(&self.eviction); - let mut policy = mtx.lock().unwrap(); - policy.write_update(loc.clone()); + + if msg.update_lru { + let mut policy = mtx.lock().unwrap(); + policy.write_update(loc.clone()); + } }); METRICS.update_metric_histogram_latency( "device_write_latency_ms", @@ -77,7 +95,7 @@ impl Writer { /// Pool of writer threads sharing a single receiver #[derive(Debug)] pub struct WriterPool { - sender: Sender, + sender: Sender, handles: Vec>, } @@ -88,7 +106,7 @@ impl WriterPool { device: Arc, eviction_policy: &Arc>, ) -> Self { - let (sender, receiver): (Sender, Receiver) = unbounded(); + let (sender, receiver): (Sender, Receiver) = unbounded(); let mut handles = Vec::with_capacity(num_writers); for id in 0..num_writers { @@ -103,7 +121,16 @@ impl WriterPool { /// Send a message to the writer pool pub async fn send(&self, message: WriteRequest) -> std::io::Result<()> { - self.sender.send_async(message).await.map_err(|e| { + self.sender.send_async(request_update_lru(message)).await.map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("WriterPool::send_async failed: {}", e), + ) + }) + } + + pub async fn send_no_update_lru(&self, message: WriteRequest) -> std::io::Result<()> { + self.sender.send_async(request_no_update_lru(message)).await.map_err(|e| { std::io::Error::new( std::io::ErrorKind::Other, format!("WriterPool::send_async failed: {}", e),