Skip to content

Commit bd06dd9

Browse files
authored
Merge pull request #24 from ncode/codex/benchmark-broker-fanout
Benchmark broker fan-out and buffer dispatch
2 parents b4cb499 + 71a0e11 commit bd06dd9

File tree

2 files changed

+85
-1
lines changed

2 files changed

+85
-1
lines changed

internal/sshConn/message.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ var (
1212
workerRunner = worker
1313
)
1414

15+
const brokerChannelBufferSize = 1
16+
1517
type ProxyWriter struct {
1618
events chan<- OutputEvent
1719
host *Host
@@ -105,7 +107,7 @@ func worker(host *Host, input <-chan CommandRequest, events chan<- OutputEvent)
105107

106108
func Broker(hostList *HostList, input <-chan CommandRequest, events chan<- OutputEvent) {
107109
for _, host := range hostList.Hosts() {
108-
host.Channel = make(chan CommandRequest)
110+
host.Channel = make(chan CommandRequest, brokerChannelBufferSize)
109111
go workerRunner(host, host.Channel, events)
110112
}
111113

internal/sshConn/message_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package sshConn
22

33
import (
44
"errors"
5+
"fmt"
56
"io"
67
"strings"
8+
"sync"
79
"sync/atomic"
810
"testing"
911

@@ -219,3 +221,83 @@ func TestBrokerDispatchesOnlyToConnectedHosts(t *testing.T) {
219221
default:
220222
}
221223
}
224+
225+
func BenchmarkBrokerDispatch(b *testing.B) {
226+
for _, mode := range []struct {
227+
name string
228+
buffer int
229+
}{
230+
{name: "unbuffered", buffer: 0},
231+
{name: "buffered_1", buffer: 1},
232+
} {
233+
for _, hostCount := range []int{1, 8, 32, 128} {
234+
b.Run(fmt.Sprintf("%s/hosts_%d", mode.name, hostCount), func(b *testing.B) {
235+
benchmarkBrokerDispatchMode(b, hostCount, mode.buffer)
236+
})
237+
}
238+
}
239+
}
240+
241+
func benchmarkBrokerDispatchMode(b *testing.B, hostCount, bufferSize int) {
242+
prevWorker := workerRunner
243+
defer func() {
244+
workerRunner = prevWorker
245+
}()
246+
247+
hostList := NewHostList()
248+
for i := 0; i < hostCount; i++ {
249+
hostList.AddHost(&Host{
250+
Hostname: fmt.Sprintf("host-%d", i),
251+
IsConnected: 1,
252+
})
253+
}
254+
255+
var ready sync.WaitGroup
256+
var drained sync.WaitGroup
257+
ready.Add(hostCount)
258+
drained.Add(hostCount)
259+
260+
workerRunner = func(host *Host, input <-chan CommandRequest, events chan<- OutputEvent) {
261+
ready.Done()
262+
for i := 0; i < b.N; i++ {
263+
<-input
264+
}
265+
drained.Done()
266+
}
267+
268+
input := make(chan CommandRequest)
269+
done := make(chan struct{})
270+
go func() {
271+
brokerWithBuffer(hostList, input, nil, bufferSize)
272+
close(done)
273+
}()
274+
275+
ready.Wait()
276+
277+
request := CommandRequest{Kind: CommandKindRun, JobID: 1, Command: "date"}
278+
b.ReportAllocs()
279+
b.ResetTimer()
280+
for i := 0; i < b.N; i++ {
281+
input <- request
282+
}
283+
b.StopTimer()
284+
285+
close(input)
286+
<-done
287+
drained.Wait()
288+
}
289+
290+
func brokerWithBuffer(hostList *HostList, input <-chan CommandRequest, events chan<- OutputEvent, bufferSize int) {
291+
for _, host := range hostList.Hosts() {
292+
host.Channel = make(chan CommandRequest, bufferSize)
293+
go workerRunner(host, host.Channel, events)
294+
}
295+
296+
for request := range input {
297+
for _, host := range hostList.Hosts() {
298+
if atomic.LoadInt32(&host.IsConnected) == 1 {
299+
host.Channel <- request
300+
}
301+
}
302+
}
303+
}

0 commit comments

Comments
 (0)