diff --git a/src/catnip/runtime/mempool.rs b/src/catnip/runtime/mempool.rs index 0d65b39a7..567c34649 100644 --- a/src/catnip/runtime/mempool.rs +++ b/src/catnip/runtime/mempool.rs @@ -28,11 +28,9 @@ pub struct MemoryPool { // Associate Functions //====================================================================================================================== -/// Associated functions for memory pool. impl MemoryPool { - /// Creates a new memory pool. pub fn new(name: CString, data_room_size: usize, pool_size: usize, cache_size: usize) -> Result { - let pool: *mut rte_mempool = unsafe { + let pool = unsafe { rte_pktmbuf_pool_create( name.as_ptr(), pool_size as u32, @@ -45,8 +43,8 @@ impl MemoryPool { // Failed to create memory pool. if pool.is_null() { - let rte_errno: libc::c_int = unsafe { rte_errno() }; - let cause: String = format!("failed to create memory pool: {:?}", rte_errno); + let rte_errno = unsafe { rte_errno() }; + let cause = format!("failed to create memory pool: {:?}", rte_errno); error!("new(): {}", cause); return Err(Fail::new(libc::EAGAIN, &cause)); } @@ -59,24 +57,22 @@ impl MemoryPool { self.pool } - /// Allocates a mbuf in the target memory pool. pub fn alloc_mbuf(&self, size: Option) -> Result<*mut rte_mbuf, Fail> { // TODO: Drop the following warning once DPDK memory management is more stable. warn!("allocating mbuf from DPDK pool"); - // Allocate mbuf. - let mbuf_ptr: *mut rte_mbuf = unsafe { rte_pktmbuf_alloc(self.pool) }; + let mbuf_ptr = unsafe { rte_pktmbuf_alloc(self.pool) }; + if mbuf_ptr.is_null() { - let rte_errno: libc::c_int = unsafe { rte_errno() }; - let cause: String = format!("cannot allocate an mbuf at this time: {:?}", rte_errno); + let rte_errno = unsafe { rte_errno() }; + let cause = format!("cannot allocate an mbuf at this time: {:?}", rte_errno); warn!("alloc_mbuf(): {}", cause); - return Err(Fail::new(libc::ENOMEM, &cause)); } // Fill out some fields of the underlying mbuf. unsafe { - let mut num_bytes: u16 = (*mbuf_ptr).buf_len - (*mbuf_ptr).data_off; + let mut num_bytes = (*mbuf_ptr).buf_len - (*mbuf_ptr).data_off; if let Some(size) = size { // Check if allocated buffer is big enough. diff --git a/src/catnip/runtime/mod.rs b/src/catnip/runtime/mod.rs index dc563de69..04bea1e92 100644 --- a/src/catnip/runtime/mod.rs +++ b/src/catnip/runtime/mod.rs @@ -23,15 +23,14 @@ use crate::{ fail::Fail, libdpdk::{ rte_delay_us_block, rte_eal_init, rte_errno, rte_eth_conf, rte_eth_dev_configure, rte_eth_dev_count_avail, - rte_eth_dev_get_mtu, rte_eth_dev_info, rte_eth_dev_info_get, rte_eth_dev_is_valid_port, - rte_eth_dev_set_mtu, rte_eth_dev_start, rte_eth_find_next_owned_by, rte_eth_link, rte_eth_link_get_nowait, - rte_eth_promiscuous_enable, rte_eth_rss_ip, rte_eth_rx_burst, - rte_eth_rx_mq_mode_RTE_ETH_MQ_RX_RSS as RTE_ETH_MQ_RX_RSS, rte_eth_rx_offload_tcp_cksum, - rte_eth_rx_offload_udp_cksum, rte_eth_rx_queue_setup, rte_eth_rxconf, rte_eth_tx_burst, - rte_eth_tx_mq_mode_RTE_ETH_MQ_TX_NONE as RTE_ETH_MQ_TX_NONE, rte_eth_tx_offload_multi_segs, - rte_eth_tx_offload_tcp_cksum, rte_eth_tx_offload_udp_cksum, rte_eth_tx_queue_setup, rte_eth_txconf, - rte_mbuf, RTE_ETHER_MAX_JUMBO_FRAME_LEN, RTE_ETHER_MAX_LEN, RTE_ETH_DEV_NO_OWNER, RTE_ETH_LINK_FULL_DUPLEX, - RTE_ETH_LINK_UP, RTE_PKTMBUF_HEADROOM, + rte_eth_dev_get_mtu, rte_eth_dev_info_get, rte_eth_dev_is_valid_port, rte_eth_dev_set_mtu, + rte_eth_dev_start, rte_eth_find_next_owned_by, rte_eth_link_get_nowait, rte_eth_promiscuous_enable, + rte_eth_rss_ip, rte_eth_rx_burst, rte_eth_rx_mq_mode_RTE_ETH_MQ_RX_RSS as RTE_ETH_MQ_RX_RSS, + rte_eth_rx_offload_tcp_cksum, rte_eth_rx_offload_udp_cksum, rte_eth_rx_queue_setup, rte_eth_rxconf, + rte_eth_tx_burst, rte_eth_tx_mq_mode_RTE_ETH_MQ_TX_NONE as RTE_ETH_MQ_TX_NONE, + rte_eth_tx_offload_multi_segs, rte_eth_tx_offload_tcp_cksum, rte_eth_tx_offload_udp_cksum, + rte_eth_tx_queue_setup, rte_eth_txconf, rte_mbuf, RTE_ETHER_MAX_JUMBO_FRAME_LEN, RTE_ETHER_MAX_LEN, + RTE_ETH_DEV_NO_OWNER, RTE_ETH_LINK_FULL_DUPLEX, RTE_ETH_LINK_UP, RTE_PKTMBUF_HEADROOM, }, memory::{DemiBuffer, DemiMemoryAllocator}, SharedObject, @@ -68,28 +67,22 @@ impl SharedDPDKRuntime { pub fn new(config: &Config) -> Result { Self::set_environment_variables(); Self::dpdk_eal_init(&config.eal_init_args()?)?; - let port_id: u16 = Self::dpdk_eal_find_port()?; - let use_jumbo_frames: bool = config.enable_jumbo_frames()?; - let max_body_size: usize = if use_jumbo_frames { + let port_id = Self::dpdk_eal_find_port()?; + + let jumbo = config.enable_jumbo_frames()?; + let max_body_size = if jumbo { (RTE_ETHER_MAX_JUMBO_FRAME_LEN + RTE_PKTMBUF_HEADROOM) as usize } else { DEFAULT_MAX_BODY_SIZE }; - let mem_pool: MemoryPool = Self::initialize_mempool(max_body_size)?; + let mem_pool = Self::initialize_mempool(max_body_size)?; - let tcp_offload: bool = config.tcp_checksum_offload().is_ok_and(|offload| offload); - let udp_offload: bool = config.udp_checksum_offload().is_ok_and(|offload| offload); + let tcp_offload = config.tcp_checksum_offload().is_ok_and(|x| x); + let udp_offload = config.udp_checksum_offload().is_ok_and(|x| x); - Self::dpdk_initialize_port( - &mem_pool, - port_id, - use_jumbo_frames, - config.mtu()?, - tcp_offload, - udp_offload, - )?; + Self::dpdk_initialize_port(&mem_pool, port_id, jumbo, config.mtu()?, tcp_offload, udp_offload)?; Ok(Self(SharedObject::::new(DPDKRuntime { max_body_size, @@ -105,27 +98,29 @@ impl SharedDPDKRuntime { } fn dpdk_eal_init(eal_init_args: &[CString]) -> Result<(), Fail> { - let eal_init_refs = eal_init_args.iter().map(|s| s.as_ptr() as *mut u8).collect::>(); - let ret: libc::c_int = unsafe { rte_eal_init(eal_init_refs.len() as i32, eal_init_refs.as_ptr() as *mut _) }; + let argv = eal_init_args.iter().map(|s| s.as_ptr() as *mut u8).collect::>(); + let ret = unsafe { rte_eal_init(argv.len() as i32, argv.as_ptr() as *mut _) }; + if ret < 0 { - let rte_errno: libc::c_int = unsafe { rte_errno() }; - let cause: String = format!("EAL initialization failed (rte_errno={:?})", rte_errno); - error!("initialize_dpdk(): {}", cause); - return Err(Fail::new(libc::EIO, &cause)); + let err = unsafe { rte_errno() }; + let msg = format!("EAL init failed (rte_errno={:?})", err); + error!("{}", msg); + return Err(Fail::new(libc::EIO, &msg)); } + Ok(()) } fn dpdk_eal_find_port() -> Result { - let nb_ports: u16 = unsafe { rte_eth_dev_count_avail() }; - if nb_ports == 0 { - return Err(Fail::new(libc::EIO, "No ethernet ports available")); + let n = unsafe { rte_eth_dev_count_avail() }; + if n == 0 { + return Err(Fail::new(libc::EIO, "no ethernet ports available")); } - trace!("DPDK reports that {} ports (interfaces) are available.", nb_ports); - let owner: u64 = RTE_ETH_DEV_NO_OWNER as u64; - let port_id: u16 = unsafe { rte_eth_find_next_owned_by(0, owner) as u16 }; - Ok(port_id) + trace!("{} DPDK ports are available.", n); + + let port = unsafe { rte_eth_find_next_owned_by(0, RTE_ETH_DEV_NO_OWNER as u64) }; + Ok(port as u16) } fn initialize_mempool(max_body_size: usize) -> Result { @@ -139,40 +134,39 @@ impl SharedDPDKRuntime { fn dpdk_initialize_port( mem_pool: &MemoryPool, - port_id: u16, - use_jumbo_frames: bool, + port: u16, + jumbo: bool, mtu: u16, tcp_checksum_offload: bool, udp_checksum_offload: bool, ) -> Result<(), Fail> { - let rx_rings: u16 = 1; - let tx_rings: u16 = 1; - let rx_ring_size: u16 = 2048; - let tx_ring_size: u16 = 2048; - let nb_rxd: u16 = rx_ring_size; - let nb_txd: u16 = tx_ring_size; - - let rx_pthresh: u8 = 8; - let rx_hthresh: u8 = 8; - let rx_wthresh: u8 = 0; - - let tx_pthresh: u8 = 0; - let tx_hthresh: u8 = 0; - let tx_wthresh: u8 = 0; - - let dev_info: rte_eth_dev_info = unsafe { - let mut dev_info: MaybeUninit = MaybeUninit::zeroed(); - rte_eth_dev_info_get(port_id, dev_info.as_mut_ptr()); - dev_info.assume_init() + let (rx_rings, tx_rings) = (1, 1); + let (rx_ring_size, tx_ring_size) = (2048, 2048); + let (nb_rxd, nb_txd) = (rx_ring_size, tx_ring_size); + + // RX thresholds + let (rx_pthresh, rx_hthresh, rx_wthresh) = (8, 8, 0); + + // TX thresholds + let (tx_pthresh, tx_hthresh, tx_wthresh) = (0, 0, 0); + + // Get device info + let dev_info = unsafe { + let mut info = MaybeUninit::zeroed(); + rte_eth_dev_info_get(port, info.as_mut_ptr()); + info.assume_init() }; println!("dev_info: {:?}", dev_info); + + // Port config let mut port_conf: rte_eth_conf = unsafe { MaybeUninit::zeroed().assume_init() }; - port_conf.rxmode.max_lro_pkt_size = if use_jumbo_frames { + port_conf.rxmode.max_lro_pkt_size = if jumbo { RTE_ETHER_MAX_JUMBO_FRAME_LEN } else { RTE_ETHER_MAX_LEN }; + if tcp_checksum_offload { port_conf.rxmode.offloads |= unsafe { rte_eth_rx_offload_tcp_cksum() as u64 }; } @@ -191,113 +185,117 @@ impl SharedDPDKRuntime { } port_conf.txmode.offloads |= unsafe { rte_eth_tx_offload_multi_segs() as u64 }; + // RX config let mut rx_conf: rte_eth_rxconf = unsafe { MaybeUninit::zeroed().assume_init() }; rx_conf.rx_thresh.pthresh = rx_pthresh; rx_conf.rx_thresh.hthresh = rx_hthresh; rx_conf.rx_thresh.wthresh = rx_wthresh; rx_conf.rx_free_thresh = 32; + // TX config let mut tx_conf: rte_eth_txconf = unsafe { MaybeUninit::zeroed().assume_init() }; tx_conf.tx_thresh.pthresh = tx_pthresh; tx_conf.tx_thresh.hthresh = tx_hthresh; tx_conf.tx_thresh.wthresh = tx_wthresh; tx_conf.tx_free_thresh = 32; - if unsafe { rte_eth_dev_configure(port_id, rx_rings, tx_rings, &port_conf as *const _) } != 0 { - let cause: String = format!("Failed to configure ethernet device"); - error!("initialize_dpdk_port(): {}", cause); - return Err(Fail::new(libc::EIO, &cause)); + if unsafe { rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf as *const _) } != 0 { + let msg = format!("failed to configure ethernet device"); + error!("initialize_dpdk_port(): {}", msg); + return Err(Fail::new(libc::EIO, &msg)); } unsafe { - if rte_eth_dev_set_mtu(port_id, mtu) != 0 { - let cause: String = format!("Failed to set mtu {:?}", mtu); - error!("initialize_dpdk_port(): {}", cause); - return Err(Fail::new(libc::EIO, &cause)); + if rte_eth_dev_set_mtu(port, mtu) != 0 { + let msg = format!("failed to set mtu {:?}", mtu); + error!("initialize_dpdk_port(): {}", msg); + return Err(Fail::new(libc::EIO, &msg)); } - let mut dpdk_mtu: u16 = 0u16; - if (rte_eth_dev_get_mtu(port_id, &mut dpdk_mtu as *mut _)) != 0 { - let cause: String = format!("Failed to get mtu"); - error!("initialize_dpdk_port(): {}", cause); - return Err(Fail::new(libc::EIO, &cause)); + + let mut dpdk_mtu = 0u16; + if (rte_eth_dev_get_mtu(port, &mut dpdk_mtu)) != 0 { + let msg = format!("failed to get mtu"); + error!("initialize_dpdk_port(): {}", msg); + return Err(Fail::new(libc::EIO, &msg)); } + if dpdk_mtu != mtu { - let cause: String = format!("Failed to set MTU to {}, got back {}", mtu, dpdk_mtu); - error!("initialize_dpdk_port(): {}", cause); - return Err(Fail::new(libc::EIO, &cause)); + let msg = format!("failed to set MTU to {}, got back {}", mtu, dpdk_mtu); + error!("initialize_dpdk_port(): {}", msg); + return Err(Fail::new(libc::EIO, &msg)); } } - let socket_id: u32 = 0; + let socket_id = 0; unsafe { for i in 0..rx_rings { - if rte_eth_rx_queue_setup(port_id, i, nb_rxd, socket_id, &rx_conf as *const _, mem_pool.into_raw()) != 0 - { - let cause: String = format!("Failed to set up rx queue"); - error!("initialize_dpdk_port(): {}", cause); - return Err(Fail::new(libc::EIO, &cause)); + if rte_eth_rx_queue_setup(port, i, nb_rxd, socket_id, &rx_conf as *const _, mem_pool.into_raw()) != 0 { + let msg = format!("failed to set up rx queue"); + error!("initialize_dpdk_port(): {}", msg); + return Err(Fail::new(libc::EIO, &msg)); } } for i in 0..tx_rings { - if rte_eth_tx_queue_setup(port_id, i, nb_txd, socket_id, &tx_conf as *const _) != 0 { - let cause: String = format!("Failed to set up tx ring {:?}", i); - error!("initialize_dpdk_port(): {}", cause); - return Err(Fail::new(libc::EIO, &cause)); + if rte_eth_tx_queue_setup(port, i, nb_txd, socket_id, &tx_conf as *const _) != 0 { + let msg = format!("failed to set up tx ring {:?}", i); + error!("initialize_dpdk_port(): {}", msg); + return Err(Fail::new(libc::EIO, &msg)); } } - if rte_eth_dev_start(port_id) != 0 { - let cause: &'static str = "Failed to set up ethernet device"; - error!("initialize_dpdk_port(): {}", cause); - return Err(Fail::new(libc::EIO, cause)); + if rte_eth_dev_start(port) != 0 { + let msg = "failed to set up ethernet device"; + error!("initialize_dpdk_port(): {}", msg); + return Err(Fail::new(libc::EIO, msg)); } - rte_eth_promiscuous_enable(port_id); + rte_eth_promiscuous_enable(port); } - if unsafe { rte_eth_dev_is_valid_port(port_id) } == 0 { - let cause: &'static str = "Invalid port id"; - error!("initialize_dpdk_port(): {}", cause); - return Err(Fail::new(libc::EIO, cause)); + if unsafe { rte_eth_dev_is_valid_port(port) } == 0 { + let msg = "invalid port id"; + error!("initialize_dpdk_port(): {}", msg); + return Err(Fail::new(libc::EIO, msg)); } - let sleep_duration: Duration = Duration::from_millis(100); - let mut retry_count: i32 = 90; + let delay = Duration::from_millis(100); + let mut retries = 90; loop { unsafe { - let mut link: MaybeUninit = MaybeUninit::zeroed(); - rte_eth_link_get_nowait(port_id, link.as_mut_ptr()); - let link: rte_eth_link = link.assume_init(); + let mut link = MaybeUninit::zeroed(); + rte_eth_link_get_nowait(port, link.as_mut_ptr()); + let link = link.assume_init(); + if link.link_status() as u32 == RTE_ETH_LINK_UP { - let duplex: &str = if link.link_duplex() as u32 == RTE_ETH_LINK_FULL_DUPLEX { + let duplex = if link.link_duplex() as u32 == RTE_ETH_LINK_FULL_DUPLEX { "full" } else { "half" }; eprintln!( "Port {} Link Up - speed {} Mbps - {} duplex", - port_id, link.link_speed, duplex + port, link.link_speed, duplex ); break; } - rte_delay_us_block(sleep_duration.as_micros() as u32); + rte_delay_us_block(delay.as_micros() as u32); } - if retry_count == 0 { - let cause: &'static str = "Link never came up"; - error!("initialize_dpdk_port(): {}", cause); - return Err(Fail::new(libc::EIO, cause)); + + if retries == 0 { + let msg = "link never came up"; + error!("initialize_dpdk_port(): {}", msg); + return Err(Fail::new(libc::EIO, msg)); } - retry_count -= 1; + retries -= 1; } Ok(()) } fn dpdk_allocate_mbuf(&self, size: usize) -> Result { - // Allocate a DPDK-managed buffer. - let mbuf_ptr: *mut rte_mbuf = self.mem_pool.alloc_mbuf(Some(size))?; - // Safety: `mbuf_ptr` is a valid pointer to a properly initialized `rte_mbuf` struct. - Ok(unsafe { DemiBuffer::from_mbuf(mbuf_ptr) }) + let ptr = self.mem_pool.alloc_mbuf(Some(size))?; + // Safety: ptr is a valid rte_mbuf from mem_pool + Ok(unsafe { DemiBuffer::from_mbuf(ptr) }) } } @@ -316,58 +314,59 @@ impl DerefMut for SharedDPDKRuntime { } impl PhysicalLayer for SharedDPDKRuntime { - fn transmit(&mut self, pkts: ArrayVec) -> Result<(), Fail> { + fn transmit(&mut self, packets: ArrayVec) -> Result<(), Fail> { timer!("catnip::runtime::transmit"); - // Grab the packet and copy it if necessary. In general, this copy will happen for small packets without - // payloads because we allocate actual data-carrying application buffers from the DPDK pool. - let len: usize = pkts.len(); + // In general, this copy will happen for small packets without payloads because we allocate actual + // data-carrying application buffers from the DPDK pool. + let count = packets.len(); let mut mbufs: [*mut rte_mbuf; MAX_BATCH_SIZE_NUM_PACKETS] = unsafe { mem::zeroed() }; - for (i, pkt) in pkts.into_iter().enumerate() { - mbufs[i] = match pkt { - buf if buf.is_dpdk_allocated() => buf + + for (i, packet) in packets.into_iter().enumerate() { + mbufs[i] = if packet.is_dpdk_allocated() { + packet .into_mbuf() - .ok_or(Fail::new(libc::EINVAL, "should be able to convert into mbuf"))?, - buf if buf.len() <= self.max_body_size => { - let mut mbuf: DemiBuffer = self.dpdk_allocate_mbuf(buf.len())?; - debug_assert_eq!(buf.len(), mbuf.len()); - mbuf.copy_from_slice(&buf); - - mbuf.into_mbuf() - .ok_or(Fail::new(libc::EINVAL, "should be able to convert into mbuf"))? - }, - _ => { - return Err(Fail::new( - libc::EINVAL, - "cannot allocate DPDK buffer that is big enough", - )) - }, - }; + .ok_or(Fail::new(libc::EINVAL, "failed to extract DPDK mbuf"))? + } else if packet.len() <= self.max_body_size { + let mut mbuf = self.dpdk_allocate_mbuf(packet.len())?; + debug_assert_eq!(packet.len(), mbuf.len()); + mbuf.copy_from_slice(&packet); + mbuf.into_mbuf() + .ok_or(Fail::new(libc::EINVAL, "failed to convert copied buffer to mbuf"))? + } else { + return Err(Fail::new(libc::EINVAL, "packet too large for DPDK buffer")); + } } - let num_sent: u16 = unsafe { rte_eth_tx_burst(self.port_id, 0, mbufs.as_mut_ptr(), len as u16) }; - debug_assert_eq!(num_sent, 1); + let sent = unsafe { rte_eth_tx_burst(self.port_id, 0, mbufs.as_mut_ptr(), count as u16) }; + debug_assert_eq!(sent, 1); Ok(()) } fn receive(&mut self) -> Result, Fail> { timer!("catnip::runtime::receive"); - let mut out = ArrayVec::new(); - let mut packets: [*mut rte_mbuf; MAX_BATCH_SIZE_NUM_PACKETS] = unsafe { mem::zeroed() }; - let nb_rx = - unsafe { rte_eth_rx_burst(self.port_id, 0, packets.as_mut_ptr(), MAX_BATCH_SIZE_NUM_PACKETS as u16) }; - assert!(nb_rx as usize <= MAX_BATCH_SIZE_NUM_PACKETS); - - { - for &packet in &packets[..nb_rx as usize] { - // Safety: `packet` is a valid pointer to a properly initialized `rte_mbuf` struct. - let buf: DemiBuffer = unsafe { DemiBuffer::from_mbuf(packet) }; - out.push(buf); - } + let mut buffers = ArrayVec::new(); + let mut raw_mbufs: [*mut rte_mbuf; MAX_BATCH_SIZE_NUM_PACKETS] = unsafe { mem::zeroed() }; + + let count = unsafe { + rte_eth_rx_burst( + self.port_id, + 0, + raw_mbufs.as_mut_ptr(), + MAX_BATCH_SIZE_NUM_PACKETS as u16, + ) + }; + + assert!(count as usize <= MAX_BATCH_SIZE_NUM_PACKETS); + + for &mbuf in &raw_mbufs[..count as usize] { + // Safety: `packet` is a valid pointer to a properly initialized `rte_mbuf` struct. + let buffer = unsafe { DemiBuffer::from_mbuf(mbuf) }; + buffers.push(buffer); } - Ok(out) + Ok(buffers) } }