diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..73f4de2 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,98 @@ +name: Rust CI + +on: + push: + branches: [ "*" ] + pull_request: + branches: [ "*" ] + +env: + CARGO_TERM_COLOR: always + +jobs: + build-and-test: + name: Build & Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Cache Cargo Registry + uses: actions/cache@v3 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy + + - name: Check Formatting + run: cargo fmt --all -- --check + + - name: Lint with Clippy + run: cargo clippy --workspace --all-targets --all-features -- -D warnings + + - name: Run Unit & Integration Tests + run: cargo test --workspace + + smoke-test: + name: End-to-End Smoke Test + runs-on: ubuntu-latest + needs: build-and-test + steps: + - uses: actions/checkout@v3 + - name: Build Release Binaries + run: cargo build --release --workspace + + - name: Run Smoke Test Script + run: | + # 1. Start Server in background + ./target/release/chat-server & + SERVER_PID=$! + echo "Server started (PID: $SERVER_PID)" + sleep 3 # Wait for startup + + # 2. Start Client A (Roku1) in the background + # She stays online longer (sleep 5) to hear Roku2 + (sleep 1; echo "send Hello I am waiting"; sleep 5; echo "leave") | ./target/release/chat-client 127.0.0.1 8080 Roku1 > roku1_output.txt & + ROKU1_PID=$! + + # 3. Start Client B (Roku2) + # Roku2 joins AFTER Roku1, sends a message, then leaves + sleep 2 + (echo "send Hello Roku1 from Roku2"; sleep 1; echo "leave") | ./target/release/chat-client 127.0.0.1 8080 Roku2 > roku2_output.txt & + ROKU2_PID=$! + + # Wait for both clients to finish their scripts + wait $ROKU1_PID + wait $ROKU2_PID + + # 4. Analyze Output + echo "--- Roku1's Logs ---" + cat roku1_output.txt + echo "--- Roku2's Logs ---" + cat roku2_output.txt + + # Check 1: Did Roku2 connect successfully? + if grep -q "Welcome Roku2" roku2_output.txt; then + echo "✅ Roku2 Handshake Success" + else + echo "❌ Roku2 Handshake Failed" + kill $SERVER_PID + exit 1 + fi + + # Check 2: Did Roku1 receive Roku2's message? + if grep -q "Roku2: Hello Roku1 from Roku2" roku1_output.txt; then + echo "✅ Broadcast Success (Roku1 heard Roku2)" + else + echo "❌ Broadcast Failed (Roku1 did not hear Roku2)" + kill $SERVER_PID + exit 1 + fi + + kill $SERVER_PID \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..7343c3b --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[workspace] +members = [ + "chat-common", + "chat-server-lib", + "chat-server", + "chat-client-lib", + "chat-client", +] +resolver = "2" \ No newline at end of file diff --git a/READ.md b/READ.md new file mode 100644 index 0000000..2dc38ae --- /dev/null +++ b/READ.md @@ -0,0 +1,30 @@ + + +```bash +#!/bin/bash +set -e + +echo "Running Pre-commit Checks..." + +# 1. Formatting +# We use --check to fail if files are unformatted +if ! cargo fmt --all -- --check; then + echo "❌ Cargo Fmt failed. Run 'cargo fmt' to fix." + exit 1 +fi + +# 2. Clippy (Linting) +# We deny warnings to ensure clean code +if ! cargo clippy --workspace --all-targets --all-features -- -D warnings; then + echo "❌ Clippy failed. Fix warnings before committing." + exit 1 +fi + +# 3. Tests +if ! cargo test --workspace; then + echo "❌ Tests failed." + exit 1 +fi + +echo "✅ All checks passed." +``` diff --git a/chat-client-lib/Cargo.toml b/chat-client-lib/Cargo.toml new file mode 100644 index 0000000..36d4cfc --- /dev/null +++ b/chat-client-lib/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "chat-client-lib" +version = "0.1.0" +edition = "2024" + +[dependencies] +tokio = { version = "1.36", features = ["full"] } +tokio-util = { version = "0.7", features = ["codec"] } +futures = "0.3" +anyhow = "1.0" +thiserror = "1.0" \ No newline at end of file diff --git a/chat-client-lib/src/lib.rs b/chat-client-lib/src/lib.rs new file mode 100644 index 0000000..e43dbf9 --- /dev/null +++ b/chat-client-lib/src/lib.rs @@ -0,0 +1,85 @@ +use anyhow::{Context, Result}; +use futures::{SinkExt, StreamExt}; +use tokio::net::TcpStream; +use tokio::sync::mpsc; +use tokio_util::codec::{Framed, LinesCodec}; + +pub async fn run_client(addr: String, username: String) -> Result<()> { + println!("Connecting to {}...", addr); + let stream = TcpStream::connect(&addr) + .await + .context("Failed to connect to chat server")?; + + let mut lines = Framed::new(stream, LinesCodec::new()); + + if let Some(Ok(msg)) = lines.next().await { + println!("{}", msg); + } + + lines.send(&username).await?; + + match lines.next().await { + Some(Ok(msg)) => { + println!("{}", msg); + if msg.starts_with("Invalid") || msg.starts_with("Username") { + return Ok(()); + } + } + _ => return Ok(()), + } + + println!("Session started. Type 'send ' to chat, or 'leave' to quit."); + + let (input_tx, mut input_rx) = mpsc::unbounded_channel::(); + + std::thread::spawn(move || { + let stdin = std::io::stdin(); + let mut line = String::new(); + loop { + line.clear(); + if stdin.read_line(&mut line).is_ok() { + if input_tx.send(line.trim().to_string()).is_err() { + break; + } + } else { + break; + } + } + }); + + loop { + tokio::select! { + msg = lines.next() => { + match msg { + Some(Ok(text)) => println!("{}", text), + Some(Err(e)) => { + eprintln!("Network error: {}", e); + break; + } + None => { + println!("Server disconnected."); + break; + } + } + } + + input = input_rx.recv() => { + match input { + Some(cmd) => { + if cmd == "leave" { + lines.send("leave").await?; + break; + } else if cmd.starts_with("send ") { + lines.send(&cmd).await?; + } else { + println!("Unknown command. Use 'send ' or 'leave'."); + } + } + None => break, + } + } + } + } + + Ok(()) +} diff --git a/chat-client/Cargo.toml b/chat-client/Cargo.toml new file mode 100644 index 0000000..3035f09 --- /dev/null +++ b/chat-client/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "chat-client" +version = "0.1.0" +edition = "2024" + +[dependencies] +chat-client-lib = { path = "../chat-client-lib" } +tokio = { version = "1.36", features = ["full"] } +anyhow = "1.0" +clap = { version = "4.4", features = ["derive", "env"] } \ No newline at end of file diff --git a/chat-client/src/main.rs b/chat-client/src/main.rs new file mode 100644 index 0000000..f1ed394 --- /dev/null +++ b/chat-client/src/main.rs @@ -0,0 +1,21 @@ +use anyhow::Result; +use chat_client_lib::run_client; +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(author, version, about)] +struct Args { + host: String, + + port: u16, + + username: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let addr = format!("{}:{}", args.host, args.port); + + run_client(addr, args.username).await +} diff --git a/chat-common/Cargo.toml b/chat-common/Cargo.toml new file mode 100644 index 0000000..be61001 --- /dev/null +++ b/chat-common/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "chat-common" +version = "0.1.0" +edition = "2024" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } \ No newline at end of file diff --git a/chat-common/src/lib.rs b/chat-common/src/lib.rs new file mode 100644 index 0000000..a0ba3e9 --- /dev/null +++ b/chat-common/src/lib.rs @@ -0,0 +1,47 @@ +//! Common definitions for the chat application. + +pub const MAX_USERNAME_LEN: usize = 32; + +pub const MAX_MESSAGE_LEN: usize = 1024; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BroadcastMessage { + pub sender_id: String, + pub content: String, + pub is_system: bool, +} + +impl BroadcastMessage { + pub fn user_msg(sender: impl Into, content: impl Into) -> Self { + Self { + sender_id: sender.into(), + content: content.into(), + is_system: false, + } + } + + pub fn system_msg(content: impl Into) -> Self { + Self { + sender_id: "SYSTEM".to_string(), + content: content.into(), + is_system: true, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_message_creation() { + let msg = BroadcastMessage::user_msg("Alice", "Hello"); + assert_eq!(msg.sender_id, "Alice"); + assert_eq!(msg.content, "Hello"); + assert!(!msg.is_system); + + let sys = BroadcastMessage::system_msg("Alert"); + assert_eq!(sys.sender_id, "SYSTEM"); + assert!(sys.is_system); + } +} diff --git a/chat-server-lib/Cargo.toml b/chat-server-lib/Cargo.toml new file mode 100644 index 0000000..069e738 --- /dev/null +++ b/chat-server-lib/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "chat-server-lib" +version = "0.1.0" +edition = "2024" + +[dependencies] +chat-common = { path = "../chat-common" } +tokio = { version = "1.36", features = ["full"] } +tokio-util = { version = "0.7", features = ["codec"] } +futures = "0.3" +tracing = "0.1" +dashmap = "5.5" +anyhow = "1.0" +thiserror = "1.0" + +[dev-dependencies] +tokio-test = "0.4" \ No newline at end of file diff --git a/chat-server-lib/src/lib.rs b/chat-server-lib/src/lib.rs new file mode 100644 index 0000000..4a285fc --- /dev/null +++ b/chat-server-lib/src/lib.rs @@ -0,0 +1,148 @@ +use anyhow::Result; +use chat_common::{BroadcastMessage, MAX_USERNAME_LEN}; +use dashmap::DashMap; +use futures::{SinkExt, StreamExt}; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio::sync::broadcast; +use tokio_util::codec::{Framed, LinesCodec}; +use tracing::{info, warn}; + +const BROADCAST_CAPACITY: usize = 100; + +pub struct ServerState { + pub active_users: DashMap, + pub tx: broadcast::Sender, +} + +impl ServerState { + pub fn new() -> Self { + let (tx, _rx) = broadcast::channel(BROADCAST_CAPACITY); + Self { + active_users: DashMap::new(), + tx, + } + } +} + +impl Default for ServerState { + fn default() -> Self { + Self::new() + } +} + +pub async fn handle_connection(socket: TcpStream, state: Arc) -> Result<()> { + let addr = socket + .peer_addr() + .unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap()); + + let mut lines = Framed::new(socket, LinesCodec::new()); + + lines.send("Welcome! Please enter your username:").await?; + + let username = match lines.next().await { + Some(Ok(line)) => line.trim().to_string(), + _ => return Ok(()), + }; + + if username.is_empty() || username.len() > MAX_USERNAME_LEN { + lines.send("Invalid username. Bye.").await?; + return Ok(()); + } + + if state.active_users.contains_key(&username) { + lines.send("Username already taken. Bye.").await?; + return Ok(()); + } + state.active_users.insert(username.clone(), ()); + + info!("User '{}' joined from {}.", username, addr); + lines + .send(format!("Welcome {}, you are now connected.", username)) + .await?; + + let _ = state.tx.send(BroadcastMessage::system_msg(format!( + "{} has joined the room.", + username + ))); + + let mut rx = state.tx.subscribe(); + let my_username = username.clone(); + + loop { + tokio::select! { + result = lines.next() => { + match result { + Some(Ok(msg)) => { + let msg = msg.trim(); + if msg == "leave" { + break; + } else if let Some(content) = msg.strip_prefix("send ") { + let _ = state.tx.send(BroadcastMessage::user_msg(my_username.clone(), content)); + } else { + lines.send("Usage: 'send ' or 'leave'.").await?; + } + } + Some(Err(e)) => { + warn!("Error reading from {}: {}", my_username, e); + break; + } + None => break, + } + } + + result = rx.recv() => { + match result { + Ok(msg) => { + if msg.sender_id!= my_username { + let text = if msg.is_system { + format!("System: {}", msg.content) + } else { + format!("{}: {}", msg.sender_id, msg.content) + }; + + if let Err(e) = lines.send(text).await { + warn!("Failed to send to {}: {}", my_username, e); + break; + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => { + warn!("Client {} lagged by {} messages", my_username, count); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + } + } + + state.active_users.remove(&my_username); + let _ = state.tx.send(BroadcastMessage::system_msg(format!( + "{} has left the room.", + my_username + ))); + info!("User '{}' disconnected.", my_username); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_state_initialization() { + let state = ServerState::new(); + assert!(state.active_users.is_empty()); + assert!(state.tx.receiver_count() == 0); + } + + #[tokio::test] + async fn test_user_registration() { + let state = ServerState::new(); + state.active_users.insert("Bob".to_string(), ()); + + assert!(state.active_users.contains_key("Bob")); + assert!(!state.active_users.contains_key("Alice")); + } +} diff --git a/chat-server/Cargo.toml b/chat-server/Cargo.toml new file mode 100644 index 0000000..241c1ec --- /dev/null +++ b/chat-server/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "chat-server" +version = "0.1.0" +edition = "2024" + +[dependencies] +chat-server-lib = { path = "../chat-server-lib" } +tokio = { version = "1.36", features = ["full"] } +tracing = "0.1" +tracing-subscriber = "0.3" +anyhow = "1.0" \ No newline at end of file diff --git a/chat-server/src/main.rs b/chat-server/src/main.rs new file mode 100644 index 0000000..f920d1b --- /dev/null +++ b/chat-server/src/main.rs @@ -0,0 +1,28 @@ +use anyhow::Result; +use chat_server_lib::{ServerState, handle_connection}; +use std::sync::Arc; +use tokio::net::TcpListener; +use tracing::{error, info}; + +const SERVER_ADDR: &str = "0.0.0.0:8080"; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); + + let listener = TcpListener::bind(SERVER_ADDR).await?; + info!("Chat server listening on {}", SERVER_ADDR); + + let state = Arc::new(ServerState::new()); + + loop { + let (socket, _) = listener.accept().await?; + let state = state.clone(); + + tokio::spawn(async move { + if let Err(e) = handle_connection(socket, state).await { + error!("Connection error: {:?}", e); + } + }); + } +} diff --git a/chat-server/tests/integration.rs b/chat-server/tests/integration.rs new file mode 100644 index 0000000..e45114e --- /dev/null +++ b/chat-server/tests/integration.rs @@ -0,0 +1,49 @@ +use chat_server_lib::{ServerState, handle_connection}; +use std::sync::Arc; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; + +#[tokio::test] +async fn test_full_chat_flow() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let state = Arc::new(ServerState::new()); + + let state_clone = state.clone(); + tokio::spawn(async move { + loop { + let (socket, _) = listener.accept().await.unwrap(); + let s = state_clone.clone(); + tokio::spawn(async move { + handle_connection(socket, s).await.ok(); + }); + } + }); + + let mut client_a = TcpStream::connect(addr).await.unwrap(); + let mut buf_a = [0u8; 1024]; + + let n = client_a.read(&mut buf_a).await.unwrap(); + assert!(String::from_utf8_lossy(&buf_a[..n]).contains("Please enter your username")); + + client_a.write_all(b"Alice\n").await.unwrap(); + + let n = client_a.read(&mut buf_a).await.unwrap(); + assert!(String::from_utf8_lossy(&buf_a[..n]).contains("Welcome Alice")); + + let mut client_b = TcpStream::connect(addr).await.unwrap(); + let mut buf_b = [0u8; 1024]; + let _ = client_b.read(&mut buf_b).await.unwrap(); // Prompt + client_b.write_all(b"Bob\n").await.unwrap(); // Username + let _ = client_b.read(&mut buf_b).await.unwrap(); // Welcome + + let n = client_a.read(&mut buf_a).await.unwrap(); + let msg = String::from_utf8_lossy(&buf_a[..n]); + assert!(msg.contains("Bob has joined")); + + client_b.write_all(b"send Hello World\n").await.unwrap(); + + let n = client_a.read(&mut buf_a).await.unwrap(); + let msg = String::from_utf8_lossy(&buf_a[..n]); + assert!(msg.contains("Bob: Hello World")); +}