Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ flume = { version = "0.12.0", default-features = false }
futures-channel = "0.3.29"
futures-rustls = { version = "0.26.0", default-features = false }
futures-util = "0.3.29"
libc = "0.2.164"
libc = "0.2.175"
native-tls = "0.2.13"
nix = "0.30.1"
once_cell = "1.18.0"
Expand Down
1 change: 1 addition & 0 deletions compio-quic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ native-certs = ["dep:rustls-native-certs"]
webpki-roots = ["dep:webpki-roots"]
h3 = ["dep:h3", "dep:h3-datagram"]
ring = ["quinn-proto/rustls-ring"]
windows-gro = []

[[example]]
name = "http3-client"
Expand Down
2 changes: 1 addition & 1 deletion compio-quic/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fn main() {
linux_all: { any(target_os = "linux", target_os = "android") },
freebsd: { target_os = "freebsd" },
netbsd: { target_os = "netbsd" },
non_freebsd: { any(target_os = "openbsd", target_os = "netbsd") },
non_freebsd: { any(target_os = "openbsd", target_os = "netbsd", target_os = "dragonfly") },
bsd: { any(freebsd, non_freebsd) },
solarish: { any(target_os = "illumos", target_os = "solaris") },
apple: { target_vendor = "apple" },
Expand Down
12 changes: 5 additions & 7 deletions compio-quic/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{
time::{Duration, Instant},
};

use compio_buf::{BufResult, bytes::Bytes};
use compio_log::{Instrument, error};
use compio_buf::bytes::Bytes;
use compio_log::Instrument;
use compio_runtime::JoinHandle;
use flume::{Receiver, Sender};
use futures_util::{
Expand Down Expand Up @@ -212,11 +212,9 @@ impl ConnectionInner {
}
state
},
BufResult::<(), Vec<u8>>(res, mut buf) = transmit_fut => {
#[allow(unused)]
if let Err(e) = res {
error!("I/O error: {}", e);
}
buf = transmit_fut => {
// The following line is required to avoid "type annotations needed" error
let mut buf: Vec<_> = buf;
buf.clear();
send_buf = Some(buf);
self.state()
Expand Down
2 changes: 1 addition & 1 deletion compio-quic/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl EndpointInner {
fn respond(&self, buf: Vec<u8>, transmit: Transmit) {
let socket = self.socket.clone();
compio_runtime::spawn(async move {
let _ = socket.send(buf, &transmit).await;
socket.send(buf, &transmit).await;
})
.detach();
}
Expand Down
118 changes: 68 additions & 50 deletions compio-quic/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<const N: usize> DerefMut for Ancillary<N> {
}
}

#[cfg(linux)]
#[cfg(linux_all)]
#[inline]
fn max_gso_segments(socket: &UdpSocket) -> io::Result<usize> {
unsafe {
Expand All @@ -123,37 +123,46 @@ fn max_gso_segments(socket: &UdpSocket) -> io::Result<usize> {
}
Ok(512)
}
#[cfg(not(any(linux, windows)))]
#[cfg(not(any(linux_all, windows)))]
#[inline]
fn max_gso_segments(_socket: &UdpSocket) -> io::Result<usize> {
Err(io::Error::from(io::ErrorKind::Unsupported))
}

#[inline]
fn error_is_unsupported(e: &io::Error) -> bool {
if matches!(
e.kind(),
io::ErrorKind::Unsupported | io::ErrorKind::InvalidInput
) {
return true;
}
let Some(raw) = e.raw_os_error() else {
return false;
};
#[cfg(unix)]
{
raw == libc::ENOPROTOOPT
}
#[cfg(windows)]
{
raw == WinSock::WSAENOPROTOOPT
}
}

macro_rules! set_socket_option {
($socket:expr, $level:expr, $name:expr, $value:expr $(,)?) => {
match unsafe { $socket.set_socket_option($level, $name, $value) } {
Ok(()) => true,
Err(e) if error_is_unsupported(&e) => false,
Err(e) => {
compio_log::warn!(
level = stringify!($level),
name = stringify!($name),
"failed to set socket option: {}",
e
);
if e.kind() == io::ErrorKind::InvalidInput {
true
} else if e.raw_os_error()
== Some(
#[cfg(unix)]
libc::ENOPROTOOPT,
#[cfg(windows)]
WinSock::WSAENOPROTOOPT,
)
{
false
} else {
return Err(e);
}
return Err(e);
}
}
};
Expand Down Expand Up @@ -191,13 +200,13 @@ impl Socket {
#[cfg(all(unix, not(any(non_freebsd, solarish))))]
set_socket_option!(socket, libc::IPPROTO_IP, libc::IP_RECVTOS, &1);
#[cfg(windows)]
set_socket_option!(socket, WinSock::IPPROTO_IP, WinSock::IP_ECN, &1);
set_socket_option!(socket, WinSock::IPPROTO_IP, WinSock::IP_RECVECN, &1);
}
if is_ipv6 {
#[cfg(unix)]
set_socket_option!(socket, libc::IPPROTO_IPV6, libc::IPV6_RECVTCLASS, &1);
#[cfg(windows)]
set_socket_option!(socket, WinSock::IPPROTO_IPV6, WinSock::IPV6_ECN, &1);
set_socket_option!(socket, WinSock::IPPROTO_IPV6, WinSock::IPV6_RECVECN, &1);
}

// pktinfo / destination address
Expand All @@ -217,6 +226,7 @@ impl Socket {
}

// disable fragmentation
#[allow(unused_mut)]
let mut may_fragment = false;
if is_ipv4 {
#[cfg(linux_all)]
Expand Down Expand Up @@ -248,16 +258,11 @@ impl Socket {
&libc::IPV6_PMTUDISC_PROBE,
);
}
#[cfg(all(unix, not(non_freebsd)))]
#[cfg(unix)]
{
may_fragment |=
set_socket_option!(socket, libc::IPPROTO_IPV6, libc::IPV6_DONTFRAG, &1);
}
#[cfg(non_freebsd)]
{
// FIXME: workaround until https://github.com/rust-lang/libc/pull/3716 is released (at least in 0.2.155)
may_fragment |= set_socket_option!(socket, libc::IPPROTO_IPV6, 62, &1);
}
#[cfg(windows)]
{
may_fragment |=
Expand All @@ -266,13 +271,13 @@ impl Socket {
}

// GRO
#[allow(unused_mut)] // only mutable on Linux and Windows
#[allow(unused_mut)]
let mut max_gro_segments = 1;
#[cfg(linux)]
#[cfg(linux_all)]
if set_socket_option!(socket, libc::SOL_UDP, libc::UDP_GRO, &1) {
max_gro_segments = 64;
}
#[cfg(windows)]
#[cfg(all(windows, feature = "windows-gro"))]
if set_socket_option!(
socket,
WinSock::IPPROTO_UDP,
Expand Down Expand Up @@ -332,7 +337,7 @@ impl Socket {

let mut ecn_bits = 0u8;
let mut local_ip = None;
#[allow(unused_mut)] // only mutable on Linux
#[allow(unused_mut)]
let mut stride = len;

// SAFETY: `control` contains valid data
Expand Down Expand Up @@ -386,7 +391,7 @@ impl Socket {
}

// GRO
#[cfg(linux)]
#[cfg(linux_all)]
(libc::SOL_UDP, libc::UDP_GRO) => stride = *cmsg.data::<libc::c_int>() as usize,
#[cfg(windows)]
(WinSock::IPPROTO_UDP, UDP_COALESCED_INFO) => {
Expand All @@ -408,7 +413,7 @@ impl Socket {
BufResult(Ok(meta), buffer)
}

pub async fn send<T: IoBuf>(&self, buffer: T, transmit: &Transmit) -> BufResult<(), T> {
pub async fn send<T: IoBuf>(&self, buffer: T, transmit: &Transmit) -> T {
let is_ipv4 = transmit.destination.ip().to_canonical().is_ipv4();
let ecn = transmit.ecn.map_or(0, |x| x as u8);

Expand Down Expand Up @@ -492,40 +497,53 @@ impl Socket {
}

// GSO
if let Some(segment_size) = transmit.segment_size {
#[cfg(linux)]
if let Some(segment_size) = transmit.segment_size
&& segment_size < transmit.size
{
#[cfg(linux_all)]
builder.try_push(libc::SOL_UDP, libc::UDP_SEGMENT, segment_size as u16);
#[cfg(windows)]
builder.try_push(
WinSock::IPPROTO_UDP,
WinSock::UDP_SEND_MSG_SIZE,
segment_size as u32,
);
#[cfg(not(any(linux, windows)))]
#[cfg(not(any(linux_all, windows)))]
let _ = segment_size;
}

let len = builder.finish();
control.len = len;

let buffer = buffer.slice(0..transmit.size);
let BufResult(res, (buffer, _)) = self
.inner
.send_msg(buffer, control, transmit.destination)
.await;
let buffer = buffer.into_inner();
match res {
Ok(_) => BufResult(Ok(()), buffer),
Err(e) => {
#[cfg(linux)]
if let Some(libc::EIO) | Some(libc::EINVAL) = e.raw_os_error()
&& self.max_gso_segments() > 1
{
self.has_gso_error.store(true, Ordering::Relaxed);
let mut buffer = buffer.slice(0..transmit.size);

loop {
let res;
BufResult(res, (buffer, control)) = self
.inner
.send_msg(buffer, control, transmit.destination)
.await;

match res {
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Ok(_) => break,
Err(e) => {
#[cfg(linux_all)]
if matches!(e.raw_os_error(), Some(libc::EIO) | Some(libc::EINVAL))
&& self.max_gso_segments() > 1
{
self.has_gso_error.store(true, Ordering::Relaxed);
}
#[cfg(unix)]
if matches!(e.raw_os_error(), Some(libc::EMSGSIZE)) {
break;
}
compio_log::info!("failed to send UDP datagram: {e:?}, {transmit:?}");
}
BufResult(Err(e), buffer)
}
}

buffer.into_inner()
}

pub fn close(self) -> impl Future<Output = io::Result<()>> {
Expand All @@ -542,7 +560,7 @@ impl Clone for Socket {
max_gso_segments: self.max_gso_segments,
has_gso_error: AtomicBool::new(self.has_gso_error.load(Ordering::Relaxed)),
#[cfg(freebsd)]
encode_src_ip_v4: self.encode_src_ip_v4.clone(),
encode_src_ip_v4: self.encode_src_ip_v4,
}
}
}
Expand All @@ -565,7 +583,7 @@ mod tests {
let passive_addr = passive.local_addr().unwrap();
let active_addr = active.local_addr().unwrap();

let (_, content) = active.send(content, &transmit).await.unwrap();
let content = active.send(content, &transmit).await;

let segment_size = transmit.segment_size.unwrap_or(transmit.size);
let expected_datagrams = transmit.size / segment_size;
Expand Down
Loading