From 71a0e11cc0e61caaf12e5c275dc55bdb72841e07 Mon Sep 17 00:00:00 2001 From: Juliano Martinez Date: Sat, 4 Apr 2026 17:14:55 +0200 Subject: [PATCH] Benchmark broker fan-out and buffer dispatch --- internal/sshConn/message.go | 4 +- internal/sshConn/message_test.go | 82 ++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/internal/sshConn/message.go b/internal/sshConn/message.go index 796395b..5bc399e 100644 --- a/internal/sshConn/message.go +++ b/internal/sshConn/message.go @@ -12,6 +12,8 @@ var ( workerRunner = worker ) +const brokerChannelBufferSize = 1 + type ProxyWriter struct { events chan<- OutputEvent host *Host @@ -105,7 +107,7 @@ func worker(host *Host, input <-chan CommandRequest, events chan<- OutputEvent) func Broker(hostList *HostList, input <-chan CommandRequest, events chan<- OutputEvent) { for _, host := range hostList.Hosts() { - host.Channel = make(chan CommandRequest) + host.Channel = make(chan CommandRequest, brokerChannelBufferSize) go workerRunner(host, host.Channel, events) } diff --git a/internal/sshConn/message_test.go b/internal/sshConn/message_test.go index c57f0ef..1982b98 100644 --- a/internal/sshConn/message_test.go +++ b/internal/sshConn/message_test.go @@ -2,8 +2,10 @@ package sshConn import ( "errors" + "fmt" "io" "strings" + "sync" "sync/atomic" "testing" @@ -219,3 +221,83 @@ func TestBrokerDispatchesOnlyToConnectedHosts(t *testing.T) { default: } } + +func BenchmarkBrokerDispatch(b *testing.B) { + for _, mode := range []struct { + name string + buffer int + }{ + {name: "unbuffered", buffer: 0}, + {name: "buffered_1", buffer: 1}, + } { + for _, hostCount := range []int{1, 8, 32, 128} { + b.Run(fmt.Sprintf("%s/hosts_%d", mode.name, hostCount), func(b *testing.B) { + benchmarkBrokerDispatchMode(b, hostCount, mode.buffer) + }) + } + } +} + +func benchmarkBrokerDispatchMode(b *testing.B, hostCount, bufferSize int) { + prevWorker := workerRunner + defer func() { + workerRunner = prevWorker + }() + + hostList := NewHostList() + for i := 0; i < hostCount; i++ { + hostList.AddHost(&Host{ + Hostname: fmt.Sprintf("host-%d", i), + IsConnected: 1, + }) + } + + var ready sync.WaitGroup + var drained sync.WaitGroup + ready.Add(hostCount) + drained.Add(hostCount) + + workerRunner = func(host *Host, input <-chan CommandRequest, events chan<- OutputEvent) { + ready.Done() + for i := 0; i < b.N; i++ { + <-input + } + drained.Done() + } + + input := make(chan CommandRequest) + done := make(chan struct{}) + go func() { + brokerWithBuffer(hostList, input, nil, bufferSize) + close(done) + }() + + ready.Wait() + + request := CommandRequest{Kind: CommandKindRun, JobID: 1, Command: "date"} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + input <- request + } + b.StopTimer() + + close(input) + <-done + drained.Wait() +} + +func brokerWithBuffer(hostList *HostList, input <-chan CommandRequest, events chan<- OutputEvent, bufferSize int) { + for _, host := range hostList.Hosts() { + host.Channel = make(chan CommandRequest, bufferSize) + go workerRunner(host, host.Channel, events) + } + + for request := range input { + for _, host := range hostList.Hosts() { + if atomic.LoadInt32(&host.IsConnected) == 1 { + host.Channel <- request + } + } + } +}