Skip to content

Commit cb3435a

Browse files
committed
Increase length delimited codec to maximum of 2GiB
Otherwise the server just stalls indefinitely
1 parent 9065594 commit cb3435a

3 files changed

Lines changed: 28 additions & 14 deletions

File tree

example.server.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
[server]
22
socket = "/tmp/oxcache.sock"
33
disk = "/dev/nvme3n1"
4-
writer_threads = 14
5-
reader_threads = 14
6-
chunk_size = 8192
4+
writer_threads = 1
5+
reader_threads = 1
6+
chunk_size = 268435456
77

88
[remote]
99
remote_type = "emulated" # emulated | S3

oxcache/src/bin/simpleevaluationclient.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ struct Cli {
3030
num_clients: usize,
3131
}
3232

33+
const MAX_FRAME_LENGTH: usize = 2 * 1024 * 1024 * 1024; // 2 GB
34+
3335
#[tokio::main]
3436
async fn main() -> Result<(), Box<dyn std::error::Error>> {
3537
let nr_queries = 10000;
36-
let nr_uuids = 1000;
38+
let nr_uuids = 100;
3739
let mut queries: Arc<Mutex<Vec<GetRequest>>> = Arc::new(Mutex::new(Vec::new()));
3840
let args = Cli::parse();
3941
let counter = Arc::new(AtomicUsize::new(0));
@@ -55,7 +57,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5557

5658
queries.push(GetRequest {
5759
key: uuid.to_string(),
58-
size: 8192,
60+
size: 268435456,
5961
offset: 0,
6062
});
6163
}
@@ -77,9 +79,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7779
let stream = UnixStream::connect(&sock).await?;
7880
println!("[t.{}] Client connected to {}", c, sock);
7981

80-
let (read_half, write_half) = split(stream);
81-
let mut reader = FramedRead::new(read_half, LengthDelimitedCodec::new());
82-
let mut writer = FramedWrite::new(write_half, LengthDelimitedCodec::new());
82+
let (read_half, write_half) = tokio::io::split(stream);
83+
84+
let codec = LengthDelimitedCodec::builder()
85+
.max_frame_length(MAX_FRAME_LENGTH)
86+
.new_codec();
87+
88+
let mut reader = FramedRead::new(read_half, codec.clone());
89+
let mut writer = FramedWrite::new(write_half, codec);
8390

8491
loop {
8592
let query_num = counter.fetch_add(1, Ordering::Relaxed) + 1;

oxcache/src/server.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ impl<T: RemoteBackend + Send + Sync + 'static> Server<T> {
148148
}
149149
}
150150

151+
const MAX_FRAME_LENGTH: usize = 2 * 1024 * 1024 * 1024; // 2 GB
152+
151153
async fn handle_connection<T: RemoteBackend + Send + Sync + 'static>(
152154
stream: UnixStream,
153155
writer_pool: Arc<WriterPool>,
@@ -156,9 +158,14 @@ async fn handle_connection<T: RemoteBackend + Send + Sync + 'static>(
156158
cache: Arc<Cache>,
157159
chunk_size: usize,
158160
) -> tokio::io::Result<()> {
159-
let (read_half, write_half) = split(stream);
160-
let mut reader = FramedRead::new(read_half, LengthDelimitedCodec::new());
161-
let writer = Arc::new(Mutex::new(FramedWrite::new(write_half, LengthDelimitedCodec::new())));
161+
let (read_half, write_half) = tokio::io::split(stream);
162+
163+
let codec = LengthDelimitedCodec::builder()
164+
.max_frame_length(MAX_FRAME_LENGTH)
165+
.new_codec();
166+
167+
let mut reader = FramedRead::new(read_half, codec.clone());
168+
let writer = Arc::new(Mutex::new(FramedWrite::new(write_half, codec)));
162169

163170
while let Some(frame) = reader.next().await {
164171
let f = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("frame read failed: {}", e)))?;
@@ -169,7 +176,7 @@ async fn handle_connection<T: RemoteBackend + Send + Sync + 'static>(
169176

170177
match msg {
171178
Ok((request, _)) => {
172-
// println!("Received: {:?}", request);
179+
println!("Received req");
173180
match request {
174181
request::Request::Get(req) => {
175182
if let Err(e) = req.validate(chunk_size) {
@@ -259,11 +266,11 @@ async fn handle_connection<T: RemoteBackend + Send + Sync + 'static>(
259266
return Err(std::io::Error::new(std::io::ErrorKind::Other, format!("remote.get failed: {}", e)));
260267
}
261268
};
262-
269+
263270
let encoded = bincode::serde::encode_to_vec(
264271
request::GetResponse::Response(resp.clone()),
265272
bincode::config::standard()
266-
).unwrap();
273+
).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("serialization failed: {}", e)))?;
267274
{
268275
let mut w = writer.lock().await;
269276
w.send(Bytes::from(encoded)).await.map_err(|e| {

0 commit comments

Comments
 (0)