Skip to content

Commit d45f3d0

Browse files
committed
Fix warnings + fmt
1 parent 2dfa799 commit d45f3d0

18 files changed

Lines changed: 1088 additions & 544 deletions

oxcache/src/bin/spclient.rs

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ use oxcache::request;
66
use oxcache::request::{GetRequest, Request};
77
use std::fs::File;
88
use std::io::{BufRead, BufReader, ErrorKind};
9-
use std::sync::atomic::{AtomicUsize, Ordering};
109
use std::sync::Arc;
10+
use std::sync::atomic::{AtomicUsize, Ordering};
1111
use std::time::Instant;
1212
use tokio::net::UnixStream;
1313
use tokio::sync::Mutex;
1414
use tokio::task::JoinHandle;
15-
use tokio::time::{timeout, Duration};
15+
use tokio::time::{Duration, timeout};
1616
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
1717

1818
const MAX_FRAME_LENGTH: usize = 2 * 1024 * 1024 * 1024; // 2 GB
@@ -303,11 +303,9 @@ async fn run_real_mode(
303303
for qreq in quantized {
304304
// Validate request size
305305
if qreq.size > MAX_FRAME_LENGTH as u64 {
306-
return Err(format!(
307-
"Request size {} exceeds MAX_FRAME_LENGTH (2GB)",
308-
qreq.size
309-
)
310-
.into());
306+
return Err(
307+
format!("Request size {} exceeds MAX_FRAME_LENGTH (2GB)", qreq.size).into(),
308+
);
311309
}
312310

313311
get_requests.push(GetRequest {
@@ -379,11 +377,18 @@ async fn run_real_mode(
379377
}
380378

381379
// Send request with timeout
382-
let encoded =
383-
bincode::serde::encode_to_vec(Request::Get(request), bincode::config::standard())
384-
.unwrap();
380+
let encoded = bincode::serde::encode_to_vec(
381+
Request::Get(request),
382+
bincode::config::standard(),
383+
)
384+
.unwrap();
385385

386-
match timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS), writer.send(Bytes::from(encoded))).await {
386+
match timeout(
387+
Duration::from_secs(REQUEST_TIMEOUT_SECS),
388+
writer.send(Bytes::from(encoded)),
389+
)
390+
.await
391+
{
387392
Ok(Ok(_)) => {
388393
// Send successful
389394
}
@@ -396,7 +401,10 @@ async fn run_real_mode(
396401
Err(_) => {
397402
return Err(std::io::Error::new(
398403
ErrorKind::TimedOut,
399-
format!("[Client {}] Send timeout after {} seconds", client_id, REQUEST_TIMEOUT_SECS),
404+
format!(
405+
"[Client {}] Send timeout after {} seconds",
406+
client_id, REQUEST_TIMEOUT_SECS
407+
),
400408
));
401409
}
402410
}
@@ -430,16 +438,23 @@ async fn run_real_mode(
430438
// Server closed connection gracefully - this is expected behavior
431439
// when server reaches benchmark target and shuts down
432440
let completed_reqs = counter.load(Ordering::Relaxed);
433-
println!("[Client {}] Server closed connection gracefully after {} requests",
434-
client_id, completed_reqs);
435-
println!("[Client {}] This is expected when server reaches benchmark target",
436-
client_id);
441+
println!(
442+
"[Client {}] Server closed connection gracefully after {} requests",
443+
client_id, completed_reqs
444+
);
445+
println!(
446+
"[Client {}] This is expected when server reaches benchmark target",
447+
client_id
448+
);
437449
return Ok(());
438450
}
439451
Err(_) => {
440452
return Err(std::io::Error::new(
441453
ErrorKind::TimedOut,
442-
format!("[Client {}] Receive timeout after {} seconds", client_id, REQUEST_TIMEOUT_SECS),
454+
format!(
455+
"[Client {}] Receive timeout after {} seconds",
456+
client_id, REQUEST_TIMEOUT_SECS
457+
),
443458
));
444459
}
445460
}
@@ -470,10 +485,15 @@ async fn run_real_mode(
470485

471486
println!("\n=== Execution Complete ===");
472487
if completed_requests < nr_requests {
473-
println!("NOTE: Server shutdown before all requests completed (expected for byte-target benchmarks)");
474-
println!("Completed {} of {} requests ({:.1}%)",
475-
completed_requests, nr_requests,
476-
(completed_requests as f64 / nr_requests as f64) * 100.0);
488+
println!(
489+
"NOTE: Server shutdown before all requests completed (expected for byte-target benchmarks)"
490+
);
491+
println!(
492+
"Completed {} of {} requests ({:.1}%)",
493+
completed_requests,
494+
nr_requests,
495+
(completed_requests as f64 / nr_requests as f64) * 100.0
496+
);
477497
} else {
478498
println!("Completed all {} requests", nr_requests);
479499
}

oxcache/src/bin/test_subset_reads.rs

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use bytes::Bytes;
21
use bincode::error::DecodeError;
2+
use bytes::Bytes;
33
use futures::{SinkExt, StreamExt};
44
use oxcache::request::{GetRequest, GetResponse, Request};
55
use std::time::Duration;
@@ -29,7 +29,12 @@ impl TestClient {
2929
Ok(Self { reader, writer })
3030
}
3131

32-
async fn get(&mut self, key: String, offset: u64, size: u64) -> Result<Bytes, Box<dyn std::error::Error>> {
32+
async fn get(
33+
&mut self,
34+
key: String,
35+
offset: u64,
36+
size: u64,
37+
) -> Result<Bytes, Box<dyn std::error::Error>> {
3338
let request = Request::Get(GetRequest { key, offset, size });
3439

3540
let encoded = bincode::serde::encode_to_vec(request, bincode::config::standard())?;
@@ -70,11 +75,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7075
let chunk_size = 65536; // 64KB chunk size
7176
let lba_size = 4096; // 4KB logical block size
7277
let test_cases = vec![
73-
(0, lba_size), // Read first 4KB (includes header)
74-
(lba_size, lba_size), // Read second 4KB block
75-
(lba_size * 2, lba_size), // Read third 4KB block
78+
(0, lba_size), // Read first 4KB (includes header)
79+
(lba_size, lba_size), // Read second 4KB block
80+
(lba_size * 2, lba_size), // Read third 4KB block
7681
(lba_size * 4, lba_size * 2), // Read 8KB starting at 16KB offset
77-
(0, chunk_size), // Read entire chunk
82+
(0, chunk_size), // Read entire chunk
7883
];
7984

8085
for (offset, size) in test_cases {
@@ -108,7 +113,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
108113

109114
// Test LBA-aligned edge cases
110115
let edge_cases = vec![
111-
(0, lba_size), // Single LBA at start
116+
(0, lba_size), // Single LBA at start
112117
(chunk_size - lba_size, lba_size), // Single LBA at end
113118
];
114119

@@ -144,7 +149,11 @@ fn validate_subset_data(data: &Bytes, key: &str, offset: u64, size: u64) -> bool
144149
println!("Expected data length: {}", size);
145150

146151
if data.len() != size as usize {
147-
println!("ERROR: Data length mismatch: expected {}, got {}", size, data.len());
152+
println!(
153+
"ERROR: Data length mismatch: expected {}, got {}",
154+
size,
155+
data.len()
156+
);
148157
return false;
149158
}
150159

@@ -154,19 +163,34 @@ fn validate_subset_data(data: &Bytes, key: &str, offset: u64, size: u64) -> bool
154163

155164
// Show more bytes for comparison
156165
let show_bytes = std::cmp::min(32, std::cmp::min(data.len(), expected_data.len()));
157-
println!("Expected first {} bytes: {:02x?}", show_bytes, &expected_data[0..show_bytes]);
158-
println!("Actual first {} bytes: {:02x?}", show_bytes, &data[0..show_bytes]);
166+
println!(
167+
"Expected first {} bytes: {:02x?}",
168+
show_bytes,
169+
&expected_data[0..show_bytes]
170+
);
171+
println!(
172+
"Actual first {} bytes: {:02x?}",
173+
show_bytes,
174+
&data[0..show_bytes]
175+
);
159176

160177
// Find first mismatch
161178
for (i, (actual, expected)) in data.iter().zip(expected_data.iter()).enumerate() {
162179
if actual != expected {
163-
println!("ERROR: First mismatch at byte {}: expected 0x{:02x}, got 0x{:02x}", i, expected, actual);
180+
println!(
181+
"ERROR: First mismatch at byte {}: expected 0x{:02x}, got 0x{:02x}",
182+
i, expected, actual
183+
);
164184
return false;
165185
}
166186
}
167187

168188
if data.len() != expected_data.len() {
169-
println!("ERROR: Length mismatch after byte comparison: actual {}, expected {}", data.len(), expected_data.len());
189+
println!(
190+
"ERROR: Length mismatch after byte comparison: actual {}, expected {}",
191+
data.len(),
192+
expected_data.len()
193+
);
170194
return false;
171195
}
172196

@@ -179,7 +203,10 @@ fn generate_expected_subset_data(key: &str, offset: u64, size: u64) -> Vec<u8> {
179203
use std::collections::hash_map::DefaultHasher;
180204
use std::hash::{Hash, Hasher};
181205

182-
println!("=== DEBUG: Generating expected data for offset={}, size={} ===", offset, size);
206+
println!(
207+
"=== DEBUG: Generating expected data for offset={}, size={} ===",
208+
offset, size
209+
);
183210

184211
const EMULATED_BUFFER_SEED: u64 = 1;
185212
const CHUNK_SIZE: u64 = 65536; // Match our test chunk size
@@ -194,8 +221,8 @@ fn generate_expected_subset_data(key: &str, offset: u64, size: u64) -> Vec<u8> {
194221
println!("Key hash: 0x{:016x}", key_hash);
195222

196223
let mut full_chunk = Vec::new();
197-
full_chunk.extend_from_slice(&key_hash.to_be_bytes()); // 8 bytes
198-
full_chunk.extend_from_slice(&0u64.to_be_bytes()); // 8 bytes - offset=0
224+
full_chunk.extend_from_slice(&key_hash.to_be_bytes()); // 8 bytes
225+
full_chunk.extend_from_slice(&0u64.to_be_bytes()); // 8 bytes - offset=0
199226
full_chunk.extend_from_slice(&CHUNK_SIZE.to_be_bytes()); // 8 bytes - size=chunk_size
200227

201228
println!("Header (24 bytes): {:02x?}", &full_chunk[0..24]);
@@ -217,7 +244,10 @@ fn generate_expected_subset_data(key: &str, offset: u64, size: u64) -> Vec<u8> {
217244

218245
let result = full_chunk[start..end].to_vec();
219246
println!("Extracted subset length: {}", result.len());
220-
println!("Extracted first 32 bytes: {:02x?}", &result[0..std::cmp::min(32, result.len())]);
247+
println!(
248+
"Extracted first 32 bytes: {:02x?}",
249+
&result[0..std::cmp::min(32, result.len())]
250+
);
221251

222252
result
223-
}
253+
}

oxcache/src/cache/bucket.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use crate::request::GetRequest;
2+
use bytes::Bytes;
3+
use lru_mem::HeapSize;
24
use nvme::types::{Byte, Zone};
3-
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
5+
use std::sync::{
6+
Arc,
7+
atomic::{AtomicUsize, Ordering},
8+
};
49
use tokio::sync::{Notify, RwLock};
5-
use lru_mem::HeapSize;
6-
use bytes::Bytes;
710

811
#[derive(Debug, Clone)]
912
pub struct Chunk {
@@ -72,7 +75,11 @@ impl PinnedChunkLocation {
7275
/// Pin this location (increment ref count). Returns a guard that will unpin on drop.
7376
pub fn pin(self: &Arc<Self>) -> PinGuard {
7477
let new_count = self.pin_count.fetch_add(1, Ordering::SeqCst) + 1;
75-
tracing::debug!("Pinning location {:?}, pin_count now: {}", self.location, new_count);
78+
tracing::debug!(
79+
"Pinning location {:?}, pin_count now: {}",
80+
self.location,
81+
new_count
82+
);
7683
PinGuard::new(self)
7784
}
7885

@@ -99,8 +106,12 @@ impl PinnedChunkLocation {
99106
fn unpin(&self) {
100107
let old_count = self.pin_count.fetch_sub(1, Ordering::SeqCst);
101108
let new_count = old_count - 1;
102-
tracing::debug!("Unpinning location {:?}, pin_count now: {}", self.location, new_count);
103-
109+
tracing::debug!(
110+
"Unpinning location {:?}, pin_count now: {}",
111+
self.location,
112+
new_count
113+
);
114+
104115
// If pin count reached 0, notify any waiting eviction threads
105116
if new_count == 0 {
106117
self.unpin_notify.notify_waiters();
@@ -117,7 +128,7 @@ pub struct PinGuard {
117128

118129
impl PinGuard {
119130
fn new(pinned_location: &Arc<PinnedChunkLocation>) -> Self {
120-
Self {
131+
Self {
121132
location: pinned_location.location.clone(),
122133
pinned_location: Arc::clone(pinned_location),
123134
}

0 commit comments

Comments
 (0)