Skip to content

Commit ebbc276

Browse files
committed
test: add deterministic broker-worker coverage
1 parent fb4eb2d commit ebbc276

File tree

2 files changed

+144
-3
lines changed

2 files changed

+144
-3
lines changed

internal/sshConn/message.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ import (
66
"sync/atomic"
77
)
88

9+
var (
10+
connectionFunc = Connection
11+
sessionFunc = Session
12+
workerRunner = worker
13+
)
14+
915
type ProxyWriter struct {
1016
events chan<- OutputEvent
1117
host *Host
@@ -62,7 +68,7 @@ func emitSystem(events chan<- OutputEvent, host *Host, line string) {
6268
}
6369

6470
func worker(host *Host, input <-chan CommandRequest, events chan<- OutputEvent) {
65-
connection, err := Connection(host)
71+
connection, err := connectionFunc(host)
6672
if err != nil {
6773
emitSystem(events, host, fmt.Sprintf("error connection to host %s: %v", host.Hostname, err))
6874
return
@@ -72,7 +78,7 @@ func worker(host *Host, input <-chan CommandRequest, events chan<- OutputEvent)
7278
stdoutWriter := NewProxyWriter(events, host, 0)
7379
stderrWriter := NewProxyWriter(events, host, 0)
7480
stderrWriter.system = true
75-
stdin, session, err := Session(connection, host, stdoutWriter, stderrWriter)
81+
stdin, session, err := sessionFunc(connection, host, stdoutWriter, stderrWriter)
7682
if err != nil {
7783
emitSystem(events, host, fmt.Sprintf("unable to open session: %v", err))
7884
atomic.StoreInt32(&host.IsConnected, 0)
@@ -100,7 +106,7 @@ func worker(host *Host, input <-chan CommandRequest, events chan<- OutputEvent)
100106
func Broker(hostList *HostList, input <-chan CommandRequest, events chan<- OutputEvent) {
101107
for _, host := range hostList.Hosts() {
102108
host.Channel = make(chan CommandRequest)
103-
go worker(host, host.Channel, events)
109+
go workerRunner(host, host.Channel, events)
104110
}
105111

106112
for request := range input {

internal/sshConn/message_test.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package sshConn
2+
3+
import (
4+
"errors"
5+
"io"
6+
"strings"
7+
"sync/atomic"
8+
"testing"
9+
10+
"golang.org/x/crypto/ssh"
11+
)
12+
13+
type captureWriteCloser struct {
14+
buf []byte
15+
}
16+
17+
func (w *captureWriteCloser) Write(p []byte) (int, error) {
18+
w.buf = append(w.buf, p...)
19+
return len(p), nil
20+
}
21+
22+
func (w *captureWriteCloser) Close() error { return nil }
23+
24+
func TestWorkerEmitsConnectionError(t *testing.T) {
25+
prevConnection := connectionFunc
26+
prevSession := sessionFunc
27+
t.Cleanup(func() {
28+
connectionFunc = prevConnection
29+
sessionFunc = prevSession
30+
})
31+
32+
connectionFunc = func(host *Host) (*ssh.Client, error) {
33+
return nil, errors.New("dial failed")
34+
}
35+
sessionFunc = prevSession
36+
37+
host := &Host{Hostname: "host1"}
38+
events := make(chan OutputEvent, 1)
39+
input := make(chan CommandRequest)
40+
close(input)
41+
42+
worker(host, input, events)
43+
44+
select {
45+
case evt := <-events:
46+
if !evt.System {
47+
t.Fatalf("expected system event")
48+
}
49+
if !strings.Contains(evt.Line, "dial failed") {
50+
t.Fatalf("unexpected line: %q", evt.Line)
51+
}
52+
default:
53+
t.Fatalf("expected connection error event")
54+
}
55+
}
56+
57+
func TestWorkerHandlesRequestsWithStubSession(t *testing.T) {
58+
prevConnection := connectionFunc
59+
prevSession := sessionFunc
60+
t.Cleanup(func() {
61+
connectionFunc = prevConnection
62+
sessionFunc = prevSession
63+
})
64+
65+
connectionFunc = func(host *Host) (*ssh.Client, error) {
66+
return &ssh.Client{}, nil
67+
}
68+
stdin := &captureWriteCloser{}
69+
sessionFunc = func(connection *ssh.Client, host *Host, stdout, stderr io.Writer) (io.WriteCloser, *ssh.Session, error) {
70+
return stdin, nil, nil
71+
}
72+
73+
host := &Host{Hostname: "host1"}
74+
events := make(chan OutputEvent, 1)
75+
input := make(chan CommandRequest, 2)
76+
input <- CommandRequest{Kind: CommandKindRun, JobID: 7, Command: "uptime"}
77+
input <- CommandRequest{Kind: CommandKindControl, JobID: 8, ControlByte: 0x03}
78+
close(input)
79+
80+
worker(host, input, events)
81+
82+
written := string(stdin.buf)
83+
if !strings.Contains(written, "uptime\n") {
84+
t.Fatalf("expected command write, got %q", written)
85+
}
86+
if !strings.Contains(written, string([]byte{0x03})) {
87+
t.Fatalf("expected control byte write, got %q", written)
88+
}
89+
if atomic.LoadInt32(&host.IsConnected) != 1 {
90+
t.Fatalf("expected host connected")
91+
}
92+
if atomic.LoadInt32(&host.IsWaiting) != 0 {
93+
t.Fatalf("expected host not waiting")
94+
}
95+
}
96+
97+
func TestBrokerDispatchesOnlyToConnectedHosts(t *testing.T) {
98+
prevWorker := workerRunner
99+
t.Cleanup(func() {
100+
workerRunner = prevWorker
101+
})
102+
103+
dispatched := make(chan string, 2)
104+
workerRunner = func(host *Host, input <-chan CommandRequest, events chan<- OutputEvent) {
105+
request := <-input
106+
dispatched <- host.Hostname + ":" + request.Command
107+
}
108+
109+
hostList := NewHostList()
110+
host1 := &Host{Hostname: "host1", IsConnected: 1}
111+
host2 := &Host{Hostname: "host2", IsConnected: 0}
112+
hostList.AddHost(host1)
113+
hostList.AddHost(host2)
114+
115+
input := make(chan CommandRequest, 1)
116+
done := make(chan struct{})
117+
go func() {
118+
Broker(hostList, input, nil)
119+
close(done)
120+
}()
121+
122+
input <- CommandRequest{Kind: CommandKindRun, JobID: 1, Command: "date"}
123+
close(input)
124+
<-done
125+
126+
got := <-dispatched
127+
if got != "host1:date" {
128+
t.Fatalf("unexpected dispatch %q", got)
129+
}
130+
select {
131+
case extra := <-dispatched:
132+
t.Fatalf("unexpected extra dispatch: %s", extra)
133+
default:
134+
}
135+
}

0 commit comments

Comments
 (0)