Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catnip/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl PhysicalLayer for SharedDPDKRuntime {
}

impl DemiMemoryAllocator for SharedDPDKRuntime {
fn get_max_buffer_size_bytes(&self) -> usize {
fn max_buffer_size_bytes(&self) -> usize {
self.max_body_size
}

Expand Down
95 changes: 45 additions & 50 deletions src/catpowder/linux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ pub struct LinuxRuntime {

impl LinuxRuntime {
pub fn new(config: &Config) -> Result<Self, Fail> {
let mac_addr: [u8; 6] = [0; 6];
let ifindex: i32 = match Self::get_ifindex(&config.local_interface_name()?) {
Ok(ifindex) => ifindex,
Err(_) => return Err(Fail::new(libc::EINVAL, "could not parse ifindex")),
};
let socket: RawSocket = RawSocket::new()?;
let sockaddr: RawSocketAddr = RawSocketAddr::new(ifindex, &mac_addr);
socket.bind(&sockaddr)?;
let mac_addr = [0; 6];

let ifindex = Self::ifindex_for(&config.local_interface_name()?)
.map_err(|_| Fail::new(libc::EINVAL, "could not parse ifindex"))?;

let socket = RawSocket::new()?;
let bind_addr = RawSocketAddr::new(ifindex, &mac_addr);
socket.bind(&bind_addr)?;

let max_body_size = config.mtu()? as usize - MAX_HEADER_SIZE;

Expand All @@ -62,8 +62,8 @@ impl LinuxRuntime {
})
}

fn get_ifindex(ifname: &str) -> Result<i32, ParseIntError> {
let path: String = format!("/sys/class/net/{}/ifindex", ifname);
fn ifindex_for(ifname: &str) -> Result<i32, ParseIntError> {
let path = format!("/sys/class/net/{}/ifindex", ifname);
expect_ok!(fs::read_to_string(path), "could not read ifname")
.trim()
.parse()
Expand All @@ -75,7 +75,7 @@ impl LinuxRuntime {
//======================================================================================================================

impl DemiMemoryAllocator for LinuxRuntime {
fn get_max_buffer_size_bytes(&self) -> usize {
fn max_buffer_size_bytes(&self) -> usize {
self.max_body_size
}

Expand All @@ -87,55 +87,50 @@ impl DemiMemoryAllocator for LinuxRuntime {
impl Runtime for LinuxRuntime {}

impl PhysicalLayer for LinuxRuntime {
fn transmit(&mut self, pkts: ArrayVec<DemiBuffer, MAX_BATCH_SIZE_NUM_PACKETS>) -> Result<(), Fail> {
for pkt in pkts {
// We clone the packet so as to not remove the ethernet header from the outgoing message.
let header = Ethernet2Header::parse_and_strip(&mut pkt.clone()).unwrap();
let dest_addr_arr: [u8; 6] = header.dst_addr().to_array();
let dest_sockaddr: RawSocketAddr = RawSocketAddr::new(self.ifindex, &dest_addr_arr);

match self.socket.sendto(&pkt, &dest_sockaddr) {
Ok(size) if size == pkt.len() => (),
Ok(size) => {
let cause = format!(
"Incorrect number of bytes sent: packet_size={:?} sent={:?}",
pkt.len(),
size
);
fn transmit(&mut self, packets: ArrayVec<DemiBuffer, MAX_BATCH_SIZE_NUM_PACKETS>) -> Result<(), Fail> {
for packet in packets {
// Parse header but keep original packet untouched for sending by cloning it.
let header = Ethernet2Header::parse_and_strip(&mut packet.clone()).unwrap();
let dst_mac = header.dst_addr().to_array();
let addr = RawSocketAddr::new(self.ifindex, &dst_mac);

match self.socket.sendto(&packet, &addr) {
Ok(n) if n == packet.len() => (),
Ok(n) => {
let cause = format!("transmit: partial send: packet_size={:?} sent={:?}", packet.len(), n);
warn!("{}", cause);
return Err(Fail::new(libc::EAGAIN, &cause));
},
Err(e) => {
let cause = "send failed";
warn!("transmit(): {} {:?}", cause, e);
return Err(Fail::new(libc::EIO, &cause));
warn!("transmit(): send failed: {:?}", e);
return Err(Fail::new(libc::EIO, "send failed"));
},
}
}
Ok(())
}

// TODO: This routine currently only tries to receive a single packet buffer, not a batch of them.
// Only receives one packet for now.
// TODO: Support receiving multiple packets in a single call.
// TODO: Remove extra copy of the packet.
// TODO: Change to use `DemiBuffer` directly instead of `MaybeUninit<u8>`.
fn receive(&mut self) -> Result<ArrayVec<DemiBuffer, MAX_BATCH_SIZE_NUM_PACKETS>, Fail> {
// TODO: This routine contains an extra copy of the entire incoming packet that could potentially be removed.

// TODO: change this function to operate directly on DemiBuffer rather than on MaybeUninit<u8>.

// This use-case is an example for MaybeUninit in the docs.
let mut out: [MaybeUninit<u8>; limits::RECVBUF_SIZE_MAX] =
[unsafe { MaybeUninit::uninit().assume_init() }; limits::RECVBUF_SIZE_MAX];
if let Ok((nbytes, _origin_addr)) = self.socket.recvfrom(&mut out[..]) {
let mut ret: ArrayVec<DemiBuffer, MAX_BATCH_SIZE_NUM_PACKETS> = ArrayVec::new();
unsafe {
let bytes: [u8; limits::RECVBUF_SIZE_MAX] =
mem::transmute::<[MaybeUninit<u8>; limits::RECVBUF_SIZE_MAX], [u8; limits::RECVBUF_SIZE_MAX]>(out);
let mut dbuf: DemiBuffer = DemiBuffer::from_slice(&bytes)?;
dbuf.trim(limits::RECVBUF_SIZE_MAX - nbytes)?;
ret.push(dbuf);
}
Ok(ret)
} else {
Ok(ArrayVec::new())
}
let mut recv_buffer = [unsafe { MaybeUninit::uninit().assume_init() }; limits::RECVBUF_SIZE_MAX];

let (nbytes, _src) = match self.socket.recvfrom(&mut recv_buffer) {
Ok(res) => res,
Err(_) => return Ok(ArrayVec::new()),
};

let bytes = unsafe {
mem::transmute::<[MaybeUninit<u8>; limits::RECVBUF_SIZE_MAX], [u8; limits::RECVBUF_SIZE_MAX]>(recv_buffer)
};

let mut packet = DemiBuffer::from_slice(&bytes)?;
packet.trim(limits::RECVBUF_SIZE_MAX - nbytes)?;

let mut packets = ArrayVec::new();
packets.push(packet);
Ok(packets)
}
}
35 changes: 15 additions & 20 deletions src/catpowder/linux/rawsocket/rawsockaddr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ::std::mem;
use libc::sockaddr;

//======================================================================================================================
// Constants & Structures
// Structures
//======================================================================================================================

#[derive(Clone, Copy)]
Expand All @@ -21,15 +21,15 @@ pub struct RawSocketAddr(libc::sockaddr_ll);
//======================================================================================================================

impl RawSocketAddr {
pub fn new(ifindex: i32, mac_addr: &[u8; 6]) -> Self {
// Pad MAC address.
let mut addr: [u8; 8] = [0_u8; 8];
addr[..6].copy_from_slice(mac_addr);
pub fn new(ifidx: i32, mac: &[u8; 6]) -> Self {
// Pad MAC address to 8 bytes
let mut addr = [0u8; 8];
addr[..6].copy_from_slice(mac);

RawSocketAddr(libc::sockaddr_ll {
sll_family: libc::AF_PACKET.try_into().unwrap(),
sll_family: libc::AF_PACKET as u16,
sll_protocol: (libc::ETH_P_ALL as u16).to_be(),
sll_ifindex: ifindex,
sll_ifindex: ifidx,
sll_hatype: 0,
sll_pkttype: 0,
sll_halen: libc::ETH_ALEN as u8,
Expand All @@ -38,19 +38,15 @@ impl RawSocketAddr {
}

pub fn as_sockaddr_ptr(&self) -> (*const sockaddr, Socklen) {
let sockaddr_ptr: *const sockaddr =
unsafe { mem::transmute::<*const libc::sockaddr_ll, *const sockaddr>(&self.0) };
let sockaddr_len: Socklen = mem::size_of::<libc::sockaddr_ll>() as u32;

(sockaddr_ptr, sockaddr_len)
let ptr = unsafe { mem::transmute::<*const libc::sockaddr_ll, *const sockaddr>(&self.0) };
let len = mem::size_of::<libc::sockaddr_ll>() as u32;
(ptr, len)
}

pub fn as_sockaddr_mut_ptr(&mut self) -> (*mut sockaddr, Socklen) {
let sockaddr_ptr: *mut sockaddr =
unsafe { mem::transmute::<*mut libc::sockaddr_ll, *mut sockaddr>(&mut self.0) };
let sockaddr_len: Socklen = mem::size_of::<libc::sockaddr_ll>() as u32;

(sockaddr_ptr, sockaddr_len)
pub fn as_sockaddr_ptr_mut(&mut self) -> (*mut sockaddr, Socklen) {
let ptr = unsafe { mem::transmute::<*mut libc::sockaddr_ll, *mut sockaddr>(&mut self.0) };
let len = mem::size_of::<libc::sockaddr_ll>() as u32;
(ptr, len)
}
}

Expand All @@ -60,7 +56,6 @@ impl RawSocketAddr {

impl Default for RawSocketAddr {
fn default() -> Self {
let addr: libc::sockaddr_ll = unsafe { mem::zeroed() };
Self(addr)
Self(unsafe { mem::zeroed() })
}
}
88 changes: 43 additions & 45 deletions src/catpowder/linux/rawsocket/rawsocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ use crate::{
runtime::fail::Fail,
};
use ::std::{mem, mem::MaybeUninit};
use libc::sockaddr;

//======================================================================================================================
// Constants & Structures
// Structures
//======================================================================================================================

pub struct RawSocket(libc::c_int);
Expand All @@ -25,27 +24,24 @@ pub struct RawSocket(libc::c_int);

impl RawSocket {
pub fn new() -> Result<Self, Fail> {
let domain: i32 = libc::AF_PACKET; // Do not parse any headers.
let ty: i32 = libc::SOCK_RAW | libc::SOCK_NONBLOCK; // Non-blocking, raw socket.
let protocol: i32 = libc::ETH_P_ALL; // Accept packet from all protocols.
let sockfd: i32 = unsafe { libc::socket(domain, ty, protocol) };
let domain = libc::AF_PACKET; // raw packet socket, no header parsing
let ty = libc::SOCK_RAW | libc::SOCK_NONBLOCK; // raw, non-blocking socket
let protocol = libc::ETH_P_ALL; // all protocols

// Check if we failed to create the underlying raw socket.
if sockfd == -1 {
let fd = unsafe { libc::socket(domain, ty, protocol) };
if fd == -1 {
return Err(Fail::new(libc::EAGAIN, "failed to create raw socket"));
}
trace!("Creating raw socket with fd={:?}", sockfd);
Ok(RawSocket(sockfd))

trace!("created raw socket with fd={:?}", fd);
Ok(RawSocket(fd))
}

// Binds a socket to a raw address.
// Binds the socket to a raw address.
pub fn bind(&self, addr: &RawSocketAddr) -> Result<(), Fail> {
let ret: i32 = unsafe {
let (sockaddr_ptr, address_len): (*const sockaddr, Socklen) = addr.as_sockaddr_ptr();
libc::bind(self.0, sockaddr_ptr, address_len)
};
let (ptr, len) = addr.as_sockaddr_ptr();

// Check if we failed to bind the underlying raw socket.
let ret = unsafe { libc::bind(self.0, ptr, len) };
if ret == -1 {
return Err(Fail::new(libc::EAGAIN, "failed to bind raw socket"));
}
Expand All @@ -54,63 +50,65 @@ impl RawSocket {
}

/// Sends data through a raw socket.
pub fn sendto(&self, buf: &[u8], rawaddr: &RawSocketAddr) -> Result<usize, Fail> {
let buf_len: usize = buf.len();
let buf_ptr: *const libc::c_void = buf.as_ptr() as *const libc::c_void;
let (addr_ptr, addrlen): (*const sockaddr, Socklen) = rawaddr.as_sockaddr_ptr();

let nbytes: i32 =
unsafe { libc::sendto(self.0, buf_ptr, buf_len, libc::MSG_DONTWAIT, addr_ptr, addrlen) as i32 };
pub fn sendto(&self, data: &[u8], rawaddr: &RawSocketAddr) -> Result<usize, Fail> {
let (addr_ptr, addr_len) = rawaddr.as_sockaddr_ptr();
let ret = unsafe {
libc::sendto(
self.0,
data.as_ptr() as *const libc::c_void,
data.len(),
libc::MSG_DONTWAIT,
addr_ptr,
addr_len,
)
};

// Check if we failed to send data through raw socket.
if nbytes == -1 {
if ret == -1 {
return Err(Fail::new(libc::EAGAIN, "failed to send data through raw socket"));
}

Ok(nbytes as usize)
Ok(ret as usize)
}

/// Receives data from a raw socket.
pub fn recvfrom(&self, buf: &[MaybeUninit<u8>]) -> Result<(usize, RawSocketAddr), Fail> {
let buf_ptr: *mut libc::c_void = buf.as_ptr() as *mut libc::c_void;
let buf_len: usize = buf.len();
let mut addrlen: Socklen = mem::size_of::<SockAddrIn>() as u32;
let mut rawaddr: RawSocketAddr = RawSocketAddr::default();
let addrlen_ptr: *mut Socklen = &mut addrlen as *mut Socklen;
let (addr_ptr, _): (*mut sockaddr, Socklen) = rawaddr.as_sockaddr_mut_ptr();
/// Receive data from a raw socket.
pub fn recvfrom(&self, recv_buffer: &[MaybeUninit<u8>]) -> Result<(usize, RawSocketAddr), Fail> {
let ptr = recv_buffer.as_ptr() as *mut libc::c_void;
let mut addrlen = mem::size_of::<SockAddrIn>() as u32;
let mut rawaddr = RawSocketAddr::default();
let (addr_ptr, _) = rawaddr.as_sockaddr_ptr_mut();
let addrlen_ptr = &mut addrlen as *mut Socklen;

let nbytes: i32 = unsafe {
let ret = unsafe {
libc::recvfrom(
self.0,
buf_ptr,
buf_len,
ptr,
recv_buffer.len(),
libc::MSG_DONTWAIT,
addr_ptr,
addrlen_ptr as *mut u32,
) as i32
};

// Check if we failed to receive data from raw socket.
if nbytes == -1 {
if ret == -1 {
return Err(Fail::new(libc::EAGAIN, "failed to receive data from raw socket"));
}

Ok((nbytes as usize, rawaddr))
Ok((ret as usize, rawaddr))
}
}

//======================================================================================================================
// Trait Implementations
//======================================================================================================================

/// Closes the raw socket.
impl Drop for RawSocket {
fn drop(&mut self) {
if unsafe { libc::close(self.0) } < 0 {
let errno: libc::c_int = unsafe { *libc::__errno_location() };
warn!("could not close raw socket (fd={:?}): {:?}", self.0, errno);
let ret = unsafe { libc::close(self.0) };
if ret < 0 {
let errno = unsafe { *libc::__errno_location() };
warn!("failed to close raw socket (fd={}): {}", self.0, errno);
} else {
trace!("Closing raw socket fd={:?}", self.0)
trace!("closed raw socket fd={}", self.0)
}
}
}
2 changes: 1 addition & 1 deletion src/catpowder/win/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl PhysicalLayer for SharedCatpowderRuntime {

/// Memory runtime trait implementation for XDP Runtime.
impl DemiMemoryAllocator for SharedCatpowderRuntime {
fn get_max_buffer_size_bytes(&self) -> usize {
fn max_buffer_size_bytes(&self) -> usize {
self.0.max_body_size
}

Expand Down
6 changes: 3 additions & 3 deletions src/runtime/memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use self::{buffer_pool::*, demibuffer::*};
//======================================================================================================================

pub trait DemiMemoryAllocator {
fn get_max_buffer_size_bytes(&self) -> usize {
fn max_buffer_size_bytes(&self) -> usize {
u16::MAX as usize
}

Expand Down Expand Up @@ -76,11 +76,11 @@ pub fn sgaalloc<M: DemiMemoryAllocator>(size: usize, mem_alloc: &M) -> Result<de
}

// First allocate the underlying DemiBuffer.
if size > mem_alloc.get_max_buffer_size_bytes() * DEMI_SGARRAY_MAXLEN {
if size > mem_alloc.max_buffer_size_bytes() * DEMI_SGARRAY_MAXLEN {
return Err(Fail::new(libc::EINVAL, "size too large for a single demi_sgaseg_t"));
}
// Calculate the number of DemiBuffers to allocate.
let max_buffer_size_bytes: usize = mem_alloc.get_max_buffer_size_bytes();
let max_buffer_size_bytes: usize = mem_alloc.max_buffer_size_bytes();
let remainder: usize = size % max_buffer_size_bytes;
let len: usize = (size - remainder) / max_buffer_size_bytes;
let mut bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = ArrayVec::new();
Expand Down
Loading