Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions io/zenoh-links/zenoh-link-quic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rustls = { workspace = true }
rustls-pemfile = { workspace = true }
rustls-webpki = { workspace = true }
secrecy = { workspace = true }
socket2 = { workspace = true }
time = { workspace = true }
tokio = { workspace = true, features = [
"fs",
Expand Down
4 changes: 4 additions & 0 deletions io/zenoh-links/zenoh-link-quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use zenoh_protocol::{
use zenoh_result::ZResult;

mod unicast;
mod utils;
pub use unicast::*;
pub use zenoh_link_commons::quic::TlsConfigurator as QuicConfigurator;

Expand Down Expand Up @@ -82,3 +83,6 @@ zconfigurable! {
// Default set to 100 ms.
static ref QUIC_ACCEPT_THROTTLE_TIME: u64 = 100_000;
}



109 changes: 82 additions & 27 deletions io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ use zenoh_protocol::{
use zenoh_result::{bail, zerror, ZResult};

use super::{QUIC_ACCEPT_THROTTLE_TIME, QUIC_DEFAULT_MTU, QUIC_LOCATOR_PREFIX};
use crate::utils::QuicLinkConfig;



pub struct LinkUnicastQuic {
connection: quinn::Connection,
Expand Down Expand Up @@ -285,6 +288,26 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic {
set_dscp(&socket, src_addr, dscp)?;
}

// Apply Buffer Tuning
let quic_link_config = QuicLinkConfig::new(&epconf).await?;

if let Some(size) = quic_link_config.rx_buffer_size {
let socket_ref = socket2::SockRef::from(&socket);
if let Err(e) = socket_ref.set_recv_buffer_size(size as usize) {
tracing::warn!("Failed to set QUIC (UDP) SO_RCVBUF to {}: {}", size, e);
} else {
tracing::debug!("Set QUIC (UDP) SO_RCVBUF to {}", size);
}
}
if let Some(size) = quic_link_config.tx_buffer_size {
let socket_ref = socket2::SockRef::from(&socket);
if let Err(e) = socket_ref.set_send_buffer_size(size as usize) {
tracing::warn!("Failed to set QUIC (UDP) SO_SNDBUF to {}: {}", size, e);
} else {
tracing::debug!("Set QUIC (UDP) SO_SNDBUF to {}", size);
}
}

// Initialize the Endpoint
if let Some(iface) = client_crypto.bind_iface {
zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface)?;
Expand All @@ -307,7 +330,17 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic {
.client_config
.try_into()
.map_err(|e| zerror!("Can not get QUIC config {host}: {e}"))?;
quic_endpoint.set_default_client_config(quinn::ClientConfig::new(Arc::new(quic_config)));
let mut client_config = quinn::ClientConfig::new(Arc::new(quic_config));

let mut transport = quinn::TransportConfig::default();
let window_size = 67108864u32; // 64 MB
transport.receive_window(quinn::VarInt::from_u32(window_size));
transport.stream_receive_window(quinn::VarInt::from_u32(window_size));
transport.datagram_receive_buffer_size(Some(window_size as usize));

client_config.transport_config(Arc::new(transport));

quic_endpoint.set_default_client_config(client_config);

let src_addr = quic_endpoint
.local_addr()
Expand Down Expand Up @@ -395,37 +428,59 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic {
.map_err(|e| zerror!("Can not create a new QUIC listener on {addr}: {e}"))?;
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config));

// We do not accept unidireactional streams.
Arc::get_mut(&mut server_config.transport)
.unwrap()
.max_concurrent_uni_streams(0_u8.into());
// For the time being we only allow one bidirectional stream
Arc::get_mut(&mut server_config.transport)
.unwrap()
.max_concurrent_bidi_streams(1_u8.into());
// Tune Quinn Flow Control for High Latency
let mut transport = quinn::TransportConfig::default();
transport.max_concurrent_uni_streams(0_u8.into());
transport.max_concurrent_bidi_streams(1_u8.into());

let window_size = 67108864u32; // 64 MB

transport.receive_window(quinn::VarInt::from_u32(window_size));
transport.stream_receive_window(quinn::VarInt::from_u32(window_size));
transport.datagram_receive_buffer_size(Some(window_size as usize));

server_config.transport = Arc::new(transport);

// Initialize the Endpoint
let quic_endpoint = if let Some(iface) = server_crypto.bind_iface {
async {
// Bind the UDP socket
let socket = tokio::net::UdpSocket::bind(addr).await?;
let quic_endpoint = {
// Always bind manually to support socket configuration
let socket = tokio::net::UdpSocket::bind(addr).await?;

if let Some(iface) = server_crypto.bind_iface {
zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface)?;
}

// Apply Buffer Tuning
let quic_link_config = QuicLinkConfig::new(&epconf).await?;

// create the Endpoint with this socket
let runtime = quinn::default_runtime()
.ok_or_else(|| std::io::Error::other("no async runtime found"))?;
ZResult::Ok(quinn::Endpoint::new_with_abstract_socket(
EndpointConfig::default(),
Some(server_config),
runtime.wrap_udp_socket(socket.into_std()?)?,
runtime,
)?)
if let Some(size) = quic_link_config.rx_buffer_size {
let socket_ref = socket2::SockRef::from(&socket);
if let Err(e) = socket_ref.set_recv_buffer_size(size as usize) {
tracing::warn!("Failed to set QUIC (UDP) SO_RCVBUF to {}: {}", size, e);
} else {
tracing::debug!("Set QUIC (UDP) SO_RCVBUF to {}", size);
}
}
.await
} else {
quinn::Endpoint::server(server_config, addr).map_err(Into::into)
}
.map_err(|e| zerror!("Can not create a new QUIC listener on {}: {}", addr, e))?;
if let Some(size) = quic_link_config.tx_buffer_size {
let socket_ref = socket2::SockRef::from(&socket);
if let Err(e) = socket_ref.set_send_buffer_size(size as usize) {
tracing::warn!("Failed to set QUIC (UDP) SO_SNDBUF to {}: {}", size, e);
} else {
tracing::debug!("Set QUIC (UDP) SO_SNDBUF to {}", size);
}
}

// create the Endpoint with this socket
let runtime = quinn::default_runtime()
.ok_or_else(|| std::io::Error::other("no async runtime found"))?;

quinn::Endpoint::new_with_abstract_socket(
EndpointConfig::default(),
Some(server_config),
runtime.wrap_udp_socket(socket.into_std()?)?,
runtime,
).map_err(|e| zerror!("Can not create a new QUIC listener on {}: {}", addr, e))?
};

let local_addr = quic_endpoint
.local_addr()
Expand Down
Loading