Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion nvme/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ pub fn get_zone_capacity(fd: RawFd, nsid: u32) -> Result<LogicalBlock, NVMeError

/// Returns the zone capacity for the given NVMe device and namespace.
pub fn get_zone_state(fd: RawFd, nsid: u32, zone: Zone) -> Result<ZoneState, NVMeError> {
Ok(report_zones_all(fd, nsid)?.1[zone as usize].zone_state.clone())
Ok(report_zones_all(fd, nsid)?.1[zone as usize]
.zone_state
.clone())
}

/// Returns a report of all zones for the given NVMe device and namespace.
Expand Down
14 changes: 7 additions & 7 deletions oxcache/src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::cache::bucket::Chunk as CacheKey;
use crate::cache::bucket::{Chunk, ChunkLocation, ChunkState};
use bytes::Bytes;
use ndarray::{Array2, ArrayBase, s};
use nvme::types::{self, Zone};
use std::io::ErrorKind;
use std::iter::zip;
use std::sync::Arc;
use std::{collections::HashMap, io};
use bytes::Bytes;
use tokio::sync::{Notify, RwLock};
use crate::cache::bucket::Chunk as CacheKey;

pub mod bucket;

Expand Down Expand Up @@ -228,7 +227,7 @@ impl Cache {
// Collect items and corresponding notifiers
let (items, notifies) = {
// TODO: Can we deadlock here?
let mut bm = self.bm.write().await;
let bm = self.bm.write().await;
let zone_slice = s![zone as usize, ..];
let mut out = Vec::new();
let mut notifies = Vec::new();
Expand All @@ -247,7 +246,10 @@ impl Cache {
ChunkState::Ready(loc) => Arc::clone(loc),
ChunkState::Waiting(_) => {
// TODO: Shouldnt occur since zone was full
return Err(io::Error::new(ErrorKind::Other, "Encountered invalid waiting state during zone cleaning"))
return Err(io::Error::new(
ErrorKind::Other,
"Encountered invalid waiting state during zone cleaning",
));
}
};

Expand Down Expand Up @@ -321,8 +323,6 @@ impl Cache {
Ok(())
}



pub async fn remove_entry(&self, chunk: &ChunkLocation) -> tokio::io::Result<()> {
// to_relocate is a list of ChunkLocations that the caller wants to update
// We pass in each chunk location and the writer function should return back with the list of updated chunk locations
Expand Down
91 changes: 62 additions & 29 deletions oxcache/src/device.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{fs, io};
use crate::cache::Cache;
use crate::cache::bucket::Chunk as CacheKey;
use crate::cache::bucket::ChunkLocation;
use crate::eviction::{EvictTarget, EvictorMessage};
use crate::metrics::{METRICS, MetricType};
use crate::server::RUNTIME;
use crate::writerpool::{WriteRequest, WriterPool};
use crate::zone_state::zone_list::{ZoneList, ZoneObtainFailure};
Expand All @@ -14,9 +15,7 @@ use nvme::types::{Byte, Chunk, LogicalBlock, NVMeConfig, PerformOn, ZNSConfig, Z
use std::io::ErrorKind;
use std::os::fd::RawFd;
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use crate::metrics::{MetricType, METRICS};
use crate::zone_state::zone_priority_queue::ZonePriorityQueue;
use crate::cache::bucket::Chunk as CacheKey;
use std::{fs, io};

pub struct Zoned {
nvme_config: NVMeConfig,
Expand Down Expand Up @@ -66,7 +65,12 @@ pub trait Device: Send + Sync {
fn append(&self, data: Bytes) -> std::io::Result<ChunkLocation>;

/// This is expected to remove elements from the cache as well
fn evict(self: Arc<Self>, locations: EvictTarget, cache: Arc<Cache>, writer_pool: Arc<WriterPool>) -> io::Result<()>;
fn evict(
self: Arc<Self>,
locations: EvictTarget,
cache: Arc<Cache>,
writer_pool: Arc<WriterPool>,
) -> io::Result<()>;

fn read(&self, location: ChunkLocation) -> std::io::Result<Bytes>;

Expand Down Expand Up @@ -177,6 +181,7 @@ fn trigger_eviction(eviction_channel: Sender<EvictorMessage>) -> io::Result<()>
}

impl Zoned {
#[allow(dead_code)]
fn compact_zone(
&self,
zone_to_compact: Zone,
Expand Down Expand Up @@ -248,7 +253,6 @@ impl Zoned {

Ok(())
}

}

impl Zoned {
Expand All @@ -274,20 +278,23 @@ impl Zoned {
config.chunk_size_in_lbas = chunk_size_in_logical_blocks;
config.chunk_size_in_bytes = chunk_size;
let num_zones: Zone = config.num_zones;

// Apply max_zones restriction if specified
let restricted_num_zones = if let Some(max_zones) = max_zones {
if max_zones > num_zones {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("max_zones ({}) cannot be larger than the maximum number of zones available on the device ({})", max_zones, num_zones)
format!(
"max_zones ({}) cannot be larger than the maximum number of zones available on the device ({})",
max_zones, num_zones
),
));
}
max_zones
} else {
num_zones
};

let zone_list = ZoneList::new(
restricted_num_zones,
config.chunks_per_zone,
Expand Down Expand Up @@ -453,7 +460,6 @@ impl Zoned {
impl Device for Zoned {
/// Hold internal state to keep track of zone state
fn append(&self, data: Bytes) -> std::io::Result<ChunkLocation> {

let sz = data.len() as u64;

let zone_index: Zone = loop {
Expand All @@ -473,7 +479,11 @@ impl Device for Zoned {

let start = std::time::Instant::now();
let res = self.chunked_append(data, zone_index);
METRICS.update_metric_histogram_latency("disk_write_latency_ms", start.elapsed(), MetricType::MsLatency);
METRICS.update_metric_histogram_latency(
"disk_write_latency_ms",
start.elapsed(),
MetricType::MsLatency,
);
METRICS.update_metric_counter("written_bytes_total", sz);
METRICS.update_metric_counter("bytes_total", sz);
res
Expand All @@ -486,13 +496,22 @@ impl Device for Zoned {

let start = std::time::Instant::now();
self.read_into_buffer(self.max_write_size, slba, &mut data, &self.nvme_config)?;
METRICS.update_metric_histogram_latency("disk_read_latency_ms", start.elapsed(), MetricType::MsLatency);
METRICS.update_metric_histogram_latency(
"disk_read_latency_ms",
start.elapsed(),
MetricType::MsLatency,
);
METRICS.update_metric_counter("read_bytes_total", data.len() as u64);
METRICS.update_metric_counter("bytes_total", data.len() as u64);
Ok(Bytes::from(data))
}

fn evict(self: Arc<Self>, locations: EvictTarget, cache: Arc<Cache>, writer_pool: Arc<WriterPool>) -> io::Result<()> {
fn evict(
self: Arc<Self>,
locations: EvictTarget,
cache: Arc<Cache>,
writer_pool: Arc<WriterPool>,
) -> io::Result<()> {
let usage = self.get_use_percentage();
METRICS.update_metric_gauge("usage_percentage", usage as f64);

Expand All @@ -514,9 +533,8 @@ impl Device for Zoned {
let cache_clone = cache.clone();
let self_clone = self_clone.clone();
let writer_pool = writer_pool.clone();
tokio::spawn(
async move {
cache_clone.clean_zone_and_update_map(
tokio::spawn(async move {
cache_clone.clean_zone_and_update_map(
zone.clone(),
// Reads all valid chunks in zone and returns buffer [(Chunk, Bytes)]
// which is the list of chunks that need to be written back
Expand Down Expand Up @@ -561,7 +579,7 @@ impl Device for Zoned {
let (tx, rx) = flume::bounded(1);
futures.push(async move {

writer_pool.send(WriteRequest{
writer_pool.send_no_update_lru(WriteRequest{
data: data.clone(),
responder: tx,
}).await?;
Expand All @@ -583,11 +601,11 @@ impl Device for Zoned {
}
},
).await
// Interesting, the compiler will throw an error if I don't await here.
// The async move moves the value and declares that this block returns a Future
// If we don't await it, the Future is referencing a local value. So we await it
// to store it in the state of the Future. I think that's what's happening here
});
// Interesting, the compiler will throw an error if I don't await here.
// The async move moves the value and declares that this block returns a Future
// If we don't await it, the Future is referencing a local value. So we await it
// to store it in the state of the Future. I think that's what's happening here
});

tracing::debug!("[evict:Chunk] Cleaned zone {}", zone);
}
Expand Down Expand Up @@ -713,20 +731,23 @@ impl BlockInterface {

// Num_zones
let num_zones = nvme_config.total_size_in_bytes / block_zone_capacity;

// Apply max_zones restriction if specified
let restricted_num_zones = if let Some(max_zones) = max_zones {
if max_zones > num_zones {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("max_zones ({}) cannot be larger than the maximum number of zones available on the device ({})", max_zones, num_zones)
format!(
"max_zones ({}) cannot be larger than the maximum number of zones available on the device ({})",
max_zones, num_zones
),
));
}
max_zones
} else {
num_zones
};

// Chunks per zone
let chunks_per_zone = block_zone_capacity / chunk_size;

Expand Down Expand Up @@ -826,7 +847,11 @@ impl Device for BlockInterface {

let start = std::time::Instant::now();
self.chunked_append(data, write_addr)?;
METRICS.update_metric_histogram_latency("disk_write_latency_ms", start.elapsed(), MetricType::MsLatency);
METRICS.update_metric_histogram_latency(
"disk_write_latency_ms",
start.elapsed(),
MetricType::MsLatency,
);
METRICS.update_metric_counter("written_bytes_total", sz);
METRICS.update_metric_counter("bytes_total", sz);
Ok(chunk_location)
Expand All @@ -844,18 +869,26 @@ impl Device for BlockInterface {
&mut data,
&self.nvme_config,
)?;
METRICS.update_metric_histogram_latency("disk_read_latency_ms", start.elapsed(), MetricType::MsLatency);
METRICS.update_metric_histogram_latency(
"disk_read_latency_ms",
start.elapsed(),
MetricType::MsLatency,
);
METRICS.update_metric_counter("read_bytes_total", data.len() as u64);
METRICS.update_metric_counter("bytes_total", data.len() as u64);
Ok(Bytes::from(data))
}

fn evict(self: Arc<Self>, locations: EvictTarget, cache: Arc<Cache>, _writer_pool: Arc<WriterPool>) -> io::Result<()> {
fn evict(
self: Arc<Self>,
locations: EvictTarget,
cache: Arc<Cache>,
_writer_pool: Arc<WriterPool>,
) -> io::Result<()> {
let usage = self.get_use_percentage();
METRICS.update_metric_gauge("usage_percentage", usage as f64);
match locations {
EvictTarget::Chunk(chunk_locations, _) => {

if chunk_locations.is_empty() {
return Ok(());
}
Expand Down
Loading