Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/sshConn/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ var (
workerRunner = worker
)

const brokerChannelBufferSize = 1

type ProxyWriter struct {
events chan<- OutputEvent
host *Host
Expand Down Expand Up @@ -105,7 +107,7 @@ func worker(host *Host, input <-chan CommandRequest, events chan<- OutputEvent)

func Broker(hostList *HostList, input <-chan CommandRequest, events chan<- OutputEvent) {
for _, host := range hostList.Hosts() {
host.Channel = make(chan CommandRequest)
host.Channel = make(chan CommandRequest, brokerChannelBufferSize)
go workerRunner(host, host.Channel, events)
}

Expand Down
82 changes: 82 additions & 0 deletions internal/sshConn/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package sshConn

import (
"errors"
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"testing"

Expand Down Expand Up @@ -219,3 +221,83 @@ func TestBrokerDispatchesOnlyToConnectedHosts(t *testing.T) {
default:
}
}

func BenchmarkBrokerDispatch(b *testing.B) {
for _, mode := range []struct {
name string
buffer int
}{
{name: "unbuffered", buffer: 0},
{name: "buffered_1", buffer: 1},
} {
for _, hostCount := range []int{1, 8, 32, 128} {
b.Run(fmt.Sprintf("%s/hosts_%d", mode.name, hostCount), func(b *testing.B) {
benchmarkBrokerDispatchMode(b, hostCount, mode.buffer)
})
}
}
}

func benchmarkBrokerDispatchMode(b *testing.B, hostCount, bufferSize int) {
prevWorker := workerRunner
defer func() {
workerRunner = prevWorker
}()

hostList := NewHostList()
for i := 0; i < hostCount; i++ {
hostList.AddHost(&Host{
Hostname: fmt.Sprintf("host-%d", i),
IsConnected: 1,
})
}

var ready sync.WaitGroup
var drained sync.WaitGroup
ready.Add(hostCount)
drained.Add(hostCount)

workerRunner = func(host *Host, input <-chan CommandRequest, events chan<- OutputEvent) {
ready.Done()
for i := 0; i < b.N; i++ {
<-input
}
drained.Done()
}

input := make(chan CommandRequest)
done := make(chan struct{})
go func() {
brokerWithBuffer(hostList, input, nil, bufferSize)
close(done)
}()

ready.Wait()

request := CommandRequest{Kind: CommandKindRun, JobID: 1, Command: "date"}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
input <- request
}
b.StopTimer()

close(input)
<-done
drained.Wait()
}

func brokerWithBuffer(hostList *HostList, input <-chan CommandRequest, events chan<- OutputEvent, bufferSize int) {
for _, host := range hostList.Hosts() {
host.Channel = make(chan CommandRequest, bufferSize)
go workerRunner(host, host.Channel, events)
}

for request := range input {
for _, host := range hostList.Hosts() {
if atomic.LoadInt32(&host.IsConnected) == 1 {
host.Channel <- request
}
}
}
}