diff --git a/.gitignore b/.gitignore index e6495f0..9ed6936 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,7 @@ webcmd key.pem cert.pem + +*notes*.txt +*notes*.md + diff --git a/README.md b/README.md index 3301873..93e65d6 100644 --- a/README.md +++ b/README.md @@ -228,6 +228,35 @@ The `commandTemplate` uses Go's `text/template` syntax to inject data from the H Header names are normalized by replacing hyphens (`-`) with underscores (`_`). Example: `{{.headers.X_Api_Key}}` or `{{.headers.User_Agent}}` . +## HTTP Response and Error Handling + +The server returns different HTTP status codes depending on the outcome of the request and the command execution: + +- **200 OK** + Returned when the command starts successfully, regardless of whether the command later exits with code 0 or non-zero, or fails while executing. + In this case, the handler sets the following response headers: + - `X-Success`: `"true"` if the process exit code is 0, otherwise `"false"`. + - `X-Exit-Code`: The process exit code (if available). + - `X-Error-Message`: Empty on success, or contains the execution error message if the command fails (only if `server.withErrorHeader` is enabled in the configuration). + +- **429 Too Many Requests** + Returned when command execution cannot start because the call gate rejects the request as busy (e.g., when `mode: single` is used). + +- **404 Not Found** + Returned when the URL command is missing or the endpoint is not configured. + +- **400 Bad Request** + Returned when `bodyAsJson` is enabled but the request body is not a valid JSON object. + +- **500 Internal Server Error** + Returned when the command cannot be prepared or started at all, for example: + - Streaming was requested but the `ResponseWriter` does not support flushing. + - Command template rendering/building failed. + - Gate or pre-action setup failed before the process was started. + - Handler configuration is invalid. + +**Important distinction:** A command that starts successfully but later fails (e.g., returns a non-zero exit code) is still treated as an HTTP-level success and returns **200 OK**. Detailed information about the process outcome is available in the `X-Success` and `X-Exit-Code` headers. The `X-Error-Message` header is also provided if `server.withErrorHeader` is set to `true` in the configuration. + ## Configuration (`config.yaml`) ### `server` @@ -240,6 +269,8 @@ The `commandTemplate` uses Go's `text/template` syntax to inject data from the H * `shutdownGracePeriod` *(optional)* - the time to wait for active requests to finish before the server shuts down (e.g., `5s`, `30s`). Format: [Go Duration](https://pkg.go.dev/time#ParseDuration). Default: `5s`. +* `withErrorHeader` *(optional)* - if set to `true`, the `X-Error-Message` header will be included in the HTTP response when a command execution fails. Default: `false`. + * `https` *(optional)* - HTTPS configuration: * `enabled` - enable or disable HTTPS. Default: `false`. * `certFile` - path to the SSL certificate file. diff --git a/config.sample-ssl.yaml b/config.sample-ssl.yaml index e621c6f..9784f6b 100644 --- a/config.sample-ssl.yaml +++ b/config.sample-ssl.yaml @@ -1,5 +1,6 @@ server: # address: "127.0.0.1:8443" + withErrorHeader: true shutdownGracePeriod: 5s https: enabled: true diff --git a/config.sample.yaml b/config.sample.yaml index 915c42b..3ce1be4 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -1,5 +1,6 @@ server: # address: "127.0.0.1:8080" + withErrorHeader: true shutdownGracePeriod: 5s authorization: - name: auth-name1 diff --git a/pkg/callgate/callgate_queue.go b/pkg/callgate/callgate_queue.go index de373fa..6c148da 100644 --- a/pkg/callgate/callgate_queue.go +++ b/pkg/callgate/callgate_queue.go @@ -7,32 +7,61 @@ import ( ) type Sequence struct { - token chan struct{} + mu sync.Mutex + busy bool + queue []chan struct{} // FIFO of waiters } func NewSequence() *Sequence { - l := &Sequence{ - token: make(chan struct{}, 1), - } - l.token <- struct{}{} - - return l + return &Sequence{} //nolint:exhaustruct } // Acquire blocks until the execution slot is available. // -// This gate allows only one execution at a time. Calls are processed in sequence: -// if one is running, the next call waits until it completes. +// This gate allows only one execution at a time. +// Waiters are served in strict FIFO order. // -// When the slot is acquired, Acquire returns a release function. The caller must -// call release() when the work is done. +// When the slot is acquired, Acquire returns a release function. +// The caller must call release() when the work is done. // // If the context is canceled before acquiring the slot, Acquire returns ctx.Err(). func (cg *Sequence) Acquire(ctx context.Context) (func(), error) { + // Fast path: not busy and no queue => acquire immediately. + cg.mu.Lock() + if !cg.busy && len(cg.queue) == 0 { + cg.busy = true + + cg.mu.Unlock() + + return cg.releaseFunc(), nil + } + + // Otherwise, enqueue and wait for our turn. + waiter := make(chan struct{}) + + cg.queue = append(cg.queue, waiter) + cg.mu.Unlock() + select { case <-ctx.Done(): - return nil, fmt.Errorf("acquire sequence: %w", ctx.Err()) - case <-cg.token: + // Remove ourselves from the queue if we haven't been granted the slot yet. + cg.mu.Lock() + removed := cg.removeWaiterLocked(waiter) + cg.mu.Unlock() + + // If we were NOT removed, it means we were already granted (channel closed) + // roughly concurrently. In that case, we must return success, not ctx.Err(). + // We detect "granted" by checking removed==false; but there is a race: + // - if channel closed just before we locked, removeWaiterLocked won't find it. + // In that case, proceed as acquired. + if removed { + return nil, fmt.Errorf("acquire sequence: %w", ctx.Err()) + } + + return cg.releaseFunc(), nil + + case <-waiter: + // Granted in FIFO order. return cg.releaseFunc(), nil } } @@ -42,7 +71,37 @@ func (cg *Sequence) releaseFunc() func() { return func() { once.Do(func() { - cg.token <- struct{}{} + cg.mu.Lock() + defer cg.mu.Unlock() + + // If someone is waiting, grant the next one in FIFO order. + if len(cg.queue) > 0 { + next := cg.queue[0] + // pop front + copy(cg.queue[0:], cg.queue[1:]) + cg.queue = cg.queue[:len(cg.queue)-1] + + // Keep busy=true, transfer ownership to the next waiter. + close(next) + + return + } + + // No waiters => free the slot. + cg.busy = false }) } } + +func (cg *Sequence) removeWaiterLocked(waiter chan struct{}) bool { + for i := range cg.queue { + if cg.queue[i] == waiter { + copy(cg.queue[i:], cg.queue[i+1:]) + cg.queue = cg.queue[:len(cg.queue)-1] + + return true + } + } + + return false +} diff --git a/pkg/callgate/callgate_queue_test.go b/pkg/callgate/callgate_queue_test.go index b3c24d2..45a0097 100644 --- a/pkg/callgate/callgate_queue_test.go +++ b/pkg/callgate/callgate_queue_test.go @@ -252,3 +252,266 @@ func TestSequence_SerializesCriticalSection_NoOverlap(t *testing.T) { t.Fatalf("critical section overlapped: max concurrent inCS = %d, want 1", maxSeen) } } + +// FIFO / fairness: waiters should be granted in the same order they started waiting. +func TestSequence_Acquire_FIFOOrder(t *testing.T) { + t.Parallel() + + ctx := t.Context() + g := callgate.NewSequence() + + release1, err := g.Acquire(ctx) + if err != nil { + t.Fatalf("Acquire #1 error: %v", err) + } + + const n = 500 + orderCh := make(chan int, n) + + var ( + releases [n]func() + errs [n]error + ) + + // Start goroutines in a known order. + // Small sleeps help ensure they enqueue in that same order (keeps the test stable). + for i := range n { + go func(i int) { + releases[i], errs[i] = g.Acquire(ctx) + orderCh <- i + }(i) + + time.Sleep(10 * time.Millisecond) + } + + // Let the first waiter through. + release1() + + // We expect acquisition order in strict FIFO. + for want := range n { + select { + case got := <-orderCh: + if got != want { + t.Fatalf("FIFO violated: got=%d, want=%d", got, want) + } + + if errs[got] != nil { + t.Fatalf("Acquire waiter #%d error: %v", got, errs[got]) + } + + if releases[got] == nil { + t.Fatalf("Acquire waiter #%d returned nil release", got) + } + + // Release to allow the next waiter. + releases[got]() + + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for waiter #%d to acquire", want) + } + } +} + +// Race: ctx cancel vs grant happening "at the same time". +// Invariant: +// - If Acquire returns error => release must be nil. +// - If Acquire returns success => it must truly own the slot (next Acquire blocks until release()). +func TestSequence_Acquire_CancelVsGrantRace(t *testing.T) { + t.Parallel() + + root := t.Context() + g := callgate.NewSequence() + + const iters = 500 + + for iter := range iters { + // Hold the slot. + release1, err := g.Acquire(root) + if err != nil { + t.Fatalf("iter %d: Acquire #1 error: %v", iter, err) + } + + ctx, cancel := context.WithCancel(root) + + var ( + release2 func() + err2 error + ) + + done := make(chan struct{}) + + go func() { + release2, err2 = g.Acquire(ctx) + + close(done) + }() + + // Give the waiter a moment to enqueue. + time.Sleep(1 * time.Millisecond) + + // Try to make cancel and release happen as simultaneously as possible. + start := make(chan struct{}) + + var wg sync.WaitGroup + + wg.Add(2) + + go func() { + defer wg.Done() + <-start + cancel() + }() + go func() { + defer wg.Done() + <-start + release1() + }() + + close(start) + wg.Wait() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatalf("iter %d: waiter did not finish", iter) + } + + if err2 != nil { + // If canceled "won", must be an error and release must be nil. + if release2 != nil { + t.Fatalf("iter %d: expected nil release on error, got non-nil", iter) + } + + if !errors.Is(err2, context.Canceled) && !errors.Is(err2, context.DeadlineExceeded) { + t.Fatalf("iter %d: expected ctx error, got: %v", iter, err2) + } + + continue + } + + // Success path: must have a release func. + if release2 == nil { + t.Fatalf("iter %d: Acquire succeeded but release is nil", iter) + } + + // Verify it *really* owns the slot: a third Acquire should block until release2(). + acquired3 := make(chan struct{}) + go func() { + rel3, err3 := g.Acquire(root) + if err3 == nil && rel3 != nil { + rel3() + } + + close(acquired3) + }() + + select { + case <-acquired3: + // If it returned immediately, then release2 didn't actually own the slot. + t.Fatalf("iter %d: Acquire #3 returned early; release2 likely didn't own the slot", iter) + case <-time.After(30 * time.Millisecond): + } + + release2() + + select { + case <-acquired3: + case <-time.After(2 * time.Second): + t.Fatalf("iter %d: Acquire #3 did not return after release2()", iter) + } + } +} + +// A canceled waiter should be removed from the queue and not block later waiters. +// Scenario: +// - Hold slot with #1. +// - Waiter #2 times out while waiting (must return error). +// - Waiter #3 waits normally. +// - Release #1: waiter #3 must acquire (i.e., canceled waiter must not "sit" at the head forever). +func TestSequence_Acquire_CanceledWaiterDoesNotBlockQueue(t *testing.T) { + t.Parallel() + + root := t.Context() + g := callgate.NewSequence() + + release1, err := g.Acquire(root) + if err != nil { + t.Fatalf("Acquire #1 error: %v", err) + } + defer release1() + + // Waiter #2: should time out while waiting. + ctx2, cancel2 := context.WithTimeout(root, 30*time.Millisecond) + defer cancel2() + + done2 := make(chan struct{}) + + var ( + release2 func() + err2 error + ) + + go func() { + release2, err2 = g.Acquire(ctx2) + + close(done2) + }() + + select { + case <-done2: + case <-time.After(2 * time.Second): + t.Fatalf("waiter #2 did not return") + } + + if release2 != nil { + t.Fatalf("expected nil release for waiter #2, got non-nil") + } + + if err2 == nil { + t.Fatalf("expected error for waiter #2, got nil") + } + + if !errors.Is(err2, context.DeadlineExceeded) { + t.Fatalf("expected waiter #2 to wrap context.DeadlineExceeded, got: %v", err2) + } + + // Waiter #3: should be able to acquire once #1 releases. + acquired3 := make(chan struct{}) + + var ( + release3 func() + err3 error + ) + + go func() { + release3, err3 = g.Acquire(root) + + close(acquired3) + }() + + // Should still be blocked because #1 is held. + select { + case <-acquired3: + t.Fatalf("Acquire #3 unexpectedly returned early (err=%v)", err3) + case <-time.After(30 * time.Millisecond): + } + + // Now release #1: #3 must get it (canceled #2 must not block). + release1() + + select { + case <-acquired3: + case <-time.After(2 * time.Second): + t.Fatalf("Acquire #3 did not return after release #1") + } + + if err3 != nil { + t.Fatalf("Acquire #3 error: %v", err3) + } + + if release3 == nil { + t.Fatalf("Acquire #3 returned nil release") + } + + release3() +} diff --git a/pkg/cmdrunner/cmdrunner.go b/pkg/cmdrunner/cmdrunner.go index 648ac80..2560252 100644 --- a/pkg/cmdrunner/cmdrunner.go +++ b/pkg/cmdrunner/cmdrunner.go @@ -60,6 +60,9 @@ func (c *realCommand) Pid() int { return c.Cmd.Process.Pid } +// compile-time interface check. +var _ Command = (*realCommand)(nil) + // RealRunner is a real implementation of the Runner interface. type RealRunner struct{} @@ -68,8 +71,13 @@ func (r *RealRunner) Command(name string, arg ...string) Command { return &realCommand{exec.Command(name, arg...)} } -// Kill sends a signal to a process group. +// Kill sends a signal to a process or process group. +// If pid > 0, the signal is sent to the process with that PID. +// If pid < 0, the signal is sent to the process group with ID = -pid. func (r *RealRunner) Kill(pid int, sig syscall.Signal) error { //nolint:wrapcheck - return syscall.Kill(-pid, sig) + return syscall.Kill(pid, sig) } + +// compile-time interface check. +var _ Runner = (*RealRunner)(nil) diff --git a/pkg/cmdrunner/cmdrunner_integration_test.go b/pkg/cmdrunner/cmdrunner_integration_test.go new file mode 100644 index 0000000..9edb218 --- /dev/null +++ b/pkg/cmdrunner/cmdrunner_integration_test.go @@ -0,0 +1,368 @@ +//go:build integration + +//nolint:paralleltest +package cmdrunner_test + +import ( + "bytes" + "errors" + "fmt" + "os/exec" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/dkarczmarski/webcmd/pkg/cmdrunner" +) + +const ( + shortTimeout = 2 * time.Second + longTimeout = 5 * time.Second +) + +func TestSuccessfulCommandExitCodeZero(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := startShellCommand(t, r, "echo hello") + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() returned error for successful command: %v", err) + } + + ps := c.ProcessState() + if ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } + + if got := ps.ExitCode(); got != 0 { + t.Fatalf("expected exit code 0, got %d", got) + } +} + +func TestFailingCommandExitCodeNonZero(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := startShellCommand(t, r, "exit 42") + err := waitWithTimeout(t, c, shortTimeout) + + if err == nil { + t.Fatalf("expected Wait() to return error for non-zero exit") + } + + var ee *exec.ExitError + if !errors.As(err, &ee) { + t.Fatalf("expected *exec.ExitError, got %T: %v", err, err) + } + + ps := c.ProcessState() + if ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } + + if got := ps.ExitCode(); got != 42 { + t.Fatalf("expected exit code 42, got %d", got) + } +} + +func TestStartNonExistingBinaryReturnsError(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := r.Command("__not_a_cmd__") + if err := c.Start(); err == nil { + t.Fatalf("expected Start() to fail for non-existing binary") + } + + // Pid() should remain 0 because the process was never started. + if got := c.Pid(); got != 0 { + t.Fatalf("expected Pid() == 0 when Start() fails, got %d", got) + } +} + +func TestStdoutRedirectionWorks(t *testing.T) { + r := &cmdrunner.RealRunner{} + + var out bytes.Buffer + + c := r.Command("sh", "-c", "printf 'abc'") + c.SetStdout(&out) + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + if got := out.String(); got != "abc" { + t.Fatalf("expected stdout %q, got %q", "abc", got) + } +} + +func TestStderrRedirectionWorks(t *testing.T) { + r := &cmdrunner.RealRunner{} + + var errBuf bytes.Buffer + + c := r.Command("sh", "-c", "printf 'err' 1>&2") + + c.SetStderr(&errBuf) + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + if got := errBuf.String(); got != "err" { + t.Fatalf("expected stderr %q, got %q", "err", got) + } +} + +func TestCombinedStdoutAndStderrToSingleWriter(t *testing.T) { + r := &cmdrunner.RealRunner{} + + var combined bytes.Buffer + + c := r.Command("sh", "-c", "echo out; echo err 1>&2") + + c.SetStdout(&combined) + c.SetStderr(&combined) + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + got := combined.String() + // Order is not guaranteed; just verify both are present. + if !strings.Contains(got, "out") || !strings.Contains(got, "err") { + t.Fatalf("expected combined output to contain both 'out' and 'err', got %q", got) + } +} + +func TestKillProcessByPidSIGTERM(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := startShellCommand(t, r, "sleep 30") + pid := c.Pid() + + if pid <= 0 { + t.Fatalf("expected valid pid, got %d", pid) + } + + if err := r.Kill(pid, syscall.SIGTERM); err != nil { + t.Fatalf("Kill(pid, SIGTERM) failed: %v", err) + } + + // Process should exit quickly after SIGTERM. + _ = waitWithTimeout(t, c, longTimeout) + + // We avoid asserting a specific exit code because shells/wrappers can map signals differently. + + ps := c.ProcessState() + if ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } +} + +func TestKillProcessGroupByNegativePidSIGTERM(t *testing.T) { + r := &cmdrunner.RealRunner{} + + // This command creates a new process group so that killing -pid targets the group. + // The script starts a background sleep plus a foreground sleep. + c := r.Command("sh", "-c", "(sleep 30) & sleep 30") + c.SetSysProcAttr(&syscall.SysProcAttr{Setpgid: true}) + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + pid := c.Pid() + if pid <= 0 { + t.Fatalf("expected valid pid, got %d", pid) + } + + // Give the shell a moment to spawn the background process. + time.Sleep(150 * time.Millisecond) + + // Kill the entire process group: pid < 0 targets process group id = -pid. + if err := r.Kill(-pid, syscall.SIGTERM); err != nil { + t.Fatalf("Kill(-pid, SIGTERM) failed: %v", err) + } + + // The group should terminate quickly. + _ = waitWithTimeout(t, c, longTimeout) + + ps := c.ProcessState() + if ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } +} + +func TestKillAfterProcessExitReturnsESRCH(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := startShellCommand(t, r, "true") + pid := c.Pid() + + if pid <= 0 { + t.Fatalf("expected valid pid, got %d", pid) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + err := r.Kill(pid, syscall.SIGTERM) + if err == nil { + t.Fatalf("expected Kill() to fail for already-exited process") + } + + // On Linux, killing a non-existing pid typically returns ESRCH. + if !errors.Is(err, syscall.ESRCH) { + t.Fatalf("expected ESRCH, got %T: %v", err, err) + } +} + +func TestParallelStartsAndWaits(t *testing.T) { + r := &cmdrunner.RealRunner{} + + const n = 10 + + var wg sync.WaitGroup + + wg.Add(n) + + errCh := make(chan error, n) + + for i := range n { + go func(i int) { + defer wg.Done() + + // Each command prints a unique token. + script := fmt.Sprintf("echo token_%d", i) + c := r.Command("sh", "-c", script) + + var out bytes.Buffer + + c.SetStdout(&out) + + if err := c.Start(); err != nil { + errCh <- fmt.Errorf("function Start() failed (i=%d): %w", i, err) + + return + } + + if c.Pid() <= 0 { + errCh <- fmt.Errorf("invalid pid (i=%d): %d", i, c.Pid()) + + return + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + errCh <- fmt.Errorf("function Wait() failed (i=%d): %w", i, err) + + return + } + + if !strings.Contains(out.String(), fmt.Sprintf("token_%d", i)) { + errCh <- fmt.Errorf("unexpected stdout (i=%d): %q", i, out.String()) + + return + } + }(i) + } + + wg.Wait() + close(errCh) + + for err := range errCh { + t.Error(err) + } +} + +func TestPidIsZeroBeforeStartAndNonZeroAfterStart(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := r.Command("sh", "-c", "sleep 0.2") + if got := c.Pid(); got != 0 { + t.Fatalf("expected Pid() == 0 before Start(), got %d", got) + } + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + if got := c.Pid(); got <= 0 { + t.Fatalf("expected Pid() > 0 after Start(), got %d", got) + } + + _ = waitWithTimeout(t, c, shortTimeout) +} + +func TestProcessStateNilBeforeWaitNonNilAfterWait(t *testing.T) { + r := &cmdrunner.RealRunner{} + + c := r.Command("sh", "-c", "sleep 0.2") + + // Before Start(), ProcessState should be nil. + if ps := c.ProcessState(); ps != nil { + t.Fatalf("expected ProcessState() == nil before Start(), got %+v", ps) + } + + if err := c.Start(); err != nil { + t.Fatalf("Start() failed: %v", err) + } + + // After Start() but before Wait(), ProcessState is typically still nil. + if ps := c.ProcessState(); ps != nil { + // We keep this as a strict assertion to catch unexpected behavior changes. + t.Fatalf("expected ProcessState() == nil before Wait(), got %+v", ps) + } + + if err := waitWithTimeout(t, c, shortTimeout); err != nil { + t.Fatalf("Wait() failed: %v", err) + } + + if ps := c.ProcessState(); ps == nil { + t.Fatalf("expected ProcessState() to be non-nil after Wait()") + } +} + +// startShellCommand creates and starts: sh -c