diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..b343e6c --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "frontend"] + path = frontend + url = https://github.com/maximka76667/TM-software-H11 diff --git a/README.md b/README.md index b9ab751..631c614 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,12 @@ -# TM Software H11: Task 3 +# TM Software H11: Final Stage This Go project simulates a complete vehicle telemetry system. -It connects multiple components — a **Generator**, **Hub**, **Consumer**, and a **Frontend (WebSocket)** — that exchange telemetry and control commands in real time. +It connects multiple components — a **Generator**, **Hub**, **Consumer**, and a **Frontend (WebSocket)** — that exchange telemetry, control commands in real time and visualizes metrics. + +> This project was developed over the course of **four weeks** as part of the **Training Month (TM)** as a **Backend Engineer** for **Hyperloop UPV**. +> The different *branches* reflect the project’s progress throughout the four weeks. +> **Backend** components (contained in this repository) were implemented by me, while the **frontend** —included as a Git [*submodule*](https://github.com/maximka76667/TM-software-H11) — was developed by **[maximka76667](https://github.com/maximka76667)**. +> In the final stage, both parts were integrated into a fully functional end-to-end system. ## Table of Contents * [Features](#features) @@ -12,40 +17,53 @@ It connects multiple components — a **Generator**, **Hub**, **Consumer**, and * [Development Notes](#development-notes) ## Features -* Synthetic **Generator** that simulates a vehicle, producing random sensor data (speed, pressure, temperature). +* Synthetic **Generator** models speed, pressure, and temperature, reacting to `start`, `stop`, `accelerate`, and `mode` commands with mode-aware speed caps. * **Hub** that routes data and commands between Generator, Consumer, and Frontend: - + * fans telemetry to the frontend (WebSocket) and consumer (UDP) while forwarding commands from the frontend to the generator (channels) and consumer (TCP). * `ResultData` is sent to both the Frontend (WS) and Consumer (UDP). * `Command` messages flow from the Frontend (WS) to the Generator and Consumer (TCP). -* **Consumer** that receives and logs both telemetry results and commands in rotating `.jsonl` files. -* Real-time **WebSocket communication** with the Frontend for live telemetry and remote control. -* Dynamic **command handling**: the vehicle can start, stop, accelerate, or change driving mode (`eco`, `normal`, `speed`). -* Centralized **config system** controlling intervals, modes, and ports. -* Includes an automated **test suite** that simulates a frontend connection sending commands and logging responses. +* **Consumer** listens on UDP/TCP, autodetects `ResultData` vs `Command` payloads, and rotates structured `.jsonl` logs across `logs/`, `logs/data/`, and `logs/commands/`. +* **React frontend** (Vite + Tailwind) offers connect/disconnect controls, command groups, toast feedback, and metric tiles that track the latest batch stats in real time. +* Central **config package** exposes runtime tuning parameters — settings that define how the system behaves when running, such as sensor cadence, aggregation windows, port bindings, log rotation, and vehicle identity. +* End-to-end **integration test** (`cmd/app/main_test.go`) spins up the stack, drives scripted WebSocket commands, and records the telemetry stream under `test/`. +* `start.sh` boots the Go backend and frontend dev server together for a single command developer experience. ## Repository Layout - ``` TM-software-H11/ │ .gitignore +│ .gitmodules │ config.json │ go.mod │ go.sum -│ main_test.go -│ main.go │ README.md +│ start.sh +│ +├───cmd +│ └───app +│ main.go +│ main_test.go +│ ├───config │ config.go +│ +├───frontend +│ │ package.json +│ │ package-lock.json +│ └───src +│ ├───internal │ ├───consumer │ │ consumer.go │ │ listener.go │ │ logger.go │ │ parser.go +│ │ │ ├───generator │ │ generator.go │ │ processor.go │ │ sensor.go +│ │ │ ├───hub │ │ hub.go │ │ tcphandler.go @@ -56,79 +74,96 @@ TM-software-H11/ │ command.go │ resultData.go │ sensorData.go +│ ├───logs -└───test_logs +│ ├───commands +│ └───data +└───test + test_logs.jsonl ``` ## Getting Started - -1. Install Go 1.21 or newer. -2. Clone the repository and enter the project directory: +1. Install Go 1.25.1 or newer and Node.js 20+ (with npm). +2. Clone the repository, change into it and update the submodule: ```bash git clone https://github.com/vasyl-ks/TM-software-H11.git cd TM-software-H11 + git submodule update --init --recursive + + ``` +3. Inspect and adjust `config.json` for your desired intervals, ports, range and mode ratios. +4. Install frontend dependencies: + + ```bash + cd frontend + npm install + ``` +5. Run the stack: + + ```bash + cd .. + ./start.sh ``` -3. Inspect and adjust `config.json` for your desired intervals, ports, and speed mode ratios. -4. Run the system: + + > The script launches `go run ./cmd/app/main.go` and `npm run dev` (Vite). Stop with `Ctrl+C`. +6. To run only the backend: ```bash - go run main.go + go run ./cmd/app/main.go ``` -5. Optionally, execute tests to simulate a frontend: + + > Then start the frontend separately with `npm run dev` inside `frontend/` (use `-- --host` if you need LAN access). +7. To execute the integration test (writes logs under `test/`): ```bash - go test -v + go test ./cmd/app -run TestFrontendSimulation -v ``` -6. Watch logs under `/logs` — telemetry and commands are saved as `.jsonl` files. +8. Inspect telemetry and command logs in `logs/` after running. Files rotate automatically when `maxLines` is reached. ## Configuration -`config.json` defines the runtime behavior and communication parameters: -* **Vehicle** - * `vehicleID`: unique identifier for telemetry. -* **Sensor** - * `intervalMilliSeconds`: how often new sensor data is generated. - * `minSpeed`, `maxSpeed`, `minPressure`, `maxPressure`, `minTemp`, `maxTemp`: generation ranges. - * `ecoMode`, `normalMode`, `speedMode`: scaling ratios for maximum speed behavior. -* **Processor** - * `intervalMilliSeconds`: how often readings are aggregated into statistics. -* **Logger** - * `maxLines`: maximum lines per log file before rotation. - * `fileDir`: directory for log storage. -* **Hub** - * `udpPort`, `tcpPort`, `wsPort`: network ports for communication. - * `bufferSize`: size for UDP/TCP packet buffers. -Configuration is read once at process start; update the file and restart the application to apply changes. +`config.json` governs how the system behaves: +* **vehicle** + * `vehicleID`: identifier stamped on telemetry batches. +* **sensor** + * `intervalMilliSeconds`: cadence for raw SensorData generation. + * `minSpeed`, `maxSpeed`, `minPressure`, `maxPressure`, `minTemp`, `maxTemp`: randomization bounds. + * `ecoMode`, `normalMode`, `speedMode`: relative limits applied when each driving mode is active. +* **processor** + * `intervalMilliSeconds`: aggregation window for computing averages/min/max. +* **logger** + * `maxLines`: number of log entries before a new file is created. + * `fileDir`: root folder for combined, data-only, and command-only `.jsonl` logs. +* **hub** + * `udpPort`, `tcpPort`, `wsPort`: loopback endpoints used by consumer and frontend. + * `bufferSize`: byte buffer used by UDP/TCP readers. + +Configuration loads once on startup via `config.LoadConfig()`. Update the file and restart to apply changes. ## System Flow 1. **Generator** - * `Sensor` continuously emits simulated sensor readings. - * Receives `Command` messages to alter vehicle behavior (start, stop, accelerate, mode). - * `Process` aggregates data into `ResultData` summaries and sends them to the Hub. + * `Sensor` emits random-but-mode-aware speed, pressure, and temperature readings and reacts to incoming commands. + * `Process` batches readings for the configured interval, fan-outs calculations across goroutines, and forwards summarized `ResultData`. 2. **Hub** - * Acts as the central bridge between Generator, Frontend, and Consumer. - * Forwards telemetry (`ResultData`) to: - * Consumer (via UDP) - * Frontend (via WebSocket) - * Forwards control commands (`Command`) from the Frontend (via WebSocket) to: - * Generator (via internal channel) - * Consumer (via TCP) + * Registers `/api/stream` and upgrades HTTP requests to WebSocket connections. + * Streams each `ResultData` batch to connected frontend and the consumer (UDP) while duplicating commands to generator (channels) and consumer (TCP). 3. **Consumer** - * Listens for UDP results and TCP commands. - * Parses both `ResultData` and `Command` messages. - * Logs entries into rotating `.jsonl` files with timestamps. + * Opens UDP and TCP listeners (signalling readiness through `consumer.Ready`). + * Differentiates telemetry vs command payloads, then logs each to rotating files with timestamps. 4. **Frontend** - * Connects via WebSocket to `/api/stream`. - * Sends commands (`{"action": "start"}` etc.) and receives live telemetry. + * Uses a WebSocket hook to connect on demand, show connection status, render the latest metrics, and send predefined commands or custom acceleration values. + * Provides toast notifications for connect/disconnect, command results, and validation feedback. 5. **Tests** - * `main_test.go` simulates a frontend connection, sends commands with delays, and validates Hub responses. - * Watch test logs under `/test_logs` — saved as `.jsonl` files. + * `TestFrontendSimulation` spins up the services, drives a scripted command sequence, captures the WebSocket stream, and persists the interaction under `test/test_logs.jsonl`. ## Development Notes * The system is fully concurrent, using goroutines and channels for communication. +* Goroutines and channels orchestrate concurrency; `consumer.Ready` ensures network listeners are up before the hub dials TCP/UDP. +* The system is fully concurrent, using goroutines and channels for communication. * Each transport layer (UDP, TCP, WS) runs independently but shares data via the Hub. +* WebSocket handlers handle graceful close frames and distinguish expected vs unexpected disconnects for cleaner logs. * Generator speed adjusts based on commands in real time. +* Temperature and pressure are randomly generated based on speed, and they increase or decrease at different rates depending on the mode. * Frontend tests provide an end-to-end check of the communication pipeline. * Logs in `.jsonl` format are machine- and human-readable, suitable for further analysis. -* At this moment, temperature and pressure values are independent and randomly generated; they are not linked to vehicle state. * Once the program starts, the vehicle begins sending telemetry automatically, but it must be started and accelerated through commands to simulate motion. \ No newline at end of file diff --git a/main.go b/cmd/app/main.go similarity index 84% rename from main.go rename to cmd/app/main.go index 6ea95ba..36d1fa6 100644 --- a/main.go +++ b/cmd/app/main.go @@ -1,6 +1,8 @@ package main import ( + "log" + "github.com/vasyl-ks/TM-software-H11/config" consumer "github.com/vasyl-ks/TM-software-H11/internal/consumer" generator "github.com/vasyl-ks/TM-software-H11/internal/generator" @@ -16,23 +18,23 @@ Start loads configuration values, creates an internal channel, and then calls th The final "select {}" keep the program running indefinitely. */ func Start() { + log.Println("[INFO][Main] Running") + // Load configuration (const variables) config.LoadConfig() + // Wait for config to finish + <-config.Done + // Creates internal channel of ResultData and Command between Generator and Hub. resultChan := make(chan modelPkg.ResultData) commandChan := make(chan modelPkg.Command) // Run Generator, Hub and Consumer. go generator.Run(commandChan, resultChan) - - // Consumer must initialize UDP&TCP listeners, before Hub tries to connect. - ready := make(chan struct{}) - go consumer.Run(ready) - <-ready - + go consumer.Run() + <-consumer.Ready // Wait for consumer to initialize UDP&TCP listeners, before Hub tries to connect. go hub.Run(resultChan, commandChan) - select {} } diff --git a/main_test.go b/cmd/app/main_test.go similarity index 96% rename from main_test.go rename to cmd/app/main_test.go index 6fbd5ca..4d65fec 100644 --- a/main_test.go +++ b/cmd/app/main_test.go @@ -35,6 +35,9 @@ func TestFrontendSimulation(t *testing.T) { defer logFile.Close() logger := log.New(logFile, "", log.LstdFlags) + // Wait for config to finish + <-config.Done + // Connect to running Hub wsURL := fmt.Sprintf("ws://localhost:%d/api/stream", config.Hub.WSPort) dialer := websocket.Dialer{HandshakeTimeout: 3 * time.Second} @@ -56,7 +59,7 @@ func TestFrontendSimulation(t *testing.T) { {"action": "start"}, {"action": "accelerate", "params": 130}, {"action": "accelerate", "params": -10}, - {"action": "mode", "params": "speed"}, + {"action": "mode", "params": "sport"}, {"action": "accelerate"}, {"action": "accelerate", "params": 40}, {"action": "mode", "params": "eco"}, diff --git a/config.json b/config.json index e848869..fc73e4f 100644 --- a/config.json +++ b/config.json @@ -24,7 +24,7 @@ "hub": { "udpPort": 10000, "tcpPort": 10000, - "wsPort": 10000, + "wsPort": 3000, "bufferSize": 1024 } } \ No newline at end of file diff --git a/config/config.go b/config/config.go index 7fc27d7..fb1e544 100644 --- a/config/config.go +++ b/config/config.go @@ -2,7 +2,7 @@ package config import ( "encoding/json" - "fmt" + "log" "os" "time" ) @@ -49,12 +49,17 @@ var Processor processor var Logger logger var Hub hub +// Exported channel to signal when config finishes loading +var Done = make(chan struct{}) + // LoadConfig reads config.json and configures func LoadConfig() { + defer log.Println("[INFO][Config] Loaded.") + // Open the file file, err := os.Open("config.json") if err != nil { - fmt.Println("Error loading configuration: ", err) + log.Println("[ERROR][Config] Error opening config file: ", err) return } defer file.Close() @@ -72,7 +77,7 @@ func LoadConfig() { }{} err = decoder.Decode(&temp) if err != nil { - fmt.Println("Error loading configuration: ", err) + log.Println("[ERROR][Config] Error decoding config struct: ", err) return } @@ -86,4 +91,6 @@ func LoadConfig() { // Derive time.Duration to Seconds Sensor.Interval = time.Duration(Sensor.I) * time.Millisecond Processor.Interval = time.Duration(Processor.I) * time.Millisecond + + close(Done) } diff --git a/frontend b/frontend new file mode 160000 index 0000000..c074852 --- /dev/null +++ b/frontend @@ -0,0 +1 @@ +Subproject commit c0748524f4477542140053f53d23905c3338473f diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index 8333c04..d443081 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -1,6 +1,8 @@ package consumer import ( + "log" + "github.com/vasyl-ks/TM-software-H11/internal/model" ) @@ -10,14 +12,16 @@ Consumer initializes the byteChan and jsonChan channels, and calls the Listen, P - Parse receives a JSON from byteChan, parses it to ResultData and sends it through resultChan. - Log receives a ResultData from resultChan and logs it. */ -func Run(ready chan<- struct{}) { +func Run() { + defer log.Println("[INFO][Consumer] Running.") + // Create unbuffered channels. byteChan := make(chan []byte) resultChan := make(chan model.ResultData) commandChan := make(chan model.Command) // Launch concurrent goroutines. - go Listen(byteChan, ready) + go Listen(byteChan) go Parse(byteChan, resultChan, commandChan) go Log(resultChan, commandChan) } diff --git a/internal/consumer/listener.go b/internal/consumer/listener.go index ffeb302..1ab3268 100644 --- a/internal/consumer/listener.go +++ b/internal/consumer/listener.go @@ -3,36 +3,44 @@ package consumer import ( "fmt" "io" + "log" "net" + "github.com/vasyl-ks/TM-software-H11/config" ) +var Ready = make(chan struct{}) + /* Listen binds a UDP socket on config.Sender.ClientPort and forwards incoming datagrams to out. - Copies each datagram into a new slice to avoid buffer reuse. */ -func Listen(outChan chan<- []byte, ready chan<- struct{}) { - // Listen for UDP Traffic - udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("127.0.0.1:%d", config.Hub.UDPPort)) +func Listen(outChan chan<- []byte) { + addrUDP := fmt.Sprintf("127.0.0.1:%d", config.Hub.UDPPort) + addrTCP := fmt.Sprintf("127.0.0.1:%d", config.Hub.TCPPort) + + // Resolve and bind UDP + udpAddr, err := net.ResolveUDPAddr("udp", addrUDP) if err != nil { - fmt.Println("Error resolving UDP address:", err) + log.Printf("[ERROR][Consumer][Listen] Failed to resolve UDP address %s: %v", addrUDP, err) return } udpConn, err := net.ListenUDP("udp", udpAddr) if err != nil { - fmt.Println("Error listening on UDP:", err) + log.Printf("[ERROR][Consumer][Listen] Failed to listen on UDP %s: %v", addrUDP, err) return } - // Listen for TCP Traffic - tcpListener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", config.Hub.TCPPort)) + // Bind TCP + tcpListener, err := net.Listen("tcp", addrTCP) if err != nil { - fmt.Println("Error listening on TCP:", err) + log.Printf("[ERROR][Consumer][Listen] Failed to listen on TCP %s: %v", addrTCP, err) return } // Notify that listeners are ready - close(ready) + close(Ready) + log.Printf("[INFO][Consumer][Listen] Listening on UDP %s and TCP %s", addrUDP, addrTCP) // UDP handler goroutine go func() { @@ -41,7 +49,7 @@ func Listen(outChan chan<- []byte, ready chan<- struct{}) { for { n, _, err := udpConn.ReadFromUDP(buf) if err != nil { - fmt.Printf("Error reading from UDP: %v\n", err) + log.Printf("[ERROR][Consumer][Listen] Error reading from UDP: %v\n", err) continue } payload := make([]byte, n) @@ -55,7 +63,7 @@ func Listen(outChan chan<- []byte, ready chan<- struct{}) { defer tcpListener.Close() conn, err := tcpListener.Accept() if err != nil { - fmt.Println("Error accepting TCP connection:", err) + fmt.Println("[ERROR][Consumer][Listen] Error accepting TCP connection:", err) return } defer conn.Close() @@ -64,7 +72,7 @@ func Listen(outChan chan<- []byte, ready chan<- struct{}) { n, err := conn.Read(buf) if err != nil { if err != io.EOF { - fmt.Printf("Error reading TCP: %v\n", err) + fmt.Printf("[ERROR][Consumer][Listen] Error reading TCP: %v\n", err) } break } @@ -73,30 +81,4 @@ func Listen(outChan chan<- []byte, ready chan<- struct{}) { outChan <- payload } }() - - - for { - conn, err := tcpListener.Accept() - if err != nil { - fmt.Println("Error accepting TCP connection:", err) - continue - } - - go func(c net.Conn) { - defer c.Close() - buf := make([]byte, config.Hub.BufferSize) - for { - n, err := c.Read(buf) - if err != nil { - if err != io.EOF { - fmt.Printf("read TCP: %v\n", err) - } - break - } - payload := make([]byte, n) - copy(payload, buf[:n]) - outChan <- payload - } - }(conn) - } } diff --git a/internal/consumer/logger.go b/internal/consumer/logger.go index cdacf9b..6e325a6 100644 --- a/internal/consumer/logger.go +++ b/internal/consumer/logger.go @@ -2,13 +2,73 @@ package consumer import ( "fmt" + "log" "os" + "path/filepath" "time" - "log" + "github.com/vasyl-ks/TM-software-H11/config" "github.com/vasyl-ks/TM-software-H11/internal/model" ) +// Helper type for grouped loggers +type Loggers struct { + Main *log.Logger + Data *log.Logger + Command *log.Logger +} + +// Helper function to create a logger for a given subdirectory and prefix +func createLogger(baseDir, subDir, prefix string) (*log.Logger, *os.File, error) { + dirPath := filepath.Join(baseDir, subDir) + if err := os.MkdirAll(dirPath, 0755); err != nil { + return nil, nil, fmt.Errorf("[ERROR][Consumer][Log] Error creating directory %s: %w", dirPath, err) + } + + filename := fmt.Sprintf("%s_%s.jsonl", prefix, time.Now().Format("20060102_150405")) + fullPath := filepath.Join(dirPath, filename) + + file, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return nil, nil, fmt.Errorf("[ERROR][Consumer][Log] Error opening log file %s: %w", fullPath, err) + } + + logger := log.New(file, "", 0) + return logger, file, nil +} + +// Helper function to write a ResultData +func writeResult(loggers Loggers, r model.ResultData) { + msg := fmt.Sprintf( + "[DATA] Created at %s, Processed at %s, Logged at %s | "+ + "AvgSpeed: %5.2f, MinSpeed: %5.2f, MaxSpeed: %5.2f | "+ + "AvgTemp: %5.2f, MinTemp: %5.2f, MaxTemp: %5.2f | "+ + "AvgPressure: %4.2f, MinPressure: %4.2f, MaxPressure: %4.2f", + r.CreatedAt.Format("15:04:05.000000"), + r.ProcessedAt.Format("15:04:05.000000"), + time.Now().Local().Format("15:04:05.000000"), + r.AverageSpeed, r.MinimumSpeed, r.MaximumSpeed, + r.AverageTemperature, r.MinimumTemperature, r.MaximumTemperature, + r.AveragePressure, r.MinimumPressure, r.MaximumPressure, + ) + + loggers.Main.Println(msg) + loggers.Data.Println(msg) +} + +// Helper function to write a Command +func writeCommand(loggers Loggers, cmd model.Command) { + msg := fmt.Sprintf( + "[COMMAND] Received at %s | Action: %-12s | Params: %-8v", + time.Now().Local().Format("15:04:05.000000"), + cmd.Action, + cmd.Params, + ) + + loggers.Main.Println(msg) + loggers.Command.Println(msg) +} + /* Log receives ResultData and Command messages from their respective channels and logs them to rotating log files. @@ -19,58 +79,65 @@ and logs them to rotating log files. */ func Log(inResultChan <-chan model.ResultData, inCommandChan <-chan model.Command) { lineCount := 0 - fileDir := config.Logger.FileDir // defines directory where the log is saved. + fileDir := config.Logger.FileDir // defines directory where the log is saved. maxLines := config.Logger.MaxLines // defines the maximum number of ResultData to log in a single file. - // Check directory - err := os.MkdirAll(fileDir, 0755) + // Create base directory + if err := os.MkdirAll(fileDir, 0755); err != nil { + log.Println("[ERROR][Consumer][Log] Error creating directory:", err) + return + } + + // Create all loggers + mainLogger, mainFile, err := createLogger(fileDir, "", "log") + if err != nil { + log.Println(err) + return + } + defer mainFile.Close() + + dataLogger, dataFile, err := createLogger(fileDir, "data", "data") if err != nil { - fmt.Println("Error creating directory:", err) + log.Println(err) return } + defer dataFile.Close() - // Create a new file - filename := fmt.Sprintf("%s/sensor_log_%s.jsonl", fileDir, time.Now().Format("20060102_150405")) - file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + commandLogger, commandFile, err := createLogger(fileDir, "commands", "command") if err != nil { - fmt.Println("Error opening log file:", err) + log.Println(err) return } - defer file.Close() - log.SetOutput(file) - log.SetFlags(0) + defer commandFile.Close() + + // Group them for easier access + loggers := Loggers{ + Main: mainLogger, + Data: dataLogger, + Command: commandLogger, + } + + log.Println("[INFO][Consumer][Log] Running.") for { select { - // Receive ResultData - case resultData, ok := <-inResultChan: + // Receive ResultData + case resultData, ok := <-inResultChan: if !ok { inResultChan = nil // channel closed continue } // Log in the file - log.Printf( - "[DATA] || Created at %s, Processed at %s, Logged at %s | AvgSpeed: %5.2f, MinSpeed: %5.2f, MaxSpeed: %5.2f | AvgTemp: %5.2f, MinTemp: %5.2f, MaxTemp: %5.2f | AvgPressure: %4.2f, MinPressure: %4.2f, MaxPressure: %4.2f ||\n", - resultData.CreatedAt.Format("15:04:05.000000"), - resultData.ProcessedAt.Format("15:04:05.000000"), - time.Now().Local().Format("15:04:05.000000"), - resultData.AverageSpeed, resultData.MinSpeed, resultData.MaxSpeed, - resultData.AverageTemp, resultData.MinTemp, resultData.MaxTemp, - resultData.AveragePressure, resultData.MinPressure, resultData.MaxPressure, - ) - - // Receive ResultData - case cmd, ok := <-inCommandChan: + writeResult(loggers, resultData) + + // Receive Command + case cmd, ok := <-inCommandChan: if !ok { inCommandChan = nil // channel closed continue } // Log in the file - log.Printf("[COMMAND] || Received at %s | Action: %s | Params: %+v ||", - time.Now().Local().Format("15:04:05.000000"), - cmd.Action, - cmd.Params, - ) + writeCommand(loggers, cmd) } // Exit if both channels are closed @@ -80,17 +147,35 @@ func Log(inResultChan <-chan model.ResultData, inCommandChan <-chan model.Comman lineCount++ if lineCount >= maxLines { - file.Close() - + mainFile.Close() + dataFile.Close() + commandFile.Close() + // Create a new file - filename := fmt.Sprintf("%s/sensor_log_%s.jsonl", fileDir, time.Now().Format("20060102_150405")) - file, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + mainLogger, mainFile, err = createLogger(fileDir, "", "log") if err != nil { - fmt.Println("Error opening new log file:", err) + fmt.Println(err) return } - defer file.Close() - log.SetOutput(file) + + dataLogger, dataFile, err = createLogger(fileDir, "data", "data") + if err != nil { + fmt.Println(err) + return + } + + commandLogger, commandFile, err = createLogger(fileDir, "commands", "command") + if err != nil { + fmt.Println(err) + return + } + + // Group them for easier access + loggers = Loggers{ + Main: mainLogger, + Data: dataLogger, + Command: commandLogger, + } lineCount = 0 } diff --git a/internal/consumer/parser.go b/internal/consumer/parser.go index 73feb3f..fd86bae 100644 --- a/internal/consumer/parser.go +++ b/internal/consumer/parser.go @@ -2,7 +2,8 @@ package consumer import ( "encoding/json" - "fmt" + "log" + "github.com/vasyl-ks/TM-software-H11/internal/model" ) @@ -12,6 +13,8 @@ attempts to decode each into either a ResultData or a Command object, then send parsed messages to their respective output channels. */ func Parse(inChan <-chan []byte, outResultChan chan<- model.ResultData, outCommandChan chan<- model.Command) { + log.Println("[INFO][Consumer][Parse] Running.") + for payload := range inChan { // First, try to unmarshal as ResultData var res model.ResultData @@ -28,6 +31,6 @@ func Parse(inChan <-chan []byte, outResultChan chan<- model.ResultData, outComma } // If neither works, log error - fmt.Printf("Parse: unrecognized JSON payload: %s\n", string(payload)) + log.Printf("[Error][Consumer][Parse] Unrecognized JSON payload: %s\n", string(payload)) } } \ No newline at end of file diff --git a/internal/generator/generator.go b/internal/generator/generator.go index b69ab2b..014a668 100644 --- a/internal/generator/generator.go +++ b/internal/generator/generator.go @@ -1,6 +1,8 @@ package generator import ( + "log" + "github.com/vasyl-ks/TM-software-H11/internal/model" ) @@ -11,6 +13,8 @@ Generator initializes the dataChan channel, then calls the Sensor and Process go - Process receives SensorData, calculates statistics, builds a Result, and sends it through outResultChan. */ func Run(inCommandChan <-chan model.Command, outResultChan chan<- model.ResultData) { + defer log.Println("[INFO][Generator] Running.") + // Create unbuffered channel. dataChan := make(chan model.SensorData) diff --git a/internal/generator/processor.go b/internal/generator/processor.go index 1e12e0a..5e69a91 100644 --- a/internal/generator/processor.go +++ b/internal/generator/processor.go @@ -1,6 +1,7 @@ package generator import ( + "log" "time" "github.com/vasyl-ks/TM-software-H11/config" @@ -30,9 +31,9 @@ func calculateAverage(data []model.SensorData) model.ResultData { } return model.ResultData{ - AverageSpeed: sumSpeed / n, - AverageTemp: sumTemp / n, - AveragePressure: sumPressure / n, + AverageSpeed: sumSpeed / n, + AverageTemperature: sumTemp / n, + AveragePressure: sumPressure / n, } } @@ -55,9 +56,9 @@ func calculateMin(data []model.SensorData) model.ResultData { } return model.ResultData{ - MinSpeed: minSpeed, - MinTemp: minTemp, - MinPressure: minPressure, + MinimumSpeed: minSpeed, + MinimumTemperature: minTemp, + MinimumPressure: minPressure, } } @@ -80,9 +81,9 @@ func calculateMax(data []model.SensorData) model.ResultData { } return model.ResultData{ - MaxSpeed: maxSpeed, - MaxTemp: maxTemp, - MaxPressure: maxPressure, + MaximumSpeed: maxSpeed, + MaximumTemperature: maxTemp, + MaximumPressure: maxPressure, } } @@ -103,6 +104,8 @@ func Process(inChan <-chan model.SensorData, outChan chan<- model.ResultData) { ticker := time.NewTicker(batchInterval) defer ticker.Stop() + log.Println("[INFO][Generator][Process] Running.") + for { select { case data := <-inChan: @@ -128,18 +131,18 @@ func Process(inChan <-chan model.SensorData, outChan chan<- model.ResultData) { // Build ResultData result := model.ResultData{ - AverageSpeed: avg.AverageSpeed, - MinSpeed: min.MinSpeed, - MaxSpeed: max.MaxSpeed, - AverageTemp: avg.AverageTemp, - MinTemp: min.MinTemp, - MaxTemp: max.MaxTemp, - AveragePressure: avg.AveragePressure, - MinPressure: min.MinPressure, - MaxPressure: max.MaxPressure, - VehicleID: dataSlice[0].VehicleID, - CreatedAt: tme, - ProcessedAt: time.Now().Local(), + AverageSpeed: avg.AverageSpeed, + MinimumSpeed: min.MinimumSpeed, + MaximumSpeed: max.MaximumSpeed, + AverageTemperature: avg.AverageTemperature, + MinimumTemperature: min.MinimumTemperature, + MaximumTemperature: max.MaximumTemperature, + AveragePressure: avg.AveragePressure, + MinimumPressure: min.MinimumPressure, + MaximumPressure: max.MaximumPressure, + VehicleID: dataSlice[0].VehicleID, + CreatedAt: tme, + ProcessedAt: time.Now().Local(), } outChan <- result diff --git a/internal/generator/sensor.go b/internal/generator/sensor.go index f51248b..f13c460 100644 --- a/internal/generator/sensor.go +++ b/internal/generator/sensor.go @@ -1,6 +1,7 @@ package generator import ( + "log" "math/rand" "strings" "time" @@ -17,7 +18,7 @@ Speed behavior responds to control commands: - "Start" → enables movement. - "Stop" → sets speed to 0. - "Accelerate n" → increases current speed by n. -- "Mode" → changes driving mode (eco|normal|speed). +- "Mode" → changes driving mode (eco|normal|sport). */ func Sensor(inCommandChan <-chan model.Command, outChan chan<- model.SensorData) { sensorInterval := config.Sensor.Interval // defines how often a new sensor reading is generated. @@ -34,60 +35,112 @@ func Sensor(inCommandChan <-chan model.Command, outChan chan<- model.SensorData) mode := "normal" started := false + log.Println("[INFO][Generator][Sensor] Running.") + for { select { - case cmd := <- inCommandChan: - switch strings.ToLower(cmd.Action) { - case "start": - started = true - case "stop": - started = false - currentSpeed = 0 - case "accelerate": - // Try to read numeric parameter - if val, ok := cmd.Params.(float64); ok { - currentSpeed += float32(val) - } - case "mode": - if val, ok := cmd.Params.(string); ok { - mode = strings.ToLower(val) - } + case cmd := <-inCommandChan: + switch strings.ToLower(cmd.Action) { + case "start": + started = true + log.Println("[INFO][Generator][Sensor] Started.") + case "stop": + started = false + currentSpeed = 0 + log.Println("[INFO][Generator][Sensor] Stopped.") + case "accelerate": + // Try to read numeric parameter + if val, ok := cmd.Params.(float64); ok { + currentSpeed += float32(val) + log.Printf("[INFO][Generator][Sensor] Accelerated by %f.", val) } - - case <- ticker.C: - // adjust max speed depending on mode - var maxAllowed float32 - switch mode { - case "eco": - maxAllowed = maxS * 0.5 - case "normal": - maxAllowed = maxS * 0.8 - case "speed": - maxAllowed = maxS - default: - maxAllowed = maxS * 0.8 + case "mode": + if val, ok := cmd.Params.(string); ok { + mode = strings.ToLower(val) + log.Printf("[INFO][Generator][Sensor] Mode changed to %s.", val) } + } - // simulate speed - if started { - if currentSpeed < minS { - currentSpeed = minS - } - if currentSpeed > maxAllowed { - currentSpeed = maxAllowed - } - } else { - currentSpeed = 0 - } + case <-ticker.C: + // adjust max speed depending on mode + var maxAllowed float32 + switch mode { + case "eco": + maxAllowed = maxS * 0.5 + case "normal": + maxAllowed = maxS * 0.8 + case "sport": + maxAllowed = maxS + default: + maxAllowed = maxS * 0.8 + } + + // adjust growth factors based on mode + var growthFactor float32 + switch mode { + case "eco": + growthFactor = 0.7 // slowest growth + case "normal": + growthFactor = 1.0 // standard growth + case "sport": + growthFactor = 1.3 // fastest growth + default: + growthFactor = 1.0 + } - data := model.SensorData{ - VehicleID: config.Vehicle.VehicleID, - Speed: currentSpeed, - Pressure: rand.Float32()*(maxP-minP) + minP, // 0-10 bar - Temperature: rand.Float32()*(maxT-minT) + minT, // 0-50 °C - CreatedAt: time.Now().Local(), + // simulate speed + if started { + if currentSpeed < minS { + currentSpeed = minS } - outChan <- data + if currentSpeed > maxAllowed { + currentSpeed = maxAllowed + } + } else { + currentSpeed = 0 + } + + // Normalize the current speed into a [0,1] range + // 0 means minimum speed, 1 means maximum speed + speedRatio := (currentSpeed - minS) / (maxS - minS) + if speedRatio < 0 { + speedRatio = 0 + } else if speedRatio > 1 { + speedRatio = 1 + } + + // Make pressure and temperature increase with speed + // Both grow linearly from their minimum to maximum values + // They grow faster on the sport mode and slower on eco. + pressure := minP + (speedRatio*growthFactor)*(maxP-minP) + temperature := minT + (speedRatio*growthFactor)*(maxT-minT) + + // Add a small random noise to simulate sensor variability + pressure += rand.Float32()*0.1 - 0.05 + temperature += rand.Float32()*0.5 - 0.25 + + // Adjust pressure to avoid negative values + if pressure < 0 { + pressure = 0 + } + + // Adjust pressure to avoid values above maximum. + if pressure > maxP { + pressure = maxP + } + if temperature > maxT { + temperature = maxT + } + + // Create SensorData + data := model.SensorData{ + VehicleID: config.Vehicle.VehicleID, + Speed: currentSpeed, + Pressure: pressure, + Temperature: temperature, + CreatedAt: time.Now().Local(), + } + outChan <- data } - } -} \ No newline at end of file + } +} diff --git a/internal/hub/hub.go b/internal/hub/hub.go index a8bc907..b16b91b 100644 --- a/internal/hub/hub.go +++ b/internal/hub/hub.go @@ -1,6 +1,7 @@ package hub import ( + "log" "fmt" "net/http" "github.com/vasyl-ks/TM-software-H11/config" @@ -14,6 +15,8 @@ Hub acts as a central bridge between the Generator, Frontend, and Consumer. - Consumer ↔ Hub: sends ResultData via UDP and Command via TCP. */ func Run(inResultChan <-chan model.ResultData, outCommandChan chan<- model.Command) { + defer log.Println("[INFO][Hub] Running.") + // Create unbuffered channel. internalCommandChan := make(chan model.Command) @@ -30,7 +33,7 @@ func Run(inResultChan <-chan model.ResultData, outCommandChan chan<- model.Comma go SendResultToFrontEnd(conn, inResultChan) }) go func() { - http.ListenAndServe(":"+fmt.Sprintf("%d", config.Hub.WSPort), nil) + http.ListenAndServe("127.0.0.1:"+fmt.Sprintf("%d", config.Hub.WSPort), nil) }() // UDP diff --git a/internal/hub/tcphandler.go b/internal/hub/tcphandler.go index cb88d1e..6a98c66 100644 --- a/internal/hub/tcphandler.go +++ b/internal/hub/tcphandler.go @@ -3,6 +3,7 @@ package hub import ( "encoding/json" "fmt" + "log" "net" "github.com/vasyl-ks/TM-software-H11/config" @@ -14,9 +15,10 @@ func CreateConnTCP() net.Conn { address := fmt.Sprintf("127.0.0.1:%d", config.Hub.TCPPort) conn, err := net.Dial("tcp", address) if err != nil { - fmt.Println("Error connecting via TCP:", err) + log.Println("[ERROR][Hub][TCP] Error connecting via TCP:", err) panic(err) } + log.Printf("[INFO][Hub][TCP] Established TCP connection from Hub to Consumer, on %s", address) return conn } @@ -33,7 +35,7 @@ func SendCommandToConsumer(conn net.Conn, inChan <-chan model.Command) { // Marshal ResultData to JSON-encoded []byte data, err := json.Marshal(command) if err != nil { - fmt.Println("Error marshalling WS command JSON:", err) + log.Println("[ERROR][Hub][TCP] Error marshalling WS command JSON:", err) continue } @@ -43,7 +45,7 @@ func SendCommandToConsumer(conn net.Conn, inChan <-chan model.Command) { // Send JSON via TCP _, err = conn.Write(data) if err != nil { - fmt.Println("Error sending via TCP:", err) + log.Println("[ERROR][Hub][TCP] Error sending via TCP:", err) break } } diff --git a/internal/hub/updhandler.go b/internal/hub/updhandler.go index ea0944b..d273451 100644 --- a/internal/hub/updhandler.go +++ b/internal/hub/updhandler.go @@ -3,6 +3,7 @@ package hub import ( "encoding/json" "fmt" + "log" "net" "github.com/vasyl-ks/TM-software-H11/config" @@ -12,15 +13,16 @@ import ( // CreateConnUDP establishes a UDP connection to the configured address and port. func CreateConnUDP() *net.UDPConn { // Client address - clientAddr := net.UDPAddr{ + address := net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: config.Hub.UDPPort, } - conn, err := net.DialUDP("udp", nil, &clientAddr) + conn, err := net.DialUDP("udp", nil, &address) if err != nil { - fmt.Println("Error connecting via UDP", err) + log.Println("[ERROR][Hub][UDP] Error connecting via UDP", err) panic(err) } + log.Printf("[INFO][Hub][UDP] Established UDP connection from Hub to Consumer, on %s", fmt.Sprintf("%s:%d", address.IP, address.Port)) return conn } @@ -37,14 +39,14 @@ func SendResultToConsumer(conn *net.UDPConn, inChan <-chan model.ResultData) { // Marshal ResultData to JSON-encoded []byte data, err := json.Marshal(resultData) if err != nil { - fmt.Println("Error marshalling WS result JSON", err) + log.Println("[ERROR][Hub][UDP] Error marshalling WS result JSON", err) continue } // Send JSON via UDP _, err = conn.Write(data) if err != nil { - fmt.Println("Error sending via UDP:", err) + log.Println("[ERROR][Hub][UDP] Error sending via UDP:", err) continue } } diff --git a/internal/hub/wshandler.go b/internal/hub/wshandler.go index e940aef..4c15f3e 100644 --- a/internal/hub/wshandler.go +++ b/internal/hub/wshandler.go @@ -4,6 +4,7 @@ import ( "encoding/json" "log" "net/http" + "strings" "github.com/gorilla/websocket" "github.com/vasyl-ks/TM-software-H11/internal/model" @@ -17,9 +18,10 @@ var upgrader = websocket.Upgrader{ func CreateConnWS(w http.ResponseWriter, r *http.Request) *websocket.Conn { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { - log.Println("Error upgrading to WebSocket:", err) + log.Println("[ERROR][Hub][WS] Error upgrading to WebSocket:", err) return nil } + log.Printf("[INFO][Hub][WS] Established WS connection between Hub and Frontend, on %s", conn.LocalAddr()) return conn } @@ -30,19 +32,26 @@ and forwards it to a channel. */ func ReceiveCommandFromFrontEnd(conn *websocket.Conn, outChan1 chan<- model.Command, outChan2 chan<- model.Command) { defer conn.Close() - + for { // Listen for WS Command JSON _, msg, err := conn.ReadMessage() if err != nil { - log.Println("Error reading WS command:", err) + if websocket.IsCloseError(err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived) { + log.Printf("[INFO][Hub][WS] Client disconnected normally: %v", err) // Expected error + } else { + log.Printf("[ERROR][Hub][WS] Error reading WS command: %v", err) // Unexpected error + } break } // Parse it to Command struct var cmd model.Command if err := json.Unmarshal(msg, &cmd); err != nil { - log.Println("Error parsing WS command JSON:", err) + log.Println("[ERROR][Hub][WS] Error parsing WS command JSON:", err) continue } @@ -53,23 +62,36 @@ func ReceiveCommandFromFrontEnd(conn *websocket.Conn, outChan1 chan<- model.Comm } /* -SendResultToFrontEnd receives ResultData from a channel, -marshals it to JSON-encoded []byte +SendResultToFrontEnd receives ResultData from a channel, +marshals it to JSON-encoded []byte and sends it via WS to the WebSocket client. */ func SendResultToFrontEnd(conn *websocket.Conn, inChan <-chan model.ResultData) { + defer func() { + conn.Close() + log.Printf("[INFO][Hub][WS] Writer closed connection: %s", conn.RemoteAddr()) + }() + // Receive ResultData from channel for result := range inChan { // Marshal ResultData to JSON-encoded []byte data, err := json.Marshal(result) if err != nil { - log.Println("Error marshalling WS result JSON:", err) + log.Println("[ERROR][Hub][WS] Error marshalling WS result JSON:", err) continue } // Send JSON via WS if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { - log.Println("Error sending via WS:", err) + if websocket.IsCloseError(err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived) || + strings.Contains(err.Error(), "close sent") { + log.Printf("[INFO][Hub][WS] Client disconnected during write: %v", err) // Expected error + } else { + log.Printf("[ERROR][Hub][WS] Error sending via WS: %v", err) // Unexpected error + } break } } diff --git a/internal/model/resultData.go b/internal/model/resultData.go index 703e472..8ffc8ec 100644 --- a/internal/model/resultData.go +++ b/internal/model/resultData.go @@ -8,16 +8,16 @@ containing average, minimum, and maximum values for both speed, temperature and and indemnifications such as its ID and the time it was generated and processed. */ type ResultData struct { - AverageSpeed float32 - MinSpeed float32 - MaxSpeed float32 - AverageTemp float32 - MinTemp float32 - MaxTemp float32 - AveragePressure float32 - MinPressure float32 - MaxPressure float32 - VehicleID string - CreatedAt time.Time - ProcessedAt time.Time + AverageSpeed float32 + MinimumSpeed float32 + MaximumSpeed float32 + AverageTemperature float32 + MinimumTemperature float32 + MaximumTemperature float32 + AveragePressure float32 + MinimumPressure float32 + MaximumPressure float32 + VehicleID string + CreatedAt time.Time + ProcessedAt time.Time } diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..21551fd --- /dev/null +++ b/start.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# start.sh - runs backend (Go) and frontend (Vite) for local development + +set -e + +echo "Starting backend..." +go run ./cmd/app/main.go & +BACK_PID=$! + +echo "Starting frontend..." +( + cd frontend + npm run dev +) + +echo "Shutting down backend..." +kill $BACK_PID 2>/dev/null || true \ No newline at end of file