Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ jobs:
# uses: mxschmitt/action-tmate@v3
- run: make test-integration-docker
name: Run integration tests inside Docker

#- name: Setup tmate session
# uses: mxschmitt/action-tmate@v3
- run: make test
name: Unit tests
- run: make build
Expand Down
3 changes: 2 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ var RunCmd = &cobra.Command{
queueManager := services.NewQueueManager(scrollService, processLauncher)
snapshotService := snapshotService.NewSnapshotService()

_, err = initScroll(scrollService, snapshotService, processLauncher, queueManager)
go queueManager.Work()
_, err = initScroll(scrollService, snapshotService, processLauncher)
if err != nil {
return fmt.Errorf("error initializing scroll: %w", err)
}
Expand Down
14 changes: 8 additions & 6 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ to interact and monitor the Scroll Application`,
coldStarter := services.NewColdStarter(portService, queueManager, snapshotService, scrollService.GetDir())

uiService := services.NewUiService(scrollService)
uiDevService := services.NewUiDevService()
uiDevService := services.NewUiDevService(
queueManager, scrollService,
)

scrollHandler := handler.NewScrollHandler(scrollService, pluginManager, processLauncher, queueManager, processManager)
processHandler := handler.NewProcessHandler(processManager)
Expand Down Expand Up @@ -163,6 +165,9 @@ to interact and monitor the Scroll Application`,
go portService.StartMonitoring(ctx, watchPortsInterfaces, currentScroll.KeepAlivePPM)
}

logger.Log().Info("Starting queue manager")
go queueManager.Work()

