Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 50 additions & 54 deletions internal/core/domain/broadcast_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,98 +2,94 @@ package domain

import (
"sync"

"github.com/highcard-dev/daemon/internal/utils/logger"
"time"
)

type BroadcastChannel struct {
// Registered clients.
Clients map[chan *[]byte]bool

// Inbound messages from the clients.
Broadcast chan []byte

// Mutex to protect concurrent access to Clients map
mu sync.RWMutex
clients map[chan *[]byte]bool
broadcast chan []byte
mu sync.RWMutex
closed bool
}

func NewHub() *BroadcastChannel {
return &BroadcastChannel{
Broadcast: make(chan []byte, 100), // Buffered channel to handle bursts of file changes
Clients: make(map[chan *[]byte]bool),
broadcast: make(chan []byte, 100), // Buffered to prevent blocking
clients: make(map[chan *[]byte]bool),
}
}

func (h *BroadcastChannel) Subscribe() chan *[]byte {
h.mu.Lock()
defer h.mu.Unlock()

client := make(chan *[]byte, 50) // Increased buffer size to handle message bursts
h.Clients[client] = true
if h.closed {
return nil
}

client := make(chan *[]byte, 25) // Reasonable buffer
h.clients[client] = true
return client
}

func (h *BroadcastChannel) Unsubscribe(client chan *[]byte) {
if client == nil {
return
}

h.mu.Lock()
defer h.mu.Unlock()

if _, exists := h.Clients[client]; exists {
delete(h.Clients, client)
if _, exists := h.clients[client]; exists {
delete(h.clients, client)
close(client)
}
}

func (h *BroadcastChannel) CloseChannel() {
// Broadcast sends data to all clients, returns false if dropped
func (h *BroadcastChannel) Broadcast(data []byte) bool {
if h.closed {
return false
}

select {
case h.broadcast <- data:
return true
case <-time.After(50 * time.Millisecond): // Quick timeout
return false
}
}

func (h *BroadcastChannel) Close() {
h.mu.Lock()
defer h.mu.Unlock()

close(h.Broadcast)
for client := range h.Clients {
if h.closed {
return
}

h.closed = true
close(h.broadcast)

for client := range h.clients {
close(client)
}
h.Clients = make(map[chan *[]byte]bool)
h.clients = make(map[chan *[]byte]bool)
}

func (h *BroadcastChannel) Run() {
for {
message, more := <-h.Broadcast
if !more {
logger.Log().Debug("Broadcast channel closed")
return
}
defer h.Close()

for data := range h.broadcast {
h.mu.RLock()
clients := make([]chan *[]byte, 0, len(h.Clients))
for client := range h.Clients {
clients = append(clients, client)
}
h.mu.RUnlock()

// Track clients to remove (dead connections)
var deadClients []chan *[]byte

for _, client := range clients {
for client := range h.clients {
select {
case client <- &message:
// Successfully sent the message
case client <- &data:
// Sent successfully
default:
// Client channel is blocked or closed, mark for removal
logger.Log().Debug("Client channel blocked or closed, marking for removal")
deadClients = append(deadClients, client)
}
}

// Remove dead clients
if len(deadClients) > 0 {
h.mu.Lock()
for _, deadClient := range deadClients {
if _, exists := h.Clients[deadClient]; exists {
delete(h.Clients, deadClient)
// Don't close the channel here as it might be closed elsewhere
logger.Log().Debug("Removed dead client from broadcast channel")
}
// Client blocked, skip (will be cleaned up later if persistent)
}
h.mu.Unlock()
}
h.mu.RUnlock()
}
}
2 changes: 1 addition & 1 deletion internal/core/services/console_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (cm *ConsoleManager) AddConsoleWithChannel(id string, consoleType domain.Co
go func() {
for data := range channel {
b := []byte(data)
newChannel.Broadcast <- b
newChannel.Broadcast(b)
cm.logManager.AddLine(id, b)
}
close(done)
Expand Down
2 changes: 1 addition & 1 deletion internal/core/services/procedure_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (sc *ProcedureLauncher) LaunchPlugins() error {
if !ok {
console, _ = sc.consoleManager.AddConsoleWithChannel(item.Stream, domain.ConsoleTypePlugin, item.Stream, make(chan string))
}
console.Channel.Broadcast <- []byte(item.Data)
console.Channel.Broadcast([]byte(item.Data))
}
}
}()
Expand Down
12 changes: 5 additions & 7 deletions internal/core/services/ui_dev_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (uds *UiDevService) StopWatching() error {

// Close the broadcast channel to clean up subscribers
if uds.broadcastChannel != nil {
uds.broadcastChannel.CloseChannel()
uds.broadcastChannel.Close()
uds.broadcastChannel = nil
}

Expand Down Expand Up @@ -252,12 +252,10 @@ func (uds *UiDevService) handleFileEvent(event fsnotify.Event) {
return
}

// Broadcast the event to all subscribers with a timeout to prevent blocking
select {
case uds.broadcastChannel.Broadcast <- eventData:
logger.Log().Debug("File change event broadcasted", zap.String("path", event.Name), zap.String("op", event.Op.String()))
case <-time.After(100 * time.Millisecond): // Timeout after 100ms
logger.Log().Warn("Failed to broadcast file change event - channel full (timed out)")
// Broadcast the event to all subscribers
if !uds.broadcastChannel.Broadcast(eventData) {
// Silently drop if channel is full - this is normal during high activity
logger.Log().Debug("Dropped file change event (channel busy)", zap.String("path", event.Name))
}

// Handle directory creation - add new directories to watcher
Expand Down
Loading