From cbd7066cfcbb6ca1edd493a13a4b932dbd92d578 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Schottst=C3=A4dt?= Date: Fri, 3 Oct 2025 23:14:02 +0200 Subject: [PATCH] chore: better channels --- internal/core/domain/broadcast_channel.go | 104 +++++++------ internal/core/services/console_manager.go | 2 +- internal/core/services/procedure_launcher.go | 2 +- internal/core/services/ui_dev_service.go | 12 +- internal/handler/ui_dev_handler.go | 148 +++++-------------- internal/handler/websocket_handler.go | 57 ++++--- internal/utils/websocket.go | 85 +++++++++++ 7 files changed, 201 insertions(+), 209 deletions(-) create mode 100644 internal/utils/websocket.go diff --git a/internal/core/domain/broadcast_channel.go b/internal/core/domain/broadcast_channel.go index 5ca01ed..07fea1c 100644 --- a/internal/core/domain/broadcast_channel.go +++ b/internal/core/domain/broadcast_channel.go @@ -2,25 +2,20 @@ package domain import ( "sync" - - "github.com/highcard-dev/daemon/internal/utils/logger" + "time" ) type BroadcastChannel struct { - // Registered clients. - Clients map[chan *[]byte]bool - - // Inbound messages from the clients. - Broadcast chan []byte - - // Mutex to protect concurrent access to Clients map - mu sync.RWMutex + clients map[chan *[]byte]bool + broadcast chan []byte + mu sync.RWMutex + closed bool } func NewHub() *BroadcastChannel { return &BroadcastChannel{ - Broadcast: make(chan []byte, 100), // Buffered channel to handle bursts of file changes - Clients: make(map[chan *[]byte]bool), + broadcast: make(chan []byte, 100), // Buffered to prevent blocking + clients: make(map[chan *[]byte]bool), } } @@ -28,72 +23,73 @@ func (h *BroadcastChannel) Subscribe() chan *[]byte { h.mu.Lock() defer h.mu.Unlock() - client := make(chan *[]byte, 50) // Increased buffer size to handle message bursts - h.Clients[client] = true + if h.closed { + return nil + } + + client := make(chan *[]byte, 25) // Reasonable buffer + h.clients[client] = true return client } func (h *BroadcastChannel) Unsubscribe(client chan *[]byte) { + if client == nil { + return + } + h.mu.Lock() defer h.mu.Unlock() - if _, exists := h.Clients[client]; exists { - delete(h.Clients, client) + if _, exists := h.clients[client]; exists { + delete(h.clients, client) close(client) } } -func (h *BroadcastChannel) CloseChannel() { +// Broadcast sends data to all clients, returns false if dropped +func (h *BroadcastChannel) Broadcast(data []byte) bool { + if h.closed { + return false + } + + select { + case h.broadcast <- data: + return true + case <-time.After(50 * time.Millisecond): // Quick timeout + return false + } +} + +func (h *BroadcastChannel) Close() { h.mu.Lock() defer h.mu.Unlock() - close(h.Broadcast) - for client := range h.Clients { + if h.closed { + return + } + + h.closed = true + close(h.broadcast) + + for client := range h.clients { close(client) } - h.Clients = make(map[chan *[]byte]bool) + h.clients = make(map[chan *[]byte]bool) } func (h *BroadcastChannel) Run() { - for { - message, more := <-h.Broadcast - if !more { - logger.Log().Debug("Broadcast channel closed") - return - } + defer h.Close() + for data := range h.broadcast { h.mu.RLock() - clients := make([]chan *[]byte, 0, len(h.Clients)) - for client := range h.Clients { - clients = append(clients, client) - } - h.mu.RUnlock() - - // Track clients to remove (dead connections) - var deadClients []chan *[]byte - - for _, client := range clients { + for client := range h.clients { select { - case client <- &message: - // Successfully sent the message + case client <- &data: + // Sent successfully default: - // Client channel is blocked or closed, mark for removal - logger.Log().Debug("Client channel blocked or closed, marking for removal") - deadClients = append(deadClients, client) - } - } - - // Remove dead clients - if len(deadClients) > 0 { - h.mu.Lock() - for _, deadClient := range deadClients { - if _, exists := h.Clients[deadClient]; exists { - delete(h.Clients, deadClient) - // Don't close the channel here as it might be closed elsewhere - logger.Log().Debug("Removed dead client from broadcast channel") - } + // Client blocked, skip (will be cleaned up later if persistent) } - h.mu.Unlock() } + h.mu.RUnlock() } } diff --git a/internal/core/services/console_manager.go b/internal/core/services/console_manager.go index 4b166e7..8e2168f 100644 --- a/internal/core/services/console_manager.go +++ b/internal/core/services/console_manager.go @@ -42,7 +42,7 @@ func (cm *ConsoleManager) AddConsoleWithChannel(id string, consoleType domain.Co go func() { for data := range channel { b := []byte(data) - newChannel.Broadcast <- b + newChannel.Broadcast(b) cm.logManager.AddLine(id, b) } close(done) diff --git a/internal/core/services/procedure_launcher.go b/internal/core/services/procedure_launcher.go index 4011794..c2d69d2 100644 --- a/internal/core/services/procedure_launcher.go +++ b/internal/core/services/procedure_launcher.go @@ -71,7 +71,7 @@ func (sc *ProcedureLauncher) LaunchPlugins() error { if !ok { console, _ = sc.consoleManager.AddConsoleWithChannel(item.Stream, domain.ConsoleTypePlugin, item.Stream, make(chan string)) } - console.Channel.Broadcast <- []byte(item.Data) + console.Channel.Broadcast([]byte(item.Data)) } } }() diff --git a/internal/core/services/ui_dev_service.go b/internal/core/services/ui_dev_service.go index bb6256a..8a60ef4 100644 --- a/internal/core/services/ui_dev_service.go +++ b/internal/core/services/ui_dev_service.go @@ -122,7 +122,7 @@ func (uds *UiDevService) StopWatching() error { // Close the broadcast channel to clean up subscribers if uds.broadcastChannel != nil { - uds.broadcastChannel.CloseChannel() + uds.broadcastChannel.Close() uds.broadcastChannel = nil } @@ -252,12 +252,10 @@ func (uds *UiDevService) handleFileEvent(event fsnotify.Event) { return } - // Broadcast the event to all subscribers with a timeout to prevent blocking - select { - case uds.broadcastChannel.Broadcast <- eventData: - logger.Log().Debug("File change event broadcasted", zap.String("path", event.Name), zap.String("op", event.Op.String())) - case <-time.After(100 * time.Millisecond): // Timeout after 100ms - logger.Log().Warn("Failed to broadcast file change event - channel full (timed out)") + // Broadcast the event to all subscribers + if !uds.broadcastChannel.Broadcast(eventData) { + // Silently drop if channel is full - this is normal during high activity + logger.Log().Debug("Dropped file change event (channel busy)", zap.String("path", event.Name)) } // Handle directory creation - add new directories to watcher diff --git a/internal/handler/ui_dev_handler.go b/internal/handler/ui_dev_handler.go index 593705a..9ea0c69 100644 --- a/internal/handler/ui_dev_handler.go +++ b/internal/handler/ui_dev_handler.go @@ -2,10 +2,7 @@ package handler import ( "encoding/json" - "net" "path/filepath" - "strings" - "syscall" "time" "github.com/gofiber/contrib/websocket" @@ -43,39 +40,7 @@ func NewUiDevHandler(uiDevService ports.UiDevServiceInterface, scrollService por uiDevService: uiDevService, scrollService: scrollService, } -} - -// isConnectionError checks if the error is related to a broken connection -func isConnectionError(err error) bool { - if err == nil { - return false - } - - // Check for common connection error patterns - errStr := strings.ToLower(err.Error()) - if strings.Contains(errStr, "broken pipe") || - strings.Contains(errStr, "connection reset") || - strings.Contains(errStr, "connection refused") || - strings.Contains(errStr, "use of closed network connection") { - return true - } - - // Check for specific error types - if netErr, ok := err.(*net.OpError); ok { - if netErr.Op == "write" { - return true - } - } - - // Check for syscall errors - if errno, ok := err.(syscall.Errno); ok { - return errno == syscall.EPIPE || errno == syscall.ECONNRESET - } - - return false -} - -// @Summary Enable development mode +} // @Summary Enable development mode // @ID enableDev // @Tags ui, dev, druid, daemon // @Accept json @@ -181,37 +146,15 @@ func (udh *UiDevHandler) Status(ctx *fiber.Ctx) error { // NotifyChange handles WebSocket connections for real-time file change notifications func (udh *UiDevHandler) NotifyChange(c *websocket.Conn) { - // Set connection timeouts - const ( - writeWait = 10 * time.Second - pongWait = 60 * time.Second - pingPeriod = (pongWait * 9) / 10 - ) - - // Create a done channel to signal when the connection should be closed - done := make(chan struct{}) - - defer func() { - close(done) - if err := c.Close(); err != nil { - logger.Log().Debug("Error closing WebSocket connection", zap.Error(err)) - } - }() - - c.SetReadDeadline(time.Now().Add(pongWait)) - c.SetPongHandler(func(string) error { - c.SetReadDeadline(time.Now().Add(pongWait)) - return nil - }) + defer c.Close() // Check if development mode is enabled if !udh.uiDevService.IsWatching() { logger.Log().Warn("WebSocket connection attempted but development mode is not enabled") - errorMsg := map[string]interface{}{ + c.WriteJSON(map[string]interface{}{ "type": "error", "message": "Development mode is not enabled", - } - c.WriteJSON(errorMsg) + }) return } @@ -219,109 +162,84 @@ func (udh *UiDevHandler) NotifyChange(c *websocket.Conn) { changesChan := udh.uiDevService.Subscribe() if changesChan == nil { logger.Log().Error("Failed to subscribe to file changes") - errorMsg := map[string]interface{}{ + c.WriteJSON(map[string]interface{}{ "type": "error", "message": "Failed to subscribe to file changes", - } - c.WriteJSON(errorMsg) + }) return } - defer udh.uiDevService.Unsubscribe(changesChan) + // Set up ping/pong + c.SetReadDeadline(time.Now().Add(60 * time.Second)) + c.SetPongHandler(func(string) error { + c.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + // Send initial connection message - connectMsg := map[string]interface{}{ + c.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := c.WriteJSON(map[string]interface{}{ "type": "connected", "message": "Connected to file watcher", "watchedPaths": udh.uiDevService.GetWatchedPaths(), "timestamp": time.Now(), - } - - c.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.WriteJSON(connectMsg); err != nil { - logger.Log().Error("Failed to send initial connection message", zap.Error(err)) + }); err != nil { + logger.Log().Debug("Failed to send initial message, client disconnected", zap.Error(err)) return } logger.Log().Info("WebSocket client connected for file change notifications") - // Start ping ticker - ticker := time.NewTicker(pingPeriod) - defer ticker.Stop() + // Create ping ticker + pingTicker := time.NewTicker(54 * time.Second) + defer pingTicker.Stop() - // Start a goroutine to read messages (to handle pong responses and detect broken connections) + // Start reader goroutine to detect disconnects + done := make(chan struct{}) go func() { - defer func() { - select { - case <-done: - // Connection is already being closed - default: - close(done) - } - }() - + defer close(done) for { _, _, err := c.ReadMessage() if err != nil { - if isConnectionError(err) { - logger.Log().Debug("WebSocket client disconnected", zap.Error(err)) - } else { - logger.Log().Warn("WebSocket read error", zap.Error(err)) - } + logger.Log().Debug("WebSocket client disconnected", zap.Error(err)) return } } }() - // Handle messages and file changes + // Main event loop for { select { case <-done: - logger.Log().Debug("WebSocket connection done signal received") return case data := <-changesChan: if data == nil { - logger.Log().Info("File change channel closed") return } - // Parse the file change event + // Parse and send file change event var fileEvent map[string]interface{} if err := json.Unmarshal(*data, &fileEvent); err != nil { logger.Log().Error("Failed to parse file change event", zap.Error(err)) continue } - // Add message type and send to client - changeMessage := map[string]interface{}{ + c.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := c.WriteJSON(map[string]interface{}{ "type": "file_change", "data": fileEvent, "timestamp": time.Now(), - } - - c.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.WriteJSON(changeMessage); err != nil { - if isConnectionError(err) { - logger.Log().Debug("WebSocket client disconnected while sending file change", zap.Error(err)) - } else { - logger.Log().Error("Failed to write file change to WebSocket", zap.Error(err)) - } + }); err != nil { + logger.Log().Debug("Failed to send file change, client disconnected", zap.Error(err)) return } - logger.Log().Debug("File change event sent to WebSocket client", - zap.String("path", fileEvent["path"].(string)), - zap.String("operation", fileEvent["operation"].(string))) - - case <-ticker.C: - c.SetWriteDeadline(time.Now().Add(writeWait)) + case <-pingTicker.C: + c.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { - if isConnectionError(err) { - logger.Log().Debug("WebSocket client disconnected during ping", zap.Error(err)) - } else { - logger.Log().Error("Failed to send ping", zap.Error(err)) - } + logger.Log().Debug("Failed to send ping, client disconnected", zap.Error(err)) return } } diff --git a/internal/handler/websocket_handler.go b/internal/handler/websocket_handler.go index 1e104b5..668324b 100644 --- a/internal/handler/websocket_handler.go +++ b/internal/handler/websocket_handler.go @@ -83,40 +83,37 @@ func (ah WebsocketHandler) Consoles(c *fiber.Ctx) error { func (wh WebsocketHandler) HandleProcess(c *websocket.Conn) { param := c.Params("console") + defer c.Close() - // Create a done channel to signal when the connection should be closed - done := make(chan struct{}) - - ticker := time.NewTicker(pingPeriod) - defer func() { - close(done) - ticker.Stop() - c.Close() - }() - + // Get console channel channel := wh.consoleService.GetConsole(param) if channel == nil { + logger.Log().Warn("Console not found", zap.String("console", param)) return } + // Subscribe to console output subscriptionChannel := channel.Channel.Subscribe() defer channel.Channel.Unsubscribe(subscriptionChannel) + // Set up ping/pong c.SetReadLimit(maxMessageSize) c.SetReadDeadline(time.Now().Add(pongWait)) - c.SetPongHandler(func(string) error { c.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + c.SetPongHandler(func(string) error { + c.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) - // Start a goroutine to read messages (to handle pong responses and detect broken connections) - go func() { - defer func() { - select { - case <-done: - // Connection is already being closed - default: - close(done) - } - }() + logger.Log().Info("WebSocket client connected to console", zap.String("console", param)) + + // Create ping ticker + pingTicker := time.NewTicker(pingPeriod) + defer pingTicker.Stop() + // Start reader goroutine to detect disconnects + done := make(chan struct{}) + go func() { + defer close(done) for { _, _, err := c.ReadMessage() if err != nil { @@ -126,29 +123,27 @@ func (wh WebsocketHandler) HandleProcess(c *websocket.Conn) { } }() - //fetch channel and send to websocket + // Main event loop for { select { case <-done: - logger.Log().Debug("WebSocket connection done signal received") return - //send 1024 bytes at a time case buffer, ok := <-subscriptionChannel: - c.SetWriteDeadline(time.Now().Add(writeWait)) - //if nil is send, assume the channel is closed if buffer == nil || !ok { return } - err := c.WriteMessage(websocket.TextMessage, *buffer) - if err != nil { - logger.Log().Debug("WebSocket client disconnected while sending message", zap.Error(err)) + + c.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.WriteMessage(websocket.TextMessage, *buffer); err != nil { + logger.Log().Debug("Failed to send console output, client disconnected", zap.Error(err)) return } - case <-ticker.C: + + case <-pingTicker.C: c.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { - logger.Log().Debug("WebSocket client disconnected during ping", zap.Error(err)) + logger.Log().Debug("Failed to send ping, client disconnected", zap.Error(err)) return } } diff --git a/internal/utils/websocket.go b/internal/utils/websocket.go new file mode 100644 index 0000000..10213bd --- /dev/null +++ b/internal/utils/websocket.go @@ -0,0 +1,85 @@ +package utils + +import ( + "context" + "time" + + "github.com/gofiber/contrib/websocket" + "github.com/highcard-dev/daemon/internal/utils/logger" + "go.uber.org/zap" +) + +// WebSocketConnection is a lean wrapper for WebSocket connections with basic safety +type WebSocketConnection struct { + conn *websocket.Conn + ctx context.Context + cancel context.CancelFunc +} + +// NewWebSocketConnection creates a managed WebSocket connection +func NewWebSocketConnection(c *websocket.Conn) *WebSocketConnection { + ctx, cancel := context.WithCancel(context.Background()) + + wsc := &WebSocketConnection{ + conn: c, + ctx: ctx, + cancel: cancel, + } + + // Simple connection monitoring + go wsc.monitor() + + return wsc +} + +// Close safely closes the connection +func (wsc *WebSocketConnection) Close() { + wsc.cancel() + wsc.conn.Close() +} + +// WriteJSON writes JSON with timeout +func (wsc *WebSocketConnection) WriteJSON(v interface{}) error { + wsc.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + return wsc.conn.WriteJSON(v) +} + +// WritePing sends a ping +func (wsc *WebSocketConnection) WritePing() error { + wsc.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + return wsc.conn.WriteMessage(websocket.PingMessage, nil) +} + +// WriteMessage writes a message +func (wsc *WebSocketConnection) WriteMessage(messageType int, data []byte) error { + wsc.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + return wsc.conn.WriteMessage(messageType, data) +} + +// Context returns the connection context +func (wsc *WebSocketConnection) Context() context.Context { + return wsc.ctx +} + +// monitor detects disconnections +func (wsc *WebSocketConnection) monitor() { + wsc.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + wsc.conn.SetPongHandler(func(string) error { + wsc.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + + for { + select { + case <-wsc.ctx.Done(): + return + default: + _, _, err := wsc.conn.ReadMessage() + if err != nil { + logger.Log().Debug("WebSocket disconnected", zap.Error(err)) + wsc.cancel() + return + } + } + } +}