From e124e5646791c83d2de0a440374c8d66515ca07e Mon Sep 17 00:00:00 2001 From: nihalpasham Date: Sun, 27 Oct 2024 16:21:53 +0530 Subject: [PATCH 1/2] simple chat test --- .github/workflows/chat-server.yml | 41 ++++++ Cargo.toml | 33 +++++ async-chat-client/Cargo.toml | 19 +++ async-chat-client/notes.md | 24 ++++ async-chat-client/src/main.rs | 206 ++++++++++++++++++++++++++++++ chat-server/Cargo.toml | 10 ++ chat-server/notes.md | 7 + chat-server/src/main.rs | 105 +++++++++++++++ src/mod.rs | 1 + 9 files changed, 446 insertions(+) create mode 100644 .github/workflows/chat-server.yml create mode 100644 Cargo.toml create mode 100644 async-chat-client/Cargo.toml create mode 100644 async-chat-client/notes.md create mode 100644 async-chat-client/src/main.rs create mode 100644 chat-server/Cargo.toml create mode 100644 chat-server/notes.md create mode 100644 chat-server/src/main.rs create mode 100644 src/mod.rs diff --git a/.github/workflows/chat-server.yml b/.github/workflows/chat-server.yml new file mode 100644 index 0000000..d1c89a5 --- /dev/null +++ b/.github/workflows/chat-server.yml @@ -0,0 +1,41 @@ +name: Build and Test Rust Workspace + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Set up Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + components: rustfmt + + - name: Build chat-server and async-chat-client + run: | + cd chat-server + cargo build --release + cd ../async-chat-client + cargo build --release + + - name: Run chat-server + run: | + nohup ./target/release/chat-server & + sleep 5 # Wait for the server to start + + - name: Test async-chat-client connection to server + run: | + cd async-chat-client + echo -e "send hello from testuser\nleave" | cargo run -- --host 127.0.0.1 --port 12345 --username "testuser" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..068107f --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,33 @@ +[workspace] +resolver = '2' +members = ["chat-server", "async-chat-client"] + +[workspace.package] +version = "0.1.0" +authors = ["Nihal Pasham"] +edition = "2021" +description = "A simple chat app" +license = "MIT" +readme = "README.md" +keywords = ["async", "chat"] +documentation = "https://github.com/nihalpasham/simple-chat" +repository = "https://github.com/nihalpasham/simple-chat" + +[workspace.lints.rust] +# Turn on some lints which are otherwise allow-by-default in rustc. +unstable_features = 'warn' +unused_import_braces = 'warn' + +[workspace.lints.clippy] +# The default set of lints in Clippy is viewed as "too noisy" right now so +# they're all turned off by default. Selective lints are then enabled below as +# necessary. +all = { level = 'allow', priority = -1 } +clone_on_copy = 'warn' +map_clone = 'warn' +uninlined_format_args = 'warn' +unnecessary_to_owned = 'warn' +manual_strip = 'warn' +unnecessary_mut_passed = 'warn' +unnecessary_fallible_conversions = 'warn' +unnecessary_cast = 'warn' diff --git a/async-chat-client/Cargo.toml b/async-chat-client/Cargo.toml new file mode 100644 index 0000000..685b454 --- /dev/null +++ b/async-chat-client/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "async-chat-client" +version.workspace = true +authors.workspace = true +edition.workspace = true +description.workspace = true +license.workspace = true +readme.workspace = true +keywords.workspace = true +documentation.workspace = true +repository.workspace = true + +[dependencies] +mio = { version = "1.0.2", features = ["net", "os-ext", "os-poll"] } +clap = { version = "4.0", features = ["derive"] } + + +[lints] +workspace = true diff --git a/async-chat-client/notes.md b/async-chat-client/notes.md new file mode 100644 index 0000000..56aa476 --- /dev/null +++ b/async-chat-client/notes.md @@ -0,0 +1,24 @@ +### Requirements: + +- **Command-line Arguments & Environment Variables:** + - Uses the clap crate to parse command-line arguments (host, port, and username). + - Environment variables (HOST, PORT, and USERNAME) are fallback options. +- **Networking:** + - A TcpStream is created to connect to the specified server. + - The Poll and Events are used to asynchronously handle events like reading from the TCP stream or stdin. +- **Handling Events:** + - The client listens for incoming messages from the server or inputs from the user. + - When the user types send , the message is sent to the server. + - If leave is typed, the client disconnects and exits. +- **Async I/O:** + - mio is used for non-blocking, event-based network communication. + - `Stdin` is handled in a separate thread, and input is sent to the main loop using an mpsc channel. +- **Interactive Prompt:** + The user can type send to send a message and leave to exit the program. + +### Usage + +Run the client using: +```sh +cargo run -- --host {} --port {} --username "{}" +``` diff --git a/async-chat-client/src/main.rs b/async-chat-client/src/main.rs new file mode 100644 index 0000000..25e4f54 --- /dev/null +++ b/async-chat-client/src/main.rs @@ -0,0 +1,206 @@ +use clap::Parser; +use mio::net::TcpStream; +use mio::unix::SourceFd; // For handling `Stdin` on Unix-like systems +use mio::{Events, Interest, Poll, Token}; +use std::env; +use std::io::{self, Read, Write}; +use std::net::SocketAddr; +use std::os::unix::io::AsRawFd; + +/// Command-line argument struct for configuring the chat application. +#[derive(Parser)] +struct Args { + /// The host of the server (default: 127.0.0.1) + #[arg(long, default_value = "127.0.0.1")] + host: String, + + /// The port of the server (default: 8080) + #[arg(short, long, default_value = "12345")] + port: String, + + /// The username used for identification + #[arg(short, long)] + username: String, +} + +// Constants for the server and stdin events. +const SERVER: Token = Token(0); +const STDIN: Token = Token(1); + +/// Entry point of the chat application. Manages connection and polling of events. +fn main() -> io::Result<()> { + // Parse the command-line arguments + let args = Args::parse(); + + let host = env::var("HOST").unwrap_or(args.host); + let port = env::var("PORT").unwrap_or(args.port); + let username = env::var("USERNAME").unwrap_or(args.username); + + // Create a stream socket and initiate a connection + let address = format!("{host}:{port}"); + let username = format!("{username}\n"); + let server_address: SocketAddr = address.parse().unwrap(); + let mut stream = TcpStream::connect(server_address)?; + println!("Connecting to server at {} as {}", &address, &username); + + // We'll need the raw file descriptor for the standard input stream + let stdin = io::stdin(); + let stdin_fd = stdin.as_raw_fd(); + + // Set up polling to handle both stdin and the TCP stream + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(128); + + // Register the connection with the Poll instance + poll.registry() + .register(&mut stream, SERVER, Interest::READABLE | Interest::WRITABLE)?; + + // Register `Stdin` as a source for polling + poll.registry() + .register(&mut SourceFd(&stdin_fd), STDIN, Interest::READABLE)?; + + const BUF_SIZE: usize = 512; + let mut input_buffer = Vec::new(); + let mut server_buffer = [0; BUF_SIZE]; + let mut bytes_to_send; + let mut bytes_written = 0; + let mut username_sent = false; + + // Main event loop + loop { + poll.poll(&mut events, None)?; + + for event in events.iter() { + match event.token() { + SERVER => { + if event.is_readable() { + match stream.read(&mut server_buffer) { + Ok(0) => { + println!("Connection closed by server."); + return Ok(()); + } + Ok(n) => { + let msg = String::from_utf8_lossy(&server_buffer[..n]); + println!("{}", msg.trim()); + } + Err(ref err) if would_block(err) => {} + Err(e) => { + eprintln!("Error reading from server: {e}"); + return Err(e); + } + } + } + + if event.is_writable() { + if !username_sent { + input_buffer.extend_from_slice(username.as_bytes()); + // In this simple chat app, we assume the username is short and will be sent in a single write. + // Note: This assumption may not hold in all cases, as `stream.write` does NOT guarantee that + // the entire buffer will be written at once. According to the documentation, we should loop + // until either a `WouldBlock` error occurs or the entire data buffer is sent. + let _ = stream.write(&input_buffer.as_slice()); + username_sent = true; + } + } + } + + STDIN => { + // Handle input from `Stdin` + let mut input = String::new(); + stdin.read_line(&mut input).expect("Failed to read input"); + input = input.trim().to_string(); + + if let Some(stripped) = input.strip_prefix("send ") { + let message = format!("{stripped}\n"); + let msg_len = message.len(); + input_buffer.clear(); + input_buffer.extend_from_slice(message.as_bytes()); + bytes_to_send = msg_len; + // If we receive a write readiness event but skip writing due to `!input_buffer.is_empty()` + // or an incomplete `input_buffer.extend_from_slice(message.as_bytes())` call, the code may + // not write to the stream as expected since we may miss the SERVER token. + + // To handle this, we write to the stream as soon as user input is received from stdin. + // Note: there are more robust solutions for handling this, but for a basic chat app, + // this approach should be sufficient while maintaining asynchronous behavior. + match stream.write(&input_buffer[bytes_written..bytes_to_send]) { + // Continue writing until we hit a `WouldBlock` + Ok(n) if n < bytes_to_send => { + bytes_written += n; + continue; + } + // Our data buffer has been exhausted i.e. we have sent everything we need to + Ok(_v) => { + input_buffer.clear(); + break; + } + // Encountered a `WouldBlock`, stop and poll again for readiness + Err(ref err) if would_block(err) => { + println!("{}", io::ErrorKind::WouldBlock); + break; + } + Err(e) => { + eprintln!("Error writing to server: {e}"); + return Err(e); + } + } + } else if input == "leave" { + println!("Disconnecting..."); + return Ok(()); + } else { + println!("Invalid command. Use 'send ' or 'leave'"); + } + } + + _token => { + println!("Got a spurious event!") + } + } + } + } +} + +fn would_block(err: &io::Error) -> bool { + err.kind() == io::ErrorKind::WouldBlock +} + +#[cfg(test)] +mod tests { + use super::*; + use clap::Parser; + + #[test] + fn test_args_parsing() { + // Arrange: create a sample set of arguments + let args = Args::parse_from(&[ + "test", + "--host", + "192.168.0.1", + "--port", + "9000", + "--username", + "testuser", + ]); + + // Assert: verify the parsed values match expected inputs + assert_eq!(args.host, "192.168.0.1"); + assert_eq!(args.port, "9000"); + assert_eq!(args.username, "testuser"); + } + + #[test] + fn test_username_initialization() { + // Arrange: simulate username setup + let username = "testuser\n"; + let mut input_buffer = Vec::new(); + + // Act: extend input_buffer with the username bytes + input_buffer.extend_from_slice(username.as_bytes()); + + // Assert: check that the input buffer has the username content + assert_eq!( + String::from_utf8(input_buffer.clone()).unwrap(), + "testuser\n" + ); + } +} diff --git a/chat-server/Cargo.toml b/chat-server/Cargo.toml new file mode 100644 index 0000000..d0ca3d0 --- /dev/null +++ b/chat-server/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "chat-server" +version.workspace = true +authors.workspace = true +description.workspace = true +documentation.workspace = true +edition.workspace = true + + +[dependencies] diff --git a/chat-server/notes.md b/chat-server/notes.md new file mode 100644 index 0000000..2cd8840 --- /dev/null +++ b/chat-server/notes.md @@ -0,0 +1,7 @@ +### Requirements: + +- **User Management:** Each user is uniquely identified by a username. It will prompt the user for a username and check for uniqueness. +- **Message Broadcasting:** Messages are broadcasted to all users except the sender. +- **Leave or Disconnect:** When a user sends a /leave message or disconnects, the server removes the user from the active user list. +- **Threaded for Concurrency:** Each client connection is handled in a separate thread for parallelism, ensuring low latency for multiple users. +- **Memory Efficient:** The server uses Arc> and Arc to share state (user connections and usernames) between threads with minimal memory overhead. \ No newline at end of file diff --git a/chat-server/src/main.rs b/chat-server/src/main.rs new file mode 100644 index 0000000..c4fdc49 --- /dev/null +++ b/chat-server/src/main.rs @@ -0,0 +1,105 @@ +use std::collections::{HashMap, HashSet}; +use std::io::{BufRead, BufReader, Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::sync::{Arc, Mutex}; +use std::thread; + +/// Type alias for the list of users connected to the chat server. +type UserList = Arc, TcpStream>>>; +/// Type alias for the list of active users/connections. +type ActiveUsers = Arc>>>; + +/// Handles a connected client. +/// +/// This function processes messages sent by the client and broadcasts them to +/// other connected users. It also removes the user from the list when they leave. +fn handle_client( + stream: TcpStream, + username: Arc, + user_list: UserList, + active_usrs: ActiveUsers, +) { + let reader = BufReader::new(stream); + for line in reader.lines() { + let message = match line { + Ok(msg) => msg, + Err(e) => e.to_string(), + }; + if message == "/leave" { + break; + } + // Broadcast message to everyone in the user_list, except the sender + let mut user_list = user_list.lock().unwrap(); + for (user, user_stream) in user_list.iter_mut() { + if user != &username { + writeln!(user_stream, "[{}]: {}", username, message) + .expect("Failed to send message"); + } + } + } + + // Cleanup after user leaves + user_list.lock().unwrap().remove(&username); + active_usrs.lock().unwrap().remove(&username); + println!("User {} has left", username); +} + +/// Main function that initializes the server and listens for incoming connections. +/// The server waits for a username from the client, verifies its uniqueness, and then +/// allows the user to join the chat room. +fn main() { + let listener = TcpListener::bind("0.0.0.0:12345").expect("Failed to bind"); + let user_list = Arc::new(Mutex::new(HashMap::new())); + let active_usernames = Arc::new(Mutex::new(HashSet::new())); + let mut stream; + + for s in listener.incoming() { + match s { + Ok(s) => { + println!("Received a connection from: {:?}", s.peer_addr().unwrap()); + stream = s + } + Err(e) => { + println!("Failed to accept new connection: {}", e); + continue; + } + }; + + // Get a unique username from the client. + let mut buffer = [0; 512]; + let mut username = String::new(); + loop { + let bytes_read = stream.read(&mut buffer).expect("Failed to read username"); + username.push_str(String::from_utf8_lossy(&buffer[..bytes_read]).trim()); + + if username.contains(" ") || username.contains("/leave") { + writeln!(&mut stream, "Invalid username").expect("Failed to write"); + } + + // Ensure the username is unique + if active_usernames.lock().unwrap().contains(&username) { + writeln!(&mut stream, "Username is already taken").expect("Failed to write"); + continue; + } + break; + } + + // Arc avoids unecessary `String` allocations + let usr = Arc::new(username); + + // Register user + println!("User {} has joined", usr.as_str()); + active_usernames.lock().unwrap().insert(usr.clone()); + user_list + .lock() + .unwrap() + .insert(usr.clone(), stream.try_clone().expect("Failed to clone")); + + // Spawn a new thread to handle this client's connection + let user_list_clone = Arc::clone(&user_list); + let active_usrs_clone = Arc::clone(&active_usernames); + thread::spawn(move || { + handle_client(stream, usr, user_list_clone, active_usrs_clone); + }); + } +} diff --git a/src/mod.rs b/src/mod.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/mod.rs @@ -0,0 +1 @@ + From 421ed55ed8913fcba6d9397a9c2ec587665f2a75 Mon Sep 17 00:00:00 2001 From: nihalpasham Date: Sun, 27 Oct 2024 16:50:02 +0530 Subject: [PATCH 2/2] added demo video and links --- Demo.md | 8 ++++++++ async-chat-client/notes.md | 6 +++--- chat-server/notes.md | 2 +- 3 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 Demo.md diff --git a/Demo.md b/Demo.md new file mode 100644 index 0000000..47ef6d1 --- /dev/null +++ b/Demo.md @@ -0,0 +1,8 @@ +# Demo + +![simple_chat](https://github.com/user-attachments/assets/298cb223-aaa3-45a4-bb85-0582fd88427e) + +## Links + +- [chat-server](https://github.com/nihalpasham/simple-chat/blob/main/chat-server/notes.md) +- [async-chat-client](https://github.com/nihalpasham/simple-chat/blob/main/async-chat-client/notes.md) \ No newline at end of file diff --git a/async-chat-client/notes.md b/async-chat-client/notes.md index 56aa476..f40e27f 100644 --- a/async-chat-client/notes.md +++ b/async-chat-client/notes.md @@ -1,11 +1,11 @@ ### Requirements: - **Command-line Arguments & Environment Variables:** - - Uses the clap crate to parse command-line arguments (host, port, and username). + - Uses the `clap` crate to parse command-line arguments (host, port, and username). - Environment variables (HOST, PORT, and USERNAME) are fallback options. - **Networking:** - - A TcpStream is created to connect to the specified server. - - The Poll and Events are used to asynchronously handle events like reading from the TCP stream or stdin. + - A `TcpStream` is created to connect to the specified server. + - The `Poll` and `Events` are used to asynchronously handle events like reading from the TCP stream or stdin. - **Handling Events:** - The client listens for incoming messages from the server or inputs from the user. - When the user types send , the message is sent to the server. diff --git a/chat-server/notes.md b/chat-server/notes.md index 2cd8840..d1eced6 100644 --- a/chat-server/notes.md +++ b/chat-server/notes.md @@ -4,4 +4,4 @@ - **Message Broadcasting:** Messages are broadcasted to all users except the sender. - **Leave or Disconnect:** When a user sends a /leave message or disconnects, the server removes the user from the active user list. - **Threaded for Concurrency:** Each client connection is handled in a separate thread for parallelism, ensuring low latency for multiple users. -- **Memory Efficient:** The server uses Arc> and Arc to share state (user connections and usernames) between threads with minimal memory overhead. \ No newline at end of file +- **Memory Efficient:** The server uses `Arc>` and `Arc` to share state (user connections and usernames) between threads with minimal memory overhead. \ No newline at end of file