diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 7749462..de9eaf2 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -23,6 +23,9 @@ jobs: # uses: mxschmitt/action-tmate@v3 - run: make test-integration-docker name: Run integration tests inside Docker + + #- name: Setup tmate session + # uses: mxschmitt/action-tmate@v3 - run: make test name: Unit tests - run: make build diff --git a/cmd/run.go b/cmd/run.go index 6d240ae..a9b20e0 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -45,7 +45,8 @@ var RunCmd = &cobra.Command{ queueManager := services.NewQueueManager(scrollService, processLauncher) snapshotService := snapshotService.NewSnapshotService() - _, err = initScroll(scrollService, snapshotService, processLauncher, queueManager) + go queueManager.Work() + _, err = initScroll(scrollService, snapshotService, processLauncher) if err != nil { return fmt.Errorf("error initializing scroll: %w", err) } diff --git a/cmd/serve.go b/cmd/serve.go index 9bc3b03..58a2f9e 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -128,7 +128,9 @@ to interact and monitor the Scroll Application`, coldStarter := services.NewColdStarter(portService, queueManager, snapshotService, scrollService.GetDir()) uiService := services.NewUiService(scrollService) - uiDevService := services.NewUiDevService() + uiDevService := services.NewUiDevService( + queueManager, scrollService, + ) scrollHandler := handler.NewScrollHandler(scrollService, pluginManager, processLauncher, queueManager, processManager) processHandler := handler.NewProcessHandler(processManager) @@ -163,6 +165,9 @@ to interact and monitor the Scroll Application`, go portService.StartMonitoring(ctx, watchPortsInterfaces, currentScroll.KeepAlivePPM) } + logger.Log().Info("Starting queue manager") + go queueManager.Work() + if !idleScroll { doneChan := make(chan error, 1) @@ -306,7 +311,7 @@ func startup(scrollService *services.ScrollService, snapshotService ports.Snapsh logger.Log().Info("Initializing scroll") - newScroll, err := initScroll(scrollService, snapshotService, processLauncher, queueManager) + newScroll, err := initScroll(scrollService, snapshotService, processLauncher) if err != nil { doneChan <- err @@ -380,7 +385,7 @@ func startup(scrollService *services.ScrollService, snapshotService ports.Snapsh } -func initScroll(scrollService *services.ScrollService, snapshotService ports.SnapshotService, processLauncher *services.ProcedureLauncher, queueManager *services.QueueManager) (bool, error) { +func initScroll(scrollService *services.ScrollService, snapshotService ports.SnapshotService, processLauncher *services.ProcedureLauncher) (bool, error) { lock, err := scrollService.ReloadLock(ignoreVersionCheck) if err != nil { @@ -450,8 +455,5 @@ func initScroll(scrollService *services.ScrollService, snapshotService ports.Sna } } - logger.Log().Info("Starting queue manager") - go queueManager.Work() - return newScroll, nil } diff --git a/internal/core/domain/queue_item.go b/internal/core/domain/queue_item.go index 56bd402..20a3c12 100644 --- a/internal/core/domain/queue_item.go +++ b/internal/core/domain/queue_item.go @@ -1,8 +1,10 @@ package domain type QueueItem struct { + Name string Status ScrollLockStatus Error error UpdateLockStatus bool RunAfterExecution func() + DoneChan chan struct{} } diff --git a/internal/core/domain/scroll.go b/internal/core/domain/scroll.go index 7f60a53..99cca76 100644 --- a/internal/core/domain/scroll.go +++ b/internal/core/domain/scroll.go @@ -15,9 +15,9 @@ import ( type RunMode string const ( - RunModeAlways RunMode = "always" //default - RunModeOnce RunMode = "once" - RunModeRestart RunMode = "restart" + RunModeAlways RunMode = "always" //default + RunModeOnce RunMode = "once" //runs only once + RunModeRestart RunMode = "restart" //restarts on failure RunModePersistent RunMode = "persistent" //restarts on failure and on program restart ) diff --git a/internal/core/ports/services_ports.go b/internal/core/ports/services_ports.go index 22e9940..6dcb1ac 100644 --- a/internal/core/ports/services_ports.go +++ b/internal/core/ports/services_ports.go @@ -36,6 +36,7 @@ type ScrollServiceInterface interface { GetCommand(cmd string) (*domain.CommandInstructionSet, error) InitFiles(files ...string) error InitTemplateFiles(files ...string) error + AddTemporaryCommand(cmd string, instructions *domain.CommandInstructionSet) } type ProcedureLauchnerInterface interface { @@ -107,6 +108,7 @@ type QueueManagerInterface interface { AddAndRememberItem(cmd string) error AddTempItem(cmd string) error AddShutdownItem(cmd string) error + AddTempItemWithWait(cmd string) error GetQueue() map[string]domain.ScrollLockStatus } @@ -180,4 +182,5 @@ type UiDevServiceInterface interface { Unsubscribe(client chan *[]byte) GetWatchedPaths() []string IsWatching() bool + SetCommands(procs map[string]*domain.CommandInstructionSet) } diff --git a/internal/core/services/process_manager.go b/internal/core/services/process_manager.go index bf6d8a3..4044c78 100644 --- a/internal/core/services/process_manager.go +++ b/internal/core/services/process_manager.go @@ -242,7 +242,6 @@ func (po *ProcessManager) Run(commandName string, command []string, dir string) // Wait for goroutine to print everything (watchdog closes stdin) exitCode := process.Cmd.ProcessState.ExitCode() - println("Exit code", exitCode) console.MarkExited(exitCode) close(combinedChannel) diff --git a/internal/core/services/queue_manager.go b/internal/core/services/queue_manager.go index 0da7242..c22f641 100644 --- a/internal/core/services/queue_manager.go +++ b/internal/core/services/queue_manager.go @@ -16,6 +16,7 @@ var ErrCommandDoneOnce = fmt.Errorf("command is already done and has run mode on type AddItemOptions struct { Remember bool + Wait bool RunAfterExecution func() } @@ -40,8 +41,8 @@ func NewQueueManager( scrollService: scrollService, processLauncher: processLauncher, commandQueue: make(map[string]*domain.QueueItem), - taskChan: make(chan string), - taskDoneChan: make(chan struct{}), + taskChan: make(chan string, 100), // FIXED: Buffered channel + taskDoneChan: make(chan struct{}, 1), // FIXED: Buffered channel shutdownChan: make(chan struct{}), notifierChan: make([]chan []string, 0), callbacksPostRun: make(map[string]func()), @@ -66,15 +67,20 @@ func (sc *QueueManager) workItem(cmd string) error { } func (sc *QueueManager) notify() { + sc.mu.Lock() queuedCommands := make([]string, 0) - for cmd, _ := range sc.commandQueue { - if sc.getStatus(cmd) != domain.ScrollLockStatusDone && sc.getStatus(cmd) != domain.ScrollLockStatusError { + for cmd, item := range sc.commandQueue { + if item.Status != domain.ScrollLockStatusDone && item.Status != domain.ScrollLockStatusError { queuedCommands = append(queuedCommands, cmd) } } - for _, notifier := range sc.notifierChan { + notifiers := make([]chan []string, len(sc.notifierChan)) + copy(notifiers, sc.notifierChan) + sc.mu.Unlock() + + for _, notifier := range notifiers { select { case notifier <- queuedCommands: // Successfully sent queuedCommands to the notifier channel @@ -111,9 +117,15 @@ func (sc *QueueManager) AddItemWithCallback(cmd string, cb func()) error { }) } +func (sc *QueueManager) AddTempItemWithWait(cmd string) error { + return sc.addQueueItem(cmd, AddItemOptions{ + Remember: false, + Wait: true, + }) +} + func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error { sc.mu.Lock() - defer sc.mu.Unlock() setLock := options.Remember @@ -124,6 +136,7 @@ func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error { command, err := sc.scrollService.GetCommand(cmd) if err != nil { + sc.mu.Unlock() return err } @@ -135,17 +148,25 @@ func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error { if value, ok := sc.commandQueue[cmd]; ok { if value.Status != domain.ScrollLockStatusDone && value.Status != domain.ScrollLockStatusError { + sc.mu.Unlock() return ErrAlreadyInQueue } if value.Status == domain.ScrollLockStatusDone && command.Run == domain.RunModeOnce { + sc.mu.Unlock() return ErrCommandDoneOnce } } + var doneChan chan struct{} + if options.Wait { + doneChan = make(chan struct{}) + } + item := &domain.QueueItem{ Status: domain.ScrollLockStatusWaiting, UpdateLockStatus: setLock, + DoneChan: doneChan, } if options.RunAfterExecution != nil { @@ -157,12 +178,27 @@ func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error { if setLock { lock, err := sc.scrollService.GetLock() if err != nil { + sc.mu.Unlock() return err } lock.SetStatus(cmd, domain.ScrollLockStatusWaiting, nil) } + + sc.mu.Unlock() + + // FIXED: Non-blocking send to buffered channel sc.taskChan <- cmd + // Wait for completion if requested + if options.Wait { + <-doneChan + // Return error if command failed + item := sc.GetQueueItem(cmd) + if item != nil && item.Error != nil { + return item.Error + } + } + return nil } @@ -303,6 +339,11 @@ func (sc *QueueManager) RunQueue() { logger.Log().Info("Running command", zap.String("command", cmd)) go func(c string, i *domain.QueueItem) { defer func() { + // Signal completion if someone is waiting + if i.DoneChan != nil { + close(i.DoneChan) + } + if i.RunAfterExecution != nil { i.RunAfterExecution() } @@ -310,6 +351,7 @@ func (sc *QueueManager) RunQueue() { callback() } + // FIXED: Non-blocking send to buffered channel sc.taskDoneChan <- struct{}{} }() err := sc.workItem(c) @@ -340,33 +382,30 @@ func (sc *QueueManager) Shutdown() { } func (sc *QueueManager) WaitUntilEmpty() { - notifier := make(chan []string) + notifier := make(chan []string, 10) // FIXED: Buffered channel + + sc.mu.Lock() sc.notifierChan = append(sc.notifierChan, notifier) - println("WaitUntilEmpty") - for k, n := range sc.commandQueue { - if n.Status == domain.ScrollLockStatusError { - println(k + "---: " + string(n.Status) + " " + n.Error.Error()) - } else { - println(k + "---: " + string(n.Status)) - } - } + sc.mu.Unlock() + for { + sc.mu.Lock() + for cmd, item := range sc.commandQueue { + println(cmd + ": " + string(item.Status)) + } + sc.mu.Unlock() + cmds := <-notifier if len(cmds) == 0 { // remove notifier + sc.mu.Lock() for i, n := range sc.notifierChan { if n == notifier { sc.notifierChan = append(sc.notifierChan[:i], sc.notifierChan[i+1:]...) break } } - for k, n := range sc.commandQueue { - if n.Status == domain.ScrollLockStatusError { - println(k + ": " + string(n.Status) + " " + n.Error.Error()) - } else { - println(k + ": " + string(n.Status)) - } - } + sc.mu.Unlock() return } } diff --git a/internal/core/services/scroll_service.go b/internal/core/services/scroll_service.go index 280161e..c697b1c 100644 --- a/internal/core/services/scroll_service.go +++ b/internal/core/services/scroll_service.go @@ -266,3 +266,11 @@ func (sc *ScrollService) GetCommand(cmd string) (*domain.CommandInstructionSet, return nil, errors.New("command " + cmd + " not found") } } + +func (sc *ScrollService) AddTemporaryCommand(cmd string, instructions *domain.CommandInstructionSet) { + scroll := sc.GetFile() + if scroll.Commands == nil { + scroll.Commands = make(map[string]*domain.CommandInstructionSet) + } + scroll.Commands[cmd] = instructions +} diff --git a/internal/core/services/ui_dev_service.go b/internal/core/services/ui_dev_service.go index 8a60ef4..ab15459 100644 --- a/internal/core/services/ui_dev_service.go +++ b/internal/core/services/ui_dev_service.go @@ -33,13 +33,30 @@ type UiDevService struct { ctx context.Context cancel context.CancelFunc isWatching bool + commands map[string]*domain.CommandInstructionSet + queueManager ports.QueueManagerInterface + scrollService ports.ScrollServiceInterface + events uint32 } // NewUiDevService creates a new instance of UiDevService -func NewUiDevService() ports.UiDevServiceInterface { +func NewUiDevService( + queueManager ports.QueueManagerInterface, scrollService ports.ScrollServiceInterface, +) ports.UiDevServiceInterface { return &UiDevService{ - watchPaths: make([]string, 0), - isWatching: false, + watchPaths: make([]string, 0), + isWatching: false, + queueManager: queueManager, + scrollService: scrollService, + } +} + +func (uds *UiDevService) SetCommands(commands map[string]*domain.CommandInstructionSet) { + uds.mu.Lock() + defer uds.mu.Unlock() + uds.commands = commands + for key, cmd := range commands { + uds.scrollService.AddTemporaryCommand(key, cmd) } } @@ -215,7 +232,10 @@ func (uds *UiDevService) processEvents() { logger.Log().Info("File watcher events channel closed") return } - + if event.Op&fsnotify.Chmod == fsnotify.Chmod { + // Ignore chmod events + continue + } uds.handleFileEvent(event) case err, ok := <-uds.watcher.Errors: @@ -238,6 +258,18 @@ func (uds *UiDevService) handleFileEvent(event fsnotify.Event) { relativePath = relPath } } + uds.events++ + if uds.commands != nil && len(uds.commands) > 0 { + go func() { + e := uds.events - 1 + for uds.events != e { + e = uds.events + for key, _ := range uds.commands { + uds.queueManager.AddTempItemWithWait(key) + } + } + }() + } changeEvent := FileChangeEvent{ Path: relativePath, diff --git a/internal/core/services/ui_dev_service_test.go b/internal/core/services/ui_dev_service_test.go index a9507df..233f05c 100644 --- a/internal/core/services/ui_dev_service_test.go +++ b/internal/core/services/ui_dev_service_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + mock_ports "github.com/highcard-dev/daemon/test/mock" "go.uber.org/mock/gomock" ) @@ -12,8 +13,11 @@ func TestUiDevService_BasicFunctionality(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + queueManager := mock_ports.NewMockQueueManagerInterface(ctrl) + scrollService := mock_ports.NewMockScrollServiceInterface(ctrl) + // Create the UI dev service - uiDevService := NewUiDevService() + uiDevService := NewUiDevService(queueManager, scrollService) // Check initial state if uiDevService.IsWatching() { @@ -67,7 +71,11 @@ func TestUiDevService_MultipleSubscribers(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - uiDevService := NewUiDevService() + queueManager := mock_ports.NewMockQueueManagerInterface(ctrl) + scrollService := mock_ports.NewMockScrollServiceInterface(ctrl) + + // Create the UI dev service + uiDevService := NewUiDevService(queueManager, scrollService) // Start watching first err := uiDevService.StartWatching("/tmp/test", "/tmp/test/ui") @@ -97,7 +105,11 @@ func TestUiDevService_ContinuousStartStop(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - uiDevService := NewUiDevService() + queueManager := mock_ports.NewMockQueueManagerInterface(ctrl) + scrollService := mock_ports.NewMockScrollServiceInterface(ctrl) + + // Create the UI dev service + uiDevService := NewUiDevService(queueManager, scrollService) // Test multiple start/stop cycles for i := 0; i < 5; i++ { @@ -165,7 +177,11 @@ func TestUiDevService_SubscribeBeforeStart(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - uiDevService := NewUiDevService() + queueManager := mock_ports.NewMockQueueManagerInterface(ctrl) + scrollService := mock_ports.NewMockScrollServiceInterface(ctrl) + + // Create the UI dev service + uiDevService := NewUiDevService(queueManager, scrollService) // Try to subscribe before starting sub := uiDevService.Subscribe() diff --git a/internal/handler/ui_dev_handler.go b/internal/handler/ui_dev_handler.go index 9ea0c69..f672869 100644 --- a/internal/handler/ui_dev_handler.go +++ b/internal/handler/ui_dev_handler.go @@ -7,6 +7,7 @@ import ( "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" + "github.com/highcard-dev/daemon/internal/core/domain" "github.com/highcard-dev/daemon/internal/core/ports" "github.com/highcard-dev/daemon/internal/utils/logger" "go.uber.org/zap" @@ -35,6 +36,10 @@ type UiDevHandler struct { scrollService ports.ScrollServiceInterface } +type DevModeBody struct { + Commands map[string]*domain.CommandInstructionSet `json:"commands,omitempty"` +} + func NewUiDevHandler(uiDevService ports.UiDevServiceInterface, scrollService ports.ScrollServiceInterface) *UiDevHandler { return &UiDevHandler{ uiDevService: uiDevService, @@ -51,7 +56,7 @@ func NewUiDevHandler(uiDevService ports.UiDevServiceInterface, scrollService por func (udh *UiDevHandler) Enable(ctx *fiber.Ctx) error { if udh.uiDevService.IsWatching() { response := DevModeResponse{ - Status: "success", + Status: "already-active", Enabled: true, } return ctx.JSON(response) @@ -69,10 +74,17 @@ func (udh *UiDevHandler) Enable(ctx *fiber.Ctx) error { return ctx.Status(400).JSON(errorResponse) } + var requestBody DevModeBody + + err := ctx.BodyParser(&requestBody) + if err == nil { + udh.uiDevService.SetCommands(requestBody.Commands) + } + watchPaths = append(watchPaths, filepath.Join(scrollDir, "public"), filepath.Join(scrollDir, "private")) // Start file watching with scroll directory as base path - err := udh.uiDevService.StartWatching(scrollDir, watchPaths...) + err = udh.uiDevService.StartWatching(scrollDir, watchPaths...) if err != nil { logger.Log().Error("Failed to start file watcher", zap.Error(err)) errorResponse := ErrorResponse{ diff --git a/test/mock/services.go b/test/mock/services.go index 2085eab..45bbac9 100644 --- a/test/mock/services.go +++ b/test/mock/services.go @@ -115,6 +115,18 @@ func (m *MockScrollServiceInterface) EXPECT() *MockScrollServiceInterfaceMockRec return m.recorder } +// AddTemporaryCommand mocks base method. +func (m *MockScrollServiceInterface) AddTemporaryCommand(cmd string, instructions *domain.CommandInstructionSet) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddTemporaryCommand", cmd, instructions) +} + +// AddTemporaryCommand indicates an expected call of AddTemporaryCommand. +func (mr *MockScrollServiceInterfaceMockRecorder) AddTemporaryCommand(cmd, instructions any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTemporaryCommand", reflect.TypeOf((*MockScrollServiceInterface)(nil).AddTemporaryCommand), cmd, instructions) +} + // GetCommand mocks base method. func (m *MockScrollServiceInterface) GetCommand(cmd string) (*domain.CommandInstructionSet, error) { m.ctrl.T.Helper() @@ -1064,6 +1076,20 @@ func (mr *MockQueueManagerInterfaceMockRecorder) AddTempItem(cmd any) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTempItem", reflect.TypeOf((*MockQueueManagerInterface)(nil).AddTempItem), cmd) } +// AddTempItemWithWait mocks base method. +func (m *MockQueueManagerInterface) AddTempItemWithWait(cmd string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddTempItemWithWait", cmd) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddTempItemWithWait indicates an expected call of AddTempItemWithWait. +func (mr *MockQueueManagerInterfaceMockRecorder) AddTempItemWithWait(cmd any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTempItemWithWait", reflect.TypeOf((*MockQueueManagerInterface)(nil).AddTempItemWithWait), cmd) +} + // GetQueue mocks base method. func (m *MockQueueManagerInterface) GetQueue() map[string]domain.ScrollLockStatus { m.ctrl.T.Helper() @@ -1620,6 +1646,18 @@ func (mr *MockUiDevServiceInterfaceMockRecorder) IsWatching() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsWatching", reflect.TypeOf((*MockUiDevServiceInterface)(nil).IsWatching)) } +// SetCommands mocks base method. +func (m *MockUiDevServiceInterface) SetCommands(procs map[string]*domain.CommandInstructionSet) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetCommands", procs) +} + +// SetCommands indicates an expected call of SetCommands. +func (mr *MockUiDevServiceInterfaceMockRecorder) SetCommands(procs any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCommands", reflect.TypeOf((*MockUiDevServiceInterface)(nil).SetCommands), procs) +} + // StartWatching mocks base method. func (m *MockUiDevServiceInterface) StartWatching(basePath string, paths ...string) error { m.ctrl.T.Helper()