From 1058661389b502ee66781f0b833d05d22887a81d Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Tue, 24 Feb 2026 17:36:42 +0100 Subject: [PATCH 01/38] refactor(handlers): refactor prepareOutput to prepareOutputAndRunCommand --- pkg/router/handlers/execution_handler.go | 37 ++++++++++++++++-------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index ea047cd..d9f2d1d 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -22,6 +22,8 @@ import ( "github.com/dkarczmarski/webcmd/pkg/httpx" ) +var ErrCommandFailed = errors.New("command failed") + // ExecutionHandler returns a WebHandler that executes the command associated with the URLCommand stored in the // request context. // It builds the command from the configured template and request parameters, prepares the response output, @@ -52,19 +54,12 @@ func ExecutionHandler(runner cmdrunner.Runner, registry *callgate.Registry) http return err } - writer, async, err := prepareOutput(responseWriter, cmd.CommandConfig.OutputType) - if err != nil { - return err - } - - return runCommand( + return prepareOutputAndRunCommand( request.Context(), runner, registry, cmd, cmdResult, - writer, - async, responseWriter, ) }) @@ -220,19 +215,28 @@ func executeCommand( return handleSyncWait(ctx, runner, cmd, graceTerminationTimeout) } -func prepareOutput(responseWriter http.ResponseWriter, outputType string) (io.Writer, bool, error) { +func prepareOutputAndRunCommand( + ctx context.Context, + runner cmdrunner.Runner, + registry *callgate.Registry, + cmd *config.URLCommand, + cmdResult *cmdbuilder.Result, + responseWriter http.ResponseWriter, +) error { var ( writer io.Writer async bool ) + outputType := cmd.CommandConfig.OutputType + switch outputType { case "none": writer = io.Discard async = true case "stream": if _, ok := responseWriter.(http.Flusher); !ok { - return nil, false, httpx.NewWebError( + return httpx.NewWebError( fmt.Errorf("streaming not supported: %w", ErrBadConfiguration), http.StatusInternalServerError, "", @@ -250,14 +254,23 @@ func prepareOutput(responseWriter http.ResponseWriter, outputType string) (io.Wr responseWriter.Header().Set("Content-Type", "text/plain; charset=utf-8") default: - return nil, false, httpx.NewWebError( + return httpx.NewWebError( fmt.Errorf("%w: unknown output type %q", ErrBadConfiguration, outputType), http.StatusInternalServerError, "", ) } - return writer, async, nil + return runCommand( + ctx, + runner, + registry, + cmd, + cmdResult, + writer, + async, + responseWriter, + ) } func extractQueryParams(request *http.Request) map[string]string { From 872049d8cd906ab6e96e0cd092c0a6251ad2aba8 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 16:41:04 +0100 Subject: [PATCH 02/38] refactor(handlers): prepareOutputAndRunCommand --- pkg/router/handlers/execution_handler.go | 97 ++++++++++++++++++------ 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index d9f2d1d..f680611 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -223,36 +223,15 @@ func prepareOutputAndRunCommand( cmdResult *cmdbuilder.Result, responseWriter http.ResponseWriter, ) error { - var ( - writer io.Writer - async bool - ) - outputType := cmd.CommandConfig.OutputType switch outputType { case "none": - writer = io.Discard - async = true + return prepareOutputAndRunAsyncCommand(ctx, runner, registry, cmd, cmdResult, responseWriter) case "stream": - if _, ok := responseWriter.(http.Flusher); !ok { - return httpx.NewWebError( - fmt.Errorf("streaming not supported: %w", ErrBadConfiguration), - http.StatusInternalServerError, - "", - ) - } - - writer = newFlushResponseWriter(responseWriter) - - responseWriter.Header().Set("Content-Type", "text/plain; charset=utf-8") - responseWriter.Header().Set("Cache-Control", "no-cache") - // nginx: - responseWriter.Header().Set("X-Accel-Buffering", "no") + return prepareOutputAndRunStreamCommand(ctx, runner, registry, cmd, cmdResult, responseWriter) case "", "text": - writer = responseWriter - - responseWriter.Header().Set("Content-Type", "text/plain; charset=utf-8") + return prepareOutputAndRunSyncCommand(ctx, runner, registry, cmd, cmdResult, responseWriter) default: return httpx.NewWebError( fmt.Errorf("%w: unknown output type %q", ErrBadConfiguration, outputType), @@ -260,6 +239,74 @@ func prepareOutputAndRunCommand( "", ) } +} + +func prepareOutputAndRunAsyncCommand( + ctx context.Context, + runner cmdrunner.Runner, + registry *callgate.Registry, + cmd *config.URLCommand, + cmdResult *cmdbuilder.Result, + responseWriter http.ResponseWriter, +) error { + return runCommand( + ctx, + runner, + registry, + cmd, + cmdResult, + io.Discard, + true, + responseWriter, + ) +} + +func prepareOutputAndRunStreamCommand( + ctx context.Context, + runner cmdrunner.Runner, + registry *callgate.Registry, + cmd *config.URLCommand, + cmdResult *cmdbuilder.Result, + responseWriter http.ResponseWriter, +) error { + if _, ok := responseWriter.(http.Flusher); !ok { + return httpx.NewWebError( + fmt.Errorf("streaming not supported: %w", ErrBadConfiguration), + http.StatusInternalServerError, + "", + ) + } + + writer := newFlushResponseWriter(responseWriter) + + responseWriter.Header().Set("Content-Type", "text/plain; charset=utf-8") + responseWriter.Header().Set("Cache-Control", "no-cache") + // nginx: + responseWriter.Header().Set("X-Accel-Buffering", "no") + + return runCommand( + ctx, + runner, + registry, + cmd, + cmdResult, + writer, + false, + responseWriter, + ) +} + +func prepareOutputAndRunSyncCommand( + ctx context.Context, + runner cmdrunner.Runner, + registry *callgate.Registry, + cmd *config.URLCommand, + cmdResult *cmdbuilder.Result, + responseWriter http.ResponseWriter, +) error { + writer := responseWriter + + responseWriter.Header().Set("Content-Type", "text/plain; charset=utf-8") return runCommand( ctx, @@ -268,7 +315,7 @@ func prepareOutputAndRunCommand( cmd, cmdResult, writer, - async, + false, responseWriter, ) } From 849c0b991a03174313edbf9e818ce760cf5e1acb Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 16:56:09 +0100 Subject: [PATCH 03/38] refactor(handlers): refactor ExecutionHandler to translate errors --- pkg/router/handlers/execution_handler.go | 59 +++++++++++++++--------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index f680611..dc6d4cd 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -36,33 +36,46 @@ var ErrCommandFailed = errors.New("command failed") // response body. func ExecutionHandler(runner cmdrunner.Runner, registry *callgate.Registry) httpx.WebHandler { //nolint:ireturn return httpx.WebHandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) error { - rid := requestIDFromContext(request.Context()) - log.Printf("[INFO] rid=%s Executing command for: %s %s", rid, request.Method, request.URL.Path) + return translateError(executionHandler(responseWriter, request, runner, registry)) + }) +} - cmd, err := getURLCommandFromContext(request) - if err != nil { - return httpx.NewWebError(err, http.StatusNotFound, "Command not found") - } +func executionHandler( + responseWriter http.ResponseWriter, + request *http.Request, + runner cmdrunner.Runner, + registry *callgate.Registry, +) error { + rid := requestIDFromContext(request.Context()) + log.Printf("[INFO] rid=%s Executing command for: %s %s", rid, request.Method, request.URL.Path) - params, err := extractParams(request, cmd) - if err != nil { - return err - } + cmd, err := getURLCommandFromContext(request) + if err != nil { + return httpx.NewWebError(err, http.StatusNotFound, "Command not found") + } - cmdResult, err := buildCommand(cmd.CommandConfig.CommandTemplate, params) - if err != nil { - return err - } + params, err := extractParams(request, cmd) + if err != nil { + return err + } - return prepareOutputAndRunCommand( - request.Context(), - runner, - registry, - cmd, - cmdResult, - responseWriter, - ) - }) + cmdResult, err := buildCommand(cmd.CommandConfig.CommandTemplate, params) + if err != nil { + return err + } + + return prepareOutputAndRunCommand( + request.Context(), + runner, + registry, + cmd, + cmdResult, + responseWriter, + ) +} + +func translateError(err error) error { + return err } func runCommand( From acac9984edbce15902e9c6048f9812bf5b3a7a35 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 18:14:17 +0100 Subject: [PATCH 04/38] refactor(handlers): refactor tests to use X-Error-Message instead of body string --- pkg/router/handlers/execution_handler.go | 6 +-- pkg/router/handlers/execution_handler_test.go | 41 ++++++++++++++++--- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index dc6d4cd..98f87f3 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -249,7 +249,7 @@ func prepareOutputAndRunCommand( return httpx.NewWebError( fmt.Errorf("%w: unknown output type %q", ErrBadConfiguration, outputType), http.StatusInternalServerError, - "", + "unknown output type", ) } } @@ -286,7 +286,7 @@ func prepareOutputAndRunStreamCommand( return httpx.NewWebError( fmt.Errorf("streaming not supported: %w", ErrBadConfiguration), http.StatusInternalServerError, - "", + "response writer does not support flushing", ) } @@ -377,7 +377,7 @@ func processBodyAsJSON(bodyBytes []byte, params map[string]interface{}) error { return httpx.NewWebError( fmt.Errorf("failed to parse JSON body: %w", err), http.StatusBadRequest, - "", + "must be a JSON object", ) } diff --git a/pkg/router/handlers/execution_handler_test.go b/pkg/router/handlers/execution_handler_test.go index 017ab6b..0e4c554 100644 --- a/pkg/router/handlers/execution_handler_test.go +++ b/pkg/router/handlers/execution_handler_test.go @@ -171,8 +171,9 @@ func TestExecutionHandler_NoCommandInContext(t *testing.T) { t.Errorf("expected status 404, got %d", rr.Code) } - if !strings.Contains(rr.Body.String(), "Command not found") { - t.Errorf("expected body to contain 'Command not found', got %q", rr.Body.String()) + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "Command not found") { + t.Errorf("expected X-Error-Message to contain 'Command not found', got %q", errMsg) } } @@ -305,6 +306,11 @@ func TestExecutionHandler_ExtractParams_BodyReadError(t *testing.T) { if rr.Code != http.StatusInternalServerError { t.Errorf("expected status 500, got %d", rr.Code) } + + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "read error") { + t.Errorf("expected X-Error-Message to contain 'read error', got %q", errMsg) + } } //nolint:dupl @@ -438,6 +444,11 @@ func TestExecutionHandler_BodyAsJSON_Invalid(t *testing.T) { if rr.Code != http.StatusBadRequest { t.Errorf("expected status 400, got %d", rr.Code) } + + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "must be a JSON object") { + t.Errorf("expected X-Error-Message to contain 'must be a JSON object', got %q", errMsg) + } } func TestExecutionHandler_BodyAsJSON_NonObject(t *testing.T) { @@ -485,6 +496,11 @@ func TestExecutionHandler_BodyAsJSON_NonObject(t *testing.T) { if rr.Code != http.StatusBadRequest { t.Errorf("%s: expected status 400, got %d", tc.name, rr.Code) } + + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "must be a JSON object") { + t.Errorf("%s: expected X-Error-Message to contain 'must be a JSON object', got %q", tc.name, errMsg) + } }) } } @@ -578,6 +594,11 @@ func TestExecutionHandler_BuildCommand_Error(t *testing.T) { if rr.Code != http.StatusInternalServerError { t.Errorf("%s: expected status 500, got %d", tc.name, rr.Code) } + + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "error building command") { + t.Errorf("%s: expected X-Error-Message to contain 'error building command', got %q", tc.name, errMsg) + } }) } } @@ -751,6 +772,11 @@ func TestExecutionHandler_PrepareOutput_Stream_Failure(t *testing.T) { if rr.Code != http.StatusInternalServerError { t.Errorf("expected status 500, got %d", rr.Code) } + + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "response writer does not support flushing") { + t.Errorf("expected X-Error-Message to contain 'response writer does not support flushing', got %q", errMsg) + } } func TestExecutionHandler_PrepareOutput_None(t *testing.T) { @@ -884,9 +910,9 @@ func TestExecutionHandler_UnknownCallGateMode(t *testing.T) { t.Fatalf("expected non-200 status for invalid callgate mode, got %d, body=%q", rr.Code, rr.Body.String()) } - body := rr.Body.String() - if !strings.Contains(body, "callgate registry") { - t.Errorf("expected response body to contain %q, got %q", "callgate registry", body) + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "callgate registry") { + t.Errorf("expected X-Error-Message to contain %q, got %q", "callgate registry", errMsg) } } @@ -919,6 +945,11 @@ func TestExecutionHandler_PrepareOutput_Unknown(t *testing.T) { if rr.Code != http.StatusInternalServerError { t.Errorf("expected status 500, got %d", rr.Code) } + + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "unknown output type") { + t.Errorf("expected X-Error-Message to contain 'unknown output type', got %q", errMsg) + } } func TestExecutionHandler_ExecuteCommand_StartError_WritesFailedToStart(t *testing.T) { From cf1f5ca22c8ce59f198a0206e826fb097f00b120 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 18:20:47 +0100 Subject: [PATCH 05/38] refactor(handlers): refactor error handling --- pkg/router/handlers/execution_handler.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index 98f87f3..fe43a3d 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -75,6 +75,14 @@ func executionHandler( } func translateError(err error) error { + if err == nil { + return nil + } + + if errors.Is(err, callgate.ErrBusy) { + return httpx.NewWebError(err, http.StatusTooManyRequests, "Too many requests") + } + return err } @@ -111,14 +119,12 @@ func runCommand( gate, gateErr := registry.GetOrCreate(groupName, cmd.CallGate.Mode) if gateErr != nil { - return httpx.NewWebError( - gateErr, http.StatusInternalServerError, fmt.Sprintf("callgate registry: %v", gateErr), - ) + return fmt.Errorf("callgate registry: %w", gateErr) } exitCode, err = runWithGate(ctx, action, gate) - if err != nil && errors.Is(err, callgate.ErrBusy) { - return httpx.NewWebError(err, http.StatusTooManyRequests, "Too many requests") + if err != nil { + return err } } else { exitCode, err = action(ctx) From 77810d7f6e5b4e815f68c6a6f5c309fd04df06a4 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 18:39:24 +0100 Subject: [PATCH 06/38] refactor(handlers): refactor logging --- pkg/router/handlers/execution_handler.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index fe43a3d..a9c8f26 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -99,8 +99,13 @@ func runCommand( rid := requestIDFromContext(ctx) action := func(ctx context.Context) (int, error) { + command := cmdResult.Command + arguments := cmdResult.Arguments + rid := requestIDFromContext(ctx) + log.Printf("[INFO] rid=%s Executing command: %s %v", rid, command, arguments) + return executeCommand( - ctx, runner, cmdResult.Command, cmdResult.Arguments, writer, async, cmd.GraceTerminationTimeout, + ctx, runner, command, arguments, writer, async, cmd.GraceTerminationTimeout, ) } @@ -209,9 +214,6 @@ func executeCommand( async bool, graceTerminationTimeout *time.Duration, ) (int, error) { - rid := requestIDFromContext(ctx) - log.Printf("[INFO] rid=%s Executing command: %s %v", rid, command, arguments) - cmd := runner.Command(command, arguments...) //nolint:exhaustruct From 362cade5b74d2be7ff534af03b54cae432137787 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 18:53:04 +0100 Subject: [PATCH 07/38] refactor(handlers): extract executeCommand logic into processrunner package --- pkg/processrunner/process_runner.go | 183 +++++++++++++++++++++++ pkg/router/handlers/execution_handler.go | 153 ++----------------- pkg/router/handlers/handlers.go | 8 +- 3 files changed, 198 insertions(+), 146 deletions(-) create mode 100644 pkg/processrunner/process_runner.go diff --git a/pkg/processrunner/process_runner.go b/pkg/processrunner/process_runner.go new file mode 100644 index 0000000..19a5c95 --- /dev/null +++ b/pkg/processrunner/process_runner.go @@ -0,0 +1,183 @@ +package processrunner + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "os/exec" + "syscall" + "time" + + "github.com/dkarczmarski/webcmd/pkg/cmdrunner" +) + +type contextKey string + +const RequestIDKey contextKey = "requestID" + +type Process struct { + cmd cmdrunner.Command + runner cmdrunner.Runner + timeout *time.Duration +} + +func StartProcess( + runner cmdrunner.Runner, + command string, + args []string, + writer io.Writer, + graceTimeout *time.Duration, +) (*Process, error) { + cmd := runner.Command(command, args...) + + //nolint:exhaustruct + cmd.SetSysProcAttr(&syscall.SysProcAttr{ + Setpgid: true, + }) + cmd.SetStdout(writer) + cmd.SetStderr(writer) + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start command: %w", err) + } + + return &Process{ + cmd: cmd, + runner: runner, + timeout: graceTimeout, + }, nil +} + +func (p *Process) WaitSync(ctx context.Context) (int, error) { + done := make(chan struct{}) + + go func() { + p.terminateOnContextDone(ctx, done) + }() + + err := p.cmd.Wait() + + close(done) + + return p.determineExitCodeAndError(ctx, err) +} + +func (p *Process) WaitAsync(ctx context.Context) { + rid := requestIDFromContext(ctx) + done := make(chan struct{}) + + go func() { + log.Printf("[INFO] rid=%s Asynchronously waiting for command to finish", rid) + + waitErr := p.cmd.Wait() + + close(done) + + if waitErr != nil { + log.Printf("[ERROR] rid=%s Asynchronous command failed, error: %v", rid, waitErr) + } else { + log.Printf("[INFO] rid=%s Asynchronous command finished successfully", rid) + } + }() + + go func() { + p.terminateOnContextDone(ctx, done) + }() +} + +func (p *Process) terminateOnContextDone( + ctx context.Context, + done <-chan struct{}, +) { + rid := requestIDFromContext(ctx) + select { + case <-ctx.Done(): + pid := p.cmd.Pid() + + if p.timeout == nil { + log.Printf( + "[INFO] rid=%s Context closed, no grace termination timeout set, sending SIGKILL to process group", + rid, + ) + p.signalProcessGroup(pid, syscall.SIGKILL) + + return + } + + log.Printf("[INFO] rid=%s Context closed, sending SIGTERM to process group", rid) + p.signalProcessGroup(pid, syscall.SIGTERM) + + t := time.NewTimer(*p.timeout) + defer t.Stop() + + select { + case <-t.C: + log.Printf("[INFO] rid=%s Process still running after %v, sending SIGKILL to process group", + rid, *p.timeout) + p.signalProcessGroup(pid, syscall.SIGKILL) + case <-done: + } + + case <-done: + } +} + +func (p *Process) signalProcessGroup(pid int, sig syscall.Signal) { + if pid <= 0 { + log.Printf("[WARN] Cannot send %s to process group: PID is %d", sig, pid) + + return + } + + pgid := -pid + if err := p.runner.Kill(pgid, sig); err != nil { + log.Printf("[ERROR] Failed to send %s to process group %d: %v", sig, pgid, err) + } +} + +func (p *Process) determineExitCodeAndError(ctx context.Context, err error) (int, error) { + if err != nil { + if p.isTimeoutOrCanceled(ctx) { + // Timeout or cancellation takes precedence over other errors as this is intentional. + //nolint:wrapcheck + return -1, ctx.Err() + } + + var exitError *exec.ExitError + if errors.As(err, &exitError) { + return exitError.ExitCode(), err + } + + return -1, err + } + + if p.cmd.ProcessState() != nil { + return p.cmd.ProcessState().ExitCode(), nil + } + + return 0, nil +} + +func (p *Process) isTimeoutOrCanceled(ctx context.Context) bool { + return ctx.Err() != nil && (errors.Is(ctx.Err(), context.DeadlineExceeded) || errors.Is(ctx.Err(), context.Canceled)) +} + +// requestIDFromContext extracts request ID from context. +func requestIDFromContext(ctx context.Context) string { + if v := ctx.Value(RequestIDKey); v != nil { + if rid, ok := v.(string); ok && rid != "" { + return rid + } + } + + // Try with string key for compatibility + if v := ctx.Value("requestID"); v != nil { + if rid, ok := v.(string); ok && rid != "" { + return rid + } + } + + return "-" +} diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index a9c8f26..006c2b3 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -10,9 +10,7 @@ import ( "io" "log" "net/http" - "os/exec" "strings" - "syscall" "time" "github.com/dkarczmarski/webcmd/pkg/callgate" @@ -20,6 +18,7 @@ import ( "github.com/dkarczmarski/webcmd/pkg/cmdrunner" "github.com/dkarczmarski/webcmd/pkg/config" "github.com/dkarczmarski/webcmd/pkg/httpx" + "github.com/dkarczmarski/webcmd/pkg/processrunner" ) var ErrCommandFailed = errors.New("command failed") @@ -214,26 +213,23 @@ func executeCommand( async bool, graceTerminationTimeout *time.Duration, ) (int, error) { - cmd := runner.Command(command, arguments...) - - //nolint:exhaustruct - cmd.SetSysProcAttr(&syscall.SysProcAttr{ - Setpgid: true, - }) - cmd.SetStdout(writer) - cmd.SetStderr(writer) - - if err := cmd.Start(); err != nil { - return -1, fmt.Errorf("failed to start command: %w", err) + proc, err := processrunner.StartProcess(runner, command, arguments, writer, graceTerminationTimeout) + if err != nil { + return -1, fmt.Errorf("failed to start process: %w", err) } if async { - handleAsyncWait(ctx, runner, cmd, graceTerminationTimeout) + proc.WaitAsync(ctx) return 0, nil } - return handleSyncWait(ctx, runner, cmd, graceTerminationTimeout) + exitCode, err := proc.WaitSync(ctx) + if err != nil { + return exitCode, fmt.Errorf("process wait failed: %w", err) + } + + return exitCode, nil } func prepareOutputAndRunCommand( @@ -394,133 +390,6 @@ func processBodyAsJSON(bodyBytes []byte, params map[string]interface{}) error { return nil } -func handleSyncWait( - ctx context.Context, - runner cmdrunner.Runner, - cmd cmdrunner.Command, - graceTerminationTimeout *time.Duration, -) (int, error) { - done := make(chan struct{}) - - go func() { - terminateOnContextDone(ctx, runner, done, cmd, graceTerminationTimeout) - }() - - err := cmd.Wait() - - close(done) - - return determineExitCodeAndError(ctx, cmd, err) -} - -func handleAsyncWait( - ctx context.Context, - runner cmdrunner.Runner, - cmd cmdrunner.Command, - graceTerminationTimeout *time.Duration, -) { - rid := requestIDFromContext(ctx) - done := make(chan struct{}) - - go func() { - log.Printf("[INFO] rid=%s Asynchronously waiting for command to finish", rid) - - waitErr := cmd.Wait() - - close(done) - - if waitErr != nil { - log.Printf("[ERROR] rid=%s Asynchronous command failed, error: %v", rid, waitErr) - } else { - log.Printf("[INFO] rid=%s Asynchronous command finished successfully", rid) - } - }() - - go func() { - terminateOnContextDone(ctx, runner, done, cmd, graceTerminationTimeout) - }() -} - -func terminateOnContextDone( - ctx context.Context, - runner cmdrunner.Runner, - done <-chan struct{}, - cmd cmdrunner.Command, - graceTerminationTimeout *time.Duration, -) { - rid := requestIDFromContext(ctx) - select { - case <-ctx.Done(): - pid := cmd.Pid() - - if graceTerminationTimeout == nil { - log.Printf( - "[INFO] rid=%s Context closed, no grace termination timeout set, sending SIGKILL to process group", - rid, - ) - signalProcessGroup(runner, pid, syscall.SIGKILL) - - return - } - - log.Printf("[INFO] rid=%s Context closed, sending SIGTERM to process group", rid) - signalProcessGroup(runner, pid, syscall.SIGTERM) - - t := time.NewTimer(*graceTerminationTimeout) - defer t.Stop() - - select { - case <-t.C: - log.Printf("[INFO] rid=%s Process still running after %v, sending SIGKILL to process group", - rid, *graceTerminationTimeout) - signalProcessGroup(runner, pid, syscall.SIGKILL) - case <-done: - } - - case <-done: - } -} - -func signalProcessGroup(runner cmdrunner.Runner, pid int, sig syscall.Signal) { - if pid <= 0 { - log.Printf("[WARN] Cannot send %s to process group: PID is %d", sig, pid) - - return - } - - pgid := -pid - if err := runner.Kill(pgid, sig); err != nil { - log.Printf("[ERROR] Failed to send %s to process group %d: %v", sig, pgid, err) - } -} - -func determineExitCodeAndError(ctx context.Context, cmd cmdrunner.Command, err error) (int, error) { - if err != nil { - if isTimeoutOrCanceled(ctx) { - // Timeout or cancellation takes precedence over other errors as this is intentional. - //nolint:wrapcheck - return -1, ctx.Err() - } - - var exitError *exec.ExitError - if errors.As(err, &exitError) { - return exitError.ExitCode(), err - } - - return -1, err - } - - if cmd.ProcessState() != nil { - return cmd.ProcessState().ExitCode(), nil - } - - return 0, nil -} - -func isTimeoutOrCanceled(ctx context.Context) bool { - return ctx.Err() != nil && (errors.Is(ctx.Err(), context.DeadlineExceeded) || errors.Is(ctx.Err(), context.Canceled)) -} - func setNestedParam(params map[string]interface{}, parentKey, childKey string, value interface{}) { if _, ok := params[parentKey]; !ok { params[parentKey] = make(map[string]interface{}) diff --git a/pkg/router/handlers/handlers.go b/pkg/router/handlers/handlers.go index de0954f..16d72a3 100644 --- a/pkg/router/handlers/handlers.go +++ b/pkg/router/handlers/handlers.go @@ -20,16 +20,16 @@ var ( ErrBadConfiguration = errors.New("bad configuration") ) -type contextKey string +type ContextKey string // AuthNameKey is the context key used to store and retrieve the authorization name. -const AuthNameKey contextKey = "authName" +const AuthNameKey ContextKey = "authName" // URLCommandKey is the context key used to store and retrieve the URL command. -const URLCommandKey contextKey = "urlCommand" +const URLCommandKey ContextKey = "urlCommand" // RequestIDKey is the context key used to store and retrieve the request ID. -const RequestIDKey contextKey = "requestID" +const RequestIDKey ContextKey = "requestID" // RequestIDMiddleware creates a new Middleware that extracts the request ID from the X-Request-Id header, // or generates a new one if not present, and adds it to the request context under RequestIDKey. From e3c7fdaff04c9bd28103480d6d81d6f3f128f922 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 19:10:33 +0100 Subject: [PATCH 08/38] refactor(processrunner): refactor WaitAsync() --- pkg/processrunner/process_runner.go | 23 +++++++++-------------- pkg/router/handlers/execution_handler.go | 12 +++++++++++- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/pkg/processrunner/process_runner.go b/pkg/processrunner/process_runner.go index 19a5c95..bb6865a 100644 --- a/pkg/processrunner/process_runner.go +++ b/pkg/processrunner/process_runner.go @@ -64,27 +64,22 @@ func (p *Process) WaitSync(ctx context.Context) (int, error) { return p.determineExitCodeAndError(ctx, err) } -func (p *Process) WaitAsync(ctx context.Context) { - rid := requestIDFromContext(ctx) +func (p *Process) WaitAsync(ctx context.Context) error { done := make(chan struct{}) go func() { - log.Printf("[INFO] rid=%s Asynchronously waiting for command to finish", rid) - - waitErr := p.cmd.Wait() + p.terminateOnContextDone(ctx, done) + }() + if err := p.cmd.Wait(); err != nil { close(done) - if waitErr != nil { - log.Printf("[ERROR] rid=%s Asynchronous command failed, error: %v", rid, waitErr) - } else { - log.Printf("[INFO] rid=%s Asynchronous command finished successfully", rid) - } - }() + return fmt.Errorf("command wait: %w", err) + } - go func() { - p.terminateOnContextDone(ctx, done) - }() + close(done) + + return nil } func (p *Process) terminateOnContextDone( diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index 006c2b3..c48fca0 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -219,7 +219,17 @@ func executeCommand( } if async { - proc.WaitAsync(ctx) + rid := requestIDFromContext(ctx) + + go func() { + log.Printf("[INFO] rid=%s Asynchronously waiting for command to finish", rid) + + if err := proc.WaitAsync(ctx); err != nil { + log.Printf("[ERROR] rid=%s Asynchronous command failed, error: %v", rid, err) + } else { + log.Printf("[INFO] rid=%s Asynchronous command finished successfully", rid) + } + }() return 0, nil } From 51ac46b885565f4609cd173f6cfa3f040296a8a8 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 20:13:30 +0100 Subject: [PATCH 09/38] refactor(processrunner): refactor WaitAsync() --- pkg/processrunner/process_runner.go | 30 +++++++++++++++++------- pkg/router/handlers/execution_handler.go | 9 ++++--- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pkg/processrunner/process_runner.go b/pkg/processrunner/process_runner.go index bb6865a..c4f477f 100644 --- a/pkg/processrunner/process_runner.go +++ b/pkg/processrunner/process_runner.go @@ -64,22 +64,34 @@ func (p *Process) WaitSync(ctx context.Context) (int, error) { return p.determineExitCodeAndError(ctx, err) } -func (p *Process) WaitAsync(ctx context.Context) error { +type Result struct { + ExitCode int + Err error +} + +func (p *Process) WaitAsync(ctx context.Context) <-chan Result { + resultCh := make(chan Result, 1) done := make(chan struct{}) go func() { - p.terminateOnContextDone(ctx, done) - }() + defer close(done) + defer close(resultCh) - if err := p.cmd.Wait(); err != nil { - close(done) + err := p.cmd.Wait() - return fmt.Errorf("command wait: %w", err) - } + exitCode, finalErr := p.determineExitCodeAndError(ctx, err) - close(done) + resultCh <- Result{ + ExitCode: exitCode, + Err: finalErr, + } + }() + + go func() { + p.terminateOnContextDone(ctx, done) + }() - return nil + return resultCh } func (p *Process) terminateOnContextDone( diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index c48fca0..aedb4fe 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -224,10 +224,13 @@ func executeCommand( go func() { log.Printf("[INFO] rid=%s Asynchronously waiting for command to finish", rid) - if err := proc.WaitAsync(ctx); err != nil { - log.Printf("[ERROR] rid=%s Asynchronous command failed, error: %v", rid, err) + result := <-proc.WaitAsync(ctx) + if result.Err != nil { + log.Printf("[ERROR] rid=%s Asynchronous command failed (exit code: %d), error: %v", + rid, result.ExitCode, result.Err) } else { - log.Printf("[INFO] rid=%s Asynchronous command finished successfully", rid) + log.Printf("[INFO] rid=%s Asynchronous command finished successfully (exit code: %d)", + rid, result.ExitCode) } }() From 95646cf2a46fba5a5219e53c6612985cf2d75b9f Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 21:07:56 +0100 Subject: [PATCH 10/38] refactor(handlers): extract runCommand logic into gateexec package --- pkg/gateexec/gateexec.go | 58 ++++++++++++++++++++++ pkg/router/handlers/execution_handler.go | 63 ++++++++++-------------- 2 files changed, 83 insertions(+), 38 deletions(-) create mode 100644 pkg/gateexec/gateexec.go diff --git a/pkg/gateexec/gateexec.go b/pkg/gateexec/gateexec.go new file mode 100644 index 0000000..10e2e30 --- /dev/null +++ b/pkg/gateexec/gateexec.go @@ -0,0 +1,58 @@ +package gateexec + +import ( + "context" + "fmt" + + "github.com/dkarczmarski/webcmd/pkg/callgate" + "github.com/dkarczmarski/webcmd/pkg/config" +) + +type Action func(context.Context) (result int, done <-chan struct{}, err error) + +type Executor struct { + registry *callgate.Registry +} + +func New(registry *callgate.Registry) *Executor { + return &Executor{registry: registry} +} + +func (e *Executor) Run( + ctx context.Context, + gateCfg *config.CallGateConfig, + defaultGroup string, + action Action, +) (int, error) { + if gateCfg == nil || e.registry == nil { + exit, _, err := action(ctx) + + return exit, err + } + + group := defaultGroup + if gateCfg.GroupName != nil { + group = *gateCfg.GroupName + } + + gate, err := e.registry.GetOrCreate(group, gateCfg.Mode) + if err != nil { + return -1, fmt.Errorf("callgate registry: %w", err) + } + + release, err := gate.Acquire(ctx) + if err != nil { + return -1, fmt.Errorf("callgate acquire: %w", err) + } + + exit, done, runErr := action(ctx) + + release() + + //nolint:godox + // TODO: This channel will be used in the future to keep the gate held until async process finishes. + // Currently, the lock is released immediately after StartProcess, which is a known bug for async mode. + _ = done + + return exit, runErr +} diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index aedb4fe..88dc5de 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -17,6 +17,7 @@ import ( "github.com/dkarczmarski/webcmd/pkg/cmdbuilder" "github.com/dkarczmarski/webcmd/pkg/cmdrunner" "github.com/dkarczmarski/webcmd/pkg/config" + "github.com/dkarczmarski/webcmd/pkg/gateexec" "github.com/dkarczmarski/webcmd/pkg/httpx" "github.com/dkarczmarski/webcmd/pkg/processrunner" ) @@ -97,47 +98,44 @@ func runCommand( ) error { rid := requestIDFromContext(ctx) - action := func(ctx context.Context) (int, error) { + exec := gateexec.New(registry) + + action := func(ctx context.Context) (int, <-chan struct{}, error) { command := cmdResult.Command arguments := cmdResult.Arguments + rid := requestIDFromContext(ctx) log.Printf("[INFO] rid=%s Executing command: %s %v", rid, command, arguments) - return executeCommand( - ctx, runner, command, arguments, writer, async, cmd.GraceTerminationTimeout, - ) - } + exit, err := executeCommand(ctx, runner, command, arguments, writer, async, cmd.GraceTerminationTimeout) - var ( - exitCode int - err error - ) + //nolint:godox + // TODO: Currently we don't have done signaling (because async returns immediately) + return exit, nil, err + } - if cmd.CallGate != nil && registry != nil { - // Default to the unique endpoint identifier (Verb + Path) if groupName is not explicitly provided. - // This ensures that concurrency limits apply per-endpoint by default. - groupName := cmd.URL - if cmd.CallGate.GroupName != nil { - groupName = *cmd.CallGate.GroupName + // Default to the unique endpoint identifier (Verb + Path) from cmd.URL if groupName is not explicitly provided. + // This ensures that concurrency limits apply per-endpoint by default. + exitCode, err := exec.Run(ctx, cmd.CallGate, cmd.URL, action) + if err != nil { + if errors.Is(err, callgate.ErrBusy) || strings.Contains(err.Error(), "callgate registry") { + return translateError(err) } - gate, gateErr := registry.GetOrCreate(groupName, cmd.CallGate.Mode) - if gateErr != nil { - return fmt.Errorf("callgate registry: %w", gateErr) - } + log.Printf("[WARN] rid=%s Command failed with exit code: %d, error: %v", rid, exitCode, err) - exitCode, err = runWithGate(ctx, action, gate) - if err != nil { - return err + errorMessage := fmt.Sprintf("Command failed with exit code: %d, error: %v", exitCode, err) + if _, writeErr := responseWriter.Write([]byte(errorMessage)); writeErr != nil { + log.Printf("[ERROR] rid=%s Failed to write error message: %v", rid, writeErr) } - } else { - exitCode, err = action(ctx) + + return nil } - if exitCode != 0 || err != nil { - log.Printf("[WARN] rid=%s Command failed with exit code: %d, error: %v", rid, exitCode, err) + if exitCode != 0 { + log.Printf("[WARN] rid=%s Command failed with exit code: %d", rid, exitCode) - errorMessage := fmt.Sprintf("Command failed with exit code: %d, error: %v", exitCode, err) + errorMessage := fmt.Sprintf("Command failed with exit code: %d", exitCode) if _, writeErr := responseWriter.Write([]byte(errorMessage)); writeErr != nil { log.Printf("[ERROR] rid=%s Failed to write error message: %v", rid, writeErr) } @@ -412,14 +410,3 @@ func setNestedParam(params map[string]interface{}, parentKey, childKey string, v parentMap[childKey] = value } } - -func runWithGate(ctx context.Context, action func(context.Context) (int, error), gate callgate.CallGate) (int, error) { - release, err := gate.Acquire(ctx) - if err != nil { - return -1, fmt.Errorf("callgate acquire: %w", err) - } - - defer release() - - return action(ctx) -} From 78c1b3e2f48bab3c543c65b5db76d881200956b9 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 21:14:51 +0100 Subject: [PATCH 11/38] refactor(gateexec): refactor error handling --- pkg/gateexec/gateexec.go | 10 ++++++++-- pkg/router/handlers/execution_handler.go | 6 +++++- pkg/router/handlers/execution_handler_test.go | 8 ++++---- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/gateexec/gateexec.go b/pkg/gateexec/gateexec.go index 10e2e30..4913f01 100644 --- a/pkg/gateexec/gateexec.go +++ b/pkg/gateexec/gateexec.go @@ -2,12 +2,18 @@ package gateexec import ( "context" + "errors" "fmt" "github.com/dkarczmarski/webcmd/pkg/callgate" "github.com/dkarczmarski/webcmd/pkg/config" ) +var ( + ErrRegistry = errors.New("gate executor: registry error") + ErrAcquire = errors.New("gate executor: acquire error") +) + type Action func(context.Context) (result int, done <-chan struct{}, err error) type Executor struct { @@ -37,12 +43,12 @@ func (e *Executor) Run( gate, err := e.registry.GetOrCreate(group, gateCfg.Mode) if err != nil { - return -1, fmt.Errorf("callgate registry: %w", err) + return -1, fmt.Errorf("%w: %w", ErrRegistry, err) } release, err := gate.Acquire(ctx) if err != nil { - return -1, fmt.Errorf("callgate acquire: %w", err) + return -1, fmt.Errorf("%w: %w", ErrAcquire, err) } exit, done, runErr := action(ctx) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index 88dc5de..2a851d4 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -83,6 +83,10 @@ func translateError(err error) error { return httpx.NewWebError(err, http.StatusTooManyRequests, "Too many requests") } + if errors.Is(err, gateexec.ErrRegistry) { + return httpx.NewWebError(err, http.StatusInternalServerError, "Invalid callgate configuration") + } + return err } @@ -118,7 +122,7 @@ func runCommand( // This ensures that concurrency limits apply per-endpoint by default. exitCode, err := exec.Run(ctx, cmd.CallGate, cmd.URL, action) if err != nil { - if errors.Is(err, callgate.ErrBusy) || strings.Contains(err.Error(), "callgate registry") { + if errors.Is(err, callgate.ErrBusy) || errors.Is(err, gateexec.ErrRegistry) || errors.Is(err, gateexec.ErrAcquire) { return translateError(err) } diff --git a/pkg/router/handlers/execution_handler_test.go b/pkg/router/handlers/execution_handler_test.go index 0e4c554..6a4aff9 100644 --- a/pkg/router/handlers/execution_handler_test.go +++ b/pkg/router/handlers/execution_handler_test.go @@ -906,13 +906,13 @@ func TestExecutionHandler_UnknownCallGateMode(t *testing.T) { rr := httptest.NewRecorder() h.ServeHTTP(rr, req) - if rr.Code == http.StatusOK { - t.Fatalf("expected non-200 status for invalid callgate mode, got %d, body=%q", rr.Code, rr.Body.String()) + if rr.Code != http.StatusInternalServerError { + t.Errorf("expected status 500 for invalid callgate mode, got %d, body=%q", rr.Code, rr.Body.String()) } errMsg := rr.Header().Get("X-Error-Message") - if !strings.Contains(errMsg, "callgate registry") { - t.Errorf("expected X-Error-Message to contain %q, got %q", "callgate registry", errMsg) + if !strings.Contains(errMsg, "Invalid callgate configuration") { + t.Errorf("expected X-Error-Message to contain %q, got %q", "Invalid callgate configuration", errMsg) } } From 91189cdef84210e0ea42dfb0091086cfdc038ae1 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 21:34:14 +0100 Subject: [PATCH 12/38] fix(gateexec): release lock after process completion in async mode --- pkg/gateexec/gateexec.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/gateexec/gateexec.go b/pkg/gateexec/gateexec.go index 4913f01..ebace16 100644 --- a/pkg/gateexec/gateexec.go +++ b/pkg/gateexec/gateexec.go @@ -53,12 +53,14 @@ func (e *Executor) Run( exit, done, runErr := action(ctx) - release() - - //nolint:godox - // TODO: This channel will be used in the future to keep the gate held until async process finishes. - // Currently, the lock is released immediately after StartProcess, which is a known bug for async mode. - _ = done + if done != nil { + go func() { + <-done + release() + }() + } else { + release() + } return exit, runErr } From 9aa4acea9cb2bc08f5257c0e80c4846960354c83 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 21:50:06 +0100 Subject: [PATCH 13/38] fix(handlers): release lock after process completion in async mode --- pkg/router/handlers/execution_handler.go | 118 ++++++++++++----------- 1 file changed, 63 insertions(+), 55 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index 2a851d4..e50b22e 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -111,16 +111,50 @@ func runCommand( rid := requestIDFromContext(ctx) log.Printf("[INFO] rid=%s Executing command: %s %v", rid, command, arguments) - exit, err := executeCommand(ctx, runner, command, arguments, writer, async, cmd.GraceTerminationTimeout) + proc, err := startCommandProcess(runner, command, arguments, writer, cmd.GraceTerminationTimeout) + if err != nil { + return -1, nil, err + } + + if async { + return 0, waitAsyncAndLog(ctx, proc, rid), nil + } - //nolint:godox - // TODO: Currently we don't have done signaling (because async returns immediately) - return exit, nil, err + exitCode, err := proc.WaitSync(ctx) + if err != nil { + return exitCode, nil, fmt.Errorf("process wait failed: %w", err) + } + + return exitCode, nil, nil } - // Default to the unique endpoint identifier (Verb + Path) from cmd.URL if groupName is not explicitly provided. - // This ensures that concurrency limits apply per-endpoint by default. exitCode, err := exec.Run(ctx, cmd.CallGate, cmd.URL, action) + + return handleCommandResult(rid, exitCode, err, responseWriter) +} + +func waitAsyncAndLog(ctx context.Context, proc *processrunner.Process, rid string) <-chan struct{} { + resCh := proc.WaitAsync(ctx) + + done := make(chan struct{}) + + go func() { + defer close(done) + + result := <-resCh + if result.Err != nil { + log.Printf("[ERROR] rid=%s Asynchronous command failed (exit code: %d), error: %v", + rid, result.ExitCode, result.Err) + } else { + log.Printf("[INFO] rid=%s Asynchronous command finished successfully (exit code: %d)", + rid, result.ExitCode) + } + }() + + return done +} + +func handleCommandResult(rid string, exitCode int, err error, responseWriter http.ResponseWriter) error { if err != nil { if errors.Is(err, callgate.ErrBusy) || errors.Is(err, gateexec.ErrRegistry) || errors.Is(err, gateexec.ErrAcquire) { return translateError(err) @@ -128,10 +162,7 @@ func runCommand( log.Printf("[WARN] rid=%s Command failed with exit code: %d, error: %v", rid, exitCode, err) - errorMessage := fmt.Sprintf("Command failed with exit code: %d, error: %v", exitCode, err) - if _, writeErr := responseWriter.Write([]byte(errorMessage)); writeErr != nil { - log.Printf("[ERROR] rid=%s Failed to write error message: %v", rid, writeErr) - } + writeErrorMessage(rid, responseWriter, fmt.Sprintf("Command failed with exit code: %d, error: %v", exitCode, err)) return nil } @@ -139,15 +170,33 @@ func runCommand( if exitCode != 0 { log.Printf("[WARN] rid=%s Command failed with exit code: %d", rid, exitCode) - errorMessage := fmt.Sprintf("Command failed with exit code: %d", exitCode) - if _, writeErr := responseWriter.Write([]byte(errorMessage)); writeErr != nil { - log.Printf("[ERROR] rid=%s Failed to write error message: %v", rid, writeErr) - } + writeErrorMessage(rid, responseWriter, fmt.Sprintf("Command failed with exit code: %d", exitCode)) } return nil } +func writeErrorMessage(rid string, responseWriter http.ResponseWriter, message string) { + if _, writeErr := responseWriter.Write([]byte(message)); writeErr != nil { + log.Printf("[ERROR] rid=%s Failed to write error message: %v", rid, writeErr) + } +} + +func startCommandProcess( + runner cmdrunner.Runner, + command string, + arguments []string, + writer io.Writer, + graceTerminationTimeout *time.Duration, +) (*processrunner.Process, error) { + proc, err := processrunner.StartProcess(runner, command, arguments, writer, graceTerminationTimeout) + if err != nil { + return nil, fmt.Errorf("failed to start process: %w", err) + } + + return proc, nil +} + func getURLCommandFromContext(request *http.Request) (*config.URLCommand, error) { valCmd := request.Context().Value(URLCommandKey) if valCmd == nil { @@ -206,47 +255,6 @@ func buildCommand( return &cmdResult, nil } -func executeCommand( - ctx context.Context, - runner cmdrunner.Runner, - command string, - arguments []string, - writer io.Writer, - async bool, - graceTerminationTimeout *time.Duration, -) (int, error) { - proc, err := processrunner.StartProcess(runner, command, arguments, writer, graceTerminationTimeout) - if err != nil { - return -1, fmt.Errorf("failed to start process: %w", err) - } - - if async { - rid := requestIDFromContext(ctx) - - go func() { - log.Printf("[INFO] rid=%s Asynchronously waiting for command to finish", rid) - - result := <-proc.WaitAsync(ctx) - if result.Err != nil { - log.Printf("[ERROR] rid=%s Asynchronous command failed (exit code: %d), error: %v", - rid, result.ExitCode, result.Err) - } else { - log.Printf("[INFO] rid=%s Asynchronous command finished successfully (exit code: %d)", - rid, result.ExitCode) - } - }() - - return 0, nil - } - - exitCode, err := proc.WaitSync(ctx) - if err != nil { - return exitCode, fmt.Errorf("process wait failed: %w", err) - } - - return exitCode, nil -} - func prepareOutputAndRunCommand( ctx context.Context, runner cmdrunner.Runner, From 480ac26aed4e3106f0d2ed77bbd808fbf0d17783 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 22:13:34 +0100 Subject: [PATCH 14/38] refactor(processrunner): remove logging and request ID handling --- pkg/processrunner/process_runner.go | 63 +++++++++-------------------- 1 file changed, 19 insertions(+), 44 deletions(-) diff --git a/pkg/processrunner/process_runner.go b/pkg/processrunner/process_runner.go index c4f477f..518f184 100644 --- a/pkg/processrunner/process_runner.go +++ b/pkg/processrunner/process_runner.go @@ -1,3 +1,6 @@ +// Package processrunner provides process lifecycle management: +// start process in its own process group, wait synchronously/asynchronously, +// and terminate the process group on context cancellation with optional grace timeout. package processrunner import ( @@ -5,7 +8,6 @@ import ( "errors" "fmt" "io" - "log" "os/exec" "syscall" "time" @@ -13,9 +15,11 @@ import ( "github.com/dkarczmarski/webcmd/pkg/cmdrunner" ) -type contextKey string - -const RequestIDKey contextKey = "requestID" +var ( + ErrStartCommand = errors.New("failed to start command") + ErrInvalidPID = errors.New("invalid PID") + ErrProcessGroupSignal = errors.New("failed to send signal to process group") +) type Process struct { cmd cmdrunner.Command @@ -40,7 +44,7 @@ func StartProcess( cmd.SetStderr(writer) if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start command: %w", err) + return nil, fmt.Errorf("%w: %w", ErrStartCommand, err) } return &Process{ @@ -69,6 +73,7 @@ type Result struct { Err error } +// WaitAsync starts waiting in background and returns a channel that receives exactly one Result. func (p *Process) WaitAsync(ctx context.Context) <-chan Result { resultCh := make(chan Result, 1) done := make(chan struct{}) @@ -78,7 +83,6 @@ func (p *Process) WaitAsync(ctx context.Context) <-chan Result { defer close(resultCh) err := p.cmd.Wait() - exitCode, finalErr := p.determineExitCodeAndError(ctx, err) resultCh <- Result{ @@ -94,36 +98,25 @@ func (p *Process) WaitAsync(ctx context.Context) <-chan Result { return resultCh } -func (p *Process) terminateOnContextDone( - ctx context.Context, - done <-chan struct{}, -) { - rid := requestIDFromContext(ctx) +func (p *Process) terminateOnContextDone(ctx context.Context, done <-chan struct{}) { select { case <-ctx.Done(): pid := p.cmd.Pid() if p.timeout == nil { - log.Printf( - "[INFO] rid=%s Context closed, no grace termination timeout set, sending SIGKILL to process group", - rid, - ) - p.signalProcessGroup(pid, syscall.SIGKILL) + _ = p.signalProcessGroup(pid, syscall.SIGKILL) return } - log.Printf("[INFO] rid=%s Context closed, sending SIGTERM to process group", rid) - p.signalProcessGroup(pid, syscall.SIGTERM) + _ = p.signalProcessGroup(pid, syscall.SIGTERM) t := time.NewTimer(*p.timeout) defer t.Stop() select { case <-t.C: - log.Printf("[INFO] rid=%s Process still running after %v, sending SIGKILL to process group", - rid, *p.timeout) - p.signalProcessGroup(pid, syscall.SIGKILL) + _ = p.signalProcessGroup(pid, syscall.SIGKILL) case <-done: } @@ -131,17 +124,17 @@ func (p *Process) terminateOnContextDone( } } -func (p *Process) signalProcessGroup(pid int, sig syscall.Signal) { +func (p *Process) signalProcessGroup(pid int, sig syscall.Signal) error { if pid <= 0 { - log.Printf("[WARN] Cannot send %s to process group: PID is %d", sig, pid) - - return + return fmt.Errorf("cannot send %s to process group: pid=%d: %w", sig, pid, ErrInvalidPID) } pgid := -pid if err := p.runner.Kill(pgid, sig); err != nil { - log.Printf("[ERROR] Failed to send %s to process group %d: %v", sig, pgid, err) + return fmt.Errorf("failed to send %s to process group %d: %w: %w", sig, pgid, err, ErrProcessGroupSignal) } + + return nil } func (p *Process) determineExitCodeAndError(ctx context.Context, err error) (int, error) { @@ -170,21 +163,3 @@ func (p *Process) determineExitCodeAndError(ctx context.Context, err error) (int func (p *Process) isTimeoutOrCanceled(ctx context.Context) bool { return ctx.Err() != nil && (errors.Is(ctx.Err(), context.DeadlineExceeded) || errors.Is(ctx.Err(), context.Canceled)) } - -// requestIDFromContext extracts request ID from context. -func requestIDFromContext(ctx context.Context) string { - if v := ctx.Value(RequestIDKey); v != nil { - if rid, ok := v.(string); ok && rid != "" { - return rid - } - } - - // Try with string key for compatibility - if v := ctx.Value("requestID"); v != nil { - if rid, ok := v.(string); ok && rid != "" { - return rid - } - } - - return "-" -} From b076e52291b2c37a28fc3d4512adba23b5eff83a Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 25 Feb 2026 22:29:39 +0100 Subject: [PATCH 15/38] feat(processrunner): treat non-zero exit codes as normal process completion --- pkg/processrunner/process_runner.go | 45 +++++++++++++------ pkg/router/handlers/execution_handler_test.go | 4 -- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/pkg/processrunner/process_runner.go b/pkg/processrunner/process_runner.go index 518f184..e17d7e2 100644 --- a/pkg/processrunner/process_runner.go +++ b/pkg/processrunner/process_runner.go @@ -138,26 +138,45 @@ func (p *Process) signalProcessGroup(pid int, sig syscall.Signal) error { } func (p *Process) determineExitCodeAndError(ctx context.Context, err error) (int, error) { - if err != nil { - if p.isTimeoutOrCanceled(ctx) { - // Timeout or cancellation takes precedence over other errors as this is intentional. - //nolint:wrapcheck - return -1, ctx.Err() - } + // If the context was canceled or timed out, + // it means the process was terminated externally. + if p.isTimeoutOrCanceled(ctx) { + // Return -1 and the context error. + //nolint:wrapcheck + return -1, ctx.Err() + } - var exitError *exec.ExitError - if errors.As(err, &exitError) { - return exitError.ExitCode(), err + // If Wait() returned no error, + // the process exited normally (exit code available in ProcessState). + if err == nil { + if ps := p.cmd.ProcessState(); ps != nil { + return ps.ExitCode(), nil } - return -1, err + // This should not normally happen after Wait(), + // but return 0 as a safe default. + return 0, nil } - if p.cmd.ProcessState() != nil { - return p.cmd.ProcessState().ExitCode(), nil + // If the error is *exec.ExitError, + // the process exited by itself (possibly with non-zero exit code) + // OR it was terminated by a signal. + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + // On Unix systems, we can check the WaitStatus. + // If the process was terminated by a signal, + // it means external intervention (SIGTERM/SIGKILL). + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok && status.Signaled() { + return -1, err + } + + // Otherwise, the process exited normally (even if exit code != 0). + // In this case, exit code is a valid result and error is nil. + return exitErr.ExitCode(), nil } - return 0, nil + // Any other error from Wait() is treated as an external/infrastructure error. + return -1, err } func (p *Process) isTimeoutOrCanceled(ctx context.Context) bool { diff --git a/pkg/router/handlers/execution_handler_test.go b/pkg/router/handlers/execution_handler_test.go index 6a4aff9..8e4dbeb 100644 --- a/pkg/router/handlers/execution_handler_test.go +++ b/pkg/router/handlers/execution_handler_test.go @@ -1775,10 +1775,6 @@ func TestExecutionHandler_RunCommand_AppendsErrorMessageToBody_OnNonZeroExit(t * if !strings.Contains(body, "Command failed with exit code: 7") { t.Errorf("expected body to contain exit code 7 failure message, got %q", body) } - - if !strings.Contains(body, "error:") { - t.Errorf("expected body to contain 'error:' part, got %q", body) - } } type erroringResponseWriter struct { From 69356bfe412a22facc0714588ad441b7570a725a Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Fri, 27 Feb 2026 17:26:35 +0100 Subject: [PATCH 16/38] fix(handlers): fix context cancellation in async mode --- pkg/router/handlers/execution_handler.go | 27 +++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index e50b22e..a335f80 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -117,7 +117,15 @@ func runCommand( } if async { - return 0, waitAsyncAndLog(ctx, proc, rid), nil + asyncCtx := context.WithoutCancel(ctx) + + var cancel context.CancelFunc = func() {} + + if cmd.CommandConfig.Timeout != nil { + asyncCtx, cancel = context.WithTimeout(asyncCtx, *cmd.CommandConfig.Timeout) + } + + return 0, waitAsyncAndLog(asyncCtx, proc, cancel), nil } exitCode, err := proc.WaitSync(ctx) @@ -133,22 +141,31 @@ func runCommand( return handleCommandResult(rid, exitCode, err, responseWriter) } -func waitAsyncAndLog(ctx context.Context, proc *processrunner.Process, rid string) <-chan struct{} { +func waitAsyncAndLog( + ctx context.Context, + proc *processrunner.Process, + cancel context.CancelFunc, +) <-chan struct{} { resCh := proc.WaitAsync(ctx) done := make(chan struct{}) go func() { defer close(done) + defer cancel() + + rid := requestIDFromContext(ctx) result := <-resCh if result.Err != nil { log.Printf("[ERROR] rid=%s Asynchronous command failed (exit code: %d), error: %v", rid, result.ExitCode, result.Err) - } else { - log.Printf("[INFO] rid=%s Asynchronous command finished successfully (exit code: %d)", - rid, result.ExitCode) + + return } + + log.Printf("[INFO] rid=%s Asynchronous command finished successfully (exit code: %d)", + rid, result.ExitCode) }() return done From 3a2dbf2176465945013d8a9094d4592b8c32ac4e Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Fri, 27 Feb 2026 19:30:11 +0100 Subject: [PATCH 17/38] refactor(handlers): simplify error handling --- pkg/router/handlers/execution_handler.go | 42 +++++++++++------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index a335f80..14b3fe3 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -22,7 +22,11 @@ import ( "github.com/dkarczmarski/webcmd/pkg/processrunner" ) -var ErrCommandFailed = errors.New("command failed") +var ( + ErrCommandFailed = errors.New("command failed") + ErrCommandNotFound = errors.New("command not found") + ErrInvalidJSONBody = errors.New("invalid JSON body") +) // ExecutionHandler returns a WebHandler that executes the command associated with the URLCommand stored in the // request context. @@ -51,7 +55,7 @@ func executionHandler( cmd, err := getURLCommandFromContext(request) if err != nil { - return httpx.NewWebError(err, http.StatusNotFound, "Command not found") + return err } params, err := extractParams(request, cmd) @@ -87,6 +91,14 @@ func translateError(err error) error { return httpx.NewWebError(err, http.StatusInternalServerError, "Invalid callgate configuration") } + if errors.Is(err, ErrCommandNotFound) { + return httpx.NewWebError(err, http.StatusNotFound, "Command not found") + } + + if errors.Is(err, ErrInvalidJSONBody) { + return httpx.NewWebError(err, http.StatusBadRequest, "must be a JSON object") + } + return err } @@ -217,7 +229,7 @@ func startCommandProcess( func getURLCommandFromContext(request *http.Request) (*config.URLCommand, error) { valCmd := request.Context().Value(URLCommandKey) if valCmd == nil { - return nil, fmt.Errorf("URLCommand not found in context: %w", ErrInvalidRequestContext) + return nil, fmt.Errorf("URLCommand not found in context: %w", ErrCommandNotFound) } cmd, ok := valCmd.(*config.URLCommand) @@ -238,11 +250,7 @@ func extractParams(request *http.Request, cmd *config.URLCommand) (map[string]in bodyBytes, err := io.ReadAll(request.Body) if err != nil { - return nil, httpx.NewWebError( - fmt.Errorf("failed to read request body: %w", err), - http.StatusInternalServerError, - "", - ) + return nil, fmt.Errorf("failed to read request body: %w", err) } setNestedParam(params, "body", "text", string(bodyBytes)) @@ -262,11 +270,7 @@ func buildCommand( ) (*cmdbuilder.Result, error) { cmdResult, err := cmdbuilder.BuildCommand(template, params) if err != nil { - return nil, httpx.NewWebError( - fmt.Errorf("error building command: %w", err), - http.StatusInternalServerError, - "", - ) + return nil, fmt.Errorf("error building command: %w", err) } return &cmdResult, nil @@ -290,11 +294,7 @@ func prepareOutputAndRunCommand( case "", "text": return prepareOutputAndRunSyncCommand(ctx, runner, registry, cmd, cmdResult, responseWriter) default: - return httpx.NewWebError( - fmt.Errorf("%w: unknown output type %q", ErrBadConfiguration, outputType), - http.StatusInternalServerError, - "unknown output type", - ) + return fmt.Errorf("%w: unknown output type %q", ErrBadConfiguration, outputType) } } @@ -418,11 +418,7 @@ func (j JSONBody) String() string { func processBodyAsJSON(bodyBytes []byte, params map[string]interface{}) error { var bodyJSON JSONBody if err := json.Unmarshal(bodyBytes, &bodyJSON); err != nil { - return httpx.NewWebError( - fmt.Errorf("failed to parse JSON body: %w", err), - http.StatusBadRequest, - "must be a JSON object", - ) + return fmt.Errorf("%w: failed to parse JSON body: %w", ErrInvalidJSONBody, err) } setNestedParam(params, "body", "json", bodyJSON) From f6950272d61c5e5ffd124402b6827cb57715404e Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Fri, 27 Feb 2026 20:19:32 +0100 Subject: [PATCH 18/38] refactor(handlers): extract createGateAction from runCommand --- pkg/router/handlers/execution_handler.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index 14b3fe3..0832f23 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -113,10 +113,22 @@ func runCommand( responseWriter http.ResponseWriter, ) error { rid := requestIDFromContext(ctx) - exec := gateexec.New(registry) + action := createGateAction(runner, cmd, cmdResult, writer, async) + + exitCode, err := exec.Run(ctx, cmd.CallGate, cmd.URL, action) + + return handleCommandResult(rid, exitCode, err, responseWriter) +} - action := func(ctx context.Context) (int, <-chan struct{}, error) { +func createGateAction( + runner cmdrunner.Runner, + cmd *config.URLCommand, + cmdResult *cmdbuilder.Result, + writer io.Writer, + async bool, +) gateexec.Action { + return func(ctx context.Context) (int, <-chan struct{}, error) { command := cmdResult.Command arguments := cmdResult.Arguments @@ -147,10 +159,6 @@ func runCommand( return exitCode, nil, nil } - - exitCode, err := exec.Run(ctx, cmd.CallGate, cmd.URL, action) - - return handleCommandResult(rid, exitCode, err, responseWriter) } func waitAsyncAndLog( From a5017c0b9eb2232c558111b8beabe9226ddb5c1d Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Fri, 27 Feb 2026 23:30:59 +0100 Subject: [PATCH 19/38] feat(handlers): add command execution status headers to response --- pkg/router/handlers/execution_handler.go | 93 ++++++++++--------- pkg/router/handlers/execution_handler_test.go | 65 +++++++------ test/integration/server_test.go | 22 +++-- 3 files changed, 97 insertions(+), 83 deletions(-) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index 0832f23..b1d876b 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -3,6 +3,7 @@ package handlers //go:generate go run go.uber.org/mock/mockgen -typed -destination=./internal/mocks/mock_cmdrunner.go -package=mocks github.com/dkarczmarski/webcmd/pkg/cmdrunner Runner,Command import ( + "bytes" "context" "encoding/json" "errors" @@ -10,6 +11,7 @@ import ( "io" "log" "net/http" + "strconv" "strings" "time" @@ -23,9 +25,9 @@ import ( ) var ( - ErrCommandFailed = errors.New("command failed") - ErrCommandNotFound = errors.New("command not found") - ErrInvalidJSONBody = errors.New("invalid JSON body") + ErrStreamingNotSupported = errors.New("streaming not supported") + ErrCommandNotFound = errors.New("command not found") + ErrInvalidJSONBody = errors.New("invalid JSON body") ) // ExecutionHandler returns a WebHandler that executes the command associated with the URLCommand stored in the @@ -40,7 +42,12 @@ var ( // response body. func ExecutionHandler(runner cmdrunner.Runner, registry *callgate.Registry) httpx.WebHandler { //nolint:ireturn return httpx.WebHandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) error { - return translateError(executionHandler(responseWriter, request, runner, registry)) + err := executionHandler(responseWriter, request, runner, registry) + if err != nil { + return translateError(err) + } + + return nil }) } @@ -83,6 +90,10 @@ func translateError(err error) error { return nil } + if errors.Is(err, ErrStreamingNotSupported) { + return httpx.NewWebError(err, http.StatusInternalServerError, ErrStreamingNotSupported.Error()) + } + if errors.Is(err, callgate.ErrBusy) { return httpx.NewWebError(err, http.StatusTooManyRequests, "Too many requests") } @@ -117,8 +128,25 @@ func runCommand( action := createGateAction(runner, cmd, cmdResult, writer, async) exitCode, err := exec.Run(ctx, cmd.CallGate, cmd.URL, action) + if err != nil { + if errors.Is(err, callgate.ErrBusy) || errors.Is(err, gateexec.ErrRegistry) || errors.Is(err, gateexec.ErrAcquire) { + return fmt.Errorf("failed to start command: %w", err) + } + + responseWriter.Header().Set("X-Success", "false") + responseWriter.Header().Set("X-Error-Message", err.Error()) + responseWriter.Header().Set("X-Exit-Code", "") + log.Printf("[ERROR] rid=%s Command failed with error: %v", rid, err) - return handleCommandResult(rid, exitCode, err, responseWriter) + return httpx.NewSilentError(err) + } + + responseWriter.Header().Set("X-Success", strconv.FormatBool(exitCode == 0)) + responseWriter.Header().Set("X-Error-Message", "") + responseWriter.Header().Set("X-Exit-Code", strconv.Itoa(exitCode)) + log.Printf("[INFO] rid=%s Command failed with exit code: %d", rid, exitCode) + + return nil } func createGateAction( @@ -191,34 +219,6 @@ func waitAsyncAndLog( return done } -func handleCommandResult(rid string, exitCode int, err error, responseWriter http.ResponseWriter) error { - if err != nil { - if errors.Is(err, callgate.ErrBusy) || errors.Is(err, gateexec.ErrRegistry) || errors.Is(err, gateexec.ErrAcquire) { - return translateError(err) - } - - log.Printf("[WARN] rid=%s Command failed with exit code: %d, error: %v", rid, exitCode, err) - - writeErrorMessage(rid, responseWriter, fmt.Sprintf("Command failed with exit code: %d, error: %v", exitCode, err)) - - return nil - } - - if exitCode != 0 { - log.Printf("[WARN] rid=%s Command failed with exit code: %d", rid, exitCode) - - writeErrorMessage(rid, responseWriter, fmt.Sprintf("Command failed with exit code: %d", exitCode)) - } - - return nil -} - -func writeErrorMessage(rid string, responseWriter http.ResponseWriter, message string) { - if _, writeErr := responseWriter.Write([]byte(message)); writeErr != nil { - log.Printf("[ERROR] rid=%s Failed to write error message: %v", rid, writeErr) - } -} - func startCommandProcess( runner cmdrunner.Runner, command string, @@ -335,20 +335,20 @@ func prepareOutputAndRunStreamCommand( responseWriter http.ResponseWriter, ) error { if _, ok := responseWriter.(http.Flusher); !ok { - return httpx.NewWebError( - fmt.Errorf("streaming not supported: %w", ErrBadConfiguration), - http.StatusInternalServerError, - "response writer does not support flushing", - ) + return ErrStreamingNotSupported } - writer := newFlushResponseWriter(responseWriter) + responseWriter.Header().Add("Trailer", "X-Success") + responseWriter.Header().Add("Trailer", "X-Error-Message") + responseWriter.Header().Add("Trailer", "X-Exit-Code") responseWriter.Header().Set("Content-Type", "text/plain; charset=utf-8") responseWriter.Header().Set("Cache-Control", "no-cache") // nginx: responseWriter.Header().Set("X-Accel-Buffering", "no") + writer := newFlushResponseWriter(responseWriter) + return runCommand( ctx, runner, @@ -369,20 +369,29 @@ func prepareOutputAndRunSyncCommand( cmdResult *cmdbuilder.Result, responseWriter http.ResponseWriter, ) error { - writer := responseWriter + var buf bytes.Buffer responseWriter.Header().Set("Content-Type", "text/plain; charset=utf-8") - return runCommand( + err := runCommand( ctx, runner, registry, cmd, cmdResult, - writer, + &buf, false, responseWriter, ) + if err != nil { + return err + } + + if _, writeErr := responseWriter.Write(buf.Bytes()); writeErr != nil { + log.Printf("[ERROR] failed to write buffered output: %v", writeErr) + } + + return nil } func extractQueryParams(request *http.Request) map[string]string { diff --git a/pkg/router/handlers/execution_handler_test.go b/pkg/router/handlers/execution_handler_test.go index 8e4dbeb..70698c0 100644 --- a/pkg/router/handlers/execution_handler_test.go +++ b/pkg/router/handlers/execution_handler_test.go @@ -774,8 +774,8 @@ func TestExecutionHandler_PrepareOutput_Stream_Failure(t *testing.T) { } errMsg := rr.Header().Get("X-Error-Message") - if !strings.Contains(errMsg, "response writer does not support flushing") { - t.Errorf("expected X-Error-Message to contain 'response writer does not support flushing', got %q", errMsg) + if !strings.Contains(errMsg, "streaming not supported") { + t.Errorf("expected X-Error-Message to contain 'streaming not supported', got %q", errMsg) } } @@ -989,14 +989,13 @@ func TestExecutionHandler_ExecuteCommand_StartError_WritesFailedToStart(t *testi rr := httptest.NewRecorder() h.ServeHTTP(rr, req) - // runCommand returns nil and only writes the error to the response body, so the status is typically 200. if rr.Code != http.StatusOK { t.Errorf("expected status 200, got %d, body=%q", rr.Code, rr.Body.String()) } - body := rr.Body.String() - if !strings.Contains(body, "failed to start command") { - t.Errorf("expected body to contain %q, got %q", "failed to start command", body) + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "failed to start command") { + t.Errorf("expected X-Error-Message header to contain %q, got %q", "failed to start command", errMsg) } } @@ -1176,9 +1175,9 @@ func TestExecutionHandler_SyncWait_ExitError_NonZeroExit_WritesFailureMessage(t t.Errorf("expected status 200, got %d", rr.Code) } - body := rr.Body.String() - if !strings.Contains(body, "Command failed with exit code: 7") { - t.Errorf("expected body to contain exit code 7, got %q", body) + exitCodeHeader := rr.Header().Get("X-Exit-Code") + if exitCodeHeader != "7" { + t.Errorf("expected X-Exit-Code header to be 7, got %q", exitCodeHeader) } } @@ -1223,14 +1222,14 @@ func TestExecutionHandler_SyncWait_WaitReturnsNonExitError_WritesFailureMessage( t.Errorf("expected status 200, got %d", rr.Code) } - body := rr.Body.String() - - if !strings.Contains(body, "Command failed with exit code: -1") { - t.Errorf("expected body to contain exit code -1, got %q", body) + exitCodeHeader := rr.Header().Get("X-Exit-Code") + if exitCodeHeader != "" { + t.Errorf("expected X-Exit-Code header to be empty, got %q", exitCodeHeader) } - if !strings.Contains(body, "wait boom") { - t.Errorf("expected body to contain %q, got %q", "wait boom", body) + errorMessageHeader := rr.Header().Get("X-Error-Message") + if !strings.Contains(errorMessageHeader, "wait boom") { + t.Errorf("expected X-Error-Message header to contain %q, got %q", "wait boom", errorMessageHeader) } } @@ -1348,8 +1347,9 @@ func TestExecutionHandler_TerminateOnCancel_NoGrace_SendsSIGKILL(t *testing.T) { t.Errorf("expected status 200, got %d", rr.Code) } - if !strings.Contains(rr.Body.String(), "context canceled") { - t.Errorf("expected body to contain %q, got %q", "context canceled", rr.Body.String()) + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "context canceled") { + t.Errorf("expected X-Error-Message header to contain %q, got %q", "context canceled", errMsg) } } @@ -1528,19 +1528,19 @@ func TestExecutionHandler_DeadlineExceeded_PrioritizesCtxErrOverExitError(t *tes t.Errorf("expected status 200, got %d", rr.Code) } - body := rr.Body.String() - - if !strings.Contains(body, "context deadline exceeded") { - t.Errorf("expected body to contain %q, got %q", "context deadline exceeded", body) + exitCodeHeader := rr.Header().Get("X-Exit-Code") + if exitCodeHeader != "" { + t.Errorf("expected X-Exit-Code header to be empty, got %q", exitCodeHeader) } - if !strings.Contains(body, "Command failed with exit code: -1") { - t.Errorf("expected body to contain exit code -1, got %q", body) + errMsg := rr.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "context deadline exceeded") { + t.Errorf("expected X-Error-Message header to contain %q, got %q", "context deadline exceeded", errMsg) } // Make sure it did not report the process exit code (7) as primary. - if strings.Contains(body, "Command failed with exit code: 7") { - t.Errorf("did not expect exit code 7 to be reported, got %q", body) + if exitCodeHeader == "7" { + t.Errorf("did not expect exit code 7 to be reported, got %q", exitCodeHeader) } } @@ -1713,7 +1713,7 @@ func TestExecutionHandler_AsyncNone_WaitError_LogsButDoesNotAffectResponse(t *te } } -func TestExecutionHandler_RunCommand_AppendsErrorMessageToBody_OnNonZeroExit(t *testing.T) { +func TestExecutionHandler_RunCommand_SetsExitCodeHeader_OnNonZeroExit(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) @@ -1743,7 +1743,7 @@ func TestExecutionHandler_RunCommand_AppendsErrorMessageToBody_OnNonZeroExit(t * mockCmd.EXPECT().SetSysProcAttr(gomock.Any()) mockCmd.EXPECT().SetStdout(gomock.Any()).Do(func(w io.Writer) { - // simulate process output written before failure is appended + // simulate process output _, _ = w.Write([]byte("PROC_OUT\n")) }) mockCmd.EXPECT().SetStderr(gomock.Any()) @@ -1766,14 +1766,13 @@ func TestExecutionHandler_RunCommand_AppendsErrorMessageToBody_OnNonZeroExit(t * } body := rr.Body.String() - - // may be mixed with earlier output; we only assert that the error message is appended somewhere if !strings.Contains(body, "PROC_OUT") { t.Errorf("expected body to contain process output, got %q", body) } - if !strings.Contains(body, "Command failed with exit code: 7") { - t.Errorf("expected body to contain exit code 7 failure message, got %q", body) + exitCodeHeader := rr.Header().Get("X-Exit-Code") + if exitCodeHeader != "7" { + t.Errorf("expected X-Exit-Code header to be 7, got %q", exitCodeHeader) } } @@ -1857,7 +1856,7 @@ func TestExecutionHandler_RunCommand_WriteErrorMessageWriteFails_LogsError(t *te logs := buf.String() mu.Unlock() - if strings.Contains(logs, "Failed to write error message") { + if strings.Contains(logs, "failed to write buffered output") { if !strings.Contains(logs, "write failed") { t.Fatalf("expected logs to contain %q, got %q", "write failed", logs) } @@ -1866,7 +1865,7 @@ func TestExecutionHandler_RunCommand_WriteErrorMessageWriteFails_LogsError(t *te } if time.Now().After(deadline) { - t.Fatalf("expected logs to contain %q, got %q", "Failed to write error message", logs) + t.Fatalf("expected logs to contain %q, got %q", "failed to write buffered output", logs) } time.Sleep(5 * time.Millisecond) diff --git a/test/integration/server_test.go b/test/integration/server_test.go index 8c30efe..24f190c 100644 --- a/test/integration/server_test.go +++ b/test/integration/server_test.go @@ -176,7 +176,7 @@ func TestServerIntegration(t *testing.T) { } }) - t.Run("504 Gateway Timeout", func(t *testing.T) { + t.Run("Execution Timeout", func(t *testing.T) { t.Parallel() srv := setupServer(t) @@ -185,15 +185,17 @@ func TestServerIntegration(t *testing.T) { srv.ServeHTTP(rec, req) - // Intentional decision to return 200 OK because the command's exit status - // is unknown when headers are sent (especially for streaming). - // Error messages are appended to the response body. if rec.Code != http.StatusOK { t.Errorf("Expected status code %d, got %d", http.StatusOK, rec.Code) } - if !strings.Contains(rec.Body.String(), "context deadline exceeded") { - t.Errorf("Expected timeout error message in body, got %q", rec.Body.String()) + if rec.Header().Get("X-Success") != "false" { + t.Errorf("Expected X-Success: false, got %q", rec.Header().Get("X-Success")) + } + + errMsg := rec.Header().Get("X-Error-Message") + if !strings.Contains(errMsg, "context deadline exceeded") { + t.Errorf("Expected timeout error message in X-Error-Message header, got %q", errMsg) } }) @@ -210,8 +212,12 @@ func TestServerIntegration(t *testing.T) { t.Errorf("Expected status code %d, got %d", http.StatusOK, rec.Code) } - if !strings.Contains(rec.Body.String(), "Command failed with exit code: 1") { - t.Errorf("Expected error message in body, got %q", rec.Body.String()) + if rec.Header().Get("X-Success") != "false" { + t.Errorf("Expected X-Success: false, got %q", rec.Header().Get("X-Success")) + } + + if rec.Header().Get("X-Exit-Code") != "1" { + t.Errorf("Expected X-Exit-Code: 1, got %q", rec.Header().Get("X-Exit-Code")) } }) From c1ff384dab18bbe71e87ef6fcc69b2b192b75cc6 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Sat, 28 Feb 2026 19:38:33 +0100 Subject: [PATCH 20/38] refactor(gateexec): simplify error handling --- pkg/gateexec/gateexec.go | 9 +++------ pkg/router/handlers/execution_handler.go | 6 +----- pkg/router/handlers/execution_handler_test.go | 4 ++-- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/pkg/gateexec/gateexec.go b/pkg/gateexec/gateexec.go index ebace16..0744960 100644 --- a/pkg/gateexec/gateexec.go +++ b/pkg/gateexec/gateexec.go @@ -9,10 +9,7 @@ import ( "github.com/dkarczmarski/webcmd/pkg/config" ) -var ( - ErrRegistry = errors.New("gate executor: registry error") - ErrAcquire = errors.New("gate executor: acquire error") -) +var ErrPreAction = errors.New("gate executor: pre-action") type Action func(context.Context) (result int, done <-chan struct{}, err error) @@ -43,12 +40,12 @@ func (e *Executor) Run( gate, err := e.registry.GetOrCreate(group, gateCfg.Mode) if err != nil { - return -1, fmt.Errorf("%w: %w", ErrRegistry, err) + return -1, fmt.Errorf("%w: %w", ErrPreAction, err) } release, err := gate.Acquire(ctx) if err != nil { - return -1, fmt.Errorf("%w: %w", ErrAcquire, err) + return -1, fmt.Errorf("%w: %w", ErrPreAction, err) } exit, done, runErr := action(ctx) diff --git a/pkg/router/handlers/execution_handler.go b/pkg/router/handlers/execution_handler.go index b1d876b..e57b115 100644 --- a/pkg/router/handlers/execution_handler.go +++ b/pkg/router/handlers/execution_handler.go @@ -98,10 +98,6 @@ func translateError(err error) error { return httpx.NewWebError(err, http.StatusTooManyRequests, "Too many requests") } - if errors.Is(err, gateexec.ErrRegistry) { - return httpx.NewWebError(err, http.StatusInternalServerError, "Invalid callgate configuration") - } - if errors.Is(err, ErrCommandNotFound) { return httpx.NewWebError(err, http.StatusNotFound, "Command not found") } @@ -129,7 +125,7 @@ func runCommand( exitCode, err := exec.Run(ctx, cmd.CallGate, cmd.URL, action) if err != nil { - if errors.Is(err, callgate.ErrBusy) || errors.Is(err, gateexec.ErrRegistry) || errors.Is(err, gateexec.ErrAcquire) { + if errors.Is(err, gateexec.ErrPreAction) { return fmt.Errorf("failed to start command: %w", err) } diff --git a/pkg/router/handlers/execution_handler_test.go b/pkg/router/handlers/execution_handler_test.go index 70698c0..9e0c8fc 100644 --- a/pkg/router/handlers/execution_handler_test.go +++ b/pkg/router/handlers/execution_handler_test.go @@ -911,8 +911,8 @@ func TestExecutionHandler_UnknownCallGateMode(t *testing.T) { } errMsg := rr.Header().Get("X-Error-Message") - if !strings.Contains(errMsg, "Invalid callgate configuration") { - t.Errorf("expected X-Error-Message to contain %q, got %q", "Invalid callgate configuration", errMsg) + if !strings.Contains(errMsg, "invalid callgate mode") { + t.Errorf("expected X-Error-Message to contain %q, got %q", "invalid callgate mode", errMsg) } } From 31f0b58205572857382b4fb0e1957a79b07ccd16 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Sat, 28 Feb 2026 20:06:38 +0100 Subject: [PATCH 21/38] docs(gateexec): add godoc --- pkg/gateexec/gateexec.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/gateexec/gateexec.go b/pkg/gateexec/gateexec.go index 0744960..9daa9b3 100644 --- a/pkg/gateexec/gateexec.go +++ b/pkg/gateexec/gateexec.go @@ -1,3 +1,7 @@ +// Package gateexec provides a mechanism for executing actions under the control of call gates. +// +// It wraps an Action with gate-based concurrency control, handling gate acquisition, +// execution, and release (including asynchronous cleanup). package gateexec import ( @@ -9,8 +13,14 @@ import ( "github.com/dkarczmarski/webcmd/pkg/config" ) +// ErrPreAction is returned when an error occurs before the actual action starts, +// such as during gate acquisition or retrieval. var ErrPreAction = errors.New("gate executor: pre-action") +// Action represents a function to be executed under gate control. +// It returns a result code (e.g. process exit code), an optional channel +// that indicates when the action is fully finished (for async cleanup), +// and any execution error. type Action func(context.Context) (result int, done <-chan struct{}, err error) type Executor struct { From efd7eef0511c51a7b62dce6bb5dddf14c1e04ec9 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Sat, 28 Feb 2026 22:34:12 +0100 Subject: [PATCH 22/38] test(gateexec): add unit tests --- pkg/gateexec/fakes_test.go | 64 ++++++++ pkg/gateexec/gateexec.go | 18 ++- pkg/gateexec/gateexec_test.go | 289 ++++++++++++++++++++++++++++++++++ pkg/gateexec/helpers_test.go | 42 +++++ 4 files changed, 410 insertions(+), 3 deletions(-) create mode 100644 pkg/gateexec/fakes_test.go create mode 100644 pkg/gateexec/gateexec_test.go create mode 100644 pkg/gateexec/helpers_test.go diff --git a/pkg/gateexec/fakes_test.go b/pkg/gateexec/fakes_test.go new file mode 100644 index 0000000..0227b7c --- /dev/null +++ b/pkg/gateexec/fakes_test.go @@ -0,0 +1,64 @@ +package gateexec_test + +import ( + "context" + "sync/atomic" + + "github.com/dkarczmarski/webcmd/pkg/callgate" +) + +type fakeRegistry struct { + calls int32 + lastGroup string + lastMode string + + gateToReturn callgate.CallGate + errToReturn error +} + +func (r *fakeRegistry) GetOrCreate(group string, mode string) (callgate.CallGate, error) { //nolint:ireturn + atomic.AddInt32(&r.calls, 1) + r.lastGroup = group + r.lastMode = mode + + return r.gateToReturn, r.errToReturn +} + +func (r *fakeRegistry) callCount() int { + return int(atomic.LoadInt32(&r.calls)) +} + +type fakeGate struct { + acquireCalls int32 + lastCtx context.Context //nolint:containedctx + + acquireErr error + + releaseCalls int32 +} + +func (g *fakeGate) Acquire(ctx context.Context) (func(), error) { + atomic.AddInt32(&g.acquireCalls, 1) + g.lastCtx = ctx + + if g.acquireErr != nil { + return nil, g.acquireErr + } + + release := func() { + atomic.AddInt32(&g.releaseCalls, 1) + } + + return release, nil +} + +func (g *fakeGate) acquireCount() int { + return int(atomic.LoadInt32(&g.acquireCalls)) +} + +func (g *fakeGate) releaseCount() int { + return int(atomic.LoadInt32(&g.releaseCalls)) +} + +// compile-time interface check. +var _ callgate.CallGate = (*fakeGate)(nil) diff --git a/pkg/gateexec/gateexec.go b/pkg/gateexec/gateexec.go index 9daa9b3..d8bfe99 100644 --- a/pkg/gateexec/gateexec.go +++ b/pkg/gateexec/gateexec.go @@ -23,11 +23,23 @@ var ErrPreAction = errors.New("gate executor: pre-action") // and any execution error. type Action func(context.Context) (result int, done <-chan struct{}, err error) +// Registry provides access to execution gates identified +// by a group and mode name. +// +// Executor uses Registry to obtain or lazily create +// a Gate instance before running an action. +// +// Implementations are responsible for ensuring that +// repeated calls with the same group return the same Gate. +type Registry interface { + GetOrCreate(group string, name string) (callgate.CallGate, error) +} + type Executor struct { - registry *callgate.Registry + registry Registry } -func New(registry *callgate.Registry) *Executor { +func New(registry Registry) *Executor { return &Executor{registry: registry} } @@ -37,7 +49,7 @@ func (e *Executor) Run( defaultGroup string, action Action, ) (int, error) { - if gateCfg == nil || e.registry == nil { + if gateCfg == nil { exit, _, err := action(ctx) return exit, err diff --git a/pkg/gateexec/gateexec_test.go b/pkg/gateexec/gateexec_test.go new file mode 100644 index 0000000..ef43218 --- /dev/null +++ b/pkg/gateexec/gateexec_test.go @@ -0,0 +1,289 @@ +package gateexec_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/dkarczmarski/webcmd/pkg/config" + "github.com/dkarczmarski/webcmd/pkg/gateexec" +) + +func TestExecutor_Run_BasicPaths(t *testing.T) { + t.Parallel() + + t.Run("nil gate config: action called once, registry not used, exit/error propagated", func(t *testing.T) { + t.Parallel() + + ctx := t.Context() + reg := &fakeRegistry{} + exec := gateexec.New(reg) + + wantExit := 42 + wantErr := errors.New("action error") + + actionCalls := 0 + action := func(context.Context) (int, <-chan struct{}, error) { + actionCalls++ + + return wantExit, nil, wantErr + } + + gotExit, gotErr := exec.Run(ctx, nil, "X", action) + + assertEqual(t, actionCalls, 1, "action calls") + assertEqual(t, reg.callCount(), 0, "registry calls") + assertEqual(t, gotExit, wantExit, "exit") + assertIs(t, gotErr, wantErr) + }) + + cases := []struct { + name string + cfg *config.CallGateConfig + defaultGrp string + wantGroup string + wantMode string + }{ + { + name: "default group used when GroupName is nil", + cfg: &config.CallGateConfig{GroupName: nil, Mode: "single"}, + defaultGrp: "X", + wantGroup: "X", + wantMode: "single", + }, + { + name: "GroupName overrides default group", + cfg: &config.CallGateConfig{GroupName: strPtr("Y"), Mode: "single"}, + defaultGrp: "X", + wantGroup: "Y", + wantMode: "single", + }, + { + name: "mode is passed to registry as second argument", + cfg: &config.CallGateConfig{GroupName: nil, Mode: "custom-mode"}, + defaultGrp: "X", + wantGroup: "X", + wantMode: "custom-mode", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := t.Context() + gate := &fakeGate{} + reg := &fakeRegistry{gateToReturn: gate} + exec := gateexec.New(reg) + + actionCalls := 0 + action := func(context.Context) (int, <-chan struct{}, error) { + actionCalls++ + + return 0, nil, nil + } + + _, _ = exec.Run(ctx, tc.cfg, tc.defaultGrp, action) + + assertEqual(t, actionCalls, 1, "action calls") + assertEqual(t, reg.callCount(), 1, "registry calls") + assertEqual(t, reg.lastGroup, tc.wantGroup, "registry group argument") + assertEqual(t, reg.lastMode, tc.wantMode, "registry mode argument") + }) + } +} + +//nolint:lll +func TestExecutor_Run_PreActionErrors(t *testing.T) { + t.Parallel() + + t.Run("GetOrCreate error: returns -1 and wraps ErrPreAction and original error; does not call action or Acquire", func(t *testing.T) { + t.Parallel() + + ctx := t.Context() + someErr := errors.New("registry failure") + + gate := &fakeGate{} + reg := &fakeRegistry{ + gateToReturn: gate, + errToReturn: someErr, + } + exec := gateexec.New(reg) + + actionCalls := 0 + action := func(context.Context) (int, <-chan struct{}, error) { + actionCalls++ + + return 0, nil, nil + } + + cfg := &config.CallGateConfig{Mode: "single"} + exit, err := exec.Run(ctx, cfg, "X", action) + + assertEqual(t, exit, -1, "exit") + assertIs(t, err, gateexec.ErrPreAction) + assertIs(t, err, someErr) + + assertEqual(t, actionCalls, 0, "action calls") + assertEqual(t, reg.callCount(), 1, "registry calls") + assertEqual(t, gate.acquireCount(), 0, "gate Acquire calls") + }) + + t.Run("Acquire error: returns -1 and wraps ErrPreAction and original error; does not call action; does not call release", func(t *testing.T) { + t.Parallel() + + ctx := t.Context() + someErr := errors.New("acquire failure") + + gate := &fakeGate{acquireErr: someErr} + reg := &fakeRegistry{gateToReturn: gate} + exec := gateexec.New(reg) + + actionCalls := 0 + action := func(context.Context) (int, <-chan struct{}, error) { + actionCalls++ + + return 0, nil, nil + } + + cfg := &config.CallGateConfig{Mode: "single"} + exit, err := exec.Run(ctx, cfg, "X", action) + + assertEqual(t, exit, -1, "exit") + assertIs(t, err, gateexec.ErrPreAction) + assertIs(t, err, someErr) + + assertEqual(t, actionCalls, 0, "action calls") + assertEqual(t, gate.acquireCount(), 1, "gate Acquire calls") + assertEqual(t, gate.releaseCount(), 0, "release calls") + }) + + t.Run("Acquire receives the same context passed to Run", func(t *testing.T) { + t.Parallel() + + ctx := context.WithValue(t.Context(), struct{}{}, "marker") //nolint:staticcheck + + gate := &fakeGate{} + reg := &fakeRegistry{gateToReturn: gate} + exec := gateexec.New(reg) + + action := func(context.Context) (int, <-chan struct{}, error) { + return 0, nil, nil + } + + cfg := &config.CallGateConfig{Mode: "single"} + _, _ = exec.Run(ctx, cfg, "X", action) + + if gate.lastCtx != ctx { + t.Fatalf("Acquire context: got %v, want the exact same instance", gate.lastCtx) + } + }) +} + +//nolint:lll +func TestExecutor_Run_ReleaseAndDone(t *testing.T) { + t.Parallel() + + t.Run("done is nil: release is called synchronously before Run returns", func(t *testing.T) { + t.Parallel() + + ctx := t.Context() + gate := &fakeGate{} + reg := &fakeRegistry{gateToReturn: gate} + exec := gateexec.New(reg) + + action := func(context.Context) (int, <-chan struct{}, error) { + return 0, nil, nil + } + + cfg := &config.CallGateConfig{Mode: "single"} + _, _ = exec.Run(ctx, cfg, "X", action) + + // If release is synchronous, it must already be 1 here. + assertEqual(t, gate.releaseCount(), 1, "release calls") + }) + + t.Run("done is not nil: release is not called immediately; called once after done is closed", func(t *testing.T) { + t.Parallel() + + ctx := t.Context() + gate := &fakeGate{} + reg := &fakeRegistry{gateToReturn: gate} + exec := gateexec.New(reg) + + doneCh := make(chan struct{}) + + action := func(context.Context) (int, <-chan struct{}, error) { + return 0, doneCh, nil + } + + cfg := &config.CallGateConfig{Mode: "single"} + _, _ = exec.Run(ctx, cfg, "X", action) + + assertEqual(t, gate.releaseCount(), 0, "release calls immediately after Run") + + close(doneCh) + + eventually(t, 300*time.Millisecond, func() bool { + return gate.releaseCount() == 1 + }) + + assertEqual(t, gate.releaseCount(), 1, "release calls after done is closed") + }) + + t.Run("action returns error with done=nil: release still called once and Run returns the action error", func(t *testing.T) { + t.Parallel() + + ctx := t.Context() + gate := &fakeGate{} + reg := &fakeRegistry{gateToReturn: gate} + exec := gateexec.New(reg) + + runErr := errors.New("run failed") + action := func(context.Context) (int, <-chan struct{}, error) { + return 123, nil, runErr + } + + cfg := &config.CallGateConfig{Mode: "single"} + exit, err := exec.Run(ctx, cfg, "X", action) + + assertEqual(t, exit, 123, "exit") + assertIs(t, err, runErr) + + assertEqual(t, gate.releaseCount(), 1, "release calls") + }) + + t.Run("action returns error with done!=nil: Run returns the action error; release happens only after done is closed", func(t *testing.T) { + t.Parallel() + + ctx := t.Context() + gate := &fakeGate{} + reg := &fakeRegistry{gateToReturn: gate} + exec := gateexec.New(reg) + + doneCh := make(chan struct{}) + runErr := errors.New("run failed") + + action := func(context.Context) (int, <-chan struct{}, error) { + return 5, doneCh, runErr + } + + cfg := &config.CallGateConfig{Mode: "single"} + exit, err := exec.Run(ctx, cfg, "X", action) + + assertEqual(t, exit, 5, "exit") + assertIs(t, err, runErr) + + // Not yet released until done is closed. + assertEqual(t, gate.releaseCount(), 0, "release calls immediately after Run") + + close(doneCh) + + eventually(t, 300*time.Millisecond, func() bool { + return gate.releaseCount() == 1 + }) + + assertEqual(t, gate.releaseCount(), 1, "release calls after done is closed") + }) +} diff --git a/pkg/gateexec/helpers_test.go b/pkg/gateexec/helpers_test.go new file mode 100644 index 0000000..244ef07 --- /dev/null +++ b/pkg/gateexec/helpers_test.go @@ -0,0 +1,42 @@ +package gateexec_test + +import ( + "errors" + "testing" + "time" +) + +func strPtr(s string) *string { + return &s +} + +func assertIs(t *testing.T, err error, target error) { + t.Helper() + + if !errors.Is(err, target) { + t.Fatalf("expected errors.Is(err, %v) == true, got false; err=%v", target, err) + } +} + +func assertEqual[T comparable](t *testing.T, got, want T, msg string) { + t.Helper() + + if got != want { + t.Fatalf("%s: got %v, want %v", msg, got, want) + } +} + +func eventually(t *testing.T, timeout time.Duration, check func() bool) { + t.Helper() + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if check() { + return + } + + time.Sleep(5 * time.Millisecond) + } + + t.Fatalf("condition not satisfied within %s", timeout) +} From a527cd3209fab85eb31b3342d25c712aa98bfec6 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Mon, 2 Mar 2026 20:29:04 +0100 Subject: [PATCH 23/38] fix(callgate): Sequence serves waiters in FIFO --- pkg/callgate/callgate_queue.go | 87 +++++++-- pkg/callgate/callgate_queue_test.go | 263 ++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+), 14 deletions(-) diff --git a/pkg/callgate/callgate_queue.go b/pkg/callgate/callgate_queue.go index de373fa..6c148da 100644 --- a/pkg/callgate/callgate_queue.go +++ b/pkg/callgate/callgate_queue.go @@ -7,32 +7,61 @@ import ( ) type Sequence struct { - token chan struct{} + mu sync.Mutex + busy bool + queue []chan struct{} // FIFO of waiters } func NewSequence() *Sequence { - l := &Sequence{ - token: make(chan struct{}, 1), - } - l.token <- struct{}{} - - return l + return &Sequence{} //nolint:exhaustruct } // Acquire blocks until the execution slot is available. // -// This gate allows only one execution at a time. Calls are processed in sequence: -// if one is running, the next call waits until it completes. +// This gate allows only one execution at a time. +// Waiters are served in strict FIFO order. // -// When the slot is acquired, Acquire returns a release function. The caller must -// call release() when the work is done. +// When the slot is acquired, Acquire returns a release function. +// The caller must call release() when the work is done. // // If the context is canceled before acquiring the slot, Acquire returns ctx.Err(). func (cg *Sequence) Acquire(ctx context.Context) (func(), error) { + // Fast path: not busy and no queue => acquire immediately. + cg.mu.Lock() + if !cg.busy && len(cg.queue) == 0 { + cg.busy = true + + cg.mu.Unlock() + + return cg.releaseFunc(), nil + } + + // Otherwise, enqueue and wait for our turn. + waiter := make(chan struct{}) + + cg.queue = append(cg.queue, waiter) + cg.mu.Unlock() + select { case <-ctx.Done(): - return nil, fmt.Errorf("acquire sequence: %w", ctx.Err()) - case <-cg.token: + // Remove ourselves from the queue if we haven't been granted the slot yet. + cg.mu.Lock() + removed := cg.removeWaiterLocked(waiter) + cg.mu.Unlock() + + // If we were NOT removed, it means we were already granted (channel closed) + // roughly concurrently. In that case, we must return success, not ctx.Err(). + // We detect "granted" by checking removed==false; but there is a race: + // - if channel closed just before we locked, removeWaiterLocked won't find it. + // In that case, proceed as acquired. + if removed { + return nil, fmt.Errorf("acquire sequence: %w", ctx.Err()) + } + + return cg.releaseFunc(), nil + + case <-waiter: + // Granted in FIFO order. return cg.releaseFunc(), nil } } @@ -42,7 +71,37 @@ func (cg *Sequence) releaseFunc() func() { return func() { once.Do(func() { - cg.token <- struct{}{} + cg.mu.Lock() + defer cg.mu.Unlock() + + // If someone is waiting, grant the next one in FIFO order. + if len(cg.queue) > 0 { + next := cg.queue[0] + // pop front + copy(cg.queue[0:], cg.queue[1:]) + cg.queue = cg.queue[:len(cg.queue)-1] + + // Keep busy=true, transfer ownership to the next waiter. + close(next) + + return + } + + // No waiters => free the slot. + cg.busy = false }) } } + +func (cg *Sequence) removeWaiterLocked(waiter chan struct{}) bool { + for i := range cg.queue { + if cg.queue[i] == waiter { + copy(cg.queue[i:], cg.queue[i+1:]) + cg.queue = cg.queue[:len(cg.queue)-1] + + return true + } + } + + return false +} diff --git a/pkg/callgate/callgate_queue_test.go b/pkg/callgate/callgate_queue_test.go index b3c24d2..45a0097 100644 --- a/pkg/callgate/callgate_queue_test.go +++ b/pkg/callgate/callgate_queue_test.go @@ -252,3 +252,266 @@ func TestSequence_SerializesCriticalSection_NoOverlap(t *testing.T) { t.Fatalf("critical section overlapped: max concurrent inCS = %d, want 1", maxSeen) } } + +// FIFO / fairness: waiters should be granted in the same order they started waiting. +func TestSequence_Acquire_FIFOOrder(t *testing.T) { + t.Parallel() + + ctx := t.Context() + g := callgate.NewSequence() + + release1, err := g.Acquire(ctx) + if err != nil { + t.Fatalf("Acquire #1 error: %v", err) + } + + const n = 500 + orderCh := make(chan int, n) + + var ( + releases [n]func() + errs [n]error + ) + + // Start goroutines in a known order. + // Small sleeps help ensure they enqueue in that same order (keeps the test stable). + for i := range n { + go func(i int) { + releases[i], errs[i] = g.Acquire(ctx) + orderCh <- i + }(i) + + time.Sleep(10 * time.Millisecond) + } + + // Let the first waiter through. + release1() + + // We expect acquisition order in strict FIFO. + for want := range n { + select { + case got := <-orderCh: + if got != want { + t.Fatalf("FIFO violated: got=%d, want=%d", got, want) + } + + if errs[got] != nil { + t.Fatalf("Acquire waiter #%d error: %v", got, errs[got]) + } + + if releases[got] == nil { + t.Fatalf("Acquire waiter #%d returned nil release", got) + } + + // Release to allow the next waiter. + releases[got]() + + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for waiter #%d to acquire", want) + } + } +} + +// Race: ctx cancel vs grant happening "at the same time". +// Invariant: +// - If Acquire returns error => release must be nil. +// - If Acquire returns success => it must truly own the slot (next Acquire blocks until release()). +func TestSequence_Acquire_CancelVsGrantRace(t *testing.T) { + t.Parallel() + + root := t.Context() + g := callgate.NewSequence() + + const iters = 500 + + for iter := range iters { + // Hold the slot. + release1, err := g.Acquire(root) + if err != nil { + t.Fatalf("iter %d: Acquire #1 error: %v", iter, err) + } + + ctx, cancel := context.WithCancel(root) + + var ( + release2 func() + err2 error + ) + + done := make(chan struct{}) + + go func() { + release2, err2 = g.Acquire(ctx) + + close(done) + }() + + // Give the waiter a moment to enqueue. + time.Sleep(1 * time.Millisecond) + + // Try to make cancel and release happen as simultaneously as possible. + start := make(chan struct{}) + + var wg sync.WaitGroup + + wg.Add(2) + + go func() { + defer wg.Done() + <-start + cancel() + }() + go func() { + defer wg.Done() + <-start + release1() + }() + + close(start) + wg.Wait() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatalf("iter %d: waiter did not finish", iter) + } + + if err2 != nil { + // If canceled "won", must be an error and release must be nil. + if release2 != nil { + t.Fatalf("iter %d: expected nil release on error, got non-nil", iter) + } + + if !errors.Is(err2, context.Canceled) && !errors.Is(err2, context.DeadlineExceeded) { + t.Fatalf("iter %d: expected ctx error, got: %v", iter, err2) + } + + continue + } + + // Success path: must have a release func. + if release2 == nil { + t.Fatalf("iter %d: Acquire succeeded but release is nil", iter) + } + + // Verify it *really* owns the slot: a third Acquire should block until release2(). + acquired3 := make(chan struct{}) + go func() { + rel3, err3 := g.Acquire(root) + if err3 == nil && rel3 != nil { + rel3() + } + + close(acquired3) + }() + + select { + case <-acquired3: + // If it returned immediately, then release2 didn't actually own the slot. + t.Fatalf("iter %d: Acquire #3 returned early; release2 likely didn't own the slot", iter) + case <-time.After(30 * time.Millisecond): + } + + release2() + + select { + case <-acquired3: + case <-time.After(2 * time.Second): + t.Fatalf("iter %d: Acquire #3 did not return after release2()", iter) + } + } +} + +// A canceled waiter should be removed from the queue and not block later waiters. +// Scenario: +// - Hold slot with #1. +// - Waiter #2 times out while waiting (must return error). +// - Waiter #3 waits normally. +// - Release #1: waiter #3 must acquire (i.e., canceled waiter must not "sit" at the head forever). +func TestSequence_Acquire_CanceledWaiterDoesNotBlockQueue(t *testing.T) { + t.Parallel() + + root := t.Context() + g := callgate.NewSequence() + + release1, err := g.Acquire(root) + if err != nil { + t.Fatalf("Acquire #1 error: %v", err) + } + defer release1() + + // Waiter #2: should time out while waiting. + ctx2, cancel2 := context.WithTimeout(root, 30*time.Millisecond) + defer cancel2() + + done2 := make(chan struct{}) + + var ( + release2 func() + err2 error + ) + + go func() { + release2, err2 = g.Acquire(ctx2) + + close(done2) + }() + + select { + case <-done2: + case <-time.After(2 * time.Second): + t.Fatalf("waiter #2 did not return") + } + + if release2 != nil { + t.Fatalf("expected nil release for waiter #2, got non-nil") + } + + if err2 == nil { + t.Fatalf("expected error for waiter #2, got nil") + } + + if !errors.Is(err2, context.DeadlineExceeded) { + t.Fatalf("expected waiter #2 to wrap context.DeadlineExceeded, got: %v", err2) + } + + // Waiter #3: should be able to acquire once #1 releases. + acquired3 := make(chan struct{}) + + var ( + release3 func() + err3 error + ) + + go func() { + release3, err3 = g.Acquire(root) + + close(acquired3) + }() + + // Should still be blocked because #1 is held. + select { + case <-acquired3: + t.Fatalf("Acquire #3 unexpectedly returned early (err=%v)", err3) + case <-time.After(30 * time.Millisecond): + } + + // Now release #1: #3 must get it (canceled #2 must not block). + release1() + + select { + case <-acquired3: + case <-time.After(2 * time.Second): + t.Fatalf("Acquire #3 did not return after release #1") + } + + if err3 != nil { + t.Fatalf("Acquire #3 error: %v", err3) + } + + if release3 == nil { + t.Fatalf("Acquire #3 returned nil release") + } + + release3() +} From a1067d57c0bd4bf65d86d83f744d6d4d864b2919 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Mon, 2 Mar 2026 11:36:54 +0100 Subject: [PATCH 24/38] test(gateexec): add integration tests --- pkg/gateexec/gateexec_integration_test.go | 207 ++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 pkg/gateexec/gateexec_integration_test.go diff --git a/pkg/gateexec/gateexec_integration_test.go b/pkg/gateexec/gateexec_integration_test.go new file mode 100644 index 0000000..25c63ac --- /dev/null +++ b/pkg/gateexec/gateexec_integration_test.go @@ -0,0 +1,207 @@ +package gateexec_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/dkarczmarski/webcmd/pkg/callgate" + "github.com/dkarczmarski/webcmd/pkg/config" + "github.com/dkarczmarski/webcmd/pkg/gateexec" +) + +func TestExecutor_Integration_Single_ConcurrentSecondGetsErrBusy(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + // Real registry with default gates. + registry := callgate.NewRegistry(callgate.WithDefaults()) + exec := gateexec.New(registry) + + cfg := &config.CallGateConfig{ + Mode: "single", + GroupName: strPtr("G"), + } + + firstStarted := make(chan struct{}) + firstDone := make(chan struct{}) + + // Action #1 acquires the slot and holds it until firstDone is closed. + action1 := func(context.Context) (int, <-chan struct{}, error) { //nolint:unparam + close(firstStarted) + + return 1, firstDone, nil + } + + // Action #2 should not run; it should fail with ErrBusy at Acquire time. + action2Called := make(chan struct{}, 1) + action2 := func(context.Context) (int, <-chan struct{}, error) { + action2Called <- struct{}{} + + return 2, nil, nil + } + + // Start action1 and wait until it definitely holds the slot. + errCh1 := make(chan error, 1) + go func() { + _, err := exec.Run(ctx, cfg, "DEFAULT", action1) + errCh1 <- err + }() + + waitClosed(t, firstStarted, 700*time.Millisecond, "firstStarted") + + // Now attempt to run action2 while action1 is still holding the slot. + exit2, err2 := exec.Run(ctx, cfg, "DEFAULT", action2) + + // Ensure action2 wasn't called (Acquire should fail before action is executed). + select { + case <-action2Called: + t.Fatalf("action2 should not be called when gate is busy") + default: + } + + // gateexec should report a pre-action error and include ErrBusy. + if exit2 != -1 { + t.Fatalf("expected exit=-1 for pre-action error, got %d", exit2) + } + + if err2 == nil { + t.Fatalf("expected non-nil error") + } + + if !errors.Is(err2, gateexec.ErrPreAction) { + t.Fatalf("expected errors.Is(err, gateexec.ErrPreAction)=true, err=%v", err2) + } + + if !errors.Is(err2, callgate.ErrBusy) { + t.Fatalf("expected errors.Is(err, callgate.ErrBusy)=true, err=%v", err2) + } + + // Now release action1 and ensure it completes without error. + close(firstDone) + + select { + case err1 := <-errCh1: + if err1 != nil { + t.Fatalf("action1 Run returned unexpected error: %v", err1) + } + case <-time.After(700 * time.Millisecond): + t.Fatalf("timeout waiting for action1 Run to finish") + } +} + +func TestExecutor_Integration_Sequence_SerializesAndMakesProgress(t *testing.T) { + t.Parallel() + + const ( + N = 50 + timeout = 700 * time.Millisecond + ) + + ctx := t.Context() + + // Real registry with default gates (production-like). + registry := callgate.NewRegistry(callgate.WithDefaults()) + exec := gateexec.New(registry) + + cfg := &config.CallGateConfig{ + Mode: "sequence", + GroupName: strPtr("G"), + } + + started := make(chan int, N) + errCh := make(chan error, N) + + // Each action will block until its done[i] is closed. + done := make([]chan struct{}, N) + for i := range N { + done[i] = make(chan struct{}) + + //nolint:unparam + action := func(context.Context) (int, <-chan struct{}, error) { + // If gate is correct, this should happen strictly one-at-a-time. + started <- i + + return i, done[i], nil + } + + go func() { + _, err := exec.Run(ctx, cfg, "DEFAULT", action) + errCh <- err + }() + } + + // Wait for the first action to enter. + var current int + select { + case current = <-started: + case <-time.After(timeout): + t.Fatalf("timeout waiting for first action to start") + } + + seen := make(map[int]bool, N) + seen[current] = true + + // While current is holding the slot, nobody else should enter. + select { + case v := <-started: + t.Fatalf("unexpected concurrent entry (sequence broken): %d", v) + default: // OK + } + + // Now release actions one by one. Each release should allow exactly one next entry. + for step := 1; step < N; step++ { + close(done[current]) + + select { + case next := <-started: + if seen[next] { + t.Fatalf("action %d started more than once", next) + } + + seen[next] = true + current = next + case <-time.After(timeout): + t.Fatalf("timeout waiting for next action to start (step=%d)", step) + } + + // Again: still only one action can be in at a time. + select { + case v := <-started: + t.Fatalf("unexpected concurrent entry (sequence broken): %d", v) + default: // OK + } + } + + // Release the last one. + close(done[current]) + + // All Run() calls should complete with nil error. + for range N { + select { + case err := <-errCh: + if err != nil { + t.Fatalf("Run returned unexpected error: %v", err) + } + case <-time.After(timeout): + t.Fatalf("timeout waiting for Run results") + } + } + + // Sanity: everyone entered exactly once. + if len(seen) != N { + t.Fatalf("expected %d actions to start, got %d", N, len(seen)) + } +} + +func waitClosed(t *testing.T, ch <-chan struct{}, timeout time.Duration, name string) { + t.Helper() + select { + case <-ch: + return + case <-time.After(timeout): + t.Fatalf("timeout waiting for %s (%s)", name, timeout) + } +} From 349106969ddaa132d95e5a29d45088ceb431d4b0 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Tue, 3 Mar 2026 11:55:55 +0100 Subject: [PATCH 25/38] fix(processrunner): prioritize Wait() result over context cancellation --- pkg/processrunner/process_runner.go | 114 ++++++++++++------ pkg/router/handlers/execution_handler_test.go | 28 +++-- test/integration/server_test.go | 6 +- 3 files changed, 95 insertions(+), 53 deletions(-) diff --git a/pkg/processrunner/process_runner.go b/pkg/processrunner/process_runner.go index e17d7e2..74a45fb 100644 --- a/pkg/processrunner/process_runner.go +++ b/pkg/processrunner/process_runner.go @@ -1,6 +1,8 @@ // Package processrunner provides process lifecycle management: -// start process in its own process group, wait synchronously/asynchronously, -// and terminate the process group on context cancellation with optional grace timeout. +// - starts a process in its own process group, +// - allows synchronous and asynchronous waiting, +// - and terminates the whole process group on context cancellation +// with an optional grace timeout (SIGTERM -> SIGKILL). package processrunner import ( @@ -16,14 +18,25 @@ import ( ) var ( - ErrStartCommand = errors.New("failed to start command") - ErrInvalidPID = errors.New("invalid PID") + // ErrStartCommand indicates that the underlying command failed to start. + ErrStartCommand = errors.New("failed to start command") + + // ErrInvalidPID indicates that an invalid PID was used when attempting to signal. + ErrInvalidPID = errors.New("invalid PID") + + // ErrProcessGroupSignal indicates failure when sending a signal to a process group. ErrProcessGroupSignal = errors.New("failed to send signal to process group") ) type Process struct { - cmd cmdrunner.Command - runner cmdrunner.Runner + // cmd is the underlying command abstraction. + cmd cmdrunner.Command + + // runner is used to send signals (e.g., kill process group). + runner cmdrunner.Runner + + // timeout defines how long to wait after SIGTERM before sending SIGKILL. + // If nil, the process group is killed immediately with SIGKILL. timeout *time.Duration } @@ -36,10 +49,15 @@ func StartProcess( ) (*Process, error) { cmd := runner.Command(command, args...) + // Ensure the command runs in its own process group. + // This allows signaling the entire group (including children) + // by sending a signal to -pid. //nolint:exhaustruct cmd.SetSysProcAttr(&syscall.SysProcAttr{ Setpgid: true, }) + + // Redirect both stdout and stderr to the provided writer. cmd.SetStdout(writer) cmd.SetStderr(writer) @@ -55,17 +73,24 @@ func StartProcess( } func (p *Process) WaitSync(ctx context.Context) (int, error) { + // done is closed when Wait() finishes. + // It is used to coordinate with terminateOnContextDone + // to avoid sending signals after the process already exited. done := make(chan struct{}) + defer close(done) + // Start a goroutine that listens for context cancellation + // and attempts to terminate the process group if needed. go func() { p.terminateOnContextDone(ctx, done) }() + // Wait blocks until the process exits. + // According to the intended semantics, the result of Wait() + // has priority over a later ctx.Done(). err := p.cmd.Wait() - close(done) - - return p.determineExitCodeAndError(ctx, err) + return p.exitFromWaitError(err) } type Result struct { @@ -83,12 +108,9 @@ func (p *Process) WaitAsync(ctx context.Context) <-chan Result { defer close(resultCh) err := p.cmd.Wait() - exitCode, finalErr := p.determineExitCodeAndError(ctx, err) + exitCode, finalErr := p.exitFromWaitError(err) - resultCh <- Result{ - ExitCode: exitCode, - Err: finalErr, - } + resultCh <- Result{ExitCode: exitCode, Err: finalErr} }() go func() { @@ -101,26 +123,46 @@ func (p *Process) WaitAsync(ctx context.Context) <-chan Result { func (p *Process) terminateOnContextDone(ctx context.Context, done <-chan struct{}) { select { case <-ctx.Done(): + // If Wait() has already completed (done is closed), + // do not attempt to signal the process group. + // The result from Wait() is considered authoritative. + select { + case <-done: + return + default: + } + pid := p.cmd.Pid() + if pid <= 0 { + // Invalid PID: nothing to signal. + return + } + // If no grace timeout is defined, + // immediately kill the entire process group. if p.timeout == nil { _ = p.signalProcessGroup(pid, syscall.SIGKILL) return } + // First attempt graceful shutdown with SIGTERM. _ = p.signalProcessGroup(pid, syscall.SIGTERM) + // Wait for either: + // - the grace timeout to expire (then send SIGKILL), + // - or the process to exit naturally (done closed). t := time.NewTimer(*p.timeout) defer t.Stop() select { case <-t.C: _ = p.signalProcessGroup(pid, syscall.SIGKILL) - case <-done: + case <-done: // Process exited during grace period. } - case <-done: + case <-done: // Process exited before context cancellation. + return } } @@ -129,25 +171,25 @@ func (p *Process) signalProcessGroup(pid int, sig syscall.Signal) error { return fmt.Errorf("cannot send %s to process group: pid=%d: %w", sig, pid, ErrInvalidPID) } + // Negative PID means: send signal to the process group + // whose PGID equals the absolute value of pid. pgid := -pid + if err := p.runner.Kill(pgid, sig); err != nil { - return fmt.Errorf("failed to send %s to process group %d: %w: %w", sig, pgid, err, ErrProcessGroupSignal) + return fmt.Errorf( + "failed to send %s to process group %d: %w: %w", + sig, + pgid, + err, + ErrProcessGroupSignal, + ) } return nil } -func (p *Process) determineExitCodeAndError(ctx context.Context, err error) (int, error) { - // If the context was canceled or timed out, - // it means the process was terminated externally. - if p.isTimeoutOrCanceled(ctx) { - // Return -1 and the context error. - //nolint:wrapcheck - return -1, ctx.Err() - } - - // If Wait() returned no error, - // the process exited normally (exit code available in ProcessState). +func (p *Process) exitFromWaitError(err error) (int, error) { + // No error from Wait(): process exited normally. if err == nil { if ps := p.cmd.ProcessState(); ps != nil { return ps.ExitCode(), nil @@ -163,22 +205,18 @@ func (p *Process) determineExitCodeAndError(ctx context.Context, err error) (int // OR it was terminated by a signal. var exitErr *exec.ExitError if errors.As(err, &exitErr) { - // On Unix systems, we can check the WaitStatus. - // If the process was terminated by a signal, - // it means external intervention (SIGTERM/SIGKILL). + // On Unix systems, WaitStatus allows distinguishing + // between normal exit and signal-based termination. if status, ok := exitErr.Sys().(syscall.WaitStatus); ok && status.Signaled() { + // Terminated by a signal (e.g., SIGTERM/SIGKILL). return -1, err } - // Otherwise, the process exited normally (even if exit code != 0). - // In this case, exit code is a valid result and error is nil. + // Exited normally with a non-zero exit code. + // In this case, the exit code is returned and error is nil. return exitErr.ExitCode(), nil } - // Any other error from Wait() is treated as an external/infrastructure error. + // Any other error from Wait() is treated as an infrastructure/runtime error. return -1, err } - -func (p *Process) isTimeoutOrCanceled(ctx context.Context) bool { - return ctx.Err() != nil && (errors.Is(ctx.Err(), context.DeadlineExceeded) || errors.Is(ctx.Err(), context.Canceled)) -} diff --git a/pkg/router/handlers/execution_handler_test.go b/pkg/router/handlers/execution_handler_test.go index 9e0c8fc..28257ec 100644 --- a/pkg/router/handlers/execution_handler_test.go +++ b/pkg/router/handlers/execution_handler_test.go @@ -1348,8 +1348,12 @@ func TestExecutionHandler_TerminateOnCancel_NoGrace_SendsSIGKILL(t *testing.T) { } errMsg := rr.Header().Get("X-Error-Message") - if !strings.Contains(errMsg, "context canceled") { - t.Errorf("expected X-Error-Message header to contain %q, got %q", "context canceled", errMsg) + if errMsg == "" { + t.Fatalf("expected X-Error-Message header to be set") + } + + if !strings.Contains(errMsg, "wait error") { + t.Errorf("expected X-Error-Message header to contain %q, got %q", "wait error", errMsg) } } @@ -1471,7 +1475,7 @@ func TestExecutionHandler_TerminateOnCancel_WithGrace_ProcessEndsBeforeTimer_Sen } } -func TestExecutionHandler_DeadlineExceeded_PrioritizesCtxErrOverExitError(t *testing.T) { +func TestExecutionHandler_DeadlineExceeded_DoesNotOverrideWaitResult(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) @@ -1484,7 +1488,6 @@ func TestExecutionHandler_DeadlineExceeded_PrioritizesCtxErrOverExitError(t *tes runErr := exec.Command("sh", "-c", "exit 7").Run() var exitErr *exec.ExitError - if !errors.As(runErr, &exitErr) { t.Fatalf("expected *exec.ExitError, got %T: %v", runErr, runErr) } @@ -1507,6 +1510,7 @@ func TestExecutionHandler_DeadlineExceeded_PrioritizesCtxErrOverExitError(t *tes // Prevent signalProcessGroup from calling runner.Kill. mockCmd.EXPECT().Pid().Return(0).AnyTimes() + // Wait returns exit error (exit code 7). mockCmd.EXPECT().Wait().Return(exitErr) mockCmd.EXPECT().ProcessState().Return(nil).AnyTimes() @@ -1528,19 +1532,17 @@ func TestExecutionHandler_DeadlineExceeded_PrioritizesCtxErrOverExitError(t *tes t.Errorf("expected status 200, got %d", rr.Code) } + // Wait() result wins over context cancellation, so we should report exit code 7. exitCodeHeader := rr.Header().Get("X-Exit-Code") - if exitCodeHeader != "" { - t.Errorf("expected X-Exit-Code header to be empty, got %q", exitCodeHeader) + if exitCodeHeader != "7" { + t.Errorf("expected X-Exit-Code header to be %q, got %q", "7", exitCodeHeader) } + // Since non-zero exit is treated as a normal result (exit code + nil error), + // there should be no context-related error message. errMsg := rr.Header().Get("X-Error-Message") - if !strings.Contains(errMsg, "context deadline exceeded") { - t.Errorf("expected X-Error-Message header to contain %q, got %q", "context deadline exceeded", errMsg) - } - - // Make sure it did not report the process exit code (7) as primary. - if exitCodeHeader == "7" { - t.Errorf("did not expect exit code 7 to be reported, got %q", exitCodeHeader) + if errMsg != "" { + t.Errorf("expected X-Error-Message header to be empty, got %q", errMsg) } } diff --git a/test/integration/server_test.go b/test/integration/server_test.go index 24f190c..631cdd6 100644 --- a/test/integration/server_test.go +++ b/test/integration/server_test.go @@ -194,8 +194,10 @@ func TestServerIntegration(t *testing.T) { } errMsg := rec.Header().Get("X-Error-Message") - if !strings.Contains(errMsg, "context deadline exceeded") { - t.Errorf("Expected timeout error message in X-Error-Message header, got %q", errMsg) + expectedMsg := "process wait failed: signal: killed" + + if errMsg != expectedMsg { + t.Errorf("Expected timeout error message in X-Error-Message header to be %q, got %q", expectedMsg, errMsg) } }) From 7ae3ccb1d64a866a3aaa93e6f4494c77ba9d7b93 Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Tue, 3 Mar 2026 21:14:14 +0100 Subject: [PATCH 26/38] fix(cmdrunner): send signal to provided PID without implicit PGID negation --- pkg/cmdrunner/cmdrunner.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/cmdrunner/cmdrunner.go b/pkg/cmdrunner/cmdrunner.go index 648ac80..2560252 100644 --- a/pkg/cmdrunner/cmdrunner.go +++ b/pkg/cmdrunner/cmdrunner.go @@ -60,6 +60,9 @@ func (c *realCommand) Pid() int { return c.Cmd.Process.Pid } +// compile-time interface check. +var _ Command = (*realCommand)(nil) + // RealRunner is a real implementation of the Runner interface. type RealRunner struct{} @@ -68,8 +71,13 @@ func (r *RealRunner) Command(name string, arg ...string) Command { return &realCommand{exec.Command(name, arg...)} } -// Kill sends a signal to a process group. +// Kill sends a signal to a process or process group. +// If pid > 0, the signal is sent to the process with that PID. +// If pid < 0, the signal is sent to the process group with ID = -pid. func (r *RealRunner) Kill(pid int, sig syscall.Signal) error { //nolint:wrapcheck - return syscall.Kill(-pid, sig) + return syscall.Kill(pid, sig) } + +// compile-time interface check. +var _ Runner = (*RealRunner)(nil) From 61c3fac3e652d9f77d62b5fe658939e3692483cb Mon Sep 17 00:00:00 2001 From: dkarczmarski Date: Wed, 4 Mar 2026 09:56:15 +0100 Subject: [PATCH 27/38] test(cmdrunner): add integration tests --- .gitignore | 4 + pkg/cmdrunner/cmdrunner_integration_test.go | 368 ++++++++++++++++++++ 2 files changed, 372 insertions(+) create mode 100644 pkg/cmdrunner/cmdrunner_integration_test.go diff --git a/.gitignore b/.gitignore index e6495f0..9ed6936 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,7 @@ webcmd key.pem cert.pem + +*notes*.txt +*notes*.md + diff --git a/pkg/cmdrunner/cmdrunner_integration_test.go b/pkg/cmdrunner/cmdrunner_integration_test.go new file mode 100644 index 0000000..9edb218 --- /dev/null +++ b/pkg/cmdrunner/cmdrunner_integration_test.go @@ -0,0 +1,368 @@ +//go:build integration + +//nolint:paralleltest +package cmdrunner_test + +import ( + "bytes" + "errors" + "fmt" + "os/exec" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/dkarczmarski/webcmd/pkg/cmdrunner" +) + +const ( + shortTimeout = 2 * time.Second + longTimeout = 5 * time.Second +) + +func TestSuccessfulCommandExitCodeZero(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := startShellCommand(t, r, "echo hello") + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() returned error for successful command: %v", err) + } + + ps := c.ProcessState() + if ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } + + if got := ps.ExitCode(); got != 0 { + t.Fatalf("expected exit code 0, got %d", got) + } +} + +func TestFailingCommandExitCodeNonZero(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := startShellCommand(t, r, "exit 42") + err := waitWithTimeout(t, c, shortTimeout) + + if err == nil { + t.Fatalf("expected Wait() to return error for non-zero exit") + } + + var ee *exec.ExitError + if !errors.As(err, &ee) { + t.Fatalf("expected *exec.ExitError, got %T: %v", err, err) + } + + ps := c.ProcessState() + if ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } + + if got := ps.ExitCode(); got != 42 { + t.Fatalf("expected exit code 42, got %d", got) + } +} + +func TestStartNonExistingBinaryReturnsError(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := r.Command("__not_a_cmd__") + if err := c.Start(); err == nil { + t.Fatalf("expected Start() to fail for non-existing binary") + } + + // Pid() should remain 0 because the process was never started. + if got := c.Pid(); got != 0 { + t.Fatalf("expected Pid() == 0 when Start() fails, got %d", got) + } +} + +func TestStdoutRedirectionWorks(t *testing.T) { + r := &cmdrunner.RealRunner{} + + var out bytes.Buffer + + c := r.Command("sh", "-c", "printf 'abc'") + c.SetStdout(&out) + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + if got := out.String(); got != "abc" { + t.Fatalf("expected stdout %q, got %q", "abc", got) + } +} + +func TestStderrRedirectionWorks(t *testing.T) { + r := &cmdrunner.RealRunner{} + + var errBuf bytes.Buffer + + c := r.Command("sh", "-c", "printf 'err' 1>&2") + + c.SetStderr(&errBuf) + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + if got := errBuf.String(); got != "err" { + t.Fatalf("expected stderr %q, got %q", "err", got) + } +} + +func TestCombinedStdoutAndStderrToSingleWriter(t *testing.T) { + r := &cmdrunner.RealRunner{} + + var combined bytes.Buffer + + c := r.Command("sh", "-c", "echo out; echo err 1>&2") + + c.SetStdout(&combined) + c.SetStderr(&combined) + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + got := combined.String() + // Order is not guaranteed; just verify both are present. + if !strings.Contains(got, "out") || !strings.Contains(got, "err") { + t.Fatalf("expected combined output to contain both 'out' and 'err', got %q", got) + } +} + +func TestKillProcessByPidSIGTERM(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := startShellCommand(t, r, "sleep 30") + pid := c.Pid() + + if pid <= 0 { + t.Fatalf("expected valid pid, got %d", pid) + } + + if err := r.Kill(pid, syscall.SIGTERM); err != nil { + t.Fatalf("Kill(pid, SIGTERM) failed: %v", err) + } + + // Process should exit quickly after SIGTERM. + _ = waitWithTimeout(t, c, longTimeout) + + // We avoid asserting a specific exit code because shells/wrappers can map signals differently. + + ps := c.ProcessState() + if ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } +} + +func TestKillProcessGroupByNegativePidSIGTERM(t *testing.T) { + r := &cmdrunner.RealRunner{} + + // This command creates a new process group so that killing -pid targets the group. + // The script starts a background sleep plus a foreground sleep. + c := r.Command("sh", "-c", "(sleep 30) & sleep 30") + c.SetSysProcAttr(&syscall.SysProcAttr{Setpgid: true}) + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + pid := c.Pid() + if pid <= 0 { + t.Fatalf("expected valid pid, got %d", pid) + } + + // Give the shell a moment to spawn the background process. + time.Sleep(150 * time.Millisecond) + + // Kill the entire process group: pid < 0 targets process group id = -pid. + if err := r.Kill(-pid, syscall.SIGTERM); err != nil { + t.Fatalf("Kill(-pid, SIGTERM) failed: %v", err) + } + + // The group should terminate quickly. + _ = waitWithTimeout(t, c, longTimeout) + + ps := c.ProcessState() + if ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } +} + +func TestKillAfterProcessExitReturnsESRCH(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := startShellCommand(t, r, "true") + pid := c.Pid() + + if pid <= 0 { + t.Fatalf("expected valid pid, got %d", pid) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + err := r.Kill(pid, syscall.SIGTERM) + if err == nil { + t.Fatalf("expected Kill() to fail for already-exited process") + } + + // On Linux, killing a non-existing pid typically returns ESRCH. + if !errors.Is(err, syscall.ESRCH) { + t.Fatalf("expected ESRCH, got %T: %v", err, err) + } +} + +func TestParallelStartsAndWaits(t *testing.T) { + r := &cmdrunner.RealRunner{} + + const n = 10 + + var wg sync.WaitGroup + + wg.Add(n) + + errCh := make(chan error, n) + + for i := range n { + go func(i int) { + defer wg.Done() + + // Each command prints a unique token. + script := fmt.Sprintf("echo token_%d", i) + c := r.Command("sh", "-c", script) + + var out bytes.Buffer + + c.SetStdout(&out) + + if err := c.Start(); err != nil { + errCh <- fmt.Errorf("function Start() failed (i=%d): %w", i, err) + + return + } + + if c.Pid() <= 0 { + errCh <- fmt.Errorf("invalid pid (i=%d): %d", i, c.Pid()) + + return + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + errCh <- fmt.Errorf("function Wait() failed (i=%d): %w", i, err) + + return + } + + if !strings.Contains(out.String(), fmt.Sprintf("token_%d", i)) { + errCh <- fmt.Errorf("unexpected stdout (i=%d): %q", i, out.String()) + + return + } + }(i) + } + + wg.Wait() + close(errCh) + + for err := range errCh { + t.Error(err) + } +} + +func TestPidIsZeroBeforeStartAndNonZeroAfterStart(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := r.Command("sh", "-c", "sleep 0.2") + if got := c.Pid(); got != 0 { + t.Fatalf("expected Pid() == 0 before Start(), got %d", got) + } + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + if got := c.Pid(); got <= 0 { + t.Fatalf("expected Pid() > 0 after Start(), got %d", got) + } + + _ = waitWithTimeout(t, c, shortTimeout) +} + +func TestProcessStateNilBeforeWaitNonNilAfterWait(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := r.Command("sh", "-c", "sleep 0.2") + + // Before Start(), ProcessState should be nil. + if ps := c.ProcessState(); ps != nil { + t.Fatalf("expected ProcessState() == nil before Start(), got %+v", ps) + } + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + // After Start() but before Wait(), ProcessState is typically still nil. + if ps := c.ProcessState(); ps != nil { + // We keep this as a strict assertion to catch unexpected behavior changes. + t.Fatalf("expected ProcessState() == nil before Wait(), got %+v", ps) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + if ps := c.ProcessState(); ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } +} + +// startShellCommand creates and starts: sh -c