From a79a9b495996b393d1b434af14c43a2e223cd2d9 Mon Sep 17 00:00:00 2001 From: Joshua Job Date: Sun, 21 Feb 2021 14:29:37 -0800 Subject: [PATCH 1/2] post_meetup: Add ability to specify file descriptor for service This aids in servers either operating without disruption to new connections or in situations where a tcp port needs to be preserved. A number of checks are performed to ensure that the passed file descriptor is infact an AF_INET or AF_INET6 family and type SOCK_STREAM. --- Cargo.toml | 1 + src/bin/post_meetup.rs | 121 +++++++++++++++++++++++++++-- tests/common/find_service_setup.rs | 40 ++++++++-- 3 files changed, 151 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b034e36..a6acf4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = '2018' license = "MIT" [dependencies] +libc = "0.2" log = "0.4" env_logger = "0.7" futures = { version = "0.3"} diff --git a/src/bin/post_meetup.rs b/src/bin/post_meetup.rs index 01541cf..f0c9bcd 100644 --- a/src/bin/post_meetup.rs +++ b/src/bin/post_meetup.rs @@ -1,5 +1,5 @@ extern crate tokio; -use clap::{crate_authors, App as ClApp, Arg}; +use clap::{crate_authors, App as ClApp, Arg, ArgGroup}; use futures::future; use futures::StreamExt; use log::*; @@ -15,6 +15,65 @@ use post::find_service::{ }; use tonic::transport::Server; +use std::os::raw::{c_int, c_void}; +use std::os::unix::io as unix_io; +use std::os::unix::io::FromRawFd; + +#[derive(Debug)] +pub struct InvalidSocketDescriptor { + fd: unix_io::RawFd, +} + +impl InvalidSocketDescriptor { + pub fn new(fd: unix_io::RawFd) -> InvalidSocketDescriptor { + InvalidSocketDescriptor { fd } + } +} + +impl std::fmt::Display for InvalidSocketDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Invalid Socket File Descriptor: {}", self.fd) + } +} + +impl std::error::Error for InvalidSocketDescriptor {} + +unsafe fn getsocketopt(fd: unix_io::RawFd, level: c_int, opt: c_int) -> std::io::Result { + let mut val: T = std::mem::zeroed(); + let mut val_size: libc::socklen_t = std::mem::size_of::() as u32; + let ret = libc::getsockopt( + fd, + level, + opt, + &mut val as *mut T as *mut c_void, + &mut val_size as *mut libc::socklen_t, + ); + if ret < 0 { + Err(std::io::Error::last_os_error()) + } else { + Ok(val) + } +} + +fn listener_from_raw( + fd: unix_io::RawFd, +) -> Result> { + let family: c_int = unsafe { getsocketopt(fd, libc::SOL_SOCKET, libc::SO_DOMAIN) }?; + + if family != libc::AF_INET && family != libc::AF_INET6 { + eprint!("Unable to use socket of family: {}", family); + return Err(Box::new(InvalidSocketDescriptor::new(fd))); + } + + let sock_type: c_int = unsafe { getsocketopt(fd, libc::SOL_SOCKET, libc::SO_TYPE) }?; + + if sock_type != libc::SOCK_STREAM { + return Err(Box::new(InvalidSocketDescriptor::new(fd))); + } + + Ok(unsafe { std::net::TcpListener::from_raw_fd(fd) }) +} + #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); @@ -27,10 +86,20 @@ async fn main() -> Result<(), Box> { .short("b") .long("bind") .takes_value(true) - .required(true) .validator(socket_validator) .help("IP and port to bind to in the form of :. Use 0.0.0.0 for any IP on the system."), ) + .arg( + Arg::with_name("fd") + .long("fd") + .takes_value(true) + .help("File descriptor of bound TCP socket ready for `listen` to be called on it") + ) + .group( + ArgGroup::with_name("Socket description") + .args(&["bind","fd"]) + .required(true) + ) .arg( Arg::with_name("publisher-timeout") .short("t") @@ -62,7 +131,20 @@ async fn main() -> Result<(), Box> { .parse() .unwrap(), ); - let bind_info = matches.value_of("bind").unwrap().parse().unwrap(); + let listener = match matches.value_of("bind") { + Some(addr) => { + tokio::net::TcpListener::bind( + std::net::ToSocketAddrs::to_socket_addrs(addr)? + .next() + .expect("No Address found"), + ) + .await? + } + None => tokio::net::TcpListener::from_std(listener_from_raw( + matches.value_of("fd").unwrap().parse::().unwrap(), + )?)?, + }; + let local_address = listener.local_addr()?; let publisher_store = HashMapPublisherStore::new(RwLock::new(HashMap::new())); let meetup_server_options = MeetupServerOptions { @@ -74,9 +156,10 @@ async fn main() -> Result<(), Box> { let server = Server::builder() .add_service(FindMeServer::new(meetup_server)) - .serve(bind_info); + .serve_with_incoming(listener); + + info!("server listening at: {}", local_address); - info!("Started server {}", bind_info); remove_expired_publishers(publisher_store, scan_interval); server.await?; @@ -128,3 +211,31 @@ pub fn socket_validator(v: String) -> Result<(), String> { )), } } + +#[cfg(test)] +mod tests { + use std::os::unix::io::AsRawFd; + + fn init() { + let _ = env_logger::builder().is_test(true).try_init(); + } + + #[test] + fn test_bad_fd() { + let orig_listener = std::net::UdpSocket::bind(("127.0.0.1", 0)) + .expect("Unable to make a udp socket bound to localhost"); + let fd = orig_listener.as_raw_fd(); + + let _listener = + super::listener_from_raw(fd).expect_err("Sent invalid socket, recieved success"); + } + + #[test] + fn test_good_fd() { + let orig_listener = std::net::TcpListener::bind(("127.0.0.1", 0)) + .expect("Unable to make a tcp socket bound to localhost"); + let fd = orig_listener.as_raw_fd(); + + let _listener = super::listener_from_raw(fd).expect("Sent valid socket, recieved error"); + } +} diff --git a/tests/common/find_service_setup.rs b/tests/common/find_service_setup.rs index c84763e..7ff93bd 100644 --- a/tests/common/find_service_setup.rs +++ b/tests/common/find_service_setup.rs @@ -1,4 +1,6 @@ use post::find_service::Client; +use std::os::unix::io::AsRawFd; +use std::os::unix::io::RawFd; ///Wraps an external find service process and provides easy access to its functions pub struct FindService { @@ -6,7 +8,23 @@ pub struct FindService { client: post::find_service::Client, } -pub async fn retry_client(url: &'static str) -> post::find_service::Client { +fn unset_close_on_exec(fd: RawFd) -> std::io::Result { + use std::io; + let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) }; + if flags == -1 { + return Err(io::Error::last_os_error()); + } + let was_set = flags & libc::FD_CLOEXEC != 0; + log::info!("State of Close on exec: {}", was_set); + let result = unsafe { libc::fcntl(fd, libc::F_SETFD, flags & !libc::FD_CLOEXEC) }; + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(fd) + } +} + +pub async fn retry_client(url: String) -> post::find_service::Client { let retries: i32 = 10; let mut retry: i32 = 0; @@ -14,7 +32,7 @@ pub async fn retry_client(url: &'static str) -> post::find_service::Client { if retry >= retries { panic!("Retries exceeded"); } - if let Ok(mut client) = post::find_service::Client::from_url(url) + if let Ok(mut client) = post::find_service::Client::from_url(url.clone()) .unwrap() .set_connect_timeout(std::time::Duration::from_secs(60)) .connect() @@ -35,16 +53,26 @@ impl FindService { pub async fn new() -> FindService { log::info!("Starting new meetup service"); let path = "target/debug/post-meetup"; - let url = "http://127.0.0.1:8080/"; - let bind = "127.0.0.1:8080"; + + let listener = std::net::TcpListener::bind(("127.0.0.1", 0)) + .expect("could not reserve address for find service"); + let port = listener + .local_addr() + .expect("could not retrieve port from OS for find service") + .port(); + let listener_fd = + unset_close_on_exec(listener.as_raw_fd()).expect("could not disable close on exec"); + + log::info!("meetup server starting on port {}", port); + let url = format!("http://127.0.0.1:{}/", port); let _proc = tokio::process::Command::new(path) .arg("-s") .arg("5") .arg("-t") .arg("5") - .arg("--bind") - .arg(bind) + .arg("--fd") + .arg(format!("{}", listener_fd)) .kill_on_drop(true) .spawn() .expect("Failed to start meetup"); From 7d4fad279b7ca2695200bb850c07ddd5d58e7714 Mon Sep 17 00:00:00 2001 From: Joshua Job Date: Sun, 21 Feb 2021 14:34:40 -0800 Subject: [PATCH 2/2] tests: Enable parallel testing Since meetup servers now can use socket file descriptors directly and not a fixed port, These tests can be run in parallel. The meetup servers will not collide. --- .github/workflows/rust.yml | 2 +- tests/publisher_subscriber.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 007e4da..46cc381 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -27,7 +27,7 @@ jobs: - name: Run Clippy against main lib run: cargo clippy --workspace --exclude tests - name: Run tests - run: cargo test --verbose -- --test-threads=1 + run: cargo test --verbose -- env: RUST_LOG: info - name: Rust Format diff --git a/tests/publisher_subscriber.rs b/tests/publisher_subscriber.rs index 4325317..5441fc5 100644 --- a/tests/publisher_subscriber.rs +++ b/tests/publisher_subscriber.rs @@ -149,7 +149,7 @@ async fn publisher_cleanup() { let desc = post::PublisherDesc { name: publisher_name.clone(), host_name: "127.0.0.1".to_string(), - port: 5000, + port: 5001, subscriber_expiration_interval: std::time::Duration::from_secs(2), };