diff --git a/Cargo.toml b/Cargo.toml index e69e1838..a7a131c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ flume = { version = "0.12.0", default-features = false } futures-channel = "0.3.29" futures-rustls = { version = "0.26.0", default-features = false } futures-util = "0.3.29" -libc = "0.2.164" +libc = "0.2.175" native-tls = "0.2.13" nix = "0.30.1" once_cell = "1.18.0" diff --git a/compio-quic/Cargo.toml b/compio-quic/Cargo.toml index f5291c9d..b7ae84e6 100644 --- a/compio-quic/Cargo.toml +++ b/compio-quic/Cargo.toml @@ -70,6 +70,7 @@ native-certs = ["dep:rustls-native-certs"] webpki-roots = ["dep:webpki-roots"] h3 = ["dep:h3", "dep:h3-datagram"] ring = ["quinn-proto/rustls-ring"] +windows-gro = [] [[example]] name = "http3-client" diff --git a/compio-quic/build.rs b/compio-quic/build.rs index 4b852427..1a607163 100644 --- a/compio-quic/build.rs +++ b/compio-quic/build.rs @@ -7,7 +7,7 @@ fn main() { linux_all: { any(target_os = "linux", target_os = "android") }, freebsd: { target_os = "freebsd" }, netbsd: { target_os = "netbsd" }, - non_freebsd: { any(target_os = "openbsd", target_os = "netbsd") }, + non_freebsd: { any(target_os = "openbsd", target_os = "netbsd", target_os = "dragonfly") }, bsd: { any(freebsd, non_freebsd) }, solarish: { any(target_os = "illumos", target_os = "solaris") }, apple: { target_vendor = "apple" }, diff --git a/compio-quic/src/connection.rs b/compio-quic/src/connection.rs index 7acb299f..79fd4cb4 100644 --- a/compio-quic/src/connection.rs +++ b/compio-quic/src/connection.rs @@ -7,8 +7,8 @@ use std::{ time::{Duration, Instant}, }; -use compio_buf::{BufResult, bytes::Bytes}; -use compio_log::{Instrument, error}; +use compio_buf::bytes::Bytes; +use compio_log::Instrument; use compio_runtime::JoinHandle; use flume::{Receiver, Sender}; use futures_util::{ @@ -212,11 +212,9 @@ impl ConnectionInner { } state }, - BufResult::<(), Vec>(res, mut buf) = transmit_fut => { - #[allow(unused)] - if let Err(e) = res { - error!("I/O error: {}", e); - } + buf = transmit_fut => { + // The following line is required to avoid "type annotations needed" error + let mut buf: Vec<_> = buf; buf.clear(); send_buf = Some(buf); self.state() diff --git a/compio-quic/src/endpoint.rs b/compio-quic/src/endpoint.rs index 478d00a2..df2379f3 100644 --- a/compio-quic/src/endpoint.rs +++ b/compio-quic/src/endpoint.rs @@ -204,7 +204,7 @@ impl EndpointInner { fn respond(&self, buf: Vec, transmit: Transmit) { let socket = self.socket.clone(); compio_runtime::spawn(async move { - let _ = socket.send(buf, &transmit).await; + socket.send(buf, &transmit).await; }) .detach(); } diff --git a/compio-quic/src/socket.rs b/compio-quic/src/socket.rs index b1622c63..67cdafb4 100644 --- a/compio-quic/src/socket.rs +++ b/compio-quic/src/socket.rs @@ -107,7 +107,7 @@ impl DerefMut for Ancillary { } } -#[cfg(linux)] +#[cfg(linux_all)] #[inline] fn max_gso_segments(socket: &UdpSocket) -> io::Result { unsafe { @@ -123,16 +123,38 @@ fn max_gso_segments(socket: &UdpSocket) -> io::Result { } Ok(512) } -#[cfg(not(any(linux, windows)))] +#[cfg(not(any(linux_all, windows)))] #[inline] fn max_gso_segments(_socket: &UdpSocket) -> io::Result { Err(io::Error::from(io::ErrorKind::Unsupported)) } +#[inline] +fn error_is_unsupported(e: &io::Error) -> bool { + if matches!( + e.kind(), + io::ErrorKind::Unsupported | io::ErrorKind::InvalidInput + ) { + return true; + } + let Some(raw) = e.raw_os_error() else { + return false; + }; + #[cfg(unix)] + { + raw == libc::ENOPROTOOPT + } + #[cfg(windows)] + { + raw == WinSock::WSAENOPROTOOPT + } +} + macro_rules! set_socket_option { ($socket:expr, $level:expr, $name:expr, $value:expr $(,)?) => { match unsafe { $socket.set_socket_option($level, $name, $value) } { Ok(()) => true, + Err(e) if error_is_unsupported(&e) => false, Err(e) => { compio_log::warn!( level = stringify!($level), @@ -140,20 +162,7 @@ macro_rules! set_socket_option { "failed to set socket option: {}", e ); - if e.kind() == io::ErrorKind::InvalidInput { - true - } else if e.raw_os_error() - == Some( - #[cfg(unix)] - libc::ENOPROTOOPT, - #[cfg(windows)] - WinSock::WSAENOPROTOOPT, - ) - { - false - } else { - return Err(e); - } + return Err(e); } } }; @@ -191,13 +200,13 @@ impl Socket { #[cfg(all(unix, not(any(non_freebsd, solarish))))] set_socket_option!(socket, libc::IPPROTO_IP, libc::IP_RECVTOS, &1); #[cfg(windows)] - set_socket_option!(socket, WinSock::IPPROTO_IP, WinSock::IP_ECN, &1); + set_socket_option!(socket, WinSock::IPPROTO_IP, WinSock::IP_RECVECN, &1); } if is_ipv6 { #[cfg(unix)] set_socket_option!(socket, libc::IPPROTO_IPV6, libc::IPV6_RECVTCLASS, &1); #[cfg(windows)] - set_socket_option!(socket, WinSock::IPPROTO_IPV6, WinSock::IPV6_ECN, &1); + set_socket_option!(socket, WinSock::IPPROTO_IPV6, WinSock::IPV6_RECVECN, &1); } // pktinfo / destination address @@ -217,6 +226,7 @@ impl Socket { } // disable fragmentation + #[allow(unused_mut)] let mut may_fragment = false; if is_ipv4 { #[cfg(linux_all)] @@ -248,16 +258,11 @@ impl Socket { &libc::IPV6_PMTUDISC_PROBE, ); } - #[cfg(all(unix, not(non_freebsd)))] + #[cfg(unix)] { may_fragment |= set_socket_option!(socket, libc::IPPROTO_IPV6, libc::IPV6_DONTFRAG, &1); } - #[cfg(non_freebsd)] - { - // FIXME: workaround until https://github.com/rust-lang/libc/pull/3716 is released (at least in 0.2.155) - may_fragment |= set_socket_option!(socket, libc::IPPROTO_IPV6, 62, &1); - } #[cfg(windows)] { may_fragment |= @@ -266,13 +271,13 @@ impl Socket { } // GRO - #[allow(unused_mut)] // only mutable on Linux and Windows + #[allow(unused_mut)] let mut max_gro_segments = 1; - #[cfg(linux)] + #[cfg(linux_all)] if set_socket_option!(socket, libc::SOL_UDP, libc::UDP_GRO, &1) { max_gro_segments = 64; } - #[cfg(windows)] + #[cfg(all(windows, feature = "windows-gro"))] if set_socket_option!( socket, WinSock::IPPROTO_UDP, @@ -332,7 +337,7 @@ impl Socket { let mut ecn_bits = 0u8; let mut local_ip = None; - #[allow(unused_mut)] // only mutable on Linux + #[allow(unused_mut)] let mut stride = len; // SAFETY: `control` contains valid data @@ -386,7 +391,7 @@ impl Socket { } // GRO - #[cfg(linux)] + #[cfg(linux_all)] (libc::SOL_UDP, libc::UDP_GRO) => stride = *cmsg.data::() as usize, #[cfg(windows)] (WinSock::IPPROTO_UDP, UDP_COALESCED_INFO) => { @@ -408,7 +413,7 @@ impl Socket { BufResult(Ok(meta), buffer) } - pub async fn send(&self, buffer: T, transmit: &Transmit) -> BufResult<(), T> { + pub async fn send(&self, buffer: T, transmit: &Transmit) -> T { let is_ipv4 = transmit.destination.ip().to_canonical().is_ipv4(); let ecn = transmit.ecn.map_or(0, |x| x as u8); @@ -492,8 +497,10 @@ impl Socket { } // GSO - if let Some(segment_size) = transmit.segment_size { - #[cfg(linux)] + if let Some(segment_size) = transmit.segment_size + && segment_size < transmit.size + { + #[cfg(linux_all)] builder.try_push(libc::SOL_UDP, libc::UDP_SEGMENT, segment_size as u16); #[cfg(windows)] builder.try_push( @@ -501,31 +508,42 @@ impl Socket { WinSock::UDP_SEND_MSG_SIZE, segment_size as u32, ); - #[cfg(not(any(linux, windows)))] + #[cfg(not(any(linux_all, windows)))] let _ = segment_size; } let len = builder.finish(); control.len = len; - let buffer = buffer.slice(0..transmit.size); - let BufResult(res, (buffer, _)) = self - .inner - .send_msg(buffer, control, transmit.destination) - .await; - let buffer = buffer.into_inner(); - match res { - Ok(_) => BufResult(Ok(()), buffer), - Err(e) => { - #[cfg(linux)] - if let Some(libc::EIO) | Some(libc::EINVAL) = e.raw_os_error() - && self.max_gso_segments() > 1 - { - self.has_gso_error.store(true, Ordering::Relaxed); + let mut buffer = buffer.slice(0..transmit.size); + + loop { + let res; + BufResult(res, (buffer, control)) = self + .inner + .send_msg(buffer, control, transmit.destination) + .await; + + match res { + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Ok(_) => break, + Err(e) => { + #[cfg(linux_all)] + if matches!(e.raw_os_error(), Some(libc::EIO) | Some(libc::EINVAL)) + && self.max_gso_segments() > 1 + { + self.has_gso_error.store(true, Ordering::Relaxed); + } + #[cfg(unix)] + if matches!(e.raw_os_error(), Some(libc::EMSGSIZE)) { + break; + } + compio_log::info!("failed to send UDP datagram: {e:?}, {transmit:?}"); } - BufResult(Err(e), buffer) } } + + buffer.into_inner() } pub fn close(self) -> impl Future> { @@ -542,7 +560,7 @@ impl Clone for Socket { max_gso_segments: self.max_gso_segments, has_gso_error: AtomicBool::new(self.has_gso_error.load(Ordering::Relaxed)), #[cfg(freebsd)] - encode_src_ip_v4: self.encode_src_ip_v4.clone(), + encode_src_ip_v4: self.encode_src_ip_v4, } } } @@ -565,7 +583,7 @@ mod tests { let passive_addr = passive.local_addr().unwrap(); let active_addr = active.local_addr().unwrap(); - let (_, content) = active.send(content, &transmit).await.unwrap(); + let content = active.send(content, &transmit).await; let segment_size = transmit.segment_size.unwrap_or(transmit.size); let expected_datagrams = transmit.size / segment_size;