if !idleScroll {

doneChan := make(chan error, 1)
Expand Down Expand Up @@ -306,7 +311,7 @@ func startup(scrollService *services.ScrollService, snapshotService ports.Snapsh

logger.Log().Info("Initializing scroll")

newScroll, err := initScroll(scrollService, snapshotService, processLauncher, queueManager)
newScroll, err := initScroll(scrollService, snapshotService, processLauncher)

if err != nil {
doneChan <- err
Expand Down Expand Up @@ -380,7 +385,7 @@ func startup(scrollService *services.ScrollService, snapshotService ports.Snapsh

}

func initScroll(scrollService *services.ScrollService, snapshotService ports.SnapshotService, processLauncher *services.ProcedureLauncher, queueManager *services.QueueManager) (bool, error) {
func initScroll(scrollService *services.ScrollService, snapshotService ports.SnapshotService, processLauncher *services.ProcedureLauncher) (bool, error) {

lock, err := scrollService.ReloadLock(ignoreVersionCheck)
if err != nil {
Expand Down Expand Up @@ -450,8 +455,5 @@ func initScroll(scrollService *services.ScrollService, snapshotService ports.Sna
}
}

logger.Log().Info("Starting queue manager")
go queueManager.Work()

return newScroll, nil
}
2 changes: 2 additions & 0 deletions internal/core/domain/queue_item.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package domain

type QueueItem struct {
Name string
Status ScrollLockStatus
Error error
UpdateLockStatus bool
RunAfterExecution func()
DoneChan chan struct{}
}
6 changes: 3 additions & 3 deletions internal/core/domain/scroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
type RunMode string

const (
RunModeAlways RunMode = "always" //default
RunModeOnce RunMode = "once"
RunModeRestart RunMode = "restart"
RunModeAlways RunMode = "always" //default
RunModeOnce RunMode = "once" //runs only once
RunModeRestart RunMode = "restart" //restarts on failure
RunModePersistent RunMode = "persistent" //restarts on failure and on program restart
)

Expand Down
3 changes: 3 additions & 0 deletions internal/core/ports/services_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ScrollServiceInterface interface {
GetCommand(cmd string) (*domain.CommandInstructionSet, error)
InitFiles(files ...string) error
InitTemplateFiles(files ...string) error
AddTemporaryCommand(cmd string, instructions *domain.CommandInstructionSet)
}

type ProcedureLauchnerInterface interface {
Expand Down Expand Up @@ -107,6 +108,7 @@ type QueueManagerInterface interface {
AddAndRememberItem(cmd string) error
AddTempItem(cmd string) error
AddShutdownItem(cmd string) error
AddTempItemWithWait(cmd string) error
GetQueue() map[string]domain.ScrollLockStatus
}

Expand Down Expand Up @@ -180,4 +182,5 @@ type UiDevServiceInterface interface {
Unsubscribe(client chan *[]byte)
GetWatchedPaths() []string
IsWatching() bool
SetCommands(procs map[string]*domain.CommandInstructionSet)
}
1 change: 0 additions & 1 deletion internal/core/services/process_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ func (po *ProcessManager) Run(commandName string, command []string, dir string)
// Wait for goroutine to print everything (watchdog closes stdin)
exitCode := process.Cmd.ProcessState.ExitCode()

println("Exit code", exitCode)
console.MarkExited(exitCode)

close(combinedChannel)
Expand Down
83 changes: 61 additions & 22 deletions internal/core/services/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var ErrCommandDoneOnce = fmt.Errorf("command is already done and has run mode on

type AddItemOptions struct {
Remember bool
Wait bool
RunAfterExecution func()
}

Expand All @@ -40,8 +41,8 @@ func NewQueueManager(
scrollService: scrollService,
processLauncher: processLauncher,
commandQueue: make(map[string]*domain.QueueItem),
taskChan: make(chan string),
taskDoneChan: make(chan struct{}),
taskChan: make(chan string, 100), // FIXED: Buffered channel
taskDoneChan: make(chan struct{}, 1), // FIXED: Buffered channel
shutdownChan: make(chan struct{}),
notifierChan: make([]chan []string, 0),
callbacksPostRun: make(map[string]func()),
Expand All @@ -66,15 +67,20 @@ func (sc *QueueManager) workItem(cmd string) error {
}

func (sc *QueueManager) notify() {
sc.mu.Lock()
queuedCommands := make([]string, 0)

for cmd, _ := range sc.commandQueue {
if sc.getStatus(cmd) != domain.ScrollLockStatusDone && sc.getStatus(cmd) != domain.ScrollLockStatusError {
for cmd, item := range sc.commandQueue {
if item.Status != domain.ScrollLockStatusDone && item.Status != domain.ScrollLockStatusError {
queuedCommands = append(queuedCommands, cmd)
}
}

for _, notifier := range sc.notifierChan {
notifiers := make([]chan []string, len(sc.notifierChan))
copy(notifiers, sc.notifierChan)
sc.mu.Unlock()

for _, notifier := range notifiers {
select {
case notifier <- queuedCommands:
// Successfully sent queuedCommands to the notifier channel
Expand Down Expand Up @@ -111,9 +117,15 @@ func (sc *QueueManager) AddItemWithCallback(cmd string, cb func()) error {
})
}

func (sc *QueueManager) AddTempItemWithWait(cmd string) error {
return sc.addQueueItem(cmd, AddItemOptions{
Remember: false,
Wait: true,
})
}

func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error {
sc.mu.Lock()
defer sc.mu.Unlock()

setLock := options.Remember

Expand All @@ -124,6 +136,7 @@ func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error {
command, err := sc.scrollService.GetCommand(cmd)

if err != nil {
sc.mu.Unlock()
return err
}

Expand All @@ -135,17 +148,25 @@ func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error {
if value, ok := sc.commandQueue[cmd]; ok {

if value.Status != domain.ScrollLockStatusDone && value.Status != domain.ScrollLockStatusError {
sc.mu.Unlock()
return ErrAlreadyInQueue
}

if value.Status == domain.ScrollLockStatusDone && command.Run == domain.RunModeOnce {
sc.mu.Unlock()
return ErrCommandDoneOnce
}
}

var doneChan chan struct{}
if options.Wait {
doneChan = make(chan struct{})
}

item := &domain.QueueItem{
Status: domain.ScrollLockStatusWaiting,
UpdateLockStatus: setLock,
DoneChan: doneChan,
}

if options.RunAfterExecution != nil {
Expand All @@ -157,12 +178,27 @@ func (sc *QueueManager) addQueueItem(cmd string, options AddItemOptions) error {
if setLock {
lock, err := sc.scrollService.GetLock()
if err != nil {
sc.mu.Unlock()
return err
}
lock.SetStatus(cmd, domain.ScrollLockStatusWaiting, nil)
}

sc.mu.Unlock()

// FIXED: Non-blocking send to buffered channel
sc.taskChan <- cmd

// Wait for completion if requested
if options.Wait {
<-doneChan
// Return error if command failed
item := sc.GetQueueItem(cmd)
if item != nil && item.Error != nil {
return item.Error
}
}

return nil
}

Expand Down Expand Up @@ -303,13 +339,19 @@ func (sc *QueueManager) RunQueue() {
logger.Log().Info("Running command", zap.String("command", cmd))
go func(c string, i *domain.QueueItem) {
defer func() {
// Signal completion if someone is waiting
if i.DoneChan != nil {
close(i.DoneChan)
}

if i.RunAfterExecution != nil {
i.RunAfterExecution()
}
if callback, ok := sc.callbacksPostRun[c]; ok && callback != nil {
callback()
}

// FIXED: Non-blocking send to buffered channel
sc.taskDoneChan <- struct{}{}
}()
err := sc.workItem(c)
Expand Down Expand Up @@ -340,33 +382,30 @@ func (sc *QueueManager) Shutdown() {
}

func (sc *QueueManager) WaitUntilEmpty() {
notifier := make(chan []string)
notifier := make(chan []string, 10) // FIXED: Buffered channel

sc.mu.Lock()
sc.notifierChan = append(sc.notifierChan, notifier)
println("WaitUntilEmpty")
for k, n := range sc.commandQueue {
if n.Status == domain.ScrollLockStatusError {
println(k + "---: " + string(n.Status) + " " + n.Error.Error())
} else {
println(k + "---: " + string(n.Status))
}
}
sc.mu.Unlock()

for {
sc.mu.Lock()
for cmd, item := range sc.commandQueue {
println(cmd + ": " + string(item.Status))
}
sc.mu.Unlock()

cmds := <-notifier
if len(cmds) == 0 {
// remove notifier
sc.mu.Lock()
for i, n := range sc.notifierChan {
if n == notifier {
sc.notifierChan = append(sc.notifierChan[:i], sc.notifierChan[i+1:]...)
break
}
}
for k, n := range sc.commandQueue {
if n.Status == domain.ScrollLockStatusError {
println(k + ": " + string(n.Status) + " " + n.Error.Error())
} else {
println(k + ": " + string(n.Status))
}
}
sc.mu.Unlock()
return
}
}
Expand Down
8 changes: 8 additions & 0 deletions internal/core/services/scroll_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,11 @@ func (sc *ScrollService) GetCommand(cmd string) (*domain.CommandInstructionSet,
return nil, errors.New("command " + cmd + " not found")
}
}

func (sc *ScrollService) AddTemporaryCommand(cmd string, instructions *domain.CommandInstructionSet) {
scroll := sc.GetFile()
if scroll.Commands == nil {
scroll.Commands = make(map[string]*domain.CommandInstructionSet)
}
scroll.Commands[cmd] = instructions
}
40 changes: 36 additions & 4 deletions internal/core/services/ui_dev_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,30 @@ type UiDevService struct {
ctx context.Context
cancel context.CancelFunc
isWatching bool
commands map[string]*domain.CommandInstructionSet
queueManager ports.QueueManagerInterface
scrollService ports.ScrollServiceInterface
events uint32
}

// NewUiDevService creates a new instance of UiDevService
func NewUiDevService() ports.UiDevServiceInterface {
func NewUiDevService(
queueManager ports.QueueManagerInterface, scrollService ports.ScrollServiceInterface,
) ports.UiDevServiceInterface {
return &UiDevService{
watchPaths: make([]string, 0),
isWatching: false,
watchPaths: make([]string, 0),
isWatching: false,
queueManager: queueManager,
scrollService: scrollService,
}
}

func (uds *UiDevService) SetCommands(commands map[string]*domain.CommandInstructionSet) {
uds.mu.Lock()
defer uds.mu.Unlock()
uds.commands = commands
for key, cmd := range commands {
uds.scrollService.AddTemporaryCommand(key, cmd)
}
}

Expand Down Expand Up @@ -215,7 +232,10 @@ func (uds *UiDevService) processEvents() {
logger.Log().Info("File watcher events channel closed")
return
}

if event.Op&fsnotify.Chmod == fsnotify.Chmod {
// Ignore chmod events
continue
}
uds.handleFileEvent(event)

case err, ok := <-uds.watcher.Errors:
Expand All @@ -238,6 +258,18 @@ func (uds *UiDevService) handleFileEvent(event fsnotify.Event) {
relativePath = relPath
}
}
uds.events++
if uds.commands != nil && len(uds.commands) > 0 {
go func() {
e := uds.events - 1
for uds.events != e {
e = uds.events
for key, _ := range uds.commands {
uds.queueManager.AddTempItemWithWait(key)
}
}
}()
}

changeEvent := FileChangeEvent{
Path: relativePath,
Expand Down
Loading