Skip to content

Commit da66cd1

Browse files
authored
Merge pull request #1 from Gladium-AI/fix/drain-cloudflared-stdout
Fix: drain cloudflared stdout/stderr to prevent tunnel error 1033
2 parents bfad3af + cdf086b commit da66cd1

File tree

4 files changed

+285
-37
lines changed

4 files changed

+285
-37
lines changed

internal/exec/runner.go

Lines changed: 179 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,93 @@
11
package exec
22

33
import (
4-
"context"
4+
"bytes"
55
"fmt"
66
"io"
77
"os"
88
"os/exec"
9+
"sync"
910
"syscall"
1011
"time"
1112
)
1213

1314
// Runner manages subprocess execution.
1415
type Runner struct {
15-
cmd *exec.Cmd
16-
logPipe io.ReadCloser
16+
cmd *exec.Cmd
17+
18+
// exitCh is closed when the process exits.
19+
exitCh chan struct{}
20+
// exitErr holds the result of cmd.Wait().
21+
exitErr error
22+
23+
mu sync.Mutex
24+
logBuf *ringBuffer // Circular buffer that stores recent log output.
25+
logFile *os.File // Optional persistent log file on disk.
26+
}
27+
28+
// ringBuffer is a fixed-size circular buffer that implements io.Writer.
29+
// Older data is silently discarded when the buffer wraps.
30+
type ringBuffer struct {
31+
mu sync.Mutex
32+
buf []byte
33+
pos int // next write position
34+
full bool // whether the buffer has wrapped at least once
35+
}
36+
37+
func newRingBuffer(size int) *ringBuffer {
38+
return &ringBuffer{buf: make([]byte, size)}
39+
}
40+
41+
func (rb *ringBuffer) Write(p []byte) (int, error) {
42+
rb.mu.Lock()
43+
defer rb.mu.Unlock()
44+
n := len(p)
45+
for len(p) > 0 {
46+
k := copy(rb.buf[rb.pos:], p)
47+
rb.pos += k
48+
p = p[k:]
49+
if rb.pos == len(rb.buf) {
50+
rb.pos = 0
51+
rb.full = true
52+
}
53+
}
54+
return n, nil
55+
}
56+
57+
// Bytes returns the buffered data in chronological order.
58+
func (rb *ringBuffer) Bytes() []byte {
59+
rb.mu.Lock()
60+
defer rb.mu.Unlock()
61+
if !rb.full {
62+
return bytes.Clone(rb.buf[:rb.pos])
63+
}
64+
out := make([]byte, len(rb.buf))
65+
n := copy(out, rb.buf[rb.pos:])
66+
copy(out[n:], rb.buf[:rb.pos])
67+
return out
1768
}
1869

1970
// RunOpts configures a subprocess.
2071
type RunOpts struct {
21-
Name string // Binary name or path.
22-
Args []string // Command arguments.
23-
Dir string // Working directory (optional).
24-
Env map[string]string // Additional environment variables.
72+
Name string // Binary name or path.
73+
Args []string // Command arguments.
74+
Dir string // Working directory (optional).
75+
Env map[string]string // Additional environment variables.
76+
LogFile string // Optional path to persist subprocess logs to disk.
2577
}
2678

