Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 30 additions & 34 deletions examples/rust/tcp-dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@
//======================================================================================================================

use ::anyhow::Result;
use ::clap::{Arg, ArgMatches, Command};
use ::demikernel::{
demi_sgarray_t,
runtime::types::{demi_opcode_t, demi_qresult_t},
LibOS, LibOSName, QDesc, QToken,
};
use ::clap::{Arg, Command};
use ::demikernel::{runtime::types::demi_opcode_t, LibOS, LibOSName, QDesc};
use ::std::{
net::SocketAddr,
str::FromStr,
Expand All @@ -34,14 +30,14 @@ pub const SOCK_STREAM: i32 = libc::SOCK_STREAM;

#[derive(Debug)]
pub struct ProgramArguments {
local_socket_addr: SocketAddr,
local_addr: SocketAddr,
}

impl ProgramArguments {
const DEFAULT_LOCAL_IPV4_ADDR: &'static str = "127.0.0.1:12345";

pub fn new() -> Result<Self> {
let matches: ArgMatches = Command::new("tcp-dump")
let matches = Command::new("tcp-dump")
.arg(
Arg::new("local")
.long("local")
Expand All @@ -52,23 +48,23 @@ impl ProgramArguments {
)
.get_matches();

let mut args: ProgramArguments = ProgramArguments {
local_socket_addr: SocketAddr::from_str(Self::DEFAULT_LOCAL_IPV4_ADDR)?,
let mut args = ProgramArguments {
local_addr: SocketAddr::from_str(Self::DEFAULT_LOCAL_IPV4_ADDR)?,
};

if let Some(addr) = matches.get_one::<String>("local") {
args.set_local_socket_addr(addr)?;
args.set_local_addr(addr)?;
}

Ok(args)
}

pub fn get_local_socket_addr(&self) -> SocketAddr {
self.local_socket_addr
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

fn set_local_socket_addr(&mut self, addr: &str) -> Result<()> {
self.local_socket_addr = SocketAddr::from_str(addr)?;
fn set_local_addr(&mut self, addr: &str) -> Result<()> {
self.local_addr = SocketAddr::from_str(addr)?;
Ok(())
}
}
Expand All @@ -82,13 +78,13 @@ impl Application {
const LOG_INTERVAL_SECONDS: u64 = 5;

pub fn new(mut libos: LibOS, args: &ProgramArguments) -> Result<Self> {
let local_socket_addr: SocketAddr = args.get_local_socket_addr();
let sockqd: QDesc = match libos.socket(AF_INET, SOCK_STREAM, 0) {
let addr = args.local_addr();
let sockqd = match libos.socket(AF_INET, SOCK_STREAM, 0) {
Ok(sockqd) => sockqd,
Err(e) => anyhow::bail!("failed to create socket: {:?}", e),
};

match libos.bind(sockqd, local_socket_addr) {
match libos.bind(sockqd, addr) {
Ok(()) => (),
Err(e) => {
// If error, close socket.
Expand All @@ -112,18 +108,18 @@ impl Application {
},
}

println!("Local Address: {:?}", local_socket_addr);
println!("Local Address: {:?}", addr);

Ok(Self { libos, sockqd })
}

pub fn run(&mut self) -> Result<()> {
let start_time: Instant = Instant::now();
let mut num_clients: usize = 0;
let mut num_bytes: usize = 0;
let mut qtokens: Vec<QToken> = Vec::new();
let mut last_log_time: Instant = Instant::now();
let mut client_qds: Vec<QDesc> = Vec::default();
let start_time = Instant::now();
let mut num_clients = 0;
let mut num_bytes = 0;
let mut qtokens = Vec::new();
let mut last_log_time = Instant::now();
let mut client_qds = Vec::default();

// Accept first connection.
match self.libos.accept(self.sockqd) {
Expand All @@ -134,7 +130,7 @@ impl Application {
loop {
// Dump statistics.
if last_log_time.elapsed() > Duration::from_secs(Self::LOG_INTERVAL_SECONDS) {
let elapsed_time: Duration = Instant::now() - start_time;
let elapsed_time = Instant::now() - start_time;
println!(
"nclients={:?} / {:?} B / {:?} us",
num_clients,
Expand All @@ -144,7 +140,7 @@ impl Application {
last_log_time = Instant::now();
}

let qr: demi_qresult_t = match self.libos.wait_any(&qtokens, None) {
let qr = match self.libos.wait_any(&qtokens, None) {
Ok((i, qr)) => {
qtokens.swap_remove(i);
qr
Expand All @@ -158,7 +154,7 @@ impl Application {
num_clients += 1;

// Pop first packet from this connection.
let sockqd: QDesc = unsafe { qr.qr_value.ares.qd.into() };
let sockqd = unsafe { qr.qr_value.ares.qd.into() };
client_qds.push(sockqd);
match self.libos.pop(sockqd, None) {
Ok(qt) => qtokens.push(qt),
Expand All @@ -173,8 +169,8 @@ impl Application {
},
// Pop completed.
demi_opcode_t::DEMI_OPC_POP => {
let sockqd: QDesc = qr.qr_qd.into();
let sga: demi_sgarray_t = unsafe { qr.qr_value.sga };
let sockqd = qr.qr_qd.into();
let sga = unsafe { qr.qr_value.sga };

num_bytes += sga.segments[0].data_len_bytes as usize;

Expand All @@ -184,7 +180,7 @@ impl Application {
}

// Pop another packet.
let qt: QToken = match self.libos.pop(sockqd, None) {
let qt = match self.libos.pop(sockqd, None) {
Ok(qt) => qt,
Err(e) => anyhow::bail!("failed to pop data from socket: {:?}", e),
};
Expand Down Expand Up @@ -212,12 +208,12 @@ impl Drop for Application {
}

fn main() -> Result<()> {
let args: ProgramArguments = ProgramArguments::new()?;
let libos_name: LibOSName = match LibOSName::from_env() {
let args = ProgramArguments::new()?;
let libos_name = match LibOSName::from_env() {
Ok(libos_name) => libos_name.into(),
Err(e) => anyhow::bail!("{:?}", e),
};
let libos: LibOS = match LibOS::new(libos_name, None) {
let libos = match LibOS::new(libos_name, None) {
Ok(libos) => libos,
Err(e) => anyhow::bail!("failed to initialize libos: {:?}", e.cause),
};
Expand Down
67 changes: 33 additions & 34 deletions examples/rust/tcp-ping-pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// Imports
//======================================================================================================================
use ::anyhow::Result;
use ::demikernel::{demi_sgarray_t, runtime::types::demi_opcode_t, LibOS, LibOSName, QDesc, QToken};
use ::demikernel::{demi_sgarray_t, runtime::types::demi_opcode_t, LibOS, LibOSName, QDesc};
use ::std::{env, net::SocketAddr, slice, str::FromStr, time::Duration, u8};
use log::{error, warn};

Expand All @@ -33,15 +33,15 @@ const NUM_PING_PONG_ROUNDS: usize = 1024;
const TIMEOUT_SECONDS: Duration = Duration::from_secs(256);

fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result<demi_sgarray_t> {
let sga: demi_sgarray_t = match libos.sgaalloc(size) {
let sga = match libos.sgaalloc(size) {
Ok(sga) => sga,
Err(e) => anyhow::bail!("failed to allocate scatter-gather array: {:?}", e),
};

// Ensure that allocated the array has the requested size.
if sga.segments[0].data_len_bytes as usize != size {
freesga(libos, sga);
let seglen: usize = sga.segments[0].data_len_bytes as usize;
let seglen = sga.segments[0].data_len_bytes as usize;
anyhow::bail!(
"failed to allocate scatter-gather array: expected size={:?} allocated size={:?}",
size,
Expand All @@ -50,10 +50,10 @@ fn mksga(libos: &mut LibOS, size: usize, value: u8) -> Result<demi_sgarray_t> {
}

// Fill in the array.
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
let len: usize = sga.segments[0].data_len_bytes as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
let mut fill: u8 = value;
let ptr = sga.segments[0].data_buf_ptr as *mut u8;
let len = sga.segments[0].data_len_bytes as usize;
let slice = unsafe { slice::from_raw_parts_mut(ptr, len) };
let mut fill = value;
for x in slice {
*x = fill;
fill = (fill % (u8::MAX - 1) + 1) as u8;
Expand All @@ -70,7 +70,7 @@ fn freesga(libos: &mut LibOS, sga: demi_sgarray_t) {
}

fn accept_and_wait(libos: &mut LibOS, sockqd: QDesc) -> Result<QDesc> {
let qt: QToken = match libos.accept(sockqd) {
let qt = match libos.accept(sockqd) {
Ok(qt) => qt,
Err(e) => anyhow::bail!("accept failed: {:?}", e),
};
Expand All @@ -81,8 +81,8 @@ fn accept_and_wait(libos: &mut LibOS, sockqd: QDesc) -> Result<QDesc> {
}
}

fn connect_and_wait(libos: &mut LibOS, sockqd: QDesc, remote_socket_addr: SocketAddr) -> Result<()> {
let qt: QToken = match libos.connect(sockqd, remote_socket_addr) {
fn connect_and_wait(libos: &mut LibOS, sockqd: QDesc, remote_addr: SocketAddr) -> Result<()> {
let qt = match libos.connect(sockqd, remote_addr) {
Ok(qt) => qt,
Err(e) => anyhow::bail!("connect failed: {:?}", e),
};
Expand All @@ -96,7 +96,7 @@ fn connect_and_wait(libos: &mut LibOS, sockqd: QDesc, remote_socket_addr: Socket
}

fn push_and_wait(libos: &mut LibOS, sockqd: QDesc, sga: &demi_sgarray_t) -> Result<()> {
let qt: QToken = match libos.push(sockqd, sga) {
let qt = match libos.push(sockqd, sga) {
Ok(qt) => qt,
Err(e) => anyhow::bail!("push failed: {:?}", e),
};
Expand All @@ -110,23 +110,23 @@ fn push_and_wait(libos: &mut LibOS, sockqd: QDesc, sga: &demi_sgarray_t) -> Resu
}

fn pop_and_wait(libos: &mut LibOS, sockqd: QDesc, recvbuf: &mut [u8]) -> Result<()> {
let mut index: usize = 0;
let mut index = 0;

while index < recvbuf.len() {
let qt: QToken = match libos.pop(sockqd, None) {
let qt = match libos.pop(sockqd, None) {
Ok(qt) => qt,
Err(e) => anyhow::bail!("pop failed: {:?}", e),
};
let sga: demi_sgarray_t = match libos.wait(qt, Some(TIMEOUT_SECONDS)) {
let sga = match libos.wait(qt, Some(TIMEOUT_SECONDS)) {
Ok(qr) if qr.qr_opcode == demi_opcode_t::DEMI_OPC_POP => unsafe { qr.qr_value.sga },
Ok(_) => anyhow::bail!("unexpected result"),
Err(e) => anyhow::bail!("operation failed: {:?}", e),
};

// Copy data.
let ptr: *mut u8 = sga.segments[0].data_buf_ptr as *mut u8;
let len: usize = sga.segments[0].data_len_bytes as usize;
let slice: &mut [u8] = unsafe { slice::from_raw_parts_mut(ptr, len) };
let ptr = sga.segments[0].data_buf_ptr as *mut u8;
let len = sga.segments[0].data_len_bytes as usize;
let slice = unsafe { slice::from_raw_parts_mut(ptr, len) };
for x in slice {
recvbuf[index] = *x;
index += 1;
Expand Down Expand Up @@ -156,7 +156,7 @@ pub struct TcpServer {

impl TcpServer {
pub fn new(mut libos: LibOS) -> Result<Self> {
let sockqd: QDesc = match libos.socket(AF_INET, SOCK_STREAM, 0) {
let sockqd = match libos.socket(AF_INET, SOCK_STREAM, 0) {
Ok(sockqd) => sockqd,
Err(e) => anyhow::bail!("failed to create socket: {:?}", e),
};
Expand All @@ -169,8 +169,8 @@ impl TcpServer {
});
}

pub fn run(&mut self, local_socket_addr: SocketAddr, num_rounds: usize) -> Result<()> {
if let Err(e) = self.libos.bind(self.listening_sockqd, local_socket_addr) {
pub fn run(&mut self, local_addr: SocketAddr, num_rounds: usize) -> Result<()> {
if let Err(e) = self.libos.bind(self.listening_sockqd, local_addr) {
anyhow::bail!("bind failed: {:?}", e)
};

Expand All @@ -185,11 +185,11 @@ impl TcpServer {

// Perform multiple ping-pong rounds.
for i in 0..num_rounds {
let mut fill_char: u8 = (i % (u8::MAX as usize - 1) + 1) as u8;
let mut fill_char = (i % (u8::MAX as usize - 1) + 1) as u8;

// Pop data, and sanity check it.
{
let mut recvbuf: [u8; BUFSIZE_BYTES] = [0; BUFSIZE_BYTES];
let mut recvbuf = [0; BUFSIZE_BYTES];
if let Err(e) = pop_and_wait(
&mut self.libos,
self.accepted_sockqd.expect("should be a valid queue descriptor"),
Expand Down Expand Up @@ -257,7 +257,7 @@ pub struct TcpClient {

impl TcpClient {
pub fn new(mut libos: LibOS) -> Result<Self> {
let sockqd: QDesc = match libos.socket(AF_INET, SOCK_STREAM, 0) {
let sockqd = match libos.socket(AF_INET, SOCK_STREAM, 0) {
Ok(sockqd) => sockqd,
Err(e) => anyhow::bail!("failed to create socket: {:?}", e),
};
Expand All @@ -269,14 +269,14 @@ impl TcpClient {
});
}

fn run(&mut self, remote_socket_addr: SocketAddr, num_rounds: usize) -> Result<()> {
if let Err(e) = connect_and_wait(&mut self.libos, self.sockqd, remote_socket_addr) {
fn run(&mut self, remote_addr: SocketAddr, num_rounds: usize) -> Result<()> {
if let Err(e) = connect_and_wait(&mut self.libos, self.sockqd, remote_addr) {
anyhow::bail!("connect and wait failed: {:?}", e);
}

// Perform multiple ping-pong rounds.
for i in 0..num_rounds {
let fill_char: u8 = (i % (u8::MAX as usize - 1) + 1) as u8;
let fill_char = (i % (u8::MAX as usize - 1) + 1) as u8;

// Push data.
{
Expand All @@ -293,11 +293,11 @@ impl TcpClient {
}
}

let mut fill_check: u8 = (i % (u8::MAX as usize - 1) + 1) as u8;
let mut fill_check = (i % (u8::MAX as usize - 1) + 1) as u8;

// Pop data, and sanity check it.
{
let mut recvbuf: [u8; BUFSIZE_BYTES] = [0; BUFSIZE_BYTES];
let mut recvbuf = [0; BUFSIZE_BYTES];
if let Err(e) = pop_and_wait(&mut self.libos, self.sockqd, &mut recvbuf) {
anyhow::bail!("pop and wait failed: {:?}", e);
}
Expand Down Expand Up @@ -339,26 +339,25 @@ pub fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();

if args.len() >= 3 {
let libos_name: LibOSName = match LibOSName::from_env() {
let libos_name = match LibOSName::from_env() {
Ok(libos_name) => libos_name.into(),
Err(e) => anyhow::bail!("{:?}", e),
};
let libos: LibOS = match LibOS::new(libos_name, None) {
let libos = match LibOS::new(libos_name, None) {
Ok(libos) => libos,
Err(e) => anyhow::bail!("failed to initialize libos: {:?}", e),
};
let sockaddr: SocketAddr = SocketAddr::from_str(&args[2])?;
let sockaddr = SocketAddr::from_str(&args[2])?;

if args[1] == "--server" {
let mut server: TcpServer = TcpServer::new(libos)?;
let mut server = TcpServer::new(libos)?;
return server.run(sockaddr, NUM_PING_PONG_ROUNDS);
} else if args[1] == "--client" {
let mut client: TcpClient = TcpClient::new(libos)?;
let mut client = TcpClient::new(libos)?;
return client.run(sockaddr, NUM_PING_PONG_ROUNDS);
}
}

usage(&args[0]);

Ok(())
}
Loading
Loading