2779
// Start launches a subprocess with its own process group.
28-
func Start(ctx context.Context, opts RunOpts) (*Runner, error) {
29-
cmd := exec.CommandContext(ctx, opts.Name, opts.Args...)
80+
//
81+
// IMPORTANT: We intentionally use exec.Command (NOT exec.CommandContext).
82+
// exec.CommandContext sends SIGKILL to the subprocess when the context is
83+
// cancelled, which kills cloudflared instantly without any chance for graceful
84+
// shutdown or log capture. Instead, we manage the process lifecycle ourselves
85+
// via the Stop() method which sends SIGTERM first, then SIGKILL after a timeout.
86+
//
87+
// Stdout/stderr are continuously drained into a 1 MB ring buffer (and optionally
88+
// to a persistent log file on disk) so the subprocess never blocks on log output.
89+
func Start(_ /* ctx not used intentionally */ interface{}, opts RunOpts) (*Runner, error) {
90+
cmd := exec.Command(opts.Name, opts.Args...)
3091

3192
if opts.Dir != "" {
3293
cmd.Dir = opts.Dir
@@ -41,24 +102,79 @@ func Start(ctx context.Context, opts RunOpts) (*Runner, error) {
41102
// Own process group so we can kill the tree.
42103
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
43104

44-
// Capture combined output.
45-
pr, pw := io.Pipe()
46-
cmd.Stdout = pw
47-
cmd.Stderr = pw
105+
// Use OS pipes (buffered by kernel, typically 64KB+) instead of io.Pipe
106+
// (which has zero buffering and blocks the writer immediately).
107+
stdoutR, stdoutW, err := os.Pipe()
108+
if err != nil {
109+
return nil, fmt.Errorf("creating stdout pipe: %w", err)
110+
}
111+
stderrR, stderrW, err := os.Pipe()
112+
if err != nil {
113+
stdoutR.Close()
114+
stdoutW.Close()
115+
return nil, fmt.Errorf("creating stderr pipe: %w", err)
116+
}
117+
118+
cmd.Stdout = stdoutW
119+
cmd.Stderr = stderrW
48120

49121
if err := cmd.Start(); err != nil {
50-
pw.Close()
51-
pr.Close()
122+
stdoutR.Close()
123+
stdoutW.Close()
124+
stderrR.Close()
125+
stderrW.Close()
52126
return nil, fmt.Errorf("starting %s: %w", opts.Name, err)
53127
}
54128

55-
// Close the write end when the process exits so readers see EOF.
129+
// Close the write ends in our process — the child owns them now.
130+
stdoutW.Close()
131+
stderrW.Close()
132+
133+
r := &Runner{
134+
cmd: cmd,
135+
logBuf: newRingBuffer(1 << 20), // 1 MB ring buffer
136+
exitCh: make(chan struct{}),
137+
}
138+
139+
// Open persistent log file if requested.
140+
var logFileWriter io.Writer
141+
if opts.LogFile != "" {
142+
f, err := os.OpenFile(opts.LogFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
143+
if err != nil {
144+
// Non-fatal: log to ring buffer only.
145+
logFileWriter = nil
146+
} else {
147+
r.logFile = f
148+
logFileWriter = f
149+
}
150+
}
151+
152+
// Build the combined writer: ring buffer + optional log file.
153+
var sink io.Writer
154+
if logFileWriter != nil {
155+
sink = io.MultiWriter(r.logBuf, logFileWriter)
156+
} else {
157+
sink = r.logBuf
158+
}
159+
160+
// Continuously drain stdout and stderr into the sink.
161+
// This runs until the read ends return EOF (i.e., the process exits).
162+
var drainWg sync.WaitGroup
163+
drainWg.Add(2)
164+
go func() { defer drainWg.Done(); io.Copy(sink, stdoutR); stdoutR.Close() }()
165+
go func() { defer drainWg.Done(); io.Copy(sink, stderrR); stderrR.Close() }()
166+
167+
// Wait for the process to exit, then close exitCh to notify watchers.
56168
go func() {
57-
cmd.Wait()
58-
pw.Close()
169+
r.exitErr = cmd.Wait()
170+
drainWg.Wait() // ensure all output is captured before signalling
171+
if r.logFile != nil {
172+
r.logFile.Close()
173+
}
174+
close(r.exitCh)
59175
}()
60176

61-
return &Runner{cmd: cmd, logPipe: pr}, nil
177+
return r, nil
62178
}
63179

64180
// PID returns the process ID.
@@ -70,8 +186,34 @@ func (r *Runner) PID() int {
70186
}
71187

72188
// Logs returns a reader for the process's combined stdout/stderr.
189+
// The returned reader contains a snapshot of the most recent log output
190+
// (up to 1 MB) from the ring buffer.
73191
func (r *Runner) Logs() io.ReadCloser {
74-
return r.logPipe
192+
return io.NopCloser(bytes.NewReader(r.logBuf.Bytes()))
193+
}
194+
195+
// LogFilePath returns the path to the persistent log file, or "" if none.
196+
func (r *Runner) LogFilePath() string {
197+
if r.logFile != nil {
198+
return r.logFile.Name()
199+
}
200+
return ""
201+
}
202+
203+
// ExitCh returns a channel that is closed when the process exits.
204+
func (r *Runner) ExitCh() <-chan struct{} {
205+
return r.exitCh
206+
}
207+
208+
// ExitError returns the exit error after the process has finished.
209+
// Returns nil if the process hasn't exited yet or exited cleanly.
210+
func (r *Runner) ExitError() error {
211+
select {
212+
case <-r.exitCh:
213+
return r.exitErr
214+
default:
215+
return nil
216+
}
75217
}
76218

77219
// Stop sends SIGTERM to the process group, waits up to 10s, then SIGKILL.
@@ -80,47 +222,50 @@ func (r *Runner) Stop() error {
80222
return nil
81223
}
82224

225+
// If the process already exited, nothing to do.
226+
select {
227+
case <-r.exitCh:
228+
return nil
229+
default:
230+
}
231+
83232
// Send SIGTERM to the process group.
84233
pgid := -r.cmd.Process.Pid
85234
if err := syscall.Kill(pgid, syscall.SIGTERM); err != nil {
86235
// Process may have already exited.
87236
return nil
88237
}
89238

90-
// Wait for exit with timeout.
91-
done := make(chan error, 1)
92-
go func() {
93-
done <- r.cmd.Wait()
94-
}()
95-
96239
select {
97-
case <-done:
240+
case <-r.exitCh:
98241
return nil
99242
case <-time.After(10 * time.Second):
100243
// Force kill.
101244
_ = syscall.Kill(pgid, syscall.SIGKILL)
102-
<-done // Wait for the process to be reaped.
245+
<-r.exitCh
103246
return fmt.Errorf("process did not exit after SIGTERM; sent SIGKILL")
104247
}
105248
}
106249

107250
// Wait blocks until the process exits and returns its error.
108251
func (r *Runner) Wait() error {
109-
return r.cmd.Wait()
252+
<-r.exitCh
253+
return r.exitErr
110254
}
111255

112256
// Running returns true if the process is still alive.
113257
func (r *Runner) Running() bool {
114-
if r.cmd.Process == nil {
258+
select {
259+
case <-r.exitCh:
115260
return false
261+
default:
262+
return r.cmd.Process != nil
116263
}
117-
// Signal 0 checks if the process exists.
118-
return r.cmd.Process.Signal(syscall.Signal(0)) == nil
119264
}
120265

121266
// Run executes a command synchronously and returns its combined output.
122-
func Run(ctx context.Context, name string, args ...string) (string, error) {
123-
cmd := exec.CommandContext(ctx, name, args...)
267+
func Run(_ interface{}, name string, args ...string) (string, error) {
268+
cmd := exec.Command(name, args...)
124269
out, err := cmd.CombinedOutput()
125270
return string(out), err
126271
}

internal/pipeline/serve.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package pipeline
33
import (
44
"context"
55
"fmt"
6+
"io"
7+
"os"
68
"strings"
79
"time"
810

@@ -232,6 +234,9 @@ func (p *Pipeline) Serve(ctx context.Context, params ServeParams) error {
232234
p.connector.Stop(cleanupCtx)
233235
})
234236
ui.PrintSuccess("cloudflared running (PID %d)", pid)
237+
if logPath := p.connector.LogFilePath(); logPath != "" {
238+
ui.PrintInfo("cloudflared logs: %s", logPath)
239+
}
235240

236241
// Step 8: Print success summary.
237242
fmt.Println()
@@ -257,11 +262,37 @@ func (p *Pipeline) waitForShutdown(ctx context.Context, sess *session.Session, p
257262
ttlTimer = time.After(params.TTL)
258263
}
259264

265+
// Get cloudflared's exit channel so we detect crashes immediately.
266+
exitCh := p.connector.ExitCh()
267+
260268
select {
261269
case <-ctx.Done():
262270
ui.PrintInfo("Shutdown signal received...")
263271
case <-ttlTimer:
264272
ui.PrintInfo("TTL expired, shutting down...")
273+
case <-exitCh:
274+
ui.PrintWarning("cloudflared process exited unexpectedly!")
275+
// Dump last 50 lines of cloudflared logs for diagnostics.
276+
if logs := p.connector.Logs(); logs != nil {
277+
data, _ := io.ReadAll(logs)
278+
logs.Close()
279+
if len(data) > 0 {
280+
ui.PrintWarning("Last cloudflared output:")
281+
// Print last ~50 lines.
282+
lines := strings.Split(strings.TrimSpace(string(data)), "\n")
283+
start := 0
284+
if len(lines) > 50 {
285+
start = len(lines) - 50
286+
}
287+
for _, line := range lines[start:] {
288+
fmt.Fprintf(os.Stderr, " %s\n", line)
289+
}
290+
}
291+
}
292+
if logPath := p.connector.LogFilePath(); logPath != "" {
293+
ui.PrintInfo("Full cloudflared logs at: %s", logPath)
294+
}
295+
ui.PrintInfo("Shutting down session...")
265296
}
266297

267298
return p.Teardown(sess, cleanups)

internal/testutil/mocks.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ type MockConnector struct {
8585
FnStop func(ctx context.Context) error
8686
FnLogs func() io.ReadCloser
8787
FnHealthy func() error
88+
FnExitCh func() <-chan struct{}
8889

8990
RunCalls []string
9091
StopCalled bool
@@ -126,6 +127,18 @@ func (m *MockConnector) Healthy() error {
126127
return nil
127128
}
128129

130+
func (m *MockConnector) ExitCh() <-chan struct{} {
131+
if m.FnExitCh != nil {
132+
return m.FnExitCh()
133+
}
134+
// Default: return a channel that never closes (process stays alive).
135+
return make(chan struct{})
136+
}
137+
138+
func (m *MockConnector) LogFilePath() string {
139+
return ""
140+
}
141+
129142
// --- MockAccessManager ---
130143

131144
type MockAccessManager struct {

0 commit comments

Comments
 (